sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::ZipDebugEqIteratorExt;
39use mysten_common::{assert_reachable, fatal};
40use parking_lot::Mutex;
41use prometheus::{
42    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43    register_histogram_vec_with_registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62    collections::{HashMap, HashSet},
63    fs,
64    pin::Pin,
65    str::FromStr,
66    sync::Arc,
67    vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::crypto::RandomnessRound;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::execution_status::ExecutionStatus;
83use sui_types::inner_temporary_store::PackageStoreWithFallback;
84use sui_types::layout_resolver::LayoutResolver;
85use sui_types::layout_resolver::into_struct_layout;
86use sui_types::messages_consensus::AuthorityCapabilitiesV2;
87use sui_types::node_role::NodeRole;
88use sui_types::object::bounded_visitor::BoundedVisitor;
89use sui_types::storage::ChildObjectResolver;
90use sui_types::storage::InputKey;
91use sui_types::storage::TrackingBackingStore;
92use sui_types::traffic_control::{
93    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
94};
95use sui_types::transaction_executor::SimulateTransactionResult;
96use sui_types::transaction_executor::TransactionChecks;
97use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
98use tap::TapFallible;
99use tokio::sync::RwLock;
100use tokio::sync::mpsc::unbounded_channel;
101use tokio::sync::watch::error::RecvError;
102use tokio::sync::{mpsc, oneshot};
103use tokio::task::JoinHandle;
104use tokio::time::timeout;
105use tracing::{debug, error, info, instrument, warn};
106
107use self::authority_store::ExecutionLockWriteGuard;
108use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
109pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
110use mysten_metrics::{monitored_scope, spawn_monitored_task};
111
112use crate::jsonrpc_index::IndexStore;
113use crate::jsonrpc_index::{
114    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
115};
116use mysten_common::{debug_fatal, debug_fatal_no_invariant};
117use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
118use sui_config::genesis::Genesis;
119use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
120use sui_framework::{BuiltInFramework, SystemPackage};
121use sui_json_rpc_types::{
122    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
123    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
124    SuiTransactionBlockEvents, TransactionFilter,
125};
126use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
127use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
128use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
129use sui_types::accumulator_root::AccumulatorValue;
130use sui_types::authenticator_state::get_authenticator_state;
131use sui_types::balance::Balance;
132use sui_types::coin_reservation;
133use sui_types::committee::{EpochId, ProtocolVersion};
134use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
135use sui_types::deny_list_v1::check_coin_deny_list_v1;
136use sui_types::digests::ChainIdentifier;
137use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
138use sui_types::effects::{
139    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
140    TransactionEvents, VerifiedSignedTransactionEffects,
141};
142use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
143use sui_types::event::EventID;
144use sui_types::executable_transaction::VerifiedExecutableTransaction;
145use sui_types::execution_status::ExecutionErrorKind;
146use sui_types::gas::{GasCostSummary, SuiGasStatus};
147use sui_types::inner_temporary_store::{
148    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
149};
150use sui_types::message_envelope::Message;
151use sui_types::messages_checkpoint::{
152    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
153    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
154    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
155    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
156};
157use sui_types::messages_grpc::{
158    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
159    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
160};
161use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
162use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
163use sui_types::signature::GenericSignature;
164use sui_types::storage::{
165    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
166};
167use sui_types::sui_system_state::SuiSystemStateTrait;
168use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
169use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
170use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
171use sui_types::{
172    SUI_SYSTEM_ADDRESS,
173    base_types::*,
174    committee::Committee,
175    crypto::AuthoritySignature,
176    error::{SuiError, SuiResult},
177    object::{Object, ObjectRead},
178    transaction::*,
179};
180use sui_types::{TypeTag, is_system_package};
181use typed_store::TypedStoreError;
182use typed_store::rocks::StagedBatch;
183
184use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
185use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
186use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
187use crate::authority::authority_store_pruner::{
188    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
189};
190use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
191use crate::authority::epoch_start_configuration::EpochStartConfiguration;
192use crate::checkpoints::CheckpointStore;
193use crate::epoch::committee_store::CommitteeStore;
194use crate::execution_cache::{
195    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
196    ObjectCacheRead, StateSyncAPI,
197};
198use crate::execution_driver::execution_process;
199use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
200use crate::metrics::LatencyObserver;
201use crate::metrics::RateTracker;
202use crate::module_cache_metrics::ResolverMetrics;
203use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
204use crate::stake_aggregator::StakeAggregator;
205use crate::subscription_handler::SubscriptionHandler;
206use crate::transaction_input_loader::TransactionInputLoader;
207
208#[cfg(msim)]
209pub use crate::checkpoints::checkpoint_executor::utils::{
210    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
211};
212
213#[cfg(msim)]
214use sui_types::committee::CommitteeTrait;
215use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
216
217#[cfg(test)]
218#[path = "unit_tests/authority_tests.rs"]
219pub mod authority_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/transaction_tests.rs"]
223pub mod transaction_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/batch_transaction_tests.rs"]
227mod batch_transaction_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/move_integration_tests.rs"]
231pub mod move_integration_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/gas_tests.rs"]
235mod gas_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/gas_data_tests.rs"]
239mod gas_data_tests;
240
241#[cfg(test)]
242#[path = "unit_tests/batch_verification_tests.rs"]
243mod batch_verification_tests;
244
245#[cfg(test)]
246#[path = "unit_tests/coin_deny_list_tests.rs"]
247mod coin_deny_list_tests;
248
249#[cfg(test)]
250#[path = "unit_tests/auth_unit_test_utils.rs"]
251pub mod auth_unit_test_utils;
252
253pub mod authority_test_utils;
254
255pub mod authority_per_epoch_store;
256pub mod authority_per_epoch_store_pruner;
257
258pub mod authority_store_pruner;
259pub mod authority_store_tables;
260pub mod authority_store_types;
261pub mod congestion_log;
262pub mod consensus_tx_status_cache;
263pub(crate) mod epoch_marker_key;
264pub mod epoch_start_configuration;
265pub mod execution_time_estimator;
266pub mod shared_object_congestion_tracker;
267pub mod shared_object_version_manager;
268pub mod submitted_transaction_cache;
269pub mod test_authority_builder;
270pub mod transaction_deferral;
271pub mod transaction_reject_reason_cache;
272mod weighted_moving_average;
273
274pub(crate) mod authority_store;
275pub mod backpressure;
276
277/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
278pub struct AuthorityMetrics {
279    tx_orders: IntCounter,
280    total_certs: IntCounter,
281    total_effects: IntCounter,
282    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
283    pub shared_obj_tx: IntCounter,
284    sponsored_tx: IntCounter,
285    tx_already_processed: IntCounter,
286    num_input_objs: Histogram,
287    // TODO: this tracks consensus object count, not just shared. Consider renaming.
288    num_shared_objects: Histogram,
289    batch_size: Histogram,
290
291    authority_state_handle_vote_transaction_latency: Histogram,
292
293    internal_execution_latency: Histogram,
294    execution_load_input_objects_latency: Histogram,
295    prepare_certificate_latency: Histogram,
296    commit_certificate_latency: Histogram,
297    db_checkpoint_latency: Histogram,
298
299    // TODO: Rename these metrics.
300    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
301    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
302    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
303    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
304
305    pub(crate) execution_driver_executed_transactions: IntCounter,
306    pub(crate) execution_driver_paused_transactions: IntCounter,
307    pub(crate) execution_driver_dispatch_queue: IntGauge,
308    pub(crate) execution_queueing_delay_s: Histogram,
309    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
310    pub(crate) execution_gas_latency_ratio: Histogram,
311
312    pub(crate) skipped_consensus_txns: IntCounter,
313    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
314    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
315
316    pub(crate) authority_overload_status: IntGauge,
317    pub(crate) authority_load_shedding_percentage: IntGauge,
318
319    pub(crate) transaction_overload_sources: IntCounterVec,
320
321    /// Post processing metrics
322    post_processing_total_events_emitted: IntCounter,
323    post_processing_total_tx_indexed: IntCounter,
324    post_processing_total_tx_had_event_processed: IntCounter,
325    post_processing_total_failures: IntCounter,
326
327    /// Consensus commit and transaction handler metrics
328    pub consensus_handler_processed: IntCounterVec,
329    pub consensus_handler_transaction_sizes: HistogramVec,
330    pub consensus_handler_deferred_transactions: IntCounter,
331    pub consensus_handler_congested_transactions: IntCounter,
332    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
333    pub consensus_handler_cancelled_transactions: IntCounter,
334    pub consensus_handler_max_object_costs: IntGaugeVec,
335    pub consensus_committed_subdags: IntCounterVec,
336    pub accumulator_deposits: IntCounter,
337    pub accumulator_withdrawals: IntCounter,
338    pub consensus_committed_messages: IntGaugeVec,
339    pub consensus_committed_user_transactions: IntGaugeVec,
340    pub consensus_finalized_user_transactions: IntGaugeVec,
341    pub consensus_rejected_user_transactions: IntGaugeVec,
342    pub consensus_calculated_throughput: IntGauge,
343    pub consensus_calculated_throughput_profile: IntGauge,
344    pub consensus_block_handler_block_processed: IntCounter,
345    pub consensus_block_handler_txn_processed: IntCounterVec,
346    pub consensus_block_handler_fastpath_executions: IntCounter,
347    pub consensus_timestamp_bias: Histogram,
348
349    pub execution_metrics: Arc<ExecutionMetrics>,
350
351    /// bytecode verifier metrics for tracking timeouts
352    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
353
354    /// Count of zklogin signatures
355    pub zklogin_sig_count: IntCounter,
356    /// Count of multisig signatures
357    pub multisig_sig_count: IntCounter,
358
359    // Tracks recent average txn queueing delay between when it is ready for execution
360    // until it starts executing.
361    pub execution_queueing_latency: LatencyObserver,
362
363    // Tracks the rate of transactions become ready for execution in transaction manager.
364    // The need for the Mutex is that the tracker is updated in transaction manager and read
365    // in the overload_monitor. There should be low mutex contention because
366    // transaction manager is single threaded and the read rate in overload_monitor is
367    // low. In the case where transaction manager becomes multi-threaded, we can
368    // create one rate tracker per thread.
369    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
370
371    // Tracks the rate of transactions starts execution in execution driver.
372    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
373    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
374}
375
376// Override default Prom buckets for positive numbers in 0-10M range
377const POSITIVE_INT_BUCKETS: &[f64] = &[
378    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
379    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
380    7000000., 10000000.,
381];
382
383const LATENCY_SEC_BUCKETS: &[f64] = &[
384    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
385    10., 20., 30., 60., 90.,
386];
387
388// Buckets for low latency samples. Starts from 10us.
389const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
390    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
391    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
392];
393
394// Buckets for consensus timestamp bias in seconds.
395// We expect 200-300ms, so we cluster the buckets more tightly in that range.
396const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
397    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
398    2.0, 10.0, 30.0,
399];
400
401const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
402    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
403    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
404];
405
406pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
407
408// Transaction author should have observed the input objects as finalized output,
409// so usually the wait does not need to be long.
410// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
411pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
412
413impl AuthorityMetrics {
414    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
415        Self {
416            tx_orders: register_int_counter_with_registry!(
417                "total_transaction_orders",
418                "Total number of transaction orders",
419                registry,
420            )
421            .unwrap(),
422            total_certs: register_int_counter_with_registry!(
423                "total_transaction_certificates",
424                "Total number of transaction certificates handled",
425                registry,
426            )
427            .unwrap(),
428            // total_effects == total transactions finished
429            total_effects: register_int_counter_with_registry!(
430                "total_transaction_effects",
431                "Total number of transaction effects produced",
432                registry,
433            )
434            .unwrap(),
435
436            shared_obj_tx: register_int_counter_with_registry!(
437                "num_shared_obj_tx",
438                "Number of transactions involving shared objects",
439                registry,
440            )
441            .unwrap(),
442
443            sponsored_tx: register_int_counter_with_registry!(
444                "num_sponsored_tx",
445                "Number of sponsored transactions",
446                registry,
447            )
448            .unwrap(),
449
450            tx_already_processed: register_int_counter_with_registry!(
451                "num_tx_already_processed",
452                "Number of transaction orders already processed previously",
453                registry,
454            )
455            .unwrap(),
456            num_input_objs: register_histogram_with_registry!(
457                "num_input_objects",
458                "Distribution of number of input TX objects per TX",
459                POSITIVE_INT_BUCKETS.to_vec(),
460                registry,
461            )
462            .unwrap(),
463            num_shared_objects: register_histogram_with_registry!(
464                "num_shared_objects",
465                "Number of shared input objects per TX",
466                POSITIVE_INT_BUCKETS.to_vec(),
467                registry,
468            )
469            .unwrap(),
470            batch_size: register_histogram_with_registry!(
471                "batch_size",
472                "Distribution of size of transaction batch",
473                POSITIVE_INT_BUCKETS.to_vec(),
474                registry,
475            )
476            .unwrap(),
477            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
478                "authority_state_handle_vote_transaction_latency",
479                "Latency of voting on transactions without signing",
480                LATENCY_SEC_BUCKETS.to_vec(),
481                registry,
482            )
483            .unwrap(),
484            internal_execution_latency: register_histogram_with_registry!(
485                "authority_state_internal_execution_latency",
486                "Latency of actual certificate executions",
487                LATENCY_SEC_BUCKETS.to_vec(),
488                registry,
489            )
490            .unwrap(),
491            execution_load_input_objects_latency: register_histogram_with_registry!(
492                "authority_state_execution_load_input_objects_latency",
493                "Latency of loading input objects for execution",
494                LOW_LATENCY_SEC_BUCKETS.to_vec(),
495                registry,
496            )
497            .unwrap(),
498            prepare_certificate_latency: register_histogram_with_registry!(
499                "authority_state_prepare_certificate_latency",
500                "Latency of executing certificates, before committing the results",
501                LATENCY_SEC_BUCKETS.to_vec(),
502                registry,
503            )
504            .unwrap(),
505            commit_certificate_latency: register_histogram_with_registry!(
506                "authority_state_commit_certificate_latency",
507                "Latency of committing certificate execution results",
508                LATENCY_SEC_BUCKETS.to_vec(),
509                registry,
510            )
511            .unwrap(),
512            db_checkpoint_latency: register_histogram_with_registry!(
513                "db_checkpoint_latency",
514                "Latency of checkpointing dbs",
515                LATENCY_SEC_BUCKETS.to_vec(),
516                registry,
517            ).unwrap(),
518            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
519                "transaction_manager_num_enqueued_certificates",
520                "Current number of certificates enqueued to ExecutionScheduler",
521                &["result"],
522                registry,
523            )
524            .unwrap(),
525            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
526                "transaction_manager_num_pending_certificates",
527                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
528                registry,
529            )
530            .unwrap(),
531            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
532                "transaction_manager_num_executing_certificates",
533                "Number of executing certificates, including queued and actually running certificates",
534                registry,
535            )
536            .unwrap(),
537            authority_overload_status: register_int_gauge_with_registry!(
538                "authority_overload_status",
539                "Whether authority is current experiencing overload and enters load shedding mode.",
540                registry)
541            .unwrap(),
542            authority_load_shedding_percentage: register_int_gauge_with_registry!(
543                "authority_load_shedding_percentage",
544                "The percentage of transactions is shed when the authority is in load shedding mode.",
545                registry)
546            .unwrap(),
547            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
548                "transaction_manager_transaction_queue_age_s",
549                "Time spent in waiting for transaction in the queue",
550                LATENCY_SEC_BUCKETS.to_vec(),
551                registry,
552            )
553            .unwrap(),
554            transaction_overload_sources: register_int_counter_vec_with_registry!(
555                "transaction_overload_sources",
556                "Number of times each source indicates transaction overload.",
557                &["source"],
558                registry)
559            .unwrap(),
560            execution_driver_executed_transactions: register_int_counter_with_registry!(
561                "execution_driver_executed_transactions",
562                "Cumulative number of transaction executed by execution driver",
563                registry,
564            )
565            .unwrap(),
566            execution_driver_paused_transactions: register_int_counter_with_registry!(
567                "execution_driver_paused_transactions",
568                "Cumulative number of transactions paused by execution driver",
569                registry,
570            )
571            .unwrap(),
572            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
573                "execution_driver_dispatch_queue",
574                "Number of transaction pending in execution driver dispatch queue",
575                registry,
576            )
577            .unwrap(),
578            execution_queueing_delay_s: register_histogram_with_registry!(
579                "execution_queueing_delay_s",
580                "Queueing delay between a transaction is ready for execution until it starts executing.",
581                LATENCY_SEC_BUCKETS.to_vec(),
582                registry
583            )
584            .unwrap(),
585            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
586                "prepare_cert_gas_latency_ratio",
587                "The ratio of computation gas divided by VM execution latency.",
588                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
589                registry
590            )
591            .unwrap(),
592            execution_gas_latency_ratio: register_histogram_with_registry!(
593                "execution_gas_latency_ratio",
594                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
595                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
596                registry
597            )
598            .unwrap(),
599            skipped_consensus_txns: register_int_counter_with_registry!(
600                "skipped_consensus_txns",
601                "Total number of consensus transactions skipped",
602                registry,
603            )
604            .unwrap(),
605            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
606                "skipped_consensus_txns_cache_hit",
607                "Total number of consensus transactions skipped because of local cache hit",
608                registry,
609            )
610            .unwrap(),
611            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
612                "consensus_handler_duplicate_tx_count",
613                "Number of times each transaction appears in its first consensus commit",
614                POSITIVE_INT_BUCKETS.to_vec(),
615                registry,
616            )
617            .unwrap(),
618            post_processing_total_events_emitted: register_int_counter_with_registry!(
619                "post_processing_total_events_emitted",
620                "Total number of events emitted in post processing",
621                registry,
622            )
623            .unwrap(),
624            post_processing_total_tx_indexed: register_int_counter_with_registry!(
625                "post_processing_total_tx_indexed",
626                "Total number of txes indexed in post processing",
627                registry,
628            )
629            .unwrap(),
630            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
631                "post_processing_total_tx_had_event_processed",
632                "Total number of txes finished event processing in post processing",
633                registry,
634            )
635            .unwrap(),
636            post_processing_total_failures: register_int_counter_with_registry!(
637                "post_processing_total_failures",
638                "Total number of failure in post processing",
639                registry,
640            )
641            .unwrap(),
642            consensus_handler_processed: register_int_counter_vec_with_registry!(
643                "consensus_handler_processed",
644                "Number of transactions processed by consensus handler",
645                &["class"],
646                registry
647            ).unwrap(),
648            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
649                "consensus_handler_transaction_sizes",
650                "Sizes of each type of transactions processed by consensus handler",
651                &["class"],
652                POSITIVE_INT_BUCKETS.to_vec(),
653                registry
654            ).unwrap(),
655            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
656                "consensus_handler_deferred_transactions",
657                "Number of transactions deferred by consensus handler",
658                registry,
659            ).unwrap(),
660            consensus_handler_congested_transactions: register_int_counter_with_registry!(
661                "consensus_handler_congested_transactions",
662                "Number of transactions deferred by consensus handler due to congestion",
663                registry,
664            ).unwrap(),
665            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
666                "consensus_handler_unpaid_amplification_deferrals",
667                "Number of transactions deferred due to unpaid consensus amplification",
668                registry,
669            ).unwrap(),
670            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
671                "consensus_handler_cancelled_transactions",
672                "Number of transactions cancelled by consensus handler",
673                registry,
674            ).unwrap(),
675            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
676                "consensus_handler_max_congestion_control_object_costs",
677                "Max object costs for congestion control in the current consensus commit",
678                &["commit_type"],
679                registry,
680            ).unwrap(),
681            consensus_committed_subdags: register_int_counter_vec_with_registry!(
682                "consensus_committed_subdags",
683                "Number of committed subdags, sliced by author",
684                &["authority"],
685                registry,
686            ).unwrap(),
687            accumulator_deposits: register_int_counter_with_registry!(
688                "accumulator_deposits",
689                "Total accumulator deposit (merge) events processed in settlement",
690                registry,
691            ).unwrap(),
692            accumulator_withdrawals: register_int_counter_with_registry!(
693                "accumulator_withdrawals",
694                "Total accumulator withdrawal (split) events processed in settlement",
695                registry,
696            ).unwrap(),
697            consensus_committed_messages: register_int_gauge_vec_with_registry!(
698                "consensus_committed_messages",
699                "Total number of committed consensus messages, sliced by author",
700                &["authority"],
701                registry,
702            ).unwrap(),
703            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
704                "consensus_committed_user_transactions",
705                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
710                "consensus_finalized_user_transactions",
711                "Number of user transactions finalized, sliced by submitter",
712                &["authority"],
713                registry,
714            ).unwrap(),
715            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
716                "consensus_rejected_user_transactions",
717                "Number of user transactions rejected, sliced by submitter",
718                &["authority"],
719                registry,
720            ).unwrap(),
721            execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
722            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723            zklogin_sig_count: register_int_counter_with_registry!(
724                "zklogin_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            multisig_sig_count: register_int_counter_with_registry!(
730                "multisig_sig_count",
731                "Count of zkLogin signatures",
732                registry,
733            )
734            .unwrap(),
735            consensus_calculated_throughput: register_int_gauge_with_registry!(
736                "consensus_calculated_throughput",
737                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738                registry,
739            ).unwrap(),
740            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741                "consensus_calculated_throughput_profile",
742                "The current active calculated throughput profile",
743                registry
744            ).unwrap(),
745            consensus_block_handler_block_processed: register_int_counter_with_registry!(
746                "consensus_block_handler_block_processed",
747                "Number of blocks processed by consensus block handler.",
748                registry
749            ).unwrap(),
750            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
751                "consensus_block_handler_txn_processed",
752                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
753                &["outcome"],
754                registry
755            ).unwrap(),
756            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
757                "consensus_block_handler_fastpath_executions",
758                "Number of fastpath transactions sent for execution by consensus transaction handler",
759                registry,
760            ).unwrap(),
761            consensus_timestamp_bias: register_histogram_with_registry!(
762                "consensus_timestamp_bias",
763                "Bias/delay from consensus timestamp to system time",
764                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
765                registry
766            ).unwrap(),
767            execution_queueing_latency: LatencyObserver::new(),
768            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
769            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770        }
771    }
772}
773
774/// a Trait object for `Signer` that is:
775/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
776/// - Sync, i.e. can be safely shared between threads.
777///
778/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
779///
780pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
781
782/// Execution env contains the "environment" for the transaction to be executed in, that is,
783/// all the information necessary for execution that is not specified by the transaction itself.
784#[derive(Debug, Clone)]
785pub struct ExecutionEnv {
786    /// The assigned version of each shared object for the transaction.
787    pub assigned_versions: AssignedVersions,
788    /// The expected digest of the effects of the transaction, if executing from checkpoint or
789    /// other sources where the effects are known in advance.
790    pub expected_effects_digest: Option<TransactionEffectsDigest>,
791    /// Status of the address funds withdraw scheduling of the transaction,
792    /// including both address and object funds withdraws.
793    pub funds_withdraw_status: FundsWithdrawStatus,
794    /// Transactions that must finish before this transaction can be executed.
795    /// Used to schedule barrier transactions after non-exclusive writes.
796    pub barrier_dependencies: Vec<TransactionDigest>,
797}
798
799impl Default for ExecutionEnv {
800    fn default() -> Self {
801        Self {
802            assigned_versions: Default::default(),
803            expected_effects_digest: None,
804            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
805            barrier_dependencies: Default::default(),
806        }
807    }
808}
809
810impl ExecutionEnv {
811    pub fn new() -> Self {
812        Default::default()
813    }
814
815    pub fn with_expected_effects_digest(
816        mut self,
817        expected_effects_digest: TransactionEffectsDigest,
818    ) -> Self {
819        self.expected_effects_digest = Some(expected_effects_digest);
820        self
821    }
822
823    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
824        self.assigned_versions = assigned_versions;
825        self
826    }
827
828    pub fn with_insufficient_funds(mut self) -> Self {
829        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
830        self
831    }
832
833    pub fn with_barrier_dependencies(
834        mut self,
835        barrier_dependencies: BTreeSet<TransactionDigest>,
836    ) -> Self {
837        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
838        self
839    }
840}
841
842#[derive(Debug)]
843pub struct ForkRecoveryState {
844    /// Transaction digest to effects digest overrides
845    transaction_overrides:
846        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
847}
848
849impl Default for ForkRecoveryState {
850    fn default() -> Self {
851        Self {
852            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
853        }
854    }
855}
856
857impl ForkRecoveryState {
858    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
859        let Some(config) = config else {
860            return Ok(Self::default());
861        };
862
863        let mut transaction_overrides = HashMap::new();
864        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
865            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
866                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
867            })?;
868            let effects_digest =
869                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
870                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
871                })?;
872            transaction_overrides.insert(tx_digest, effects_digest);
873        }
874
875        Ok(Self {
876            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
877        })
878    }
879
880    pub fn get_transaction_override(
881        &self,
882        tx_digest: &TransactionDigest,
883    ) -> Option<TransactionEffectsDigest> {
884        self.transaction_overrides.read().get(tx_digest).copied()
885    }
886}
887
888pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
889
890pub struct AuthorityState {
891    // Fixed size, static, identity of the authority
892    /// The name of this authority.
893    pub name: AuthorityName,
894    /// The signature key of the authority.
895    pub secret: StableSyncAuthoritySigner,
896
897    /// The database
898    input_loader: TransactionInputLoader,
899    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
900    coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
901
902    epoch_store: ArcSwap<AuthorityPerEpochStore>,
903
904    /// This lock denotes current 'execution epoch'.
905    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
906    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
907    /// from previous epoch that are executed but did not make into checkpoint.
908    execution_lock: RwLock<EpochId>,
909
910    pub indexes: Option<Arc<IndexStore>>,
911    pub rpc_index: Option<Arc<RpcIndexStore>>,
912
913    pub subscription_handler: Arc<SubscriptionHandler>,
914    pub checkpoint_store: Arc<CheckpointStore>,
915
916    committee_store: Arc<CommitteeStore>,
917
918    /// Schedules transaction execution.
919    execution_scheduler: Arc<ExecutionScheduler>,
920
921    /// Shuts down the execution task. Used only in testing.
922    #[allow(unused)]
923    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
924
925    pub metrics: Arc<AuthorityMetrics>,
926    _pruner: AuthorityStorePruner,
927    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
928
929    /// Take db checkpoints of different dbs
930    db_checkpoint_config: DBCheckpointConfig,
931
932    pub config: NodeConfig,
933
934    /// Current overload status in this authority. Updated periodically.
935    pub overload_info: AuthorityOverloadInfo,
936
937    /// The chain identifier is derived from the digest of the genesis checkpoint.
938    chain_identifier: ChainIdentifier,
939
940    pub(crate) congestion_tracker: Arc<CongestionTracker>,
941
942    /// Consumed by gasless tx rate limiter.
943    pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
944
945    /// Traffic controller for Sui core servers (json-rpc, validator service)
946    pub traffic_controller: Option<Arc<TrafficController>>,
947
948    /// Fork recovery state for handling equivocation after forks
949    fork_recovery_state: Option<ForkRecoveryState>,
950
951    /// Notification channel for reconfiguration
952    notify_epoch: tokio::sync::watch::Sender<EpochId>,
953
954    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
955    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
956
957    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
958    /// CheckpointExecutor removes entries and collects the index batches before committing
959    /// them atomically at checkpoint boundaries.
960    pending_post_processing:
961        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
962
963    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
964    /// the blocking thread pool. Defaults to the number of available CPUs.
965    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
966}
967
968/// The authority state encapsulates all state, drives execution, and ensures safety.
969///
970/// Note the authority operations can be accessed through a read ref (&) and do not
971/// require &mut. Internally a database is synchronized through a mutex lock.
972///
973/// Repeating valid commands should produce no changes and return no error.
974impl AuthorityState {
975    pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
976        epoch_store.node_role()
977    }
978
979    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
980        epoch_store.node_role().is_validator()
981    }
982
983    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
984        epoch_store.node_role().is_fullnode()
985    }
986
987    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
988        &self.committee_store
989    }
990
991    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
992        self.committee_store.clone()
993    }
994
995    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
996        &self.config.authority_overload_config
997    }
998
999    pub fn get_epoch_state_commitments(
1000        &self,
1001        epoch: EpochId,
1002    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1003        self.checkpoint_store.get_epoch_state_commitments(epoch)
1004    }
1005
1006    /// Runs deny list checks and processes funds withdrawals. Called before loading input
1007    /// objects, since these checks don't depend on object state.
1008    fn pre_object_load_checks(
1009        &self,
1010        tx_data: &TransactionData,
1011        tx_signatures: &[GenericSignature],
1012        input_object_kinds: &[InputObjectKind],
1013        receiving_objects_refs: &[ObjectRef],
1014        protocol_config: &ProtocolConfig,
1015    ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1016        // Note: the deny checks may do redundant package loads but:
1017        // - they only load packages when there is an active package deny map
1018        // - the loads are cached anyway
1019        sui_transaction_checks::deny::check_transaction_for_signing(
1020            tx_data,
1021            tx_signatures,
1022            input_object_kinds,
1023            receiving_objects_refs,
1024            &self.config.transaction_deny_config,
1025            self.get_backing_package_store().as_ref(),
1026        )?;
1027
1028        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1029            self.chain_identifier,
1030            self.coin_reservation_resolver.as_ref(),
1031        )?;
1032
1033        self.execution_cache_trait_pointers
1034            .account_funds_read
1035            .check_amounts_available(&declared_withdrawals)?;
1036
1037        if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1038            let min_amounts =
1039                sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1040            self.execution_cache_trait_pointers
1041                .account_funds_read
1042                .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1043        }
1044
1045        Ok(declared_withdrawals)
1046    }
1047
1048    fn handle_transaction_deny_checks(
1049        &self,
1050        transaction: &VerifiedTransaction,
1051        epoch_store: &Arc<AuthorityPerEpochStore>,
1052    ) -> SuiResult<CheckedInputObjects> {
1053        let tx_digest = transaction.digest();
1054        let tx_data = transaction.data().transaction_data();
1055
1056        let input_object_kinds = tx_data.input_objects()?;
1057        let receiving_objects_refs = tx_data.receiving_objects();
1058
1059        self.pre_object_load_checks(
1060            tx_data,
1061            transaction.tx_signatures(),
1062            &input_object_kinds,
1063            &receiving_objects_refs,
1064            epoch_store.protocol_config(),
1065        )?;
1066
1067        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1068            Some(tx_digest),
1069            &input_object_kinds,
1070            &receiving_objects_refs,
1071            epoch_store.epoch(),
1072        )?;
1073
1074        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1075            epoch_store.protocol_config(),
1076            epoch_store.reference_gas_price(),
1077            tx_data,
1078            input_objects,
1079            &receiving_objects,
1080            &self.metrics.bytecode_verifier_metrics,
1081            &self.config.verifier_signing_config,
1082        )?;
1083
1084        self.handle_coin_deny_list_checks(
1085            tx_data,
1086            &checked_input_objects,
1087            &receiving_objects,
1088            epoch_store,
1089        )?;
1090
1091        Ok(checked_input_objects)
1092    }
1093
1094    fn handle_coin_deny_list_checks(
1095        &self,
1096        tx_data: &TransactionData,
1097        checked_input_objects: &CheckedInputObjects,
1098        receiving_objects: &ReceivingObjects,
1099        epoch_store: &Arc<AuthorityPerEpochStore>,
1100    ) -> SuiResult<()> {
1101        use sui_types::balance::Balance;
1102
1103        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1104            self.chain_identifier,
1105            self.coin_reservation_resolver.as_ref(),
1106        )?;
1107
1108        let funds_withdraw_types = declared_withdrawals
1109            .values()
1110            .filter_map(|(_, type_tag)| {
1111                Balance::maybe_get_balance_type_param(type_tag)
1112                    .map(|ty| ty.to_canonical_string(false))
1113            })
1114            .collect::<BTreeSet<_>>();
1115
1116        if epoch_store.coin_deny_list_v1_enabled() {
1117            check_coin_deny_list_v1(
1118                tx_data.sender(),
1119                checked_input_objects,
1120                receiving_objects,
1121                funds_withdraw_types.clone(),
1122                &self.get_object_store(),
1123            )?;
1124        }
1125
1126        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1127            check_coin_deny_list_v2_during_signing(
1128                tx_data.sender(),
1129                checked_input_objects,
1130                receiving_objects,
1131                funds_withdraw_types.clone(),
1132                &self.get_object_store(),
1133            )?;
1134        }
1135
1136        Ok(())
1137    }
1138
1139    /// Vote for a transaction, either when validator receives a submit_transaction request,
1140    /// or sees a transaction from consensus. Performs the same types of checks as
1141    /// transaction signing, but does not explicitly sign the transaction.
1142    /// Note that if the transaction has been executed, we still go through the
1143    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1144    /// but not yet finalized, the signing will still work since the objects are still available.
1145    /// But if the transaction has been finalized, the signing will fail.
1146    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1147    /// has already been finalized executed.
1148    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1149    pub fn handle_vote_transaction(
1150        &self,
1151        epoch_store: &Arc<AuthorityPerEpochStore>,
1152        transaction: VerifiedTransaction,
1153    ) -> SuiResult<()> {
1154        debug!("handle_vote_transaction");
1155
1156        let _metrics_guard = self
1157            .metrics
1158            .authority_state_handle_vote_transaction_latency
1159            .start_timer();
1160        self.metrics.tx_orders.inc();
1161
1162        // The should_accept_user_certs check here is best effort, because
1163        // between a validator signs a tx and a cert is formed, the validator
1164        // could close the window.
1165        if !epoch_store
1166            .get_reconfig_state_read_lock_guard()
1167            .should_accept_user_certs()
1168        {
1169            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1170        }
1171
1172        // Accept finalized transactions, instead of voting to reject them.
1173        // Checking executed transactions is limited to the current epoch.
1174        // Otherwise there can be a race where the transaction is accepted because it has executed,
1175        // but the executed effects are pruned post consensus, leading to failures.
1176        let tx_digest = *transaction.digest();
1177        if epoch_store.is_recently_finalized(&tx_digest)
1178            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1179        {
1180            assert_reachable!("transaction recently executed");
1181            return Ok(());
1182        }
1183
1184        if self
1185            .get_transaction_cache_reader()
1186            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1187        {
1188            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1189                digest: (*transaction.digest()),
1190            }
1191            .into());
1192        }
1193
1194        // Ensure that validator cannot reconfigure while we are validating the transaction.
1195        let _execution_lock = self.execution_lock_for_validation()?;
1196
1197        let checked_input_objects =
1198            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1199
1200        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1201
1202        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1203        // in the consensus handler. Validation still runs to prevent spam transactions with
1204        // invalid object versions, and is necessary to handle recently created objects.
1205        self.get_cache_writer()
1206            .validate_owned_object_versions(&owned_objects)
1207    }
1208
1209    /// Used for early client validation check for transactions before submission to server.
1210    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1211    /// This allows for fast failure feedback to clients for non-retriable errors.
1212    ///
1213    /// The key addition is checking that owned object versions match live object versions.
1214    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1215    /// requested version (which may exist in storage as historical versions), whereas
1216    /// validators check against the live/current version during locking (verify_live_object).
1217    pub fn check_transaction_validity(
1218        &self,
1219        epoch_store: &Arc<AuthorityPerEpochStore>,
1220        transaction: &VerifiedTransaction,
1221        enforce_live_input_objects: bool,
1222    ) -> SuiResult<()> {
1223        if !epoch_store
1224            .get_reconfig_state_read_lock_guard()
1225            .should_accept_user_certs()
1226        {
1227            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1228        }
1229
1230        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1231
1232        let checked_input_objects =
1233            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1234
1235        // Check that owned object versions match live objects
1236        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1237        let cache_reader = self.get_object_cache_reader();
1238
1239        for obj_ref in &owned_objects {
1240            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1241                // Only reject if transaction references an old version. Allow newer
1242                // versions that may exist on validators but not yet synced to fullnode.
1243                if obj_ref.1 < live_object.version() {
1244                    return Err(SuiErrorKind::UserInputError {
1245                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1246                            provided_obj_ref: *obj_ref,
1247                            current_version: live_object.version(),
1248                        },
1249                    }
1250                    .into());
1251                }
1252
1253                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1254                    // Returns a non-retriable error when the input object version is required to be live.
1255                    return Err(SuiErrorKind::UserInputError {
1256                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1257                            provided_obj_ref: *obj_ref,
1258                            current_version: live_object.version(),
1259                        },
1260                    }
1261                    .into());
1262                }
1263
1264                // If version matches, verify digest also matches
1265                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1266                    return Err(SuiErrorKind::UserInputError {
1267                        error: UserInputError::InvalidObjectDigest {
1268                            object_id: obj_ref.0,
1269                            expected_digest: live_object.digest(),
1270                        },
1271                    }
1272                    .into());
1273                }
1274            }
1275        }
1276
1277        Ok(())
1278    }
1279
1280    pub fn check_system_overload_at_signing(&self) -> bool {
1281        self.config
1282            .authority_overload_config
1283            .check_system_overload_at_signing
1284    }
1285
1286    pub fn check_system_overload_at_execution(&self) -> bool {
1287        self.config
1288            .authority_overload_config
1289            .check_system_overload_at_execution
1290    }
1291
1292    pub(crate) fn check_system_overload(
1293        &self,
1294        tx_data: &SenderSignedData,
1295        do_authority_overload_check: bool,
1296    ) -> SuiResult {
1297        if do_authority_overload_check {
1298            self.check_authority_overload(tx_data).tap_err(|_| {
1299                self.update_overload_metrics("execution_queue");
1300            })?;
1301        }
1302        self.execution_scheduler
1303            .check_execution_overload(self.overload_config(), tx_data)
1304            .tap_err(|_| {
1305                self.update_overload_metrics("execution_pending");
1306            })?;
1307        // Consensus overload is handled by the admission queue in authority_server.rs.
1308
1309        let pending_tx_count = self
1310            .get_cache_commit()
1311            .approximate_pending_transaction_count();
1312        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1313            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1314                retry_after_secs: 10,
1315            }
1316            .into());
1317        }
1318
1319        Ok(())
1320    }
1321
1322    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1323        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1324            return Ok(());
1325        }
1326
1327        let load_shedding_percentage = self
1328            .overload_info
1329            .load_shedding_percentage
1330            .load(Ordering::Relaxed);
1331        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1332    }
1333
1334    pub(crate) fn update_overload_metrics(&self, source: &str) {
1335        self.metrics
1336            .transaction_overload_sources
1337            .with_label_values(&[source])
1338            .inc();
1339    }
1340
1341    /// Waits for fastpath (owned, package) dependency objects to become available.
1342    /// Returns true if a chosen set of fastpath dependency objects are available,
1343    /// returns false otherwise after an internal timeout.
1344    pub(crate) async fn wait_for_fastpath_dependency_objects(
1345        &self,
1346        transaction: &VerifiedTransaction,
1347        epoch: EpochId,
1348    ) -> SuiResult<bool> {
1349        let txn_data = transaction.data().transaction_data();
1350        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1351
1352        // Gather and filter input objects to wait for.
1353        let fastpath_dependency_objects: Vec<_> = move_objects
1354            .into_iter()
1355            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1356            .chain(
1357                packages
1358                    .into_iter()
1359                    .map(|package_id| InputKey::Package { id: package_id }),
1360            )
1361            .collect();
1362        let receiving_keys: HashSet<_> = receiving_objects
1363            .into_iter()
1364            .filter_map(|receiving_obj_ref| {
1365                self.should_wait_for_dependency_object(receiving_obj_ref)
1366            })
1367            .collect();
1368        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1369            return Ok(true);
1370        }
1371
1372        // Use shorter wait timeout in simtests to exercise server-side error paths and
1373        // client-side retry logic.
1374        let max_wait = if cfg!(msim) {
1375            Duration::from_millis(200)
1376        } else {
1377            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1378        };
1379
1380        match timeout(
1381            max_wait,
1382            self.get_object_cache_reader().notify_read_input_objects(
1383                &fastpath_dependency_objects,
1384                &receiving_keys,
1385                epoch,
1386            ),
1387        )
1388        .await
1389        {
1390            Ok(()) => Ok(true),
1391            // Maybe return an error for unavailable input objects,
1392            // and allow the caller to skip the rest of input checks?
1393            Err(_) => Ok(false),
1394        }
1395    }
1396
1397    /// Returns Some(inputKey) if the object reference should be waited on until it is
1398    /// finalized, before proceeding to input checks.
1399    ///
1400    /// Incorrect decisions here should only affect user experience, not safety:
1401    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1402    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1403    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1404        let (obj_id, cur_version, _digest) = obj_ref;
1405        let Some(latest_obj_ref) = self
1406            .get_object_cache_reader()
1407            .get_latest_object_ref_or_tombstone(obj_id)
1408        else {
1409            // Object might not have been created.
1410            return Some(InputKey::VersionedObject {
1411                id: FullObjectID::new(obj_id, None),
1412                version: cur_version,
1413            });
1414        };
1415        let latest_digest = latest_obj_ref.2;
1416        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1417            // Do not wait for deleted object and rely on input check instead.
1418            return None;
1419        }
1420        let latest_version = latest_obj_ref.1;
1421        if cur_version <= latest_version {
1422            // Do not wait for version that already exists or has been consumed.
1423            // Let the input check to handle them and return the proper error.
1424            return None;
1425        }
1426        // Wait for the object version to become available.
1427        Some(InputKey::VersionedObject {
1428            id: FullObjectID::new(obj_id, None),
1429            version: cur_version,
1430        })
1431    }
1432
1433    /// Wait for a transaction to be executed. Transactions need to be sequenced by consensus
1434    /// before being enqueued for execution.
1435    ///
1436    /// Only use this in tests.
1437    #[instrument(level = "trace", skip_all)]
1438    pub async fn wait_for_transaction_execution_for_testing(
1439        &self,
1440        transaction: &VerifiedExecutableTransaction,
1441    ) -> TransactionEffects {
1442        self.notify_read_effects_for_testing(
1443            "AuthorityState::wait_for_transaction_execution_for_testing",
1444            *transaction.digest(),
1445        )
1446        .await
1447    }
1448
1449    /// Internal logic to execute a certificate.
1450    ///
1451    /// Guarantees that
1452    /// - If input objects are available, return no permanent failure.
1453    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1454    /// on successful execution; crashed execution has no observable effect and can be retried.
1455    ///
1456    /// It is caller's responsibility to ensure input objects are available and locks are set.
1457    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1458    ///
1459    /// Should only be called within sui-core.
1460    #[instrument(level = "trace", skip_all)]
1461    pub async fn try_execute_immediately(
1462        &self,
1463        certificate: &VerifiedExecutableTransaction,
1464        mut execution_env: ExecutionEnv,
1465        epoch_store: &Arc<AuthorityPerEpochStore>,
1466    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1467        let _scope = monitored_scope("Execution::try_execute_immediately");
1468        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1469
1470        let tx_digest = certificate.digest();
1471
1472        if let Some(fork_recovery) = &self.fork_recovery_state
1473            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1474        {
1475            warn!(
1476                ?tx_digest,
1477                original_digest = ?execution_env.expected_effects_digest,
1478                override_digest = ?override_digest,
1479                "Applying fork recovery override for transaction effects digest"
1480            );
1481            execution_env.expected_effects_digest = Some(override_digest);
1482        }
1483
1484        // prevent concurrent executions of the same tx.
1485        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1486
1487        let tx_cache_reader = self.get_transaction_cache_reader();
1488        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1489            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1490                assert_eq!(
1491                    effects.digest(),
1492                    expected_effects_digest,
1493                    "Unexpected effects digest for transaction {:?}",
1494                    tx_digest
1495                );
1496            }
1497            tx_guard.release();
1498            return ExecutionOutput::Success((effects, None));
1499        }
1500
1501        let execution_start_time = Instant::now();
1502
1503        // Any caller that verifies the signatures on the certificate will have already checked the
1504        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1505        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1506        // epoch, then it's invalid.
1507        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1508        else {
1509            tx_guard.release();
1510            return ExecutionOutput::EpochEnded;
1511        };
1512        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1513        // possible that reconfiguration has happened and they no longer match.
1514        // TODO: We may not need the following check anymore since the scheduler
1515        // should have checked that the certificate is from the same epoch as epoch_store.
1516        if *execution_guard != epoch_store.epoch() {
1517            tx_guard.release();
1518            info!("The epoch of the execution_guard doesn't match the epoch store");
1519            return ExecutionOutput::EpochEnded;
1520        }
1521
1522        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1523
1524        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1525            &tx_guard,
1526            &execution_guard,
1527            certificate,
1528            execution_env,
1529            epoch_store,
1530        ) {
1531            ExecutionOutput::Success(result) => result,
1532            output => return output.unwrap_err(),
1533        };
1534        let transaction_outputs = Arc::new(transaction_outputs);
1535
1536        fail_point!("crash");
1537
1538        let effects = transaction_outputs.effects.clone();
1539        debug!(
1540            ?tx_digest,
1541            fx_digest=?effects.digest(),
1542            "process_certificate succeeded in {:.3}ms",
1543            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1544        );
1545
1546        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1547        if let Err(err) = commit_result {
1548            error!(?tx_digest, "Error committing transaction: {err}");
1549            tx_guard.release();
1550            return ExecutionOutput::Fatal(err);
1551        }
1552
1553        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1554            certificate.data().transaction_data().kind()
1555        {
1556            if let Some(err) = &execution_error_opt {
1557                debug_fatal!("Authenticator state update failed: {:?}", err);
1558            }
1559            epoch_store.update_authenticator_state(auth_state);
1560
1561            // double check that the signature verifier always matches the authenticator state
1562            if cfg!(debug_assertions) {
1563                let authenticator_state = get_authenticator_state(self.get_object_store())
1564                    .expect("Read cannot fail")
1565                    .expect("Authenticator state must exist");
1566
1567                let mut sys_jwks: Vec<_> = authenticator_state
1568                    .active_jwks
1569                    .into_iter()
1570                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1571                    .collect();
1572                let mut active_jwks: Vec<_> = epoch_store
1573                    .signature_verifier
1574                    .get_jwks()
1575                    .into_iter()
1576                    .collect();
1577                sys_jwks.sort();
1578                active_jwks.sort();
1579
1580                assert_eq!(sys_jwks, active_jwks);
1581            }
1582        }
1583
1584        if certificate
1585            .transaction_data()
1586            .kind()
1587            .is_accumulator_barrier_settle_tx()
1588        {
1589            let object_funds_checker = self.object_funds_checker.load();
1590            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1591                // unwrap safe because we assign accumulator version for every transaction
1592                // when accumulator is enabled.
1593                let next_accumulator_version = accumulator_version.unwrap().next();
1594                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1595            }
1596        }
1597
1598        tx_guard.commit_tx();
1599
1600        let elapsed = execution_start_time.elapsed();
1601        epoch_store.record_local_execution_time(
1602            certificate.data().transaction_data(),
1603            &effects,
1604            timings,
1605            elapsed,
1606        );
1607
1608        let elapsed_us = elapsed.as_micros() as f64;
1609        if elapsed_us > 0.0 {
1610            self.metrics
1611                .execution_gas_latency_ratio
1612                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1613        };
1614        ExecutionOutput::Success((effects, execution_error_opt))
1615    }
1616
1617    pub fn read_objects_for_execution(
1618        &self,
1619        tx_lock: &CertLockGuard,
1620        certificate: &VerifiedExecutableTransaction,
1621        assigned_shared_object_versions: &AssignedVersions,
1622        epoch_store: &Arc<AuthorityPerEpochStore>,
1623    ) -> SuiResult<InputObjects> {
1624        let _scope = monitored_scope("Execution::load_input_objects");
1625        let _metrics_guard = self
1626            .metrics
1627            .execution_load_input_objects_latency
1628            .start_timer();
1629        let input_objects = &certificate.data().transaction_data().input_objects()?;
1630        self.input_loader.read_objects_for_execution(
1631            &certificate.key(),
1632            tx_lock,
1633            input_objects,
1634            assigned_shared_object_versions,
1635            epoch_store.epoch(),
1636        )
1637    }
1638
1639    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1640    /// transactions. Assumes execution will always succeed.
1641    pub async fn try_execute_for_test(
1642        &self,
1643        certificate: &VerifiedCertificate,
1644        execution_env: ExecutionEnv,
1645    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1646        let epoch_store = self.epoch_store_for_testing();
1647        let (effects, execution_error_opt) = self
1648            .try_execute_immediately(
1649                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1650                execution_env,
1651                &epoch_store,
1652            )
1653            .await
1654            .unwrap();
1655        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1656        (signed_effects, execution_error_opt)
1657    }
1658
1659    pub async fn try_execute_executable_for_test(
1660        &self,
1661        executable: &VerifiedExecutableTransaction,
1662        execution_env: ExecutionEnv,
1663    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1664        let epoch_store = self.epoch_store_for_testing();
1665        let (effects, execution_error_opt) = self
1666            .try_execute_immediately(executable, execution_env, &epoch_store)
1667            .await
1668            .unwrap();
1669        self.flush_post_processing(executable.digest()).await;
1670        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1671        (signed_effects, execution_error_opt)
1672    }
1673
1674    /// Wait until the effects of the given transaction are available and return them.
1675    /// Panics if the effects are not found.
1676    ///
1677    /// Only use this in tests where effects are expected to exist.
1678    pub async fn notify_read_effects_for_testing(
1679        &self,
1680        task_name: &'static str,
1681        digest: TransactionDigest,
1682    ) -> TransactionEffects {
1683        self.get_transaction_cache_reader()
1684            .notify_read_executed_effects(task_name, &[digest])
1685            .await
1686            .pop()
1687            .expect("must return correct number of effects")
1688    }
1689
1690    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1691        self.get_object_cache_reader()
1692            .check_owned_objects_are_live(owned_object_refs)
1693    }
1694
1695    /// This function captures the required state to debug a forked transaction.
1696    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1697    /// NOTE: Since this info escapes the validator context,
1698    /// make sure not to leak any private info here
1699    pub(crate) fn debug_dump_transaction_state(
1700        &self,
1701        tx_digest: &TransactionDigest,
1702        effects: &TransactionEffects,
1703        expected_effects_digest: TransactionEffectsDigest,
1704        inner_temporary_store: &InnerTemporaryStore,
1705        certificate: &VerifiedExecutableTransaction,
1706        debug_dump_config: &StateDebugDumpConfig,
1707    ) -> SuiResult<PathBuf> {
1708        let dump_dir = debug_dump_config
1709            .dump_file_directory
1710            .as_ref()
1711            .cloned()
1712            .unwrap_or(std::env::temp_dir());
1713        let epoch_store = self.load_epoch_store_one_call_per_task();
1714
1715        NodeStateDump::new(
1716            tx_digest,
1717            effects,
1718            expected_effects_digest,
1719            self.get_object_store().as_ref(),
1720            &epoch_store,
1721            inner_temporary_store,
1722            certificate,
1723        )?
1724        .write_to_file(&dump_dir)
1725        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1726    }
1727
1728    #[instrument(level = "trace", skip_all)]
1729    pub(crate) fn process_certificate(
1730        &self,
1731        tx_guard: &CertTxGuard,
1732        execution_guard: &ExecutionLockReadGuard<'_>,
1733        certificate: &VerifiedExecutableTransaction,
1734        execution_env: ExecutionEnv,
1735        epoch_store: &Arc<AuthorityPerEpochStore>,
1736    ) -> ExecutionOutput<(
1737        TransactionOutputs,
1738        Vec<ExecutionTiming>,
1739        Option<ExecutionError>,
1740    )> {
1741        let _scope = monitored_scope("Execution::process_certificate");
1742        let tx_digest = *certificate.digest();
1743
1744        let input_objects = match self.read_objects_for_execution(
1745            tx_guard.as_lock_guard(),
1746            certificate,
1747            &execution_env.assigned_versions,
1748            epoch_store,
1749        ) {
1750            Ok(objects) => objects,
1751            Err(e) => return ExecutionOutput::Fatal(e),
1752        };
1753
1754        let expected_effects_digest = match execution_env.expected_effects_digest {
1755            Some(expected_effects_digest) => Some(expected_effects_digest),
1756            None => {
1757                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1758                // restarting with a new binary. In this situation, if we have published an effects signature,
1759                // we must be sure not to equivocate.
1760                match epoch_store.get_signed_effects_digest(&tx_digest) {
1761                    Ok(digest) => digest,
1762                    Err(e) => return ExecutionOutput::Fatal(e),
1763                }
1764            }
1765        };
1766
1767        fail_point_if!("correlated-crash-process-certificate", || {
1768            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1769                sui_simulator::task::kill_current_node(None);
1770            }
1771        });
1772
1773        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1774        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1775        // this function occur before we have written anything to the db, so we commit the tx
1776        // guard and rely on the client to retry the tx (if it was transient).
1777        self.execute_certificate(
1778            execution_guard,
1779            certificate,
1780            input_objects,
1781            expected_effects_digest,
1782            execution_env,
1783            epoch_store,
1784        )
1785    }
1786
1787    pub async fn reconfigure_traffic_control(
1788        &self,
1789        params: TrafficControlReconfigParams,
1790    ) -> Result<TrafficControlReconfigParams, SuiError> {
1791        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1792            traffic_controller.admin_reconfigure(params).await
1793        } else {
1794            Err(SuiErrorKind::InvalidAdminRequest(
1795                "Traffic controller is not configured on this node".to_string(),
1796            )
1797            .into())
1798        }
1799    }
1800
1801    #[instrument(level = "trace", skip_all)]
1802    fn commit_certificate(
1803        &self,
1804        certificate: &VerifiedExecutableTransaction,
1805        transaction_outputs: Arc<TransactionOutputs>,
1806        epoch_store: &Arc<AuthorityPerEpochStore>,
1807    ) -> SuiResult {
1808        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1809            monitored_scope("Execution::commit_certificate");
1810        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1811
1812        let tx_digest = certificate.digest();
1813
1814        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1815        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1816        epoch_store.insert_executed_in_epoch(tx_digest);
1817        let key = certificate.key();
1818        if !matches!(key, TransactionKey::Digest(_)) {
1819            epoch_store.insert_tx_key(key, *tx_digest)?;
1820        }
1821
1822        // Allow testing what happens if we crash here.
1823        fail_point!("crash");
1824
1825        self.get_cache_writer()
1826            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1827
1828        // Fire an in-memory-only notification so the scheduler can detect that this
1829        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1830        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1831        // this to the DB to avoid stale entries surviving a crash while effects may not.
1832        if let Some(settlement_key) = certificate
1833            .transaction_data()
1834            .kind()
1835            .accumulator_barrier_settlement_key()
1836        {
1837            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1838        }
1839
1840        if certificate.transaction_data().is_end_of_epoch_tx() {
1841            // At the end of epoch, since system packages may have been upgraded, force
1842            // reload them in the cache.
1843            self.get_object_cache_reader()
1844                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1845        }
1846
1847        Ok(())
1848    }
1849
1850    fn update_metrics(
1851        &self,
1852        certificate: &VerifiedExecutableTransaction,
1853        inner_temporary_store: &InnerTemporaryStore,
1854        effects: &TransactionEffects,
1855    ) {
1856        // count signature by scheme, for zklogin and multisig
1857        if certificate.has_zklogin_sig() {
1858            self.metrics.zklogin_sig_count.inc();
1859        } else if certificate.has_upgraded_multisig() {
1860            self.metrics.multisig_sig_count.inc();
1861        }
1862
1863        self.metrics.total_effects.inc();
1864        self.metrics.total_certs.inc();
1865
1866        let consensus_object_count = effects.input_consensus_objects().len();
1867        if consensus_object_count > 0 {
1868            self.metrics.shared_obj_tx.inc();
1869        }
1870
1871        if certificate.is_sponsored_tx() {
1872            self.metrics.sponsored_tx.inc();
1873        }
1874
1875        let input_object_count = inner_temporary_store.input_objects.len();
1876        self.metrics
1877            .num_input_objs
1878            .observe(input_object_count as f64);
1879        self.metrics
1880            .num_shared_objects
1881            .observe(consensus_object_count as f64);
1882        self.metrics.batch_size.observe(
1883            certificate
1884                .data()
1885                .intent_message()
1886                .value
1887                .kind()
1888                .num_commands() as f64,
1889        );
1890    }
1891
1892    /// Runs the executor on an already-prepared transaction; the caller is responsible for any
1893    /// coin reservation rewriting.
1894    fn execute_transaction_to_effects(
1895        &self,
1896        executor: &dyn Executor,
1897        store: &dyn BackingStore,
1898        protocol_config: &ProtocolConfig,
1899        enable_expensive_checks: bool,
1900        execution_params: ExecutionOrEarlyError,
1901        epoch_id: &EpochId,
1902        epoch_timestamp_ms: u64,
1903        input_objects: CheckedInputObjects,
1904        gas_data: GasData,
1905        gas_status: SuiGasStatus,
1906        kind: TransactionKind,
1907        rewritten_inputs: Option<Vec<bool>>,
1908        signer: SuiAddress,
1909        tx_digest: TransactionDigest,
1910    ) -> (
1911        InnerTemporaryStore,
1912        SuiGasStatus,
1913        TransactionEffects,
1914        Vec<ExecutionTiming>,
1915        Result<(), ExecutionError>,
1916    ) {
1917        let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1918            // TODO only run this function on FullNodes, use `execute_transaction_to_effects` on validators.
1919            .execute_transaction_to_effects_and_execution_error(
1920                store,
1921                protocol_config,
1922                self.metrics.execution_metrics.clone(),
1923                enable_expensive_checks,
1924                execution_params,
1925                epoch_id,
1926                epoch_timestamp_ms,
1927                input_objects,
1928                gas_data,
1929                gas_status,
1930                kind,
1931                rewritten_inputs,
1932                signer,
1933                tx_digest,
1934                &mut None,
1935            );
1936
1937        (
1938            inner_temp_store,
1939            gas_status,
1940            effects,
1941            timings,
1942            execution_error,
1943        )
1944    }
1945
1946    /// execute_certificate validates the transaction input, and executes the certificate,
1947    /// returning transaction outputs.
1948    ///
1949    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1950    ///
1951    /// Executes a certificate and returns an ExecutionOutput.
1952    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1953    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1954    #[instrument(level = "trace", skip_all)]
1955    fn execute_certificate(
1956        &self,
1957        _execution_guard: &ExecutionLockReadGuard<'_>,
1958        certificate: &VerifiedExecutableTransaction,
1959        input_objects: InputObjects,
1960        expected_effects_digest: Option<TransactionEffectsDigest>,
1961        execution_env: ExecutionEnv,
1962        epoch_store: &Arc<AuthorityPerEpochStore>,
1963    ) -> ExecutionOutput<(
1964        TransactionOutputs,
1965        Vec<ExecutionTiming>,
1966        Option<ExecutionError>,
1967    )> {
1968        let _scope = monitored_scope("Execution::prepare_certificate");
1969        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1970        let prepare_certificate_start_time = tokio::time::Instant::now();
1971
1972        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1973        let tx_data = certificate.data().transaction_data();
1974
1975        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1976            return ExecutionOutput::Fatal(e);
1977        }
1978
1979        // The cost of partially re-auditing a transaction before execution is tolerated.
1980        // This step is required for correctness because, for example, ConsensusAddressOwner
1981        // object owner may have changed between signing and execution.
1982        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1983            certificate,
1984            input_objects,
1985            epoch_store.protocol_config(),
1986            epoch_store.reference_gas_price(),
1987        ) {
1988            Ok(result) => result,
1989            Err(e) => return ExecutionOutput::Fatal(e),
1990        };
1991
1992        let owned_object_refs = input_objects.inner().filter_owned_objects();
1993        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1994            return ExecutionOutput::Fatal(e);
1995        }
1996        let tx_digest = *certificate.digest();
1997        let protocol_config = epoch_store.protocol_config();
1998        let transaction_data = &certificate.data().intent_message().value;
1999        let sender = transaction_data.sender();
2000        let (mut kind, signer, gas_data) = transaction_data.execution_parts();
2001        let early_execution_error = get_early_execution_error(
2002            &tx_digest,
2003            &input_objects,
2004            self.config.certificate_deny_config.certificate_deny_set(),
2005            &execution_env.funds_withdraw_status,
2006        );
2007        let execution_params = match early_execution_error {
2008            Some(error) => ExecutionOrEarlyError::Err(error),
2009            None => ExecutionOrEarlyError::Ok(()),
2010        };
2011
2012        // Skip on early error: the tx will fail anyway and rewriting may fail if the accumulator
2013        // was deleted.
2014        let rewritten_inputs = if execution_params.is_ok() {
2015            rewrite_transaction_for_coin_reservations(
2016                self.chain_identifier,
2017                &*self.coin_reservation_resolver,
2018                sender,
2019                &mut kind,
2020                execution_env.assigned_versions.accumulator_version,
2021            )
2022            .expect("rewriting must succeed for a certified transaction")
2023        } else {
2024            None
2025        };
2026
2027        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2028
2029        #[allow(unused_mut)]
2030        let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2031            .execute_transaction_to_effects(
2032                &**epoch_store.executor(),
2033                &tracking_store,
2034                protocol_config,
2035                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2036                // cyclic dependency w/ sui-adapter
2037                self.config
2038                    .expensive_safety_check_config
2039                    .enable_deep_per_tx_sui_conservation_check(),
2040                execution_params,
2041                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2042                epoch_store
2043                    .epoch_start_config()
2044                    .epoch_data()
2045                    .epoch_start_timestamp(),
2046                input_objects,
2047                gas_data,
2048                gas_status,
2049                kind,
2050                rewritten_inputs,
2051                signer,
2052                tx_digest,
2053            );
2054
2055        let object_funds_checker = self.object_funds_checker.load();
2056        if let Some(object_funds_checker) = object_funds_checker.as_ref()
2057            && !object_funds_checker.should_commit_object_funds_withdraws(
2058                certificate,
2059                effects.status(),
2060                &inner_temp_store.accumulator_running_max_withdraws,
2061                &execution_env,
2062                self.get_account_funds_read(),
2063                &self.execution_scheduler,
2064                epoch_store,
2065            )
2066        {
2067            assert_reachable!("retry object withdraw later");
2068            return ExecutionOutput::RetryLater;
2069        }
2070
2071        if let Some(expected_effects_digest) = expected_effects_digest
2072            && effects.digest() != expected_effects_digest
2073        {
2074            // We dont want to mask the original error, so we log it and continue.
2075            match self.debug_dump_transaction_state(
2076                &tx_digest,
2077                &effects,
2078                expected_effects_digest,
2079                &inner_temp_store,
2080                certificate,
2081                &self.config.state_debug_dump_config,
2082            ) {
2083                Ok(out_path) => {
2084                    info!(
2085                        "Dumped node state for transaction {} to {}",
2086                        tx_digest,
2087                        out_path.as_path().display().to_string()
2088                    );
2089                }
2090                Err(e) => {
2091                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2092                }
2093            }
2094            let expected_effects = self
2095                .get_transaction_cache_reader()
2096                .get_effects(&expected_effects_digest);
2097            error!(
2098                ?tx_digest,
2099                ?expected_effects_digest,
2100                actual_effects = ?effects,
2101                expected_effects = ?expected_effects,
2102                "fork detected!"
2103            );
2104            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2105                tx_digest,
2106                expected_effects_digest,
2107                effects.digest(),
2108            ) {
2109                error!("Failed to record transaction fork: {e}");
2110            }
2111
2112            fail_point_if!("kill_transaction_fork_node", || {
2113                #[cfg(msim)]
2114                {
2115                    tracing::error!(
2116                        fatal = true,
2117                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2118                        tx_digest
2119                    );
2120                    sui_simulator::task::shutdown_current_node();
2121                }
2122            });
2123
2124            fatal!(
2125                "Transaction {} is expected to have effects digest {}, but got {}!",
2126                tx_digest,
2127                expected_effects_digest,
2128                effects.digest()
2129            );
2130        }
2131
2132        fail_point_arg!("simulate_fork_during_execution", |(
2133            forked_validators,
2134            full_halt,
2135            effects_overrides,
2136            fork_probability,
2137        ): (
2138            std::sync::Arc<
2139                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2140            >,
2141            bool,
2142            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2143            f32,
2144        )| {
2145            #[cfg(msim)]
2146            self.simulate_fork_during_execution(
2147                certificate,
2148                epoch_store,
2149                &mut effects,
2150                forked_validators,
2151                full_halt,
2152                effects_overrides,
2153                fork_probability,
2154            );
2155        });
2156
2157        let unchanged_loaded_runtime_objects =
2158            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2159                certificate.transaction_data(),
2160                &effects,
2161                &tracking_store.into_read_objects(),
2162            );
2163
2164        // index certificate
2165        let _ = self
2166            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2167            .tap_err(|e| {
2168                self.metrics.post_processing_total_failures.inc();
2169                error!(?tx_digest, "tx post processing failed: {e}");
2170            });
2171
2172        self.update_metrics(certificate, &inner_temp_store, &effects);
2173
2174        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2175            certificate.clone().into_unsigned(),
2176            effects,
2177            inner_temp_store,
2178            unchanged_loaded_runtime_objects,
2179        );
2180
2181        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2182        if elapsed > 0.0 {
2183            self.metrics.prepare_cert_gas_latency_ratio.observe(
2184                transaction_outputs
2185                    .effects
2186                    .gas_cost_summary()
2187                    .computation_cost as f64
2188                    / elapsed,
2189            );
2190        }
2191
2192        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2193    }
2194
2195    pub fn prepare_certificate_for_benchmark(
2196        &self,
2197        certificate: &VerifiedExecutableTransaction,
2198        input_objects: InputObjects,
2199        epoch_store: &Arc<AuthorityPerEpochStore>,
2200    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2201        let lock = RwLock::new(epoch_store.epoch());
2202        let execution_guard = lock.try_read().unwrap();
2203
2204        let (transaction_outputs, _timings, execution_error_opt) = self
2205            .execute_certificate(
2206                &execution_guard,
2207                certificate,
2208                input_objects,
2209                None,
2210                ExecutionEnv::default(),
2211                epoch_store,
2212            )
2213            .unwrap();
2214        Ok((transaction_outputs, execution_error_opt))
2215    }
2216
2217    #[instrument(skip_all)]
2218    #[allow(clippy::type_complexity)]
2219    pub async fn dry_exec_transaction(
2220        &self,
2221        transaction: TransactionData,
2222        transaction_digest: TransactionDigest,
2223    ) -> SuiResult<(
2224        DryRunTransactionBlockResponse,
2225        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2226        TransactionEffects,
2227        Option<ObjectID>,
2228    )> {
2229        let epoch_store = self.load_epoch_store_one_call_per_task();
2230        if !self.is_fullnode(&epoch_store) {
2231            return Err(SuiErrorKind::UnsupportedFeatureError {
2232                error: "dry-exec is only supported on fullnodes".to_string(),
2233            }
2234            .into());
2235        }
2236
2237        if transaction.kind().is_system_tx() {
2238            return Err(SuiErrorKind::UnsupportedFeatureError {
2239                error: "dry-exec does not support system transactions".to_string(),
2240            }
2241            .into());
2242        }
2243
2244        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2245    }
2246
2247    #[allow(clippy::type_complexity)]
2248    pub fn dry_exec_transaction_for_benchmark(
2249        &self,
2250        transaction: TransactionData,
2251        transaction_digest: TransactionDigest,
2252    ) -> SuiResult<(
2253        DryRunTransactionBlockResponse,
2254        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2255        TransactionEffects,
2256        Option<ObjectID>,
2257    )> {
2258        let epoch_store = self.load_epoch_store_one_call_per_task();
2259        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2260    }
2261
2262    #[allow(clippy::type_complexity)]
2263    fn dry_exec_transaction_impl(
2264        &self,
2265        epoch_store: &AuthorityPerEpochStore,
2266        transaction: TransactionData,
2267        transaction_digest: TransactionDigest,
2268    ) -> SuiResult<(
2269        DryRunTransactionBlockResponse,
2270        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2271        TransactionEffects,
2272        Option<ObjectID>,
2273    )> {
2274        // Cheap validity checks for a transaction, including input size limits.
2275        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2276
2277        let input_object_kinds = transaction.input_objects()?;
2278        let receiving_object_refs = transaction.receiving_objects();
2279
2280        sui_transaction_checks::deny::check_transaction_for_signing(
2281            &transaction,
2282            &[],
2283            &input_object_kinds,
2284            &receiving_object_refs,
2285            &self.config.transaction_deny_config,
2286            self.get_backing_package_store().as_ref(),
2287        )?;
2288
2289        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2290            // We don't want to cache this transaction since it's a dry run.
2291            None,
2292            &input_object_kinds,
2293            &receiving_object_refs,
2294            epoch_store.epoch(),
2295        )?;
2296
2297        // make a gas object if one was not provided
2298        let mut gas_data = transaction.gas_data().clone();
2299        let is_gasless =
2300            epoch_store.protocol_config().enable_gasless() && transaction.is_gasless_transaction();
2301        let ((gas_status, checked_input_objects), mock_gas) = if is_gasless {
2302            // Gasless transactions don't use gas coins — skip mock gas object injection.
2303            (
2304                sui_transaction_checks::check_transaction_input(
2305                    epoch_store.protocol_config(),
2306                    epoch_store.reference_gas_price(),
2307                    &transaction,
2308                    input_objects,
2309                    &receiving_objects,
2310                    &self.metrics.bytecode_verifier_metrics,
2311                    &self.config.verifier_signing_config,
2312                )?,
2313                None,
2314            )
2315        } else if transaction.gas().is_empty() {
2316            let sender = transaction.sender();
2317            // use a 1B sui coin
2318            const MIST_TO_SUI: u64 = 1_000_000_000;
2319            const DRY_RUN_SUI: u64 = 1_000_000_000;
2320            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2321            let gas_object_id = ObjectID::random();
2322            let gas_object = Object::new_move(
2323                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2324                Owner::AddressOwner(sender),
2325                TransactionDigest::genesis_marker(),
2326            );
2327            let gas_object_ref = gas_object.compute_object_reference();
2328            gas_data.payment = vec![gas_object_ref];
2329            (
2330                sui_transaction_checks::check_transaction_input_with_given_gas(
2331                    epoch_store.protocol_config(),
2332                    epoch_store.reference_gas_price(),
2333                    &transaction,
2334                    input_objects,
2335                    receiving_objects,
2336                    gas_object,
2337                    &self.metrics.bytecode_verifier_metrics,
2338                    &self.config.verifier_signing_config,
2339                )?,
2340                Some(gas_object_id),
2341            )
2342        } else {
2343            (
2344                sui_transaction_checks::check_transaction_input(
2345                    epoch_store.protocol_config(),
2346                    epoch_store.reference_gas_price(),
2347                    &transaction,
2348                    input_objects,
2349                    &receiving_objects,
2350                    &self.metrics.bytecode_verifier_metrics,
2351                    &self.config.verifier_signing_config,
2352                )?,
2353                None,
2354            )
2355        };
2356
2357        let protocol_config = epoch_store.protocol_config();
2358        let (mut kind, signer, _) = transaction.execution_parts();
2359
2360        let silent = true;
2361        let executor = sui_execution::executor(protocol_config, silent)
2362            .expect("Creating an executor should not fail here");
2363
2364        let expensive_checks = false;
2365        let early_execution_error = get_early_execution_error(
2366            &transaction_digest,
2367            &checked_input_objects,
2368            self.config.certificate_deny_config.certificate_deny_set(),
2369            // TODO(address-balances): This does not currently support balance withdraws properly.
2370            // For address balance withdraws, this cannot detect insufficient balance. We need to
2371            // first check if the balance is sufficient, similar to how we schedule withdraws.
2372            // For object balance withdraws, we need to handle the case where object balance is
2373            // insufficient in the post-execution.
2374            &FundsWithdrawStatus::MaybeSufficient,
2375        );
2376        let execution_params = match early_execution_error {
2377            Some(error) => ExecutionOrEarlyError::Err(error),
2378            None => ExecutionOrEarlyError::Ok(()),
2379        };
2380
2381        // Skip on early error: the tx will fail anyway.
2382        let rewritten_inputs = if execution_params.is_ok() {
2383            rewrite_transaction_for_coin_reservations(
2384                self.chain_identifier,
2385                &*self.coin_reservation_resolver,
2386                transaction.sender(),
2387                &mut kind,
2388                // dry run reads the latest accumulator
2389                None,
2390            )?
2391        } else {
2392            None
2393        };
2394
2395        let (inner_temp_store, _, effects, _timings, execution_error) = self
2396            .execute_transaction_to_effects(
2397                executor.as_ref(),
2398                self.get_backing_store().as_ref(),
2399                protocol_config,
2400                expensive_checks,
2401                execution_params,
2402                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2403                epoch_store
2404                    .epoch_start_config()
2405                    .epoch_data()
2406                    .epoch_start_timestamp(),
2407                checked_input_objects,
2408                gas_data,
2409                gas_status,
2410                kind,
2411                rewritten_inputs,
2412                signer,
2413                transaction_digest,
2414            );
2415
2416        let tx_digest = *effects.transaction_digest();
2417
2418        let module_cache =
2419            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2420
2421        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2422            epoch_store.protocol_config(),
2423            Box::new(PackageStoreWithFallback::new(
2424                &inner_temp_store,
2425                self.get_backing_package_store(),
2426            )),
2427        );
2428        // Returning empty vector here because we recalculate changes in the rpc layer.
2429        let object_changes = Vec::new();
2430
2431        // Returning empty vector here because we recalculate changes in the rpc layer.
2432        let balance_changes = Vec::new();
2433
2434        let written_with_kind = effects
2435            .created()
2436            .into_iter()
2437            .map(|(oref, _)| (oref, WriteKind::Create))
2438            .chain(
2439                effects
2440                    .unwrapped()
2441                    .into_iter()
2442                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2443            )
2444            .chain(
2445                effects
2446                    .mutated()
2447                    .into_iter()
2448                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2449            )
2450            .map(|(oref, kind)| {
2451                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2452                // TODO: Avoid clones.
2453                (oref.0, (oref, obj.clone(), kind))
2454            })
2455            .collect();
2456
2457        let execution_error_source = execution_error
2458            .as_ref()
2459            .err()
2460            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2461
2462        Ok((
2463            DryRunTransactionBlockResponse {
2464                suggested_gas_price: self
2465                    .congestion_tracker
2466                    .get_suggested_gas_prices(&transaction),
2467                input: SuiTransactionBlockData::try_from_with_module_cache(
2468                    transaction,
2469                    &module_cache,
2470                )
2471                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2472                    error: format!(
2473                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2474                        e
2475                    ),
2476                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2477                effects: effects.clone().try_into()?,
2478                events: SuiTransactionBlockEvents::try_from(
2479                    inner_temp_store.events.clone(),
2480                    tx_digest,
2481                    None,
2482                    layout_resolver.as_mut(),
2483                )?,
2484                object_changes,
2485                balance_changes,
2486                execution_error_source,
2487            },
2488            written_with_kind,
2489            effects,
2490            mock_gas,
2491        ))
2492    }
2493
2494    pub fn simulate_transaction(
2495        &self,
2496        mut transaction: TransactionData,
2497        checks: TransactionChecks,
2498        allow_mock_gas_coin: bool,
2499    ) -> SuiResult<SimulateTransactionResult> {
2500        if transaction.kind().is_system_tx() {
2501            return Err(SuiErrorKind::UnsupportedFeatureError {
2502                error: "simulate does not support system transactions".to_string(),
2503            }
2504            .into());
2505        }
2506
2507        let epoch_store = self.load_epoch_store_one_call_per_task();
2508        if !self.is_fullnode(&epoch_store) {
2509            return Err(SuiErrorKind::UnsupportedFeatureError {
2510                error: "simulate is only supported on fullnodes".to_string(),
2511            }
2512            .into());
2513        }
2514
2515        let dev_inspect = checks.disabled();
2516        if dev_inspect && self.config.dev_inspect_disabled {
2517            return Err(SuiErrorKind::UnsupportedFeatureError {
2518                error: "simulate with checks disabled is not allowed on this node".to_string(),
2519            }
2520            .into());
2521        }
2522
2523        // Reject coin reservations in gas payment when the execution engine
2524        // doesn't support them.
2525        let protocol_config = epoch_store.protocol_config();
2526        if !protocol_config.enable_coin_reservation_obj_refs()
2527            && transaction.gas().iter().any(|obj_ref| {
2528                sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2529            })
2530        {
2531            return Err(SuiErrorKind::UnsupportedFeatureError {
2532                error:
2533                    "coin reservations in gas payment are not supported at this protocol version"
2534                        .to_string(),
2535            }
2536            .into());
2537        }
2538
2539        // Cheap validity checks for a transaction, including input size limits.
2540        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2541
2542        let input_object_kinds = transaction.input_objects()?;
2543        let receiving_object_refs = transaction.receiving_objects();
2544
2545        // Create and inject mock gas coin before pre_object_load_checks so that
2546        // funds withdrawal processing sees non-empty payment and doesn't incorrectly
2547        // create an address balance withdrawal for gas.
2548        // Skip mock gas for gasless transactions — they don't use gas coins.
2549        let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2550        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2551        {
2552            let obj = Object::new_move(
2553                MoveObject::new_gas_coin(
2554                    OBJECT_START_VERSION,
2555                    ObjectID::MAX,
2556                    DEV_INSPECT_GAS_COIN_VALUE,
2557                ),
2558                Owner::AddressOwner(transaction.gas_data().owner),
2559                TransactionDigest::genesis_marker(),
2560            );
2561            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2562            Some(obj)
2563        } else {
2564            None
2565        };
2566
2567        let declared_withdrawals = self.pre_object_load_checks(
2568            &transaction,
2569            &[],
2570            &input_object_kinds,
2571            &receiving_object_refs,
2572            epoch_store.protocol_config(),
2573        )?;
2574        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2575
2576        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2577            // We don't want to cache this transaction since it's a simulation.
2578            None,
2579            &input_object_kinds,
2580            &receiving_object_refs,
2581            epoch_store.epoch(),
2582        )?;
2583
2584        // Add mock gas to input objects after loading (it doesn't exist in the store).
2585        let mock_gas_id = mock_gas_object.map(|obj| {
2586            let id = obj.id();
2587            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2588            id
2589        });
2590
2591        let protocol_config = epoch_store.protocol_config();
2592
2593        let (gas_status, checked_input_objects) = if dev_inspect {
2594            sui_transaction_checks::check_dev_inspect_input(
2595                protocol_config,
2596                &transaction,
2597                input_objects,
2598                receiving_objects,
2599                epoch_store.reference_gas_price(),
2600            )?
2601        } else {
2602            sui_transaction_checks::check_transaction_input(
2603                epoch_store.protocol_config(),
2604                epoch_store.reference_gas_price(),
2605                &transaction,
2606                input_objects,
2607                &receiving_objects,
2608                &self.metrics.bytecode_verifier_metrics,
2609                &self.config.verifier_signing_config,
2610            )?
2611        };
2612
2613        // TODO see if we can spin up a VM once and reuse it
2614        let executor = sui_execution::executor(
2615            protocol_config,
2616            true, // silent
2617        )
2618        .expect("Creating an executor should not fail here");
2619
2620        let (mut kind, signer, gas_data) = transaction.execution_parts();
2621        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2622            self.chain_identifier,
2623            &*self.coin_reservation_resolver,
2624            signer,
2625            &mut kind,
2626            None,
2627        )?;
2628        let early_execution_error = get_early_execution_error(
2629            &transaction.digest(),
2630            &checked_input_objects,
2631            self.config.certificate_deny_config.certificate_deny_set(),
2632            &FundsWithdrawStatus::MaybeSufficient,
2633        );
2634        let execution_params = match early_execution_error {
2635            Some(error) => ExecutionOrEarlyError::Err(error),
2636            None => ExecutionOrEarlyError::Ok(()),
2637        };
2638
2639        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2640
2641        // Clone inputs for potential retry if object funds check fails post-execution.
2642        let cloned_input_objects = checked_input_objects.clone();
2643        let cloned_gas = gas_data.clone();
2644        let cloned_kind = kind.clone();
2645        let tx_digest = transaction.digest();
2646        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2647        let epoch_timestamp_ms = epoch_store
2648            .epoch_start_config()
2649            .epoch_data()
2650            .epoch_start_timestamp();
2651        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2652            &tracking_store,
2653            protocol_config,
2654            self.metrics.execution_metrics.clone(),
2655            false, // expensive_checks
2656            execution_params,
2657            &epoch_id,
2658            epoch_timestamp_ms,
2659            checked_input_objects,
2660            gas_data,
2661            gas_status,
2662            kind,
2663            rewritten_inputs.clone(),
2664            signer,
2665            tx_digest,
2666            dev_inspect,
2667        );
2668
2669        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2670        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2671            let has_insufficient_object_funds = inner_temp_store
2672                .accumulator_running_max_withdraws
2673                .iter()
2674                .filter(|(id, _)| !address_funds.contains(id))
2675                .any(|(id, max_withdraw)| {
2676                    let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2677                    balance < *max_withdraw
2678                });
2679
2680            if has_insufficient_object_funds {
2681                let retry_gas_status = SuiGasStatus::new(
2682                    cloned_gas.budget,
2683                    cloned_gas.price,
2684                    epoch_store.reference_gas_price(),
2685                    protocol_config,
2686                )?;
2687                let (store, _, effects, result) = executor.dev_inspect_transaction(
2688                    &tracking_store,
2689                    protocol_config,
2690                    self.metrics.execution_metrics.clone(),
2691                    false,
2692                    ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2693                    &epoch_id,
2694                    epoch_timestamp_ms,
2695                    cloned_input_objects,
2696                    cloned_gas,
2697                    retry_gas_status,
2698                    cloned_kind,
2699                    rewritten_inputs,
2700                    signer,
2701                    tx_digest,
2702                    dev_inspect,
2703                );
2704                (store, effects, result)
2705            } else {
2706                (inner_temp_store, effects, execution_result)
2707            }
2708        } else {
2709            (inner_temp_store, effects, execution_result)
2710        };
2711
2712        let loaded_runtime_objects = tracking_store.into_read_objects();
2713        let unchanged_loaded_runtime_objects =
2714            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2715                &transaction,
2716                &effects,
2717                &loaded_runtime_objects,
2718            );
2719
2720        let object_set = {
2721            let objects = {
2722                let mut objects = loaded_runtime_objects;
2723
2724                for o in inner_temp_store
2725                    .input_objects
2726                    .into_values()
2727                    .chain(inner_temp_store.written.into_values())
2728                {
2729                    objects.insert(o);
2730                }
2731
2732                objects
2733            };
2734
2735            let object_keys = sui_types::storage::get_transaction_object_set(
2736                &transaction,
2737                &effects,
2738                &unchanged_loaded_runtime_objects,
2739            );
2740
2741            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2742            for k in object_keys {
2743                if let Some(o) = objects.get(&k) {
2744                    set.insert(o.clone());
2745                }
2746            }
2747
2748            set
2749        };
2750
2751        Ok(SimulateTransactionResult {
2752            objects: object_set,
2753            events: effects.events_digest().map(|_| inner_temp_store.events),
2754            effects,
2755            execution_result,
2756            mock_gas_id,
2757            unchanged_loaded_runtime_objects,
2758            suggested_gas_price: self
2759                .congestion_tracker
2760                .get_suggested_gas_prices(&transaction),
2761        })
2762    }
2763
2764    /// The object ID for gas can be any object ID, even for an uncreated object
2765    #[allow(clippy::collapsible_else_if)]
2766    #[instrument(skip_all)]
2767    pub async fn dev_inspect_transaction_block(
2768        &self,
2769        sender: SuiAddress,
2770        transaction_kind: TransactionKind,
2771        gas_price: Option<u64>,
2772        gas_budget: Option<u64>,
2773        gas_sponsor: Option<SuiAddress>,
2774        gas_objects: Option<Vec<ObjectRef>>,
2775        show_raw_txn_data_and_effects: Option<bool>,
2776        skip_checks: Option<bool>,
2777    ) -> SuiResult<DevInspectResults> {
2778        let epoch_store = self.load_epoch_store_one_call_per_task();
2779
2780        if !self.is_fullnode(&epoch_store) {
2781            return Err(SuiErrorKind::UnsupportedFeatureError {
2782                error: "dev-inspect is only supported on fullnodes".to_string(),
2783            }
2784            .into());
2785        }
2786
2787        if transaction_kind.is_system_tx() {
2788            return Err(SuiErrorKind::UnsupportedFeatureError {
2789                error: "system transactions are not supported".to_string(),
2790            }
2791            .into());
2792        }
2793
2794        let skip_checks = skip_checks.unwrap_or(true);
2795        if skip_checks && self.config.dev_inspect_disabled {
2796            return Err(SuiErrorKind::UnsupportedFeatureError {
2797                error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2798            }
2799            .into());
2800        }
2801
2802        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2803        let reference_gas_price = epoch_store.reference_gas_price();
2804        let protocol_config = epoch_store.protocol_config();
2805        let max_tx_gas = protocol_config.max_tx_gas();
2806
2807        let price = gas_price.unwrap_or(reference_gas_price);
2808        let budget = gas_budget.unwrap_or(max_tx_gas);
2809        let owner = gas_sponsor.unwrap_or(sender);
2810        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2811        let payment = gas_objects.unwrap_or_default();
2812        let mut transaction = TransactionData::V1(TransactionDataV1 {
2813            kind: transaction_kind.clone(),
2814            sender,
2815            gas_data: GasData {
2816                payment,
2817                owner,
2818                price,
2819                budget,
2820            },
2821            expiration: TransactionExpiration::None,
2822        });
2823
2824        let raw_txn_data = if show_raw_txn_data_and_effects {
2825            bcs::to_bytes(&transaction).map_err(|_| {
2826                SuiErrorKind::TransactionSerializationError {
2827                    error: "Failed to serialize transaction during dev inspect".to_string(),
2828                }
2829            })?
2830        } else {
2831            vec![]
2832        };
2833
2834        transaction.validity_check_no_gas_check(protocol_config)?;
2835
2836        let input_object_kinds = transaction.input_objects()?;
2837        let receiving_object_refs = transaction.receiving_objects();
2838
2839        sui_transaction_checks::deny::check_transaction_for_signing(
2840            &transaction,
2841            &[],
2842            &input_object_kinds,
2843            &receiving_object_refs,
2844            &self.config.transaction_deny_config,
2845            self.get_backing_package_store().as_ref(),
2846        )?;
2847
2848        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2849            // We don't want to cache this transaction since it's a dev inspect.
2850            None,
2851            &input_object_kinds,
2852            &receiving_object_refs,
2853            epoch_store.epoch(),
2854        )?;
2855
2856        let (gas_status, checked_input_objects) = if skip_checks {
2857            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2858            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2859            // use the dummy gas object so we need to add it to the input objects vector.
2860            if transaction.gas().is_empty() {
2861                // Create and use a dummy gas object if there is no gas object provided.
2862                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2863                    DEV_INSPECT_GAS_COIN_VALUE,
2864                    transaction.gas_owner(),
2865                );
2866                let gas_object_ref = dummy_gas_object.compute_object_reference();
2867                transaction.gas_data_mut().payment = vec![gas_object_ref];
2868                input_objects.push(ObjectReadResult::new(
2869                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2870                    dummy_gas_object.into(),
2871                ));
2872            }
2873            sui_transaction_checks::check_dev_inspect_input(
2874                protocol_config,
2875                &transaction,
2876                input_objects,
2877                receiving_objects,
2878                reference_gas_price,
2879            )?
2880        } else {
2881            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2882            // variant which will perform full fledged checks just like a real transaction execution.
2883            if transaction.gas().is_empty() {
2884                // Create and use a dummy gas object if there is no gas object provided.
2885                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2886                    DEV_INSPECT_GAS_COIN_VALUE,
2887                    transaction.gas_owner(),
2888                );
2889                let gas_object_ref = dummy_gas_object.compute_object_reference();
2890                transaction.gas_data_mut().payment = vec![gas_object_ref];
2891                sui_transaction_checks::check_transaction_input_with_given_gas(
2892                    epoch_store.protocol_config(),
2893                    epoch_store.reference_gas_price(),
2894                    &transaction,
2895                    input_objects,
2896                    receiving_objects,
2897                    dummy_gas_object,
2898                    &self.metrics.bytecode_verifier_metrics,
2899                    &self.config.verifier_signing_config,
2900                )?
2901            } else {
2902                sui_transaction_checks::check_transaction_input(
2903                    epoch_store.protocol_config(),
2904                    epoch_store.reference_gas_price(),
2905                    &transaction,
2906                    input_objects,
2907                    &receiving_objects,
2908                    &self.metrics.bytecode_verifier_metrics,
2909                    &self.config.verifier_signing_config,
2910                )?
2911            }
2912        };
2913
2914        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2915            .expect("Creating an executor should not fail here");
2916        let gas_data = transaction.gas_data().clone();
2917        let intent_msg = IntentMessage::new(
2918            Intent {
2919                version: IntentVersion::V0,
2920                scope: IntentScope::TransactionData,
2921                app_id: AppId::Sui,
2922            },
2923            transaction,
2924        );
2925        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2926        let early_execution_error = get_early_execution_error(
2927            &transaction_digest,
2928            &checked_input_objects,
2929            self.config.certificate_deny_config.certificate_deny_set(),
2930            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2931            &FundsWithdrawStatus::MaybeSufficient,
2932        );
2933        let execution_params = match early_execution_error {
2934            Some(error) => ExecutionOrEarlyError::Err(error),
2935            None => ExecutionOrEarlyError::Ok(()),
2936        };
2937
2938        let mut transaction_kind = transaction_kind;
2939        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2940            self.chain_identifier,
2941            &*self.coin_reservation_resolver,
2942            sender,
2943            &mut transaction_kind,
2944            None,
2945        )?;
2946        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2947            self.get_backing_store().as_ref(),
2948            protocol_config,
2949            self.metrics.execution_metrics.clone(),
2950            /* expensive checks */ false,
2951            execution_params,
2952            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2953            epoch_store
2954                .epoch_start_config()
2955                .epoch_data()
2956                .epoch_start_timestamp(),
2957            checked_input_objects,
2958            gas_data,
2959            gas_status,
2960            transaction_kind,
2961            rewritten_inputs,
2962            sender,
2963            transaction_digest,
2964            skip_checks,
2965        );
2966
2967        let raw_effects = if show_raw_txn_data_and_effects {
2968            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2969                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2970            })?
2971        } else {
2972            vec![]
2973        };
2974
2975        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2976            epoch_store.protocol_config(),
2977            Box::new(PackageStoreWithFallback::new(
2978                &inner_temp_store,
2979                self.get_backing_package_store(),
2980            )),
2981        );
2982
2983        DevInspectResults::new(
2984            effects,
2985            inner_temp_store.events.clone(),
2986            execution_result,
2987            raw_txn_data,
2988            raw_effects,
2989            layout_resolver.as_mut(),
2990        )
2991    }
2992
2993    // Only used for testing because of how epoch store is loaded.
2994    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2995        let epoch_store = self.epoch_store_for_testing();
2996        Ok(epoch_store.reference_gas_price())
2997    }
2998
2999    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
3000        self.get_transaction_cache_reader()
3001            .is_tx_already_executed(digest)
3002    }
3003
3004    #[instrument(level = "debug", skip_all, err(level = "debug"))]
3005    fn index_tx(
3006        sequence: u64,
3007        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3008        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3009        indexes: &IndexStore,
3010        digest: &TransactionDigest,
3011        // TODO: index_tx really just need the transaction data here.
3012        cert: &VerifiedExecutableTransaction,
3013        effects: &TransactionEffects,
3014        events: &TransactionEvents,
3015        timestamp_ms: u64,
3016        tx_coins: Option<TxCoins>,
3017        written: &WrittenObjects,
3018        inner_temporary_store: &InnerTemporaryStore,
3019        epoch_store: &Arc<AuthorityPerEpochStore>,
3020        acquire_locks: bool,
3021    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3022        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
3023            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
3024
3025        indexes.index_tx(
3026            sequence,
3027            cert.data().intent_message().value.sender(),
3028            cert.data()
3029                .intent_message()
3030                .value
3031                .input_objects()?
3032                .iter()
3033                .map(|o| o.object_id()),
3034            effects
3035                .all_changed_objects()
3036                .into_iter()
3037                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
3038            cert.data()
3039                .intent_message()
3040                .value
3041                .move_calls()
3042                .into_iter()
3043                .map(|(_cmd_idx, package, module, function)| {
3044                    (*package, module.to_owned(), function.to_owned())
3045                }),
3046            events,
3047            changes,
3048            digest,
3049            timestamp_ms,
3050            tx_coins,
3051            effects.accumulator_events(),
3052            acquire_locks,
3053        )
3054    }
3055
3056    #[cfg(msim)]
3057    fn simulate_fork_during_execution(
3058        &self,
3059        certificate: &VerifiedExecutableTransaction,
3060        epoch_store: &Arc<AuthorityPerEpochStore>,
3061        effects: &mut TransactionEffects,
3062        forked_validators: std::sync::Arc<
3063            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3064        >,
3065        full_halt: bool,
3066        effects_overrides: std::sync::Arc<
3067            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3068        >,
3069        fork_probability: f32,
3070    ) {
3071        use std::cell::RefCell;
3072        thread_local! {
3073            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3074        }
3075        if !certificate.data().intent_message().value.is_system_tx() {
3076            let committee = epoch_store.committee();
3077            let cur_stake = (**committee).weight(&self.name);
3078            if cur_stake > 0 {
3079                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3080                    let already_forked = forked_validators
3081                        .lock()
3082                        .ok()
3083                        .map(|set| set.contains(&self.name))
3084                        .unwrap_or(false);
3085
3086                    if !already_forked {
3087                        let should_fork = if full_halt {
3088                            // For full halt, fork enough nodes to reach validity threshold
3089                            *total_stake <= committee.validity_threshold()
3090                        } else {
3091                            // For partial fork, stay strictly below validity threshold
3092                            *total_stake + cur_stake < committee.validity_threshold()
3093                        };
3094
3095                        if should_fork {
3096                            *total_stake += cur_stake;
3097
3098                            if let Ok(mut external_set) = forked_validators.lock() {
3099                                external_set.insert(self.name);
3100                                info!("forked_validators: {:?}", external_set);
3101                            }
3102                        }
3103                    }
3104
3105                    if let Ok(external_set) = forked_validators.lock() {
3106                        if external_set.contains(&self.name) {
3107                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
3108                            // Fork this transaction and record the digest and the original effects to original_effects.
3109                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
3110                            // the transaction was forked on a different validator), fork this txn as well.
3111
3112                            let tx_digest = certificate.digest().to_string();
3113                            if let Ok(mut overrides) = effects_overrides.lock() {
3114                                if overrides.contains_key(&tx_digest)
3115                                    || overrides.is_empty()
3116                                        && sui_simulator::random::deterministic_probability(
3117                                            &tx_digest,
3118                                            fork_probability,
3119                                        )
3120                                {
3121                                    let original_effects_digest = effects.digest().to_string();
3122                                    overrides
3123                                        .insert(tx_digest.clone(), original_effects_digest.clone());
3124                                    info!(
3125                                        ?tx_digest,
3126                                        ?original_effects_digest,
3127                                        "Captured forked effects digest for transaction"
3128                                    );
3129                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
3130                                        1;
3131                                }
3132                            }
3133                        }
3134                    }
3135                });
3136            }
3137        }
3138    }
3139
3140    fn process_object_index(
3141        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3142        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3143        effects: &TransactionEffects,
3144        written: &WrittenObjects,
3145        inner_temporary_store: &InnerTemporaryStore,
3146        epoch_store: &Arc<AuthorityPerEpochStore>,
3147    ) -> SuiResult<ObjectIndexChanges> {
3148        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3149            epoch_store.protocol_config(),
3150            Box::new(PackageStoreWithFallback::new(
3151                inner_temporary_store,
3152                backing_package_store,
3153            )),
3154        );
3155
3156        let modified_at_version = effects
3157            .modified_at_versions()
3158            .into_iter()
3159            .collect::<HashMap<_, _>>();
3160
3161        let tx_digest = effects.transaction_digest();
3162        let mut deleted_owners = vec![];
3163        let mut deleted_dynamic_fields = vec![];
3164        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3165            let old_version = modified_at_version.get(&id).unwrap();
3166            // When we process the index, the latest object hasn't been written yet so
3167            // the old object must be present.
3168            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3169                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3170            ) {
3171                Owner::AddressOwner(addr)
3172                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3173                Owner::ObjectOwner(object_id) => {
3174                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3175                }
3176                _ => {}
3177            }
3178        }
3179
3180        let mut new_owners = vec![];
3181        let mut new_dynamic_fields = vec![];
3182
3183        for (oref, owner, kind) in effects.all_changed_objects() {
3184            let id = &oref.0;
3185            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3186            if let WriteKind::Mutate = kind {
3187                let Some(old_version) = modified_at_version.get(id) else {
3188                    panic!(
3189                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3190                        tx_digest
3191                    );
3192                };
3193                // When we process the index, the latest object hasn't been written yet so
3194                // the old object must be present.
3195                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3196                    panic!(
3197                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3198                        tx_digest, id, old_version
3199                    );
3200                };
3201                if old_object.owner != owner {
3202                    match old_object.owner {
3203                        Owner::AddressOwner(addr)
3204                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3205                            deleted_owners.push((addr, *id));
3206                        }
3207                        Owner::ObjectOwner(object_id) => {
3208                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3209                        }
3210                        _ => {}
3211                    }
3212                }
3213            }
3214
3215            match owner {
3216                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3217                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3218                    let new_object = written.get(id).unwrap_or_else(
3219                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3220                    );
3221                    assert_eq!(
3222                        new_object.version(),
3223                        oref.1,
3224                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3225                        tx_digest,
3226                        id,
3227                        new_object.version(),
3228                        oref.1
3229                    );
3230
3231                    let type_ = new_object
3232                        .type_()
3233                        .map(|type_| ObjectType::Struct(type_.clone()))
3234                        .unwrap_or(ObjectType::Package);
3235
3236                    new_owners.push((
3237                        (addr, *id),
3238                        ObjectInfo {
3239                            object_id: *id,
3240                            version: oref.1,
3241                            digest: oref.2,
3242                            type_,
3243                            owner,
3244                            previous_transaction: *effects.transaction_digest(),
3245                        },
3246                    ));
3247                }
3248                Owner::ObjectOwner(owner) => {
3249                    let new_object = written.get(id).unwrap_or_else(
3250                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3251                    );
3252                    assert_eq!(
3253                        new_object.version(),
3254                        oref.1,
3255                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3256                        tx_digest,
3257                        id,
3258                        new_object.version(),
3259                        oref.1
3260                    );
3261
3262                    let Some(df_info) = Self::try_create_dynamic_field_info(
3263                        object_store,
3264                        new_object,
3265                        written,
3266                        layout_resolver.as_mut(),
3267                    )
3268                    .unwrap_or_else(|e| {
3269                        error!(
3270                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3271                            e, new_object
3272                        );
3273                        None
3274                    }) else {
3275                        // Skip indexing for non dynamic field objects.
3276                        continue;
3277                    };
3278                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3279                }
3280                _ => {}
3281            }
3282        }
3283
3284        Ok(ObjectIndexChanges {
3285            deleted_owners,
3286            deleted_dynamic_fields,
3287            new_owners,
3288            new_dynamic_fields,
3289        })
3290    }
3291
3292    fn try_create_dynamic_field_info(
3293        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3294        o: &Object,
3295        written: &WrittenObjects,
3296        resolver: &mut dyn LayoutResolver,
3297    ) -> SuiResult<Option<DynamicFieldInfo>> {
3298        // Skip if not a move object
3299        let Some(move_object) = o.data.try_as_move().cloned() else {
3300            return Ok(None);
3301        };
3302
3303        // We only index dynamic field objects
3304        if !move_object.type_().is_dynamic_field() {
3305            return Ok(None);
3306        }
3307
3308        let layout = resolver
3309            .get_annotated_layout(&move_object.type_().clone().into())?
3310            .into_layout();
3311
3312        let field =
3313            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3314                SuiErrorKind::ObjectDeserializationError {
3315                    error: e.to_string(),
3316                }
3317            })?;
3318
3319        let type_ = field.kind;
3320        let name_type: TypeTag = field.name_layout.into();
3321        let bcs_name = field.name_bytes.to_owned();
3322
3323        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3324            .map_err(|e| {
3325                warn!("{e}");
3326                SuiErrorKind::ObjectDeserializationError {
3327                    error: e.to_string(),
3328                }
3329            })?;
3330
3331        let name = DynamicFieldName {
3332            type_: name_type,
3333            value: SuiMoveValue::from(name_value).to_json_value(),
3334        };
3335
3336        let value_metadata = field.value_metadata().map_err(|e| {
3337            warn!("{e}");
3338            SuiErrorKind::ObjectDeserializationError {
3339                error: e.to_string(),
3340            }
3341        })?;
3342
3343        Ok(Some(match value_metadata {
3344            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3345                name,
3346                bcs_name,
3347                type_,
3348                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3349                object_id: o.id(),
3350                version: o.version(),
3351                digest: o.digest(),
3352            },
3353
3354            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3355                // Find the actual object from storage using the object id obtained from the wrapper.
3356
3357                // Try to find the object in the written objects first.
3358                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3359                    let version = object.version();
3360                    let digest = object.digest();
3361                    let object_type = object.data.type_().unwrap().clone();
3362                    (version, digest, object_type)
3363                } else {
3364                    // If not found, try to find it in the database.
3365                    let object = object_store
3366                        .get_object_by_key(&object_id, o.version())
3367                        .ok_or_else(|| UserInputError::ObjectNotFound {
3368                            object_id,
3369                            version: Some(o.version()),
3370                        })?;
3371                    let version = object.version();
3372                    let digest = object.digest();
3373                    let object_type = object.data.type_().unwrap().clone();
3374                    (version, digest, object_type)
3375                };
3376
3377                DynamicFieldInfo {
3378                    name,
3379                    bcs_name,
3380                    type_,
3381                    object_type: object_type.to_string(),
3382                    object_id,
3383                    version,
3384                    digest,
3385                }
3386            }
3387        }))
3388    }
3389
3390    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3391    fn post_process_one_tx(
3392        &self,
3393        certificate: &VerifiedExecutableTransaction,
3394        effects: &TransactionEffects,
3395        inner_temporary_store: &InnerTemporaryStore,
3396        epoch_store: &Arc<AuthorityPerEpochStore>,
3397    ) -> SuiResult {
3398        let Some(indexes) = &self.indexes else {
3399            return Ok(());
3400        };
3401
3402        let tx_digest = *certificate.digest();
3403
3404        // Allocate sequence number on the calling thread to preserve execution order.
3405        let sequence = indexes.allocate_sequence_number();
3406
3407        if self.config.sync_post_process_one_tx {
3408            // Synchronous mode: run post-processing inline on the calling thread
3409            // and commit the index batch immediately with locks held.
3410            // Used as a rollback mechanism and for testing correctness against async mode.
3411            // TODO: delete this branch once async mode has shipped
3412            let result = Self::post_process_one_tx_impl(
3413                sequence,
3414                indexes,
3415                &self.subscription_handler,
3416                &self.metrics,
3417                self.name,
3418                self.get_backing_package_store(),
3419                self.get_object_store(),
3420                certificate,
3421                effects,
3422                inner_temporary_store,
3423                epoch_store,
3424                true, // acquire_locks
3425            );
3426
3427            match result {
3428                Ok((raw_batch, cache_updates_with_locks)) => {
3429                    let mut db_batch = indexes.new_db_batch();
3430                    db_batch
3431                        .concat(vec![raw_batch])
3432                        .expect("failed to absorb raw index batch");
3433                    // Destructure to keep _locks alive through commit_index_batch.
3434                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3435                        cache_updates_with_locks;
3436                    indexes
3437                        .commit_index_batch(db_batch, vec![inner])
3438                        .expect("failed to commit index batch");
3439                }
3440                Err(e) => {
3441                    self.metrics.post_processing_total_failures.inc();
3442                    error!(?tx_digest, "tx post processing failed: {e}");
3443                    return Err(e);
3444                }
3445            }
3446
3447            return Ok(());
3448        }
3449
3450        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3451        self.pending_post_processing.insert(tx_digest, done_rx);
3452
3453        let indexes = indexes.clone();
3454        let subscription_handler = self.subscription_handler.clone();
3455        let metrics = self.metrics.clone();
3456        let name = self.name;
3457        let backing_package_store = self.get_backing_package_store().clone();
3458        let object_store = self.get_object_store().clone();
3459        let semaphore = self.post_processing_semaphore.clone();
3460
3461        let certificate = certificate.clone();
3462        let effects = effects.clone();
3463        let inner_temporary_store = inner_temporary_store.clone();
3464        let epoch_store = epoch_store.clone();
3465
3466        // spawn post processing on a blocking thread
3467        tokio::spawn(async move {
3468            let permit = {
3469                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3470                semaphore
3471                    .acquire_owned()
3472                    .await
3473                    .expect("post-processing semaphore should not be closed")
3474            };
3475
3476            let _ = tokio::task::spawn_blocking(move || {
3477                let _permit = permit;
3478
3479                let result = Self::post_process_one_tx_impl(
3480                    sequence,
3481                    &indexes,
3482                    &subscription_handler,
3483                    &metrics,
3484                    name,
3485                    &backing_package_store,
3486                    &object_store,
3487                    &certificate,
3488                    &effects,
3489                    &inner_temporary_store,
3490                    &epoch_store,
3491                    false, // acquire_locks
3492                );
3493
3494                match result {
3495                    Ok((raw_batch, cache_updates_with_locks)) => {
3496                        fail_point!("crash-after-post-process-one-tx");
3497                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3498                        let _ = done_tx.send(output);
3499                    }
3500                    Err(e) => {
3501                        metrics.post_processing_total_failures.inc();
3502                        error!(?tx_digest, "tx post processing failed: {e}");
3503                    }
3504                }
3505            })
3506            .await;
3507        });
3508
3509        Ok(())
3510    }
3511
3512    fn post_process_one_tx_impl(
3513        sequence: u64,
3514        indexes: &Arc<IndexStore>,
3515        subscription_handler: &Arc<SubscriptionHandler>,
3516        metrics: &Arc<AuthorityMetrics>,
3517        name: AuthorityName,
3518        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3519        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3520        certificate: &VerifiedExecutableTransaction,
3521        effects: &TransactionEffects,
3522        inner_temporary_store: &InnerTemporaryStore,
3523        epoch_store: &Arc<AuthorityPerEpochStore>,
3524        acquire_locks: bool,
3525    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3526        let _scope = monitored_scope("Execution::post_process_one_tx");
3527
3528        let tx_digest = certificate.digest();
3529        let timestamp_ms = Self::unixtime_now_ms();
3530        let events = &inner_temporary_store.events;
3531        let written = &inner_temporary_store.written;
3532        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3533            name,
3534            object_store,
3535            effects,
3536            inner_temporary_store,
3537            epoch_store,
3538        );
3539
3540        let (raw_batch, cache_updates) = Self::index_tx(
3541            sequence,
3542            backing_package_store,
3543            object_store,
3544            indexes,
3545            tx_digest,
3546            certificate,
3547            effects,
3548            events,
3549            timestamp_ms,
3550            tx_coins,
3551            written,
3552            inner_temporary_store,
3553            epoch_store,
3554            acquire_locks,
3555        )
3556        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3557        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3558        .expect("Indexing tx should not fail");
3559
3560        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3561        let events = Self::make_transaction_block_events(
3562            backing_package_store,
3563            events.clone(),
3564            *tx_digest,
3565            timestamp_ms,
3566            epoch_store,
3567            inner_temporary_store,
3568        )?;
3569        // Emit events
3570        subscription_handler
3571            .process_tx(certificate.data().transaction_data(), &effects, &events)
3572            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3573            .tap_err(|e| {
3574                warn!(
3575                    ?tx_digest,
3576                    "Post processing - Couldn't process events for tx: {}", e
3577                )
3578            })?;
3579
3580        metrics
3581            .post_processing_total_events_emitted
3582            .inc_by(events.data.len() as u64);
3583
3584        Ok((raw_batch, cache_updates))
3585    }
3586
3587    fn make_transaction_block_events(
3588        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3589        transaction_events: TransactionEvents,
3590        digest: TransactionDigest,
3591        timestamp_ms: u64,
3592        epoch_store: &Arc<AuthorityPerEpochStore>,
3593        inner_temporary_store: &InnerTemporaryStore,
3594    ) -> SuiResult<SuiTransactionBlockEvents> {
3595        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3596            epoch_store.protocol_config(),
3597            Box::new(PackageStoreWithFallback::new(
3598                inner_temporary_store,
3599                backing_package_store,
3600            )),
3601        );
3602        SuiTransactionBlockEvents::try_from(
3603            transaction_events,
3604            digest,
3605            Some(timestamp_ms),
3606            layout_resolver.as_mut(),
3607        )
3608    }
3609
3610    pub fn unixtime_now_ms() -> u64 {
3611        let now = SystemTime::now()
3612            .duration_since(UNIX_EPOCH)
3613            .expect("Time went backwards")
3614            .as_millis();
3615        u64::try_from(now).expect("Travelling in time machine")
3616    }
3617
3618    // TODO(fastpath): update this handler for Mysticeti fastpath.
3619    // There will no longer be validator quorum signed transactions or effects.
3620    // The proof of finality needs to come from checkpoints.
3621    #[instrument(level = "trace", skip_all)]
3622    pub async fn handle_transaction_info_request(
3623        &self,
3624        request: TransactionInfoRequest,
3625    ) -> SuiResult<TransactionInfoResponse> {
3626        let epoch_store = self.load_epoch_store_one_call_per_task();
3627        let (transaction, status) = self
3628            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3629            .ok_or(SuiErrorKind::TransactionNotFound {
3630                digest: request.transaction_digest,
3631            })?;
3632        Ok(TransactionInfoResponse {
3633            transaction,
3634            status,
3635        })
3636    }
3637
3638    #[instrument(level = "trace", skip_all)]
3639    pub async fn handle_object_info_request(
3640        &self,
3641        request: ObjectInfoRequest,
3642    ) -> SuiResult<ObjectInfoResponse> {
3643        let epoch_store = self.load_epoch_store_one_call_per_task();
3644
3645        let requested_object_seq = match request.request_kind {
3646            ObjectInfoRequestKind::LatestObjectInfo => {
3647                let (_, seq, _) = self
3648                    .get_object_or_tombstone(request.object_id)
3649                    .await
3650                    .ok_or_else(|| {
3651                        SuiError::from(UserInputError::ObjectNotFound {
3652                            object_id: request.object_id,
3653                            version: None,
3654                        })
3655                    })?;
3656                seq
3657            }
3658            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3659        };
3660
3661        let object = self
3662            .get_object_store()
3663            .get_object_by_key(&request.object_id, requested_object_seq)
3664            .ok_or_else(|| {
3665                SuiError::from(UserInputError::ObjectNotFound {
3666                    object_id: request.object_id,
3667                    version: Some(requested_object_seq),
3668                })
3669            })?;
3670
3671        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3672            (request.generate_layout, object.data.try_as_move())
3673        {
3674            let epoch_store = self.load_epoch_store_one_call_per_task();
3675            Some(into_struct_layout(
3676                epoch_store
3677                    .executor()
3678                    .type_layout_resolver(
3679                        epoch_store.protocol_config(),
3680                        Box::new(self.get_backing_package_store().as_ref()),
3681                    )
3682                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3683            )?)
3684        } else {
3685            None
3686        };
3687
3688        let lock = if !object.is_address_owned() {
3689            // Only address owned objects have locks.
3690            None
3691        } else {
3692            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3693                .await?
3694                .map(|s| s.into_inner())
3695        };
3696
3697        Ok(ObjectInfoResponse {
3698            object,
3699            layout,
3700            lock_for_debugging: lock,
3701        })
3702    }
3703
3704    #[instrument(level = "trace", skip_all)]
3705    pub fn handle_checkpoint_request(
3706        &self,
3707        request: &CheckpointRequest,
3708    ) -> SuiResult<CheckpointResponse> {
3709        let summary = match request.sequence_number {
3710            Some(seq) => self
3711                .checkpoint_store
3712                .get_checkpoint_by_sequence_number(seq)?,
3713            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3714        }
3715        .map(|v| v.into_inner());
3716        let contents = match &summary {
3717            Some(s) => self
3718                .checkpoint_store
3719                .get_checkpoint_contents(&s.content_digest)?,
3720            None => None,
3721        };
3722        Ok(CheckpointResponse {
3723            checkpoint: summary,
3724            contents,
3725        })
3726    }
3727
3728    #[instrument(level = "trace", skip_all)]
3729    pub fn handle_checkpoint_request_v2(
3730        &self,
3731        request: &CheckpointRequestV2,
3732    ) -> SuiResult<CheckpointResponseV2> {
3733        let summary = if request.certified {
3734            let summary = match request.sequence_number {
3735                Some(seq) => self
3736                    .checkpoint_store
3737                    .get_checkpoint_by_sequence_number(seq)?,
3738                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3739            }
3740            .map(|v| v.into_inner());
3741            summary.map(CheckpointSummaryResponse::Certified)
3742        } else {
3743            let summary = match request.sequence_number {
3744                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3745                None => self
3746                    .checkpoint_store
3747                    .get_latest_locally_computed_checkpoint()?,
3748            };
3749            summary.map(CheckpointSummaryResponse::Pending)
3750        };
3751        let contents = match &summary {
3752            Some(s) => self
3753                .checkpoint_store
3754                .get_checkpoint_contents(&s.content_digest())?,
3755            None => None,
3756        };
3757        Ok(CheckpointResponseV2 {
3758            checkpoint: summary,
3759            contents,
3760        })
3761    }
3762
3763    fn check_protocol_version(
3764        supported_protocol_versions: SupportedProtocolVersions,
3765        current_version: ProtocolVersion,
3766    ) {
3767        info!("current protocol version is now {:?}", current_version);
3768        info!("supported versions are: {:?}", supported_protocol_versions);
3769        if !supported_protocol_versions.is_version_supported(current_version) {
3770            let msg = format!(
3771                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3772                current_version, supported_protocol_versions,
3773            );
3774
3775            error!("{}", msg);
3776            eprintln!("{}", msg);
3777
3778            #[cfg(not(msim))]
3779            std::process::exit(1);
3780
3781            #[cfg(msim)]
3782            sui_simulator::task::shutdown_current_node();
3783        }
3784    }
3785
3786    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3787    #[allow(clippy::too_many_arguments)]
3788    pub async fn new(
3789        name: AuthorityName,
3790        secret: StableSyncAuthoritySigner,
3791        supported_protocol_versions: SupportedProtocolVersions,
3792        store: Arc<AuthorityStore>,
3793        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3794        epoch_store: Arc<AuthorityPerEpochStore>,
3795        committee_store: Arc<CommitteeStore>,
3796        indexes: Option<Arc<IndexStore>>,
3797        rpc_index: Option<Arc<RpcIndexStore>>,
3798        checkpoint_store: Arc<CheckpointStore>,
3799        prometheus_registry: &Registry,
3800        genesis_objects: &[Object],
3801        db_checkpoint_config: &DBCheckpointConfig,
3802        config: NodeConfig,
3803        chain_identifier: ChainIdentifier,
3804        policy_config: Option<PolicyConfig>,
3805        firewall_config: Option<RemoteFirewallConfig>,
3806        pruner_watermarks: Arc<PrunerWatermarks>,
3807    ) -> Arc<Self> {
3808        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3809
3810        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3811        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3812        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3813            execution_cache_trait_pointers.object_cache_reader.clone(),
3814            execution_cache_trait_pointers.account_funds_read.clone(),
3815            execution_cache_trait_pointers
3816                .transaction_cache_reader
3817                .clone(),
3818            tx_ready_certificates,
3819            &epoch_store,
3820            config.funds_withdraw_scheduler_type,
3821            metrics.clone(),
3822            prometheus_registry,
3823        ));
3824        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3825
3826        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3827            epoch_store.get_parent_path(),
3828            &config.authority_store_pruning_config,
3829        );
3830        let _pruner = AuthorityStorePruner::new(
3831            store.perpetual_tables.clone(),
3832            checkpoint_store.clone(),
3833            rpc_index.clone(),
3834            indexes.clone(),
3835            config.authority_store_pruning_config.clone(),
3836            epoch_store.committee().authority_exists(&name),
3837            epoch_store.epoch_start_state().epoch_duration_ms(),
3838            prometheus_registry,
3839            pruner_watermarks,
3840        );
3841        let input_loader =
3842            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3843        let epoch = epoch_store.epoch();
3844        let traffic_controller_metrics =
3845            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3846        let traffic_controller = if let Some(policy_config) = policy_config {
3847            Some(Arc::new(
3848                TrafficController::init(
3849                    policy_config,
3850                    traffic_controller_metrics,
3851                    firewall_config.clone(),
3852                )
3853                .await,
3854            ))
3855        } else {
3856            None
3857        };
3858
3859        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3860            ForkRecoveryState::new(Some(fork_config))
3861                .expect("Failed to initialize fork recovery state")
3862        });
3863
3864        let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3865            execution_cache_trait_pointers.child_object_resolver.clone(),
3866        ));
3867
3868        let object_funds_checker_metrics =
3869            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3870        let state = Arc::new(AuthorityState {
3871            name,
3872            secret,
3873            execution_lock: RwLock::new(epoch),
3874            epoch_store: ArcSwap::new(epoch_store.clone()),
3875            input_loader,
3876            execution_cache_trait_pointers,
3877            coin_reservation_resolver,
3878            indexes,
3879            rpc_index,
3880            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3881            checkpoint_store,
3882            committee_store,
3883            execution_scheduler,
3884            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3885            metrics,
3886            _pruner,
3887            _authority_per_epoch_pruner,
3888            db_checkpoint_config: db_checkpoint_config.clone(),
3889            config,
3890            overload_info: AuthorityOverloadInfo::default(),
3891            chain_identifier,
3892            congestion_tracker: Arc::new(CongestionTracker::new()),
3893            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3894            traffic_controller,
3895            fork_recovery_state,
3896            notify_epoch: tokio::sync::watch::channel(epoch).0,
3897            object_funds_checker: ArcSwapOption::empty(),
3898            object_funds_checker_metrics,
3899            pending_post_processing: Arc::new(DashMap::new()),
3900            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3901        });
3902        state.init_object_funds_checker().await;
3903
3904        let state_clone = Arc::downgrade(&state);
3905        spawn_monitored_task!(fix_indexes(state_clone));
3906        // Start a task to execute ready certificates.
3907        let authority_state = Arc::downgrade(&state);
3908        spawn_monitored_task!(execution_process(
3909            authority_state,
3910            rx_ready_certificates,
3911            rx_execution_shutdown,
3912        ));
3913        // TODO: This doesn't belong to the constructor of AuthorityState.
3914        state
3915            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3916            .expect("Error indexing genesis objects.");
3917
3918        if epoch_store
3919            .protocol_config()
3920            .enable_multi_epoch_transaction_expiration()
3921            && epoch_store.epoch() > 0
3922        {
3923            use typed_store::Map;
3924            let previous_epoch = epoch_store.epoch() - 1;
3925            let start_key = (previous_epoch, TransactionDigest::ZERO);
3926            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3927            let has_previous_epoch_data = store
3928                .perpetual_tables
3929                .executed_transaction_digests
3930                .safe_range_iter(start_key..end_key)
3931                .next()
3932                .is_some();
3933
3934            if !has_previous_epoch_data {
3935                panic!(
3936                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3937                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3938                    Please restore from a snapshot using the latest version of sui-tool.",
3939                    previous_epoch
3940                );
3941            }
3942        }
3943
3944        state
3945    }
3946
3947    async fn init_object_funds_checker(&self) {
3948        let epoch_store = self.epoch_store.load();
3949        if self.is_validator(&epoch_store)
3950            && epoch_store.protocol_config().enable_object_funds_withdraw()
3951        {
3952            if self.object_funds_checker.load().is_none() {
3953                let inner = self
3954                    .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3955                    .await
3956                    .map(|o| {
3957                        Arc::new(ObjectFundsChecker::new(
3958                            o.version(),
3959                            self.object_funds_checker_metrics.clone(),
3960                        ))
3961                    });
3962                self.object_funds_checker.store(inner);
3963            }
3964        } else {
3965            self.object_funds_checker.store(None);
3966        }
3967    }
3968
3969    // TODO: Consolidate our traits to reduce the number of methods here.
3970    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3971        &self.execution_cache_trait_pointers.object_cache_reader
3972    }
3973
3974    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3975        &self.execution_cache_trait_pointers.transaction_cache_reader
3976    }
3977
3978    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3979        &self.execution_cache_trait_pointers.cache_writer
3980    }
3981
3982    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3983        &self.execution_cache_trait_pointers.backing_store
3984    }
3985
3986    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3987        &self.execution_cache_trait_pointers.child_object_resolver
3988    }
3989
3990    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3991        &self.execution_cache_trait_pointers.account_funds_read
3992    }
3993
3994    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3995        &self.execution_cache_trait_pointers.backing_package_store
3996    }
3997
3998    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3999        &self.execution_cache_trait_pointers.object_store
4000    }
4001
4002    pub fn pending_post_processing(
4003        &self,
4004    ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
4005        &self.pending_post_processing
4006    }
4007
4008    pub async fn await_post_processing(
4009        &self,
4010        tx_digest: &TransactionDigest,
4011    ) -> Option<PostProcessingOutput> {
4012        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
4013            // Tx was executed and post-processing is in flight.
4014            rx.await.ok()
4015        } else {
4016            // Tx was already persisted or post-processing already completed.
4017            None
4018        }
4019    }
4020
4021    /// Await post-processing for a transaction and commit the index batch immediately.
4022    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
4023    /// index batches at checkpoint boundaries.
4024    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
4025        if let Some(indexes) = &self.indexes
4026            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
4027        {
4028            let mut db_batch = indexes.new_db_batch();
4029            db_batch
4030                .concat(vec![raw_batch])
4031                .expect("failed to build index batch");
4032            indexes
4033                .commit_index_batch(db_batch, vec![cache_updates])
4034                .expect("failed to commit index batch");
4035        }
4036    }
4037
4038    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
4039        &self.execution_cache_trait_pointers.reconfig_api
4040    }
4041
4042    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
4043        &self.execution_cache_trait_pointers.global_state_hash_store
4044    }
4045
4046    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
4047        &self.execution_cache_trait_pointers.checkpoint_cache
4048    }
4049
4050    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
4051        &self.execution_cache_trait_pointers.state_sync_store
4052    }
4053
4054    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
4055        &self.execution_cache_trait_pointers.cache_commit
4056    }
4057
4058    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
4059        self.execution_cache_trait_pointers
4060            .testing_api
4061            .database_for_testing()
4062    }
4063
4064    pub fn cache_for_testing(&self) -> &WritebackCache {
4065        self.execution_cache_trait_pointers
4066            .testing_api
4067            .cache_for_testing()
4068    }
4069
4070    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4071        &self,
4072        config: NodeConfig,
4073        metrics: Arc<AuthorityStorePruningMetrics>,
4074    ) -> anyhow::Result<()> {
4075        use crate::authority::authority_store_pruner::PrunerWatermarks;
4076        let watermarks = Arc::new(PrunerWatermarks::default());
4077        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4078            &self.database_for_testing().perpetual_tables,
4079            &self.checkpoint_store,
4080            self.rpc_index.as_deref(),
4081            config.authority_store_pruning_config,
4082            metrics,
4083            EPOCH_DURATION_MS_FOR_TESTING,
4084            &watermarks,
4085        )
4086        .await
4087    }
4088
4089    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4090        &self.execution_scheduler
4091    }
4092
4093    fn create_owner_index_if_empty(
4094        &self,
4095        genesis_objects: &[Object],
4096        epoch_store: &Arc<AuthorityPerEpochStore>,
4097    ) -> SuiResult {
4098        let Some(index_store) = &self.indexes else {
4099            return Ok(());
4100        };
4101        if !index_store.is_empty() {
4102            return Ok(());
4103        }
4104
4105        let mut new_owners = vec![];
4106        let mut new_dynamic_fields = vec![];
4107        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
4108            epoch_store.protocol_config(),
4109            Box::new(self.get_backing_package_store().as_ref()),
4110        );
4111        for o in genesis_objects.iter() {
4112            match o.owner {
4113                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4114                    new_owners.push((
4115                        (addr, o.id()),
4116                        ObjectInfo::new(&o.compute_object_reference(), o),
4117                    ))
4118                }
4119                Owner::ObjectOwner(object_id) => {
4120                    let id = o.id();
4121                    let Some(info) = Self::try_create_dynamic_field_info(
4122                        self.get_object_store(),
4123                        o,
4124                        &BTreeMap::new(),
4125                        layout_resolver.as_mut(),
4126                    )?
4127                    else {
4128                        continue;
4129                    };
4130                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4131                }
4132                _ => {}
4133            }
4134        }
4135
4136        index_store.insert_genesis_objects(ObjectIndexChanges {
4137            deleted_owners: vec![],
4138            deleted_dynamic_fields: vec![],
4139            new_owners,
4140            new_dynamic_fields,
4141        })
4142    }
4143
4144    /// Attempts to acquire execution lock for an executable transaction.
4145    /// Returns Some(lock) if the transaction is matching current executed epoch.
4146    /// Returns None if validator is halted at epoch end or epoch mismatch.
4147    pub fn execution_lock_for_executable_transaction(
4148        &self,
4149        transaction: &VerifiedExecutableTransaction,
4150    ) -> Option<ExecutionLockReadGuard<'_>> {
4151        let lock = self.execution_lock.try_read().ok()?;
4152        if *lock == transaction.auth_sig().epoch() {
4153            Some(lock)
4154        } else {
4155            // TODO: Can this still happen?
4156            None
4157        }
4158    }
4159
4160    /// Acquires the execution lock for the duration of transaction validation.
4161    /// This prevents reconfiguration from starting until we are finished validating the transaction.
4162    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4163        self.execution_lock
4164            .try_read()
4165            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4166    }
4167
4168    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4169        self.execution_lock.write().await
4170    }
4171
4172    #[instrument(level = "error", skip_all)]
4173    pub async fn reconfigure(
4174        &self,
4175        cur_epoch_store: &AuthorityPerEpochStore,
4176        supported_protocol_versions: SupportedProtocolVersions,
4177        new_committee: Committee,
4178        epoch_start_configuration: EpochStartConfiguration,
4179        state_hasher: Arc<GlobalStateHasher>,
4180        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4181        epoch_last_checkpoint: CheckpointSequenceNumber,
4182    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4183        Self::check_protocol_version(
4184            supported_protocol_versions,
4185            epoch_start_configuration
4186                .epoch_start_state()
4187                .protocol_version(),
4188        );
4189
4190        self.committee_store.insert_new_committee(&new_committee)?;
4191
4192        // Wait until no transactions are being executed.
4193        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4194
4195        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
4196        cur_epoch_store.epoch_terminated().await;
4197
4198        // Safe to being reconfiguration now. No transactions are being executed,
4199        // and no epoch-specific tasks are running.
4200
4201        {
4202            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4203            if state.should_accept_user_certs() {
4204                // Need to change this so that consensus adapter do not accept certificates from user.
4205                // This can happen if our local validator did not initiate epoch change locally,
4206                // but 2f+1 nodes already concluded the epoch.
4207                //
4208                // This lock is essentially a barrier for
4209                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
4210                cur_epoch_store.close_user_certs(state);
4211            }
4212            // lock is dropped here
4213        }
4214
4215        self.get_reconfig_api()
4216            .clear_state_end_of_epoch(&execution_lock);
4217        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4218        self.maybe_reaccumulate_state_hash(
4219            cur_epoch_store,
4220            epoch_start_configuration
4221                .epoch_start_state()
4222                .protocol_version(),
4223        );
4224        self.get_reconfig_api()
4225            .set_epoch_start_configuration(&epoch_start_configuration);
4226        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4227            && self
4228                .db_checkpoint_config
4229                .perform_db_checkpoints_at_epoch_end
4230        {
4231            let checkpoint_indexes = self
4232                .db_checkpoint_config
4233                .perform_index_db_checkpoints_at_epoch_end
4234                .unwrap_or(false);
4235            let current_epoch = cur_epoch_store.epoch();
4236            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4237            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4238        }
4239
4240        self.get_reconfig_api()
4241            .reconfigure_cache(&epoch_start_configuration)
4242            .await;
4243
4244        let new_epoch = new_committee.epoch;
4245        let new_epoch_store = self
4246            .reopen_epoch_db(
4247                cur_epoch_store,
4248                new_committee,
4249                epoch_start_configuration,
4250                expensive_safety_check_config,
4251                epoch_last_checkpoint,
4252            )
4253            .await?;
4254        assert_eq!(new_epoch_store.epoch(), new_epoch);
4255        self.execution_scheduler
4256            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4257        self.init_object_funds_checker().await;
4258        *execution_lock = new_epoch;
4259
4260        self.notify_epoch(new_epoch);
4261        // drop execution_lock after epoch store was updated
4262        // see also assert in AuthorityState::process_certificate
4263        // on the epoch store and execution lock epoch match
4264        Ok(new_epoch_store)
4265    }
4266
4267    fn notify_epoch(&self, new_epoch: EpochId) {
4268        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4269    }
4270
4271    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4272        let mut rx = self.notify_epoch.subscribe();
4273        loop {
4274            let epoch = *rx.borrow();
4275            if epoch >= target_epoch {
4276                return Ok(epoch);
4277            }
4278            rx.changed().await?;
4279        }
4280    }
4281
4282    /// Executes accumulator settlement for testing purposes.
4283    /// Returns a list of (transaction, execution_env) pairs that can be replayed on another
4284    /// AuthorityState (e.g., a fullnode) using `replay_settlement_for_testing`.
4285    pub async fn settle_accumulator_for_testing(
4286        &self,
4287        effects: &[TransactionEffects],
4288        checkpoint_seq: Option<u64>,
4289    ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4290        let accumulator_version = self
4291            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4292            .await
4293            .unwrap()
4294            .version();
4295        // Use provided checkpoint sequence, or fall back to accumulator version.
4296        let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4297        let builder = AccumulatorSettlementTxBuilder::new(
4298            Some(self.get_transaction_cache_reader().as_ref()),
4299            effects,
4300            ckpt_seq,
4301            0,
4302        );
4303        let balance_changes = builder.collect_funds_changes();
4304        let epoch_store = self.epoch_store_for_testing();
4305        let epoch = epoch_store.epoch();
4306        let accumulator_root_obj_initial_shared_version = epoch_store
4307            .epoch_start_config()
4308            .accumulator_root_obj_initial_shared_version()
4309            .unwrap();
4310        let settlements = builder.build_tx(
4311            epoch_store.protocol_config(),
4312            epoch,
4313            accumulator_root_obj_initial_shared_version,
4314            ckpt_seq,
4315            ckpt_seq,
4316        );
4317
4318        let settlements: Vec<_> = settlements
4319            .into_iter()
4320            .map(|tx| {
4321                VerifiedExecutableTransaction::new_system(
4322                    VerifiedTransaction::new_system_transaction(tx),
4323                    epoch,
4324                )
4325            })
4326            .collect();
4327
4328        let assigned_versions = epoch_store
4329            .assign_shared_object_versions_for_tests(
4330                self.get_object_cache_reader().as_ref(),
4331                &settlements,
4332            )
4333            .unwrap();
4334        let version_map = assigned_versions.into_map();
4335
4336        let mut replay_txns = Vec::new();
4337        let mut settlement_effects = Vec::with_capacity(settlements.len());
4338        for tx in settlements {
4339            let assigned = version_map.get(&tx.key()).unwrap().clone();
4340            let env = ExecutionEnv::new().with_assigned_versions(assigned);
4341            let (effects, _) = self
4342                .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4343                .await
4344                .unwrap();
4345            assert!(effects.status().is_ok());
4346            replay_txns.push((tx, env));
4347            settlement_effects.push(effects);
4348        }
4349
4350        let barrier = accumulators::build_accumulator_barrier_tx(
4351            epoch,
4352            accumulator_root_obj_initial_shared_version,
4353            ckpt_seq,
4354            &settlement_effects,
4355        );
4356        let barrier = VerifiedExecutableTransaction::new_system(
4357            VerifiedTransaction::new_system_transaction(barrier),
4358            epoch,
4359        );
4360
4361        let assigned_versions = epoch_store
4362            .assign_shared_object_versions_for_tests(
4363                self.get_object_cache_reader().as_ref(),
4364                std::slice::from_ref(&barrier),
4365            )
4366            .unwrap();
4367        let version_map = assigned_versions.into_map();
4368
4369        let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4370        let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4371        let (effects, _) = self
4372            .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4373            .await
4374            .unwrap();
4375        assert!(effects.status().is_ok());
4376        replay_txns.push((barrier, env));
4377
4378        let next_accumulator_version = accumulator_version.next();
4379        self.execution_scheduler
4380            .settle_address_funds(FundsSettlement {
4381                funds_changes: balance_changes,
4382                next_accumulator_version,
4383            });
4384        // object funds are settled while executing the barrier transaction
4385
4386        replay_txns
4387    }
4388
4389    /// Replays settlement transactions on this AuthorityState.
4390    /// Used to sync a fullnode with settlement transactions executed on a validator.
4391    pub async fn replay_settlement_for_testing(
4392        &self,
4393        txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4394    ) {
4395        let epoch_store = self.epoch_store_for_testing();
4396        for (tx, env) in txns {
4397            let (effects, _) = self
4398                .try_execute_immediately(tx, env.clone(), &epoch_store)
4399                .await
4400                .unwrap();
4401            assert!(effects.status().is_ok());
4402        }
4403    }
4404
4405    /// Advance the epoch store to the next epoch for testing only.
4406    /// This only manually sets all the places where we have the epoch number.
4407    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4408    pub async fn reconfigure_for_testing(&self) {
4409        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4410        let epoch_store = self.epoch_store_for_testing().clone();
4411        let protocol_config = epoch_store.protocol_config().clone();
4412        // The current protocol config used in the epoch store may have been overridden and diverged from
4413        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4414        // We reapply the override before creating the new epoch store, to make sure that
4415        // the new epoch store has the same protocol config as the current one.
4416        // Since this is for testing only, we mostly like to keep the protocol config the same
4417        // across epochs.
4418        let _guard =
4419            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4420        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4421            self.get_backing_package_store().clone(),
4422            self.get_object_store().clone(),
4423            &self.config.expensive_safety_check_config,
4424            self.checkpoint_store
4425                .get_epoch_last_checkpoint(epoch_store.epoch())
4426                .unwrap()
4427                .map(|c| *c.sequence_number())
4428                .unwrap_or_default(),
4429        );
4430        self.execution_scheduler
4431            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4432        let new_epoch = new_epoch_store.epoch();
4433        self.epoch_store.store(new_epoch_store);
4434        epoch_store.epoch_terminated().await;
4435
4436        *execution_lock = new_epoch;
4437    }
4438
4439    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4440    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4441    #[instrument(level = "error", skip_all)]
4442    fn maybe_reaccumulate_state_hash(
4443        &self,
4444        cur_epoch_store: &AuthorityPerEpochStore,
4445        new_protocol_version: ProtocolVersion,
4446    ) {
4447        self.get_reconfig_api()
4448            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4449    }
4450
4451    #[instrument(level = "error", skip_all)]
4452    fn check_system_consistency(
4453        &self,
4454        cur_epoch_store: &AuthorityPerEpochStore,
4455        state_hasher: Arc<GlobalStateHasher>,
4456        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4457    ) {
4458        info!(
4459            "Performing sui conservation consistency check for epoch {}",
4460            cur_epoch_store.epoch()
4461        );
4462
4463        if let Err(err) = self
4464            .get_reconfig_api()
4465            .expensive_check_sui_conservation(cur_epoch_store)
4466        {
4467            if cfg!(debug_assertions) {
4468                panic!("{}", err);
4469            } else {
4470                // We cannot panic in production yet because it is known that there are some
4471                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4472                warn!("Sui conservation consistency check failed: {}", err);
4473            }
4474        } else {
4475            info!("Sui conservation consistency check passed");
4476        }
4477
4478        // check for root state hash consistency with live object set
4479        if expensive_safety_check_config.enable_state_consistency_check() {
4480            info!(
4481                "Performing state consistency check for epoch {}",
4482                cur_epoch_store.epoch()
4483            );
4484            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4485        }
4486
4487        if expensive_safety_check_config.enable_secondary_index_checks()
4488            && let Some(indexes) = self.indexes.clone()
4489        {
4490            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4491                .expect("secondary indexes are inconsistent");
4492        }
4493
4494        // Verify all checkpointed transactions are present in transactions_seq.
4495        // This catches any post-processing gaps that could occur if async
4496        // post-processing failed to complete before persistence.
4497        if expensive_safety_check_config.enable_secondary_index_checks()
4498            && let Some(indexes) = self.indexes.clone()
4499        {
4500            let epoch = cur_epoch_store.epoch();
4501            // Only verify the current epoch's checkpoints. Previous epoch contents
4502            // may have been pruned, and we only need to verify that this epoch's
4503            // async post-processing completed correctly.
4504            let first_checkpoint = if epoch == 0 {
4505                0
4506            } else {
4507                self.checkpoint_store
4508                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4509                    .expect("Failed to get previous epoch's last checkpoint")
4510                    .expect("Previous epoch's last checkpoint missing")
4511                    + 1
4512            };
4513            let highest_executed = self
4514                .checkpoint_store
4515                .get_highest_executed_checkpoint_seq_number()
4516                .expect("Failed to get highest executed checkpoint")
4517                .expect("No executed checkpoints");
4518
4519            info!(
4520                "Verifying checkpointed transactions are in transactions_seq \
4521                 (checkpoints {first_checkpoint}..={highest_executed})"
4522            );
4523            for seq in first_checkpoint..=highest_executed {
4524                let checkpoint = self
4525                    .checkpoint_store
4526                    .get_checkpoint_by_sequence_number(seq)
4527                    .expect("Failed to get checkpoint")
4528                    .expect("Checkpoint missing");
4529                let contents = self
4530                    .checkpoint_store
4531                    .get_checkpoint_contents(&checkpoint.content_digest)
4532                    .expect("Failed to get checkpoint contents")
4533                    .expect("Checkpoint contents missing");
4534                for digests in contents.iter() {
4535                    let tx_digest = digests.transaction;
4536                    assert!(
4537                        indexes
4538                            .get_transaction_seq(&tx_digest)
4539                            .expect("Failed to read transactions_seq")
4540                            .is_some(),
4541                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4542                    );
4543                }
4544            }
4545            info!("All checkpointed transactions verified in transactions_seq");
4546        }
4547    }
4548
4549    fn expensive_check_is_consistent_state(
4550        &self,
4551        state_hasher: Arc<GlobalStateHasher>,
4552        cur_epoch_store: &AuthorityPerEpochStore,
4553    ) {
4554        let live_object_set_hash = state_hasher.digest_live_object_set(
4555            !cur_epoch_store
4556                .protocol_config()
4557                .simplified_unwrap_then_delete(),
4558        );
4559
4560        let root_state_hash: ECMHLiveObjectSetDigest = self
4561            .get_global_state_hash_store()
4562            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4563            .expect("Retrieving root state hash cannot fail")
4564            .expect("Root state hash for epoch must exist")
4565            .1
4566            .digest()
4567            .into();
4568
4569        let is_inconsistent = root_state_hash != live_object_set_hash;
4570        if is_inconsistent {
4571            debug_fatal!(
4572                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4573                root_state_hash,
4574                live_object_set_hash
4575            );
4576        } else {
4577            info!("State consistency check passed");
4578        }
4579
4580        state_hasher.set_inconsistent_state(is_inconsistent);
4581    }
4582
4583    pub fn current_epoch_for_testing(&self) -> EpochId {
4584        self.epoch_store_for_testing().epoch()
4585    }
4586
4587    #[instrument(level = "error", skip_all)]
4588    pub fn checkpoint_all_dbs(
4589        &self,
4590        checkpoint_path: &Path,
4591        cur_epoch_store: &AuthorityPerEpochStore,
4592        checkpoint_indexes: bool,
4593    ) -> SuiResult {
4594        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4595        let current_epoch = cur_epoch_store.epoch();
4596
4597        if checkpoint_path.exists() {
4598            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4599            return Ok(());
4600        }
4601
4602        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4603        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4604
4605        if checkpoint_path_tmp.exists() {
4606            fs::remove_dir_all(&checkpoint_path_tmp)
4607                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4608        }
4609
4610        fs::create_dir_all(&checkpoint_path_tmp)
4611            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4612        fs::create_dir(&store_checkpoint_path_tmp)
4613            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4614
4615        // NOTE: Do not change the order of invoking these checkpoint calls
4616        // We want to snapshot checkpoint db first to not race with state sync
4617        self.checkpoint_store
4618            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4619
4620        self.get_reconfig_api()
4621            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4622
4623        self.committee_store
4624            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4625
4626        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4627            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4628        }
4629
4630        fs::rename(checkpoint_path_tmp, checkpoint_path)
4631            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4632        Ok(())
4633    }
4634
4635    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4636    /// we never end up accessing different epoch stores in a single task, we need to make sure
4637    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4638    /// the case. This also means we should minimize the number of call-sites. Only call it when
4639    /// there is no way to obtain it from somewhere else.
4640    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4641        self.epoch_store.load()
4642    }
4643
4644    // Load the epoch store, should be used in tests only.
4645    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4646        self.load_epoch_store_one_call_per_task()
4647    }
4648
4649    pub fn clone_committee_for_testing(&self) -> Committee {
4650        Committee::clone(self.epoch_store_for_testing().committee())
4651    }
4652
4653    #[instrument(level = "trace", skip_all)]
4654    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4655        self.get_object_store().get_object(object_id)
4656    }
4657
4658    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4659        Ok(self
4660            .get_object(&SUI_SYSTEM_ADDRESS.into())
4661            .await
4662            .expect("framework object should always exist")
4663            .compute_object_reference())
4664    }
4665
4666    // This function is only used for testing.
4667    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4668        self.get_object_cache_reader()
4669            .get_sui_system_state_object_unsafe()
4670    }
4671
4672    #[instrument(level = "trace", skip_all)]
4673    fn get_transaction_checkpoint_sequence(
4674        &self,
4675        digest: &TransactionDigest,
4676        epoch_store: &AuthorityPerEpochStore,
4677    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4678        epoch_store.get_transaction_checkpoint(digest)
4679    }
4680
4681    #[instrument(level = "trace", skip_all)]
4682    pub fn get_checkpoint_by_sequence_number(
4683        &self,
4684        sequence_number: CheckpointSequenceNumber,
4685    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4686        Ok(self
4687            .checkpoint_store
4688            .get_checkpoint_by_sequence_number(sequence_number)?)
4689    }
4690
4691    #[instrument(level = "trace", skip_all)]
4692    pub fn get_transaction_checkpoint_for_tests(
4693        &self,
4694        digest: &TransactionDigest,
4695        epoch_store: &AuthorityPerEpochStore,
4696    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4697        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4698        let Some(checkpoint) = checkpoint else {
4699            return Ok(None);
4700        };
4701        let checkpoint = self
4702            .checkpoint_store
4703            .get_checkpoint_by_sequence_number(checkpoint)?;
4704        Ok(checkpoint)
4705    }
4706
4707    #[instrument(level = "trace", skip_all)]
4708    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4709        Ok(
4710            match self
4711                .get_object_cache_reader()
4712                .get_latest_object_or_tombstone(*object_id)
4713            {
4714                Some((_, ObjectOrTombstone::Object(object))) => {
4715                    let layout = self.get_object_layout(&object)?;
4716                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4717                }
4718                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4719                None => ObjectRead::NotExists(*object_id),
4720            },
4721        )
4722    }
4723
4724    /// Chain Identifier is the digest of the genesis checkpoint.
4725    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4726        self.chain_identifier
4727    }
4728
4729    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4730        self.fork_recovery_state.as_ref()
4731    }
4732
4733    #[instrument(level = "trace", skip_all)]
4734    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4735    where
4736        T: DeserializeOwned,
4737    {
4738        let o = self.get_object_read(object_id)?.into_object()?;
4739        if let Some(move_object) = o.data.try_as_move() {
4740            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4741                SuiErrorKind::ObjectDeserializationError {
4742                    error: format!("{e}"),
4743                }
4744            })?)
4745        } else {
4746            Err(SuiErrorKind::ObjectDeserializationError {
4747                error: format!("Provided object : [{object_id}] is not a Move object."),
4748            }
4749            .into())
4750        }
4751    }
4752
4753    /// This function aims to serve rpc reads on past objects and
4754    /// we don't expect it to be called for other purposes.
4755    /// Depending on the object pruning policies that will be enforced in the
4756    /// future there is no software-level guarantee/SLA to retrieve an object
4757    /// with an old version even if it exists/existed.
4758    #[instrument(level = "trace", skip_all)]
4759    pub fn get_past_object_read(
4760        &self,
4761        object_id: &ObjectID,
4762        version: SequenceNumber,
4763    ) -> SuiResult<PastObjectRead> {
4764        // Firstly we see if the object ever existed by getting its latest data
4765        let Some(obj_ref) = self
4766            .get_object_cache_reader()
4767            .get_latest_object_ref_or_tombstone(*object_id)
4768        else {
4769            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4770        };
4771
4772        if version > obj_ref.1 {
4773            return Ok(PastObjectRead::VersionTooHigh {
4774                object_id: *object_id,
4775                asked_version: version,
4776                latest_version: obj_ref.1,
4777            });
4778        }
4779
4780        if version < obj_ref.1 {
4781            // Read past objects
4782            return Ok(match self.read_object_at_version(object_id, version)? {
4783                Some((object, layout)) => {
4784                    let obj_ref = object.compute_object_reference();
4785                    PastObjectRead::VersionFound(obj_ref, object, layout)
4786                }
4787
4788                None => PastObjectRead::VersionNotFound(*object_id, version),
4789            });
4790        }
4791
4792        if !obj_ref.2.is_alive() {
4793            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4794        }
4795
4796        match self.read_object_at_version(object_id, obj_ref.1)? {
4797            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4798            None => {
4799                debug_fatal!(
4800                    "Object with in parent_entry is missing from object store, datastore is \
4801                     inconsistent"
4802                );
4803                Err(UserInputError::ObjectNotFound {
4804                    object_id: *object_id,
4805                    version: Some(obj_ref.1),
4806                }
4807                .into())
4808            }
4809        }
4810    }
4811
4812    #[instrument(level = "trace", skip_all)]
4813    fn read_object_at_version(
4814        &self,
4815        object_id: &ObjectID,
4816        version: SequenceNumber,
4817    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4818        let Some(object) = self
4819            .get_object_cache_reader()
4820            .get_object_by_key(object_id, version)
4821        else {
4822            return Ok(None);
4823        };
4824
4825        let layout = self.get_object_layout(&object)?;
4826        Ok(Some((object, layout)))
4827    }
4828
4829    pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4830        let layout = object
4831            .data
4832            .try_as_move()
4833            .map(|object| {
4834                let epoch_store = self.load_epoch_store_one_call_per_task();
4835                into_struct_layout(
4836                    epoch_store
4837                        .executor()
4838                        // TODO(cache) - must read through cache
4839                        .type_layout_resolver(
4840                            epoch_store.protocol_config(),
4841                            Box::new(self.get_backing_package_store().as_ref()),
4842                        )
4843                        .get_annotated_layout(&object.type_().clone().into())?,
4844                )
4845            })
4846            .transpose()?;
4847        Ok(layout)
4848    }
4849
4850    /// Returns a fake ObjectRef representing an address balance, along with the balance value
4851    /// and the previous transaction digest. The ObjectRef can be returned to JSON-RPC clients
4852    /// that don't understand address balances.
4853    #[instrument(level = "trace", skip_all)]
4854    pub fn get_address_balance_coin_info(
4855        &self,
4856        owner: SuiAddress,
4857        balance_type: TypeTag,
4858    ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4859        let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4860        let accumulator_obj = AccumulatorValue::load_object_by_id(
4861            self.get_child_object_resolver().as_ref(),
4862            None,
4863            *accumulator_id.inner(),
4864        )?;
4865
4866        let Some(accumulator_obj) = accumulator_obj else {
4867            return Ok(None);
4868        };
4869
4870        // Extract the currency type from balance_type (e.g., SUI from Balance<SUI>).
4871        // get_balance expects the currency type, not the balance type.
4872        let currency_type =
4873            Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4874
4875        let balance = crate::accumulators::balances::get_balance(
4876            owner,
4877            self.get_child_object_resolver().as_ref(),
4878            currency_type,
4879        )?;
4880
4881        if balance == 0 {
4882            return Ok(None);
4883        };
4884
4885        let object_ref = coin_reservation::encode_object_ref(
4886            accumulator_obj.id(),
4887            accumulator_obj.version(),
4888            self.load_epoch_store_one_call_per_task().epoch(),
4889            balance,
4890            self.get_chain_identifier(),
4891        );
4892
4893        Ok(Some((
4894            object_ref,
4895            balance,
4896            accumulator_obj.previous_transaction,
4897        )))
4898    }
4899
4900    /// Returns fake ObjectRefs for all address balances of an owner, keyed by coin type string.
4901    /// Used by get_all_coins to include fake coins for each coin type.
4902    #[instrument(level = "trace", skip_all)]
4903    pub fn get_all_address_balance_coin_infos(
4904        &self,
4905        owner: SuiAddress,
4906    ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4907        let indexes = self
4908            .indexes
4909            .as_ref()
4910            .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4911
4912        let mut result = std::collections::HashMap::new();
4913        for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4914            let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4915            if let Some((obj_ref, balance, prev_tx)) =
4916                self.get_address_balance_coin_info(owner, balance_type)?
4917            {
4918                // Use currency_type.to_string() to match the format in CoinIndexKey2
4919                // (e.g., "0x2::sui::SUI", not "0x2::coin::Coin<0x2::sui::SUI>")
4920                result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4921            }
4922        }
4923        Ok(result)
4924    }
4925
4926    fn get_owner_at_version(
4927        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4928        object_id: &ObjectID,
4929        version: SequenceNumber,
4930    ) -> SuiResult<Owner> {
4931        object_store
4932            .get_object_by_key(object_id, version)
4933            .ok_or_else(|| {
4934                SuiError::from(UserInputError::ObjectNotFound {
4935                    object_id: *object_id,
4936                    version: Some(version),
4937                })
4938            })
4939            .map(|o| o.owner.clone())
4940    }
4941
4942    #[instrument(level = "trace", skip_all)]
4943    pub fn get_owner_objects(
4944        &self,
4945        owner: SuiAddress,
4946        // If `Some`, the query will start from the next item after the specified cursor
4947        cursor: Option<ObjectID>,
4948        limit: usize,
4949        filter: Option<SuiObjectDataFilter>,
4950    ) -> SuiResult<Vec<ObjectInfo>> {
4951        if let Some(indexes) = &self.indexes {
4952            indexes.get_owner_objects(owner, cursor, limit, filter)
4953        } else {
4954            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4955        }
4956    }
4957
4958    #[instrument(level = "trace", skip_all)]
4959    pub fn get_owned_coins_iterator_with_cursor(
4960        &self,
4961        owner: SuiAddress,
4962        // If `Some`, the query will start from the next item after the specified cursor
4963        cursor: (String, u64, ObjectID),
4964        limit: usize,
4965        one_coin_type_only: bool,
4966    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4967        if let Some(indexes) = &self.indexes {
4968            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4969        } else {
4970            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4971        }
4972    }
4973
4974    #[instrument(level = "trace", skip_all)]
4975    pub fn get_owner_objects_iterator(
4976        &self,
4977        owner: SuiAddress,
4978        // If `Some`, the query will start from the next item after the specified cursor
4979        cursor: Option<ObjectID>,
4980        filter: Option<SuiObjectDataFilter>,
4981    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4982        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4983        if let Some(indexes) = &self.indexes {
4984            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4985        } else {
4986            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4987        }
4988    }
4989
4990    #[instrument(level = "trace", skip_all)]
4991    pub async fn get_move_objects<T>(
4992        &self,
4993        owner: SuiAddress,
4994        type_: MoveObjectType,
4995    ) -> SuiResult<Vec<T>>
4996    where
4997        T: DeserializeOwned,
4998    {
4999        let object_ids = self
5000            .get_owner_objects_iterator(owner, None, None)?
5001            .filter(|o| match &o.type_ {
5002                ObjectType::Struct(s) => &type_ == s,
5003                ObjectType::Package => false,
5004            })
5005            .map(|info| ObjectKey(info.object_id, info.version))
5006            .collect::<Vec<_>>();
5007        let mut move_objects = vec![];
5008
5009        let objects = self
5010            .get_object_store()
5011            .multi_get_objects_by_key(&object_ids);
5012
5013        for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
5014            let object = o.ok_or_else(|| {
5015                SuiError::from(UserInputError::ObjectNotFound {
5016                    object_id: id.0,
5017                    version: Some(id.1),
5018                })
5019            })?;
5020            let move_object = object.data.try_as_move().ok_or_else(|| {
5021                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
5022            })?;
5023            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
5024                SuiErrorKind::ObjectDeserializationError {
5025                    error: format!("{e}"),
5026                }
5027            })?);
5028        }
5029        Ok(move_objects)
5030    }
5031
5032    #[instrument(level = "trace", skip_all)]
5033    pub fn get_dynamic_fields(
5034        &self,
5035        owner: ObjectID,
5036        // If `Some`, the query will start from the next item after the specified cursor
5037        cursor: Option<ObjectID>,
5038        limit: usize,
5039    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
5040        Ok(self
5041            .get_dynamic_fields_iterator(owner, cursor)?
5042            .take(limit)
5043            .collect::<Result<Vec<_>, _>>()?)
5044    }
5045
5046    fn get_dynamic_fields_iterator(
5047        &self,
5048        owner: ObjectID,
5049        // If `Some`, the query will start from the next item after the specified cursor
5050        cursor: Option<ObjectID>,
5051    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
5052    {
5053        if let Some(indexes) = &self.indexes {
5054            indexes.get_dynamic_fields_iterator(owner, cursor)
5055        } else {
5056            Err(SuiErrorKind::IndexStoreNotAvailable.into())
5057        }
5058    }
5059
5060    #[instrument(level = "trace", skip_all)]
5061    pub fn get_dynamic_field_object_id(
5062        &self,
5063        owner: ObjectID,
5064        name_type: TypeTag,
5065        name_bcs_bytes: &[u8],
5066    ) -> SuiResult<Option<ObjectID>> {
5067        if let Some(indexes) = &self.indexes {
5068            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5069        } else {
5070            Err(SuiErrorKind::IndexStoreNotAvailable.into())
5071        }
5072    }
5073
5074    #[instrument(level = "trace", skip_all)]
5075    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5076        Ok(self.get_indexes()?.next_sequence_number())
5077    }
5078
5079    #[instrument(level = "trace", skip_all)]
5080    pub async fn get_executed_transaction_and_effects(
5081        &self,
5082        digest: TransactionDigest,
5083        kv_store: Arc<TransactionKeyValueStore>,
5084    ) -> SuiResult<(Transaction, TransactionEffects)> {
5085        let transaction = kv_store.get_tx(digest).await?;
5086        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5087        Ok((transaction, effects))
5088    }
5089
5090    #[instrument(level = "trace", skip_all)]
5091    pub fn multi_get_checkpoint_by_sequence_number(
5092        &self,
5093        sequence_numbers: &[CheckpointSequenceNumber],
5094    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5095        Ok(self
5096            .checkpoint_store
5097            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5098    }
5099
5100    #[instrument(level = "trace", skip_all)]
5101    pub fn get_transaction_events(
5102        &self,
5103        digest: &TransactionDigest,
5104    ) -> SuiResult<TransactionEvents> {
5105        self.get_transaction_cache_reader()
5106            .get_events(digest)
5107            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5108    }
5109
5110    pub fn get_transaction_input_objects(
5111        &self,
5112        effects: &TransactionEffects,
5113    ) -> SuiResult<Vec<Object>> {
5114        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5115            .map_err(Into::into)
5116    }
5117
5118    pub fn get_transaction_output_objects(
5119        &self,
5120        effects: &TransactionEffects,
5121    ) -> SuiResult<Vec<Object>> {
5122        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5123            .map_err(Into::into)
5124    }
5125
5126    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5127        match &self.indexes {
5128            Some(i) => Ok(i.clone()),
5129            None => Err(SuiErrorKind::UnsupportedFeatureError {
5130                error: "extended object indexing is not enabled on this server".into(),
5131            }
5132            .into()),
5133        }
5134    }
5135
5136    pub async fn get_transactions_for_tests(
5137        self: &Arc<Self>,
5138        filter: Option<TransactionFilter>,
5139        cursor: Option<TransactionDigest>,
5140        limit: Option<usize>,
5141        reverse: bool,
5142    ) -> SuiResult<Vec<TransactionDigest>> {
5143        let metrics = KeyValueStoreMetrics::new_for_tests();
5144        let kv_store = Arc::new(TransactionKeyValueStore::new(
5145            "rocksdb",
5146            metrics,
5147            self.clone(),
5148        ));
5149        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5150            .await
5151    }
5152
5153    #[instrument(level = "trace", skip_all)]
5154    pub async fn get_transactions(
5155        &self,
5156        kv_store: &Arc<TransactionKeyValueStore>,
5157        filter: Option<TransactionFilter>,
5158        // If `Some`, the query will start from the next item after the specified cursor
5159        cursor: Option<TransactionDigest>,
5160        limit: Option<usize>,
5161        reverse: bool,
5162    ) -> SuiResult<Vec<TransactionDigest>> {
5163        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5164            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5165            let iter = checkpoint_contents.iter().map(|c| c.transaction);
5166            if reverse {
5167                let iter = iter
5168                    .rev()
5169                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5170                    .skip(usize::from(cursor.is_some()));
5171                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5172            } else {
5173                let iter = iter
5174                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5175                    .skip(usize::from(cursor.is_some()));
5176                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5177            }
5178        }
5179        self.get_indexes()?
5180            .get_transactions(filter, cursor, limit, reverse)
5181    }
5182
5183    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5184        &self.checkpoint_store
5185    }
5186
5187    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5188        self.get_checkpoint_store()
5189            .get_highest_executed_checkpoint_seq_number()?
5190            .ok_or(
5191                SuiErrorKind::UserInputError {
5192                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5193                }
5194                .into(),
5195            )
5196    }
5197
5198    #[cfg(msim)]
5199    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5200        self.database_for_testing()
5201            .perpetual_tables
5202            .get_highest_pruned_checkpoint()
5203            .map(|c| c.unwrap_or(0))
5204            .map_err(Into::into)
5205    }
5206
5207    #[instrument(level = "trace", skip_all)]
5208    pub fn get_checkpoint_summary_by_sequence_number(
5209        &self,
5210        sequence_number: CheckpointSequenceNumber,
5211    ) -> SuiResult<CheckpointSummary> {
5212        let verified_checkpoint = self
5213            .get_checkpoint_store()
5214            .get_checkpoint_by_sequence_number(sequence_number)?;
5215        match verified_checkpoint {
5216            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5217            None => Err(SuiErrorKind::UserInputError {
5218                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5219            }
5220            .into()),
5221        }
5222    }
5223
5224    #[instrument(level = "trace", skip_all)]
5225    pub fn get_checkpoint_summary_by_digest(
5226        &self,
5227        digest: CheckpointDigest,
5228    ) -> SuiResult<CheckpointSummary> {
5229        let verified_checkpoint = self
5230            .get_checkpoint_store()
5231            .get_checkpoint_by_digest(&digest)?;
5232        match verified_checkpoint {
5233            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5234            None => Err(SuiErrorKind::UserInputError {
5235                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5236            }
5237            .into()),
5238        }
5239    }
5240
5241    #[instrument(level = "trace", skip_all)]
5242    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5243        if is_system_package(package_id) {
5244            return self.find_genesis_txn_digest();
5245        }
5246        Ok(self
5247            .get_object_read(&package_id)?
5248            .into_object()?
5249            .previous_transaction)
5250    }
5251
5252    #[instrument(level = "trace", skip_all)]
5253    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5254        let summary = self
5255            .get_verified_checkpoint_by_sequence_number(0)?
5256            .into_message();
5257        let content = self.get_checkpoint_contents(summary.content_digest)?;
5258        let genesis_transaction = content.enumerate_transactions(&summary).next();
5259        Ok(genesis_transaction
5260            .ok_or(SuiErrorKind::UserInputError {
5261                error: UserInputError::GenesisTransactionNotFound,
5262            })?
5263            .1
5264            .transaction)
5265    }
5266
5267    #[instrument(level = "trace", skip_all)]
5268    pub fn get_verified_checkpoint_by_sequence_number(
5269        &self,
5270        sequence_number: CheckpointSequenceNumber,
5271    ) -> SuiResult<VerifiedCheckpoint> {
5272        let verified_checkpoint = self
5273            .get_checkpoint_store()
5274            .get_checkpoint_by_sequence_number(sequence_number)?;
5275        match verified_checkpoint {
5276            Some(verified_checkpoint) => Ok(verified_checkpoint),
5277            None => Err(SuiErrorKind::UserInputError {
5278                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5279            }
5280            .into()),
5281        }
5282    }
5283
5284    #[instrument(level = "trace", skip_all)]
5285    pub fn get_verified_checkpoint_summary_by_digest(
5286        &self,
5287        digest: CheckpointDigest,
5288    ) -> SuiResult<VerifiedCheckpoint> {
5289        let verified_checkpoint = self
5290            .get_checkpoint_store()
5291            .get_checkpoint_by_digest(&digest)?;
5292        match verified_checkpoint {
5293            Some(verified_checkpoint) => Ok(verified_checkpoint),
5294            None => Err(SuiErrorKind::UserInputError {
5295                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5296            }
5297            .into()),
5298        }
5299    }
5300
5301    #[instrument(level = "trace", skip_all)]
5302    pub fn get_checkpoint_contents(
5303        &self,
5304        digest: CheckpointContentsDigest,
5305    ) -> SuiResult<CheckpointContents> {
5306        self.get_checkpoint_store()
5307            .get_checkpoint_contents(&digest)?
5308            .ok_or(
5309                SuiErrorKind::UserInputError {
5310                    error: UserInputError::CheckpointContentsNotFound(digest),
5311                }
5312                .into(),
5313            )
5314    }
5315
5316    #[instrument(level = "trace", skip_all)]
5317    pub fn get_checkpoint_contents_by_sequence_number(
5318        &self,
5319        sequence_number: CheckpointSequenceNumber,
5320    ) -> SuiResult<CheckpointContents> {
5321        let verified_checkpoint = self
5322            .get_checkpoint_store()
5323            .get_checkpoint_by_sequence_number(sequence_number)?;
5324        match verified_checkpoint {
5325            Some(verified_checkpoint) => {
5326                let content_digest = verified_checkpoint.into_inner().content_digest;
5327                self.get_checkpoint_contents(content_digest)
5328            }
5329            None => Err(SuiErrorKind::UserInputError {
5330                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5331            }
5332            .into()),
5333        }
5334    }
5335
5336    #[instrument(level = "trace", skip_all)]
5337    pub async fn query_events(
5338        &self,
5339        kv_store: &Arc<TransactionKeyValueStore>,
5340        query: EventFilter,
5341        // If `Some`, the query will start from the next item after the specified cursor
5342        cursor: Option<EventID>,
5343        limit: usize,
5344        descending: bool,
5345    ) -> SuiResult<Vec<SuiEvent>> {
5346        let index_store = self.get_indexes()?;
5347
5348        //Get the tx_num from tx_digest
5349        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5350            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5351                SuiErrorKind::TransactionNotFound {
5352                    digest: cursor.tx_digest,
5353                },
5354            )?;
5355            (tx_seq, cursor.event_seq as usize)
5356        } else if descending {
5357            (u64::MAX, usize::MAX)
5358        } else {
5359            (0, 0)
5360        };
5361
5362        let limit = limit + 1;
5363        let mut event_keys = match query {
5364            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5365            EventFilter::Transaction(digest) => {
5366                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5367            }
5368            EventFilter::MoveModule { package, module } => {
5369                let module_id = ModuleId::new(package.into(), module);
5370                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5371            }
5372            EventFilter::MoveEventType(struct_name) => index_store
5373                .events_by_move_event_struct_name(
5374                    &struct_name,
5375                    tx_num,
5376                    event_num,
5377                    limit,
5378                    descending,
5379                )?,
5380            EventFilter::Sender(sender) => {
5381                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5382            }
5383            EventFilter::TimeRange {
5384                start_time,
5385                end_time,
5386            } => index_store
5387                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5388            EventFilter::MoveEventModule { package, module } => index_store
5389                .events_by_move_event_module(
5390                    &ModuleId::new(package.into(), module),
5391                    tx_num,
5392                    event_num,
5393                    limit,
5394                    descending,
5395                )?,
5396            // not using "_ =>" because we want to make sure we remember to add new variants here
5397            EventFilter::Any(_) => {
5398                return Err(SuiErrorKind::UserInputError {
5399                    error: UserInputError::Unsupported(
5400                        "'Any' queries are not supported by the fullnode.".to_string(),
5401                    ),
5402                }
5403                .into());
5404            }
5405        };
5406
5407        // skip one event if exclusive cursor is provided,
5408        // otherwise truncate to the original limit.
5409        if cursor.is_some() {
5410            if !event_keys.is_empty() {
5411                event_keys.remove(0);
5412            }
5413        } else {
5414            event_keys.truncate(limit - 1);
5415        }
5416
5417        // get the unique set of digests from the event_keys
5418        let transaction_digests = event_keys
5419            .iter()
5420            .map(|(_, digest, _, _)| *digest)
5421            .collect::<HashSet<_>>()
5422            .into_iter()
5423            .collect::<Vec<_>>();
5424
5425        let events = kv_store
5426            .multi_get_events_by_tx_digests(&transaction_digests)
5427            .await?;
5428
5429        let events_map: HashMap<_, _> = transaction_digests
5430            .iter()
5431            .zip_debug_eq(events.into_iter())
5432            .collect();
5433
5434        let stored_events = event_keys
5435            .into_iter()
5436            .map(|k| {
5437                (
5438                    k,
5439                    events_map
5440                        .get(&k.1)
5441                        .expect("fetched digest is missing")
5442                        .clone()
5443                        .and_then(|e| e.data.get(k.2).cloned()),
5444                )
5445            })
5446            .map(
5447                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5448                    event
5449                        .map(|e| (e, tx_digest, event_seq, timestamp))
5450                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5451                },
5452            )
5453            .collect::<Result<Vec<_>, _>>()?;
5454
5455        let epoch_store = self.load_epoch_store_one_call_per_task();
5456        let backing_store = self.get_backing_package_store().as_ref();
5457        let mut layout_resolver = epoch_store
5458            .executor()
5459            .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5460        let mut events = vec![];
5461        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5462            events.push(SuiEvent::try_from(
5463                e.clone(),
5464                tx_digest,
5465                event_seq as u64,
5466                Some(timestamp),
5467                layout_resolver.get_annotated_layout(&e.type_)?,
5468            )?)
5469        }
5470        Ok(events)
5471    }
5472
5473    pub async fn insert_genesis_object(&self, object: Object) {
5474        self.get_reconfig_api().insert_genesis_object(object);
5475    }
5476
5477    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5478        futures::future::join_all(
5479            objects
5480                .iter()
5481                .map(|o| self.insert_genesis_object(o.clone())),
5482        )
5483        .await;
5484    }
5485
5486    /// Make a status response for a transaction
5487    #[instrument(level = "trace", skip_all)]
5488    pub fn get_transaction_status(
5489        &self,
5490        transaction_digest: &TransactionDigest,
5491        epoch_store: &Arc<AuthorityPerEpochStore>,
5492    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5493        // TODO: In the case of read path, we should not have to re-sign the effects.
5494        if let Some(effects) =
5495            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5496        {
5497            if let Some(transaction) = self
5498                .get_transaction_cache_reader()
5499                .get_transaction_block(transaction_digest)
5500            {
5501                let events = if effects.events_digest().is_some() {
5502                    self.get_transaction_events(effects.transaction_digest())?
5503                } else {
5504                    TransactionEvents::default()
5505                };
5506                // The cert_sig slot is permanently None: validators no longer aggregate or
5507                // persist per-transaction quorum signatures (the transaction_cert_signatures
5508                // table is deprecated and unwritten).
5509                return Ok(Some((
5510                    (*transaction).clone().into_message(),
5511                    TransactionStatus::Executed(None, effects.into_inner(), events),
5512                )));
5513            } else {
5514                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5515                // the transaction (during epoch change) in between the above two reads, and we end up
5516                // having effects but not transaction. In this case, we just fall through.
5517                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5518            }
5519        }
5520        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5521            self.metrics.tx_already_processed.inc();
5522            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5523            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5524        } else {
5525            Ok(None)
5526        }
5527    }
5528
5529    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5530    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5531    /// epoch.
5532    #[instrument(level = "trace", skip_all)]
5533    pub fn get_signed_effects_and_maybe_resign(
5534        &self,
5535        transaction_digest: &TransactionDigest,
5536        epoch_store: &Arc<AuthorityPerEpochStore>,
5537    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5538        let effects = self
5539            .get_transaction_cache_reader()
5540            .get_executed_effects(transaction_digest);
5541        match effects {
5542            Some(effects) => {
5543                // If the transaction was executed in previous epochs, the validator will
5544                // re-sign the effects with new current epoch so that a client is always able to
5545                // obtain an effects certificate at the current epoch.
5546                //
5547                // Why is this necessary? Consider the following case:
5548                // - assume there are 4 validators
5549                // - Quorum driver gets 2 signed effects before reconfig halt
5550                // - The tx makes it into final checkpoint.
5551                // - 2 validators go away and are replaced in the new epoch.
5552                // - The new epoch begins.
5553                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5554                //   because it may not be able to reach either of the 2 former validators.
5555                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5556                //   epoch, the QD can make a new effects cert and return it to the client.
5557                //
5558                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5559                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5560                // the case above, the Quorum Driver would return a proof of inclusion in the final
5561                // checkpoint, and this code would no longer be necessary.
5562                if effects.executed_epoch() != epoch_store.epoch() {
5563                    debug!(
5564                        tx_digest=?transaction_digest,
5565                        effects_epoch=?effects.executed_epoch(),
5566                        epoch=?epoch_store.epoch(),
5567                        "Re-signing the effects with the current epoch"
5568                    );
5569                }
5570                Ok(Some(self.sign_effects(effects, epoch_store)?))
5571            }
5572            None => Ok(None),
5573        }
5574    }
5575
5576    #[instrument(level = "trace", skip_all)]
5577    pub(crate) fn sign_effects(
5578        &self,
5579        effects: TransactionEffects,
5580        epoch_store: &Arc<AuthorityPerEpochStore>,
5581    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5582        let tx_digest = *effects.transaction_digest();
5583        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5584            Some(sig) => {
5585                debug_assert!(sig.epoch == epoch_store.epoch());
5586                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5587            }
5588            _ => {
5589                let sig = AuthoritySignInfo::new(
5590                    epoch_store.epoch(),
5591                    &effects,
5592                    Intent::sui_app(IntentScope::TransactionEffects),
5593                    self.name,
5594                    &*self.secret,
5595                );
5596
5597                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5598
5599                epoch_store.insert_effects_digest_and_signature(
5600                    &tx_digest,
5601                    effects.digest(),
5602                    &sig,
5603                )?;
5604
5605                effects
5606            }
5607        };
5608
5609        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5610            signed_effects,
5611        ))
5612    }
5613
5614    // Returns coin objects for indexing for fullnode if indexing is enabled.
5615    #[instrument(level = "trace", skip_all)]
5616    fn fullnode_only_get_tx_coins_for_indexing(
5617        name: AuthorityName,
5618        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5619        effects: &TransactionEffects,
5620        inner_temporary_store: &InnerTemporaryStore,
5621        epoch_store: &Arc<AuthorityPerEpochStore>,
5622    ) -> Option<TxCoins> {
5623        if epoch_store.committee().authority_exists(&name) {
5624            return None;
5625        }
5626        let written_coin_objects = inner_temporary_store
5627            .written
5628            .iter()
5629            .filter_map(|(k, v)| {
5630                if v.is_coin() {
5631                    Some((*k, v.clone()))
5632                } else {
5633                    None
5634                }
5635            })
5636            .collect();
5637        let mut input_coin_objects = inner_temporary_store
5638            .input_objects
5639            .iter()
5640            .filter_map(|(k, v)| {
5641                if v.is_coin() {
5642                    Some((*k, v.clone()))
5643                } else {
5644                    None
5645                }
5646            })
5647            .collect::<ObjectMap>();
5648
5649        // Check for receiving objects that were actually used and modified during execution. Their
5650        // updated version will already showup in "written_coins" but their input isn't included in
5651        // the set of input objects in a inner_temporary_store.
5652        for (object_id, version) in effects.modified_at_versions() {
5653            if inner_temporary_store
5654                .loaded_runtime_objects
5655                .contains_key(&object_id)
5656                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5657                && object.is_coin()
5658            {
5659                input_coin_objects.insert(object_id, object);
5660            }
5661        }
5662
5663        Some((input_coin_objects, written_coin_objects))
5664    }
5665
5666    /// Get the TransactionEnvelope that currently locks the given object, if any.
5667    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5668    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5669    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5670    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5671    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5672    ///     or cannot find the transaction in transaction table, because of data race etc.
5673    #[instrument(level = "trace", skip_all)]
5674    pub async fn get_transaction_lock(
5675        &self,
5676        object_ref: &ObjectRef,
5677        epoch_store: &AuthorityPerEpochStore,
5678    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5679        let lock_info = self
5680            .get_object_cache_reader()
5681            .get_lock(*object_ref, epoch_store)?;
5682        let lock_info = match lock_info {
5683            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5684                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5685                    provided_obj_ref: *object_ref,
5686                    current_version: locked_ref.1,
5687                }
5688                .into());
5689            }
5690            ObjectLockStatus::Initialized => {
5691                return Ok(None);
5692            }
5693            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5694        };
5695
5696        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5697    }
5698
5699    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5700        self.get_object_cache_reader().get_objects(objects)
5701    }
5702
5703    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5704        self.get_object_cache_reader()
5705            .get_latest_object_ref_or_tombstone(object_id)
5706    }
5707
5708    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5709    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5710    ///
5711    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5712    /// will go through with only 2f+1 votes.
5713    ///
5714    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5715    /// value), or you risk halting the chain.
5716    pub fn set_override_protocol_upgrade_buffer_stake(
5717        &self,
5718        expected_epoch: EpochId,
5719        buffer_stake_bps: u64,
5720    ) -> SuiResult {
5721        let epoch_store = self.load_epoch_store_one_call_per_task();
5722        let actual_epoch = epoch_store.epoch();
5723        if actual_epoch != expected_epoch {
5724            return Err(SuiErrorKind::WrongEpoch {
5725                expected_epoch,
5726                actual_epoch,
5727            }
5728            .into());
5729        }
5730
5731        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5732    }
5733
5734    pub fn clear_override_protocol_upgrade_buffer_stake(
5735        &self,
5736        expected_epoch: EpochId,
5737    ) -> SuiResult {
5738        let epoch_store = self.load_epoch_store_one_call_per_task();
5739        let actual_epoch = epoch_store.epoch();
5740        if actual_epoch != expected_epoch {
5741            return Err(SuiErrorKind::WrongEpoch {
5742                expected_epoch,
5743                actual_epoch,
5744            }
5745            .into());
5746        }
5747
5748        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5749    }
5750
5751    /// Get the set of system packages that are compiled in to this build, if those packages are
5752    /// compatible with the current versions of those packages on-chain.
5753    pub async fn get_available_system_packages(
5754        &self,
5755        binary_config: &BinaryConfig,
5756    ) -> Vec<ObjectRef> {
5757        let mut results = vec![];
5758
5759        let system_packages = BuiltInFramework::iter_system_packages();
5760
5761        // Add extra framework packages during simtest
5762        #[cfg(msim)]
5763        let extra_packages = framework_injection::get_extra_packages(self.name);
5764        #[cfg(msim)]
5765        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5766
5767        for system_package in system_packages {
5768            let modules = system_package.modules().to_vec();
5769            // In simtests, we could override the current built-in framework packages.
5770            #[cfg(msim)]
5771            let modules =
5772                match framework_injection::get_override_modules(&system_package.id, self.name) {
5773                    Some(overrides) if overrides.is_empty() => continue,
5774                    Some(overrides) => overrides,
5775                    None => modules,
5776                };
5777
5778            let Some(obj_ref) = sui_framework::compare_system_package(
5779                &self.get_object_store(),
5780                &system_package.id,
5781                &modules,
5782                system_package.dependencies.to_vec(),
5783                binary_config,
5784            )
5785            .await
5786            else {
5787                return vec![];
5788            };
5789            results.push(obj_ref);
5790        }
5791
5792        results
5793    }
5794
5795    /// Return the new versions, module bytes, and dependencies for the packages that have been
5796    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5797    /// the binary, and performs the following checks:
5798    ///
5799    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5800    ///   required, and its contents are omitted from the output.
5801    /// - Whether the contents in the binary can form a package whose digest matches the input,
5802    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5803    ///   which case the contents are included in the output.
5804    ///
5805    /// If the current version of the framework can't be loaded, the binary does not contain the
5806    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5807    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5808    async fn get_system_package_bytes(
5809        &self,
5810        system_packages: Vec<ObjectRef>,
5811        binary_config: &BinaryConfig,
5812    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5813        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5814        let objects = self.get_objects(&ids).await;
5815
5816        let mut res = Vec::with_capacity(system_packages.len());
5817        for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5818        {
5819            let prev_transaction = match object {
5820                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5821                    // Skip this one because it doesn't need to be upgraded.
5822                    info!("Framework {} does not need updating", system_package_ref.0);
5823                    continue;
5824                }
5825
5826                Some(cur_object) => cur_object.previous_transaction,
5827                None => TransactionDigest::genesis_marker(),
5828            };
5829
5830            #[cfg(msim)]
5831            let SystemPackage {
5832                id: _,
5833                bytes,
5834                dependencies,
5835            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5836                .unwrap_or_else(|| {
5837                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5838                });
5839
5840            #[cfg(not(msim))]
5841            let SystemPackage {
5842                id: _,
5843                bytes,
5844                dependencies,
5845            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5846
5847            let modules: Vec<_> = bytes
5848                .iter()
5849                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5850                .collect();
5851
5852            let new_object = Object::new_system_package(
5853                &modules,
5854                system_package_ref.1,
5855                dependencies.clone(),
5856                prev_transaction,
5857            );
5858
5859            let new_ref = new_object.compute_object_reference();
5860            if new_ref != system_package_ref {
5861                if cfg!(msim) {
5862                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5863                    debug_fatal!(
5864                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5865                    );
5866                } else {
5867                    error!(
5868                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5869                    );
5870                }
5871                return None;
5872            }
5873
5874            res.push((system_package_ref.1, bytes, dependencies));
5875        }
5876
5877        Some(res)
5878    }
5879
5880    fn is_protocol_version_supported_v2(
5881        current_protocol_version: ProtocolVersion,
5882        proposed_protocol_version: ProtocolVersion,
5883        protocol_config: &ProtocolConfig,
5884        committee: &Committee,
5885        capabilities: Vec<AuthorityCapabilitiesV2>,
5886        mut buffer_stake_bps: u64,
5887    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5888        if proposed_protocol_version > current_protocol_version + 1
5889            && !protocol_config.advance_to_highest_supported_protocol_version()
5890        {
5891            return None;
5892        }
5893
5894        if buffer_stake_bps > 10000 {
5895            warn!("clamping buffer_stake_bps to 10000");
5896            buffer_stake_bps = 10000;
5897        }
5898
5899        // For each validator, gather the protocol version and system packages that it would like
5900        // to upgrade to in the next epoch.
5901        let mut desired_upgrades: Vec<_> = capabilities
5902            .into_iter()
5903            .filter_map(|mut cap| {
5904                // A validator that lists no packages is voting against any change at all.
5905                if cap.available_system_packages.is_empty() {
5906                    return None;
5907                }
5908
5909                cap.available_system_packages.sort();
5910
5911                info!(
5912                    "validator {:?} supports {:?} with system packages: {:?}",
5913                    cap.authority.concise(),
5914                    cap.supported_protocol_versions,
5915                    cap.available_system_packages,
5916                );
5917
5918                // A validator that only supports the current protocol version is also voting
5919                // against any change, because framework upgrades always require a protocol version
5920                // bump.
5921                cap.supported_protocol_versions
5922                    .get_version_digest(proposed_protocol_version)
5923                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5924            })
5925            .collect();
5926
5927        // There can only be one set of votes that have a majority, find one if it exists.
5928        desired_upgrades.sort();
5929        desired_upgrades
5930            .into_iter()
5931            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5932            .into_iter()
5933            .find_map(|((digest, packages), group)| {
5934                // should have been filtered out earlier.
5935                assert!(!packages.is_empty());
5936
5937                let mut stake_aggregator: StakeAggregator<(), true> =
5938                    StakeAggregator::new(Arc::new(committee.clone()));
5939
5940                for (_, _, authority) in group {
5941                    stake_aggregator.insert_generic(authority, ());
5942                }
5943
5944                let total_votes = stake_aggregator.total_votes();
5945                let quorum_threshold = committee.quorum_threshold();
5946                let f = committee.total_votes() - committee.quorum_threshold();
5947
5948                // multiple by buffer_stake_bps / 10000, rounded up.
5949                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5950                let effective_threshold = quorum_threshold + buffer_stake;
5951
5952                info!(
5953                    protocol_config_digest = ?digest,
5954                    ?total_votes,
5955                    ?quorum_threshold,
5956                    ?buffer_stake_bps,
5957                    ?effective_threshold,
5958                    ?proposed_protocol_version,
5959                    ?packages,
5960                    "support for upgrade"
5961                );
5962
5963                let has_support = total_votes >= effective_threshold;
5964                has_support.then_some((proposed_protocol_version, packages))
5965            })
5966    }
5967
5968    fn choose_protocol_version_and_system_packages_v2(
5969        current_protocol_version: ProtocolVersion,
5970        protocol_config: &ProtocolConfig,
5971        committee: &Committee,
5972        capabilities: Vec<AuthorityCapabilitiesV2>,
5973        buffer_stake_bps: u64,
5974    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5975        assert!(protocol_config.authority_capabilities_v2());
5976        let mut next_protocol_version = current_protocol_version;
5977        let mut system_packages = vec![];
5978
5979        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5980            current_protocol_version,
5981            next_protocol_version + 1,
5982            protocol_config,
5983            committee,
5984            capabilities.clone(),
5985            buffer_stake_bps,
5986        ) {
5987            next_protocol_version = version;
5988            system_packages = packages;
5989        }
5990
5991        (next_protocol_version, system_packages)
5992    }
5993
5994    #[instrument(level = "debug", skip_all)]
5995    fn create_authenticator_state_tx(
5996        &self,
5997        epoch_store: &Arc<AuthorityPerEpochStore>,
5998    ) -> Option<EndOfEpochTransactionKind> {
5999        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
6000            info!("authenticator state transactions not enabled");
6001            return None;
6002        }
6003
6004        let authenticator_state_exists = epoch_store.authenticator_state_exists();
6005        let tx = if authenticator_state_exists {
6006            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
6007            let min_epoch =
6008                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
6009            let authenticator_obj_initial_shared_version = epoch_store
6010                .epoch_start_config()
6011                .authenticator_obj_initial_shared_version()
6012                .expect("initial version must exist");
6013
6014            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
6015                min_epoch,
6016                authenticator_obj_initial_shared_version,
6017            );
6018
6019            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
6020
6021            tx
6022        } else {
6023            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
6024            info!("Creating AuthenticatorStateCreate tx");
6025            tx
6026        };
6027        Some(tx)
6028    }
6029
6030    #[instrument(level = "debug", skip_all)]
6031    fn create_randomness_state_tx(
6032        &self,
6033        epoch_store: &Arc<AuthorityPerEpochStore>,
6034    ) -> Option<EndOfEpochTransactionKind> {
6035        if !epoch_store.protocol_config().random_beacon() {
6036            info!("randomness state transactions not enabled");
6037            return None;
6038        }
6039
6040        if epoch_store.randomness_state_exists() {
6041            return None;
6042        }
6043
6044        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
6045        info!("Creating RandomnessStateCreate tx");
6046        Some(tx)
6047    }
6048
6049    #[instrument(level = "debug", skip_all)]
6050    fn create_accumulator_root_tx(
6051        &self,
6052        epoch_store: &Arc<AuthorityPerEpochStore>,
6053    ) -> Option<EndOfEpochTransactionKind> {
6054        if !epoch_store
6055            .protocol_config()
6056            .create_root_accumulator_object()
6057        {
6058            info!("accumulator root creation not enabled");
6059            return None;
6060        }
6061
6062        if epoch_store.accumulator_root_exists() {
6063            return None;
6064        }
6065
6066        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
6067        info!("Creating AccumulatorRootCreate tx");
6068        Some(tx)
6069    }
6070
6071    #[instrument(level = "debug", skip_all)]
6072    fn create_write_accumulator_storage_cost_tx(
6073        &self,
6074        epoch_store: &Arc<AuthorityPerEpochStore>,
6075    ) -> Option<EndOfEpochTransactionKind> {
6076        if !epoch_store.accumulator_root_exists() {
6077            info!("accumulator root does not exist yet");
6078            return None;
6079        }
6080        if !epoch_store.protocol_config().enable_accumulators() {
6081            info!("accumulators not enabled");
6082            return None;
6083        }
6084
6085        let object_store = self.get_object_store();
6086        let object_count =
6087            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6088                Ok(Some(count)) => count,
6089                Ok(None) => return None,
6090                Err(e) => {
6091                    fatal!("failed to read accumulator object count: {e}");
6092                }
6093            };
6094
6095        let storage_cost = object_count.saturating_mul(
6096            epoch_store
6097                .protocol_config()
6098                .accumulator_object_storage_cost(),
6099        );
6100
6101        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6102        info!(
6103            object_count,
6104            storage_cost, "Creating WriteAccumulatorStorageCost tx"
6105        );
6106        Some(tx)
6107    }
6108
6109    #[instrument(level = "debug", skip_all)]
6110    fn create_coin_registry_tx(
6111        &self,
6112        epoch_store: &Arc<AuthorityPerEpochStore>,
6113    ) -> Option<EndOfEpochTransactionKind> {
6114        if !epoch_store.protocol_config().enable_coin_registry() {
6115            info!("coin registry not enabled");
6116            return None;
6117        }
6118
6119        if epoch_store.coin_registry_exists() {
6120            return None;
6121        }
6122
6123        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6124        info!("Creating CoinRegistryCreate tx");
6125        Some(tx)
6126    }
6127
6128    #[instrument(level = "debug", skip_all)]
6129    fn create_display_registry_tx(
6130        &self,
6131        epoch_store: &Arc<AuthorityPerEpochStore>,
6132    ) -> Option<EndOfEpochTransactionKind> {
6133        if !epoch_store.protocol_config().enable_display_registry() {
6134            info!("display registry not enabled");
6135            return None;
6136        }
6137
6138        if epoch_store.display_registry_exists() {
6139            return None;
6140        }
6141
6142        let tx = EndOfEpochTransactionKind::new_display_registry_create();
6143        info!("Creating DisplayRegistryCreate tx");
6144        Some(tx)
6145    }
6146
6147    #[instrument(level = "debug", skip_all)]
6148    fn create_bridge_tx(
6149        &self,
6150        epoch_store: &Arc<AuthorityPerEpochStore>,
6151    ) -> Option<EndOfEpochTransactionKind> {
6152        if !epoch_store.protocol_config().enable_bridge() {
6153            info!("bridge not enabled");
6154            return None;
6155        }
6156        if epoch_store.bridge_exists() {
6157            return None;
6158        }
6159        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6160        info!("Creating Bridge Create tx");
6161        Some(tx)
6162    }
6163
6164    #[instrument(level = "debug", skip_all)]
6165    fn init_bridge_committee_tx(
6166        &self,
6167        epoch_store: &Arc<AuthorityPerEpochStore>,
6168    ) -> Option<EndOfEpochTransactionKind> {
6169        if !epoch_store.protocol_config().enable_bridge() {
6170            info!("bridge not enabled");
6171            return None;
6172        }
6173        if !epoch_store
6174            .protocol_config()
6175            .should_try_to_finalize_bridge_committee()
6176        {
6177            info!("should not try to finalize bridge committee yet");
6178            return None;
6179        }
6180        // Only create this transaction if bridge exists
6181        if !epoch_store.bridge_exists() {
6182            return None;
6183        }
6184
6185        if epoch_store.bridge_committee_initiated() {
6186            return None;
6187        }
6188
6189        let bridge_initial_shared_version = epoch_store
6190            .epoch_start_config()
6191            .bridge_obj_initial_shared_version()
6192            .expect("initial version must exist");
6193        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6194        info!("Init Bridge committee tx");
6195        Some(tx)
6196    }
6197
6198    #[instrument(level = "debug", skip_all)]
6199    fn create_deny_list_state_tx(
6200        &self,
6201        epoch_store: &Arc<AuthorityPerEpochStore>,
6202    ) -> Option<EndOfEpochTransactionKind> {
6203        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6204            return None;
6205        }
6206
6207        if epoch_store.coin_deny_list_state_exists() {
6208            return None;
6209        }
6210
6211        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6212        info!("Creating DenyListStateCreate tx");
6213        Some(tx)
6214    }
6215
6216    #[instrument(level = "debug", skip_all)]
6217    fn create_address_alias_state_tx(
6218        &self,
6219        epoch_store: &Arc<AuthorityPerEpochStore>,
6220    ) -> Option<EndOfEpochTransactionKind> {
6221        if !epoch_store.protocol_config().address_aliases() {
6222            info!("address aliases not enabled");
6223            return None;
6224        }
6225
6226        if epoch_store.address_alias_state_exists() {
6227            return None;
6228        }
6229
6230        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6231        info!("Creating AddressAliasStateCreate tx");
6232        Some(tx)
6233    }
6234
6235    #[instrument(level = "debug", skip_all)]
6236    fn create_execution_time_observations_tx(
6237        &self,
6238        epoch_store: &Arc<AuthorityPerEpochStore>,
6239        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6240        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6241    ) -> Option<EndOfEpochTransactionKind> {
6242        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6243            .protocol_config()
6244            .per_object_congestion_control_mode()
6245        else {
6246            return None;
6247        };
6248
6249        // Load tx in the last N checkpoints before end-of-epoch, and save only the
6250        // execution time observations for commands in these checkpoints.
6251        let start_checkpoint = std::cmp::max(
6252            last_checkpoint_before_end_of_epoch
6253                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6254            // If we have <N checkpoints in the epoch, use all of them.
6255            epoch_store
6256                .epoch()
6257                .checked_sub(1)
6258                .map(|prev_epoch| {
6259                    self.checkpoint_store
6260                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
6261                        .expect("typed store must not fail")
6262                        .expect(
6263                            "sequence number of last checkpoint of preceding epoch must be saved",
6264                        )
6265                        + 1
6266                })
6267                .unwrap_or(1),
6268        );
6269        info!(
6270            "reading checkpoint range {:?}..={:?}",
6271            start_checkpoint, last_checkpoint_before_end_of_epoch
6272        );
6273        let sequence_numbers =
6274            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6275        let contents_digests: Vec<_> = self
6276            .checkpoint_store
6277            .multi_get_locally_computed_checkpoints(&sequence_numbers)
6278            .expect("typed store must not fail")
6279            .into_iter()
6280            .zip_debug_eq(sequence_numbers)
6281            .map(|(maybe_checkpoint, sequence_number)| {
6282                if let Some(checkpoint) = maybe_checkpoint {
6283                    checkpoint.content_digest
6284                } else {
6285                    // If locally computed checkpoint summary was already pruned, load the
6286                    // certified checkpoint.
6287                    self.checkpoint_store
6288                        .get_checkpoint_by_sequence_number(sequence_number)
6289                        .expect("typed store must not fail")
6290                        .unwrap_or_else(|| {
6291                            fatal!("preceding checkpoints must exist by end of epoch")
6292                        })
6293                        .data()
6294                        .content_digest
6295                }
6296            })
6297            .collect();
6298        let tx_digests: Vec<_> = self
6299            .checkpoint_store
6300            .multi_get_checkpoint_content(&contents_digests)
6301            .expect("typed store must not fail")
6302            .into_iter()
6303            .flat_map(|maybe_contents| {
6304                maybe_contents
6305                    .expect("preceding checkpoint contents must exist by end of epoch")
6306                    .into_inner()
6307                    .into_iter()
6308                    .map(|digests| digests.transaction)
6309            })
6310            .collect();
6311        let included_execution_time_observations: HashSet<_> = self
6312            .get_transaction_cache_reader()
6313            .multi_get_transaction_blocks(&tx_digests)
6314            .into_iter()
6315            .flat_map(|maybe_tx| {
6316                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6317                    .expect("preceding transaction must exist by end of epoch")
6318                    .transaction_data()
6319                    .kind()
6320                {
6321                    #[allow(clippy::unnecessary_to_owned)]
6322                    itertools::Either::Left(
6323                        ptb.commands
6324                            .to_owned()
6325                            .into_iter()
6326                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6327                    )
6328                } else {
6329                    itertools::Either::Right(std::iter::empty())
6330                }
6331            })
6332            .chain(end_of_epoch_observation_keys.into_iter())
6333            .collect();
6334
6335        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6336            epoch_store
6337                .get_end_of_epoch_execution_time_observations()
6338                .filter_and_sort_v1(
6339                    |(key, _)| included_execution_time_observations.contains(key),
6340                    params.stored_observations_limit.try_into().unwrap(),
6341                ),
6342        );
6343        info!("Creating StoreExecutionTimeObservations tx");
6344        Some(tx)
6345    }
6346
6347    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6348    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6349    /// formed and executed by CheckpointExecutor.
6350    ///
6351    /// When a framework upgraded has been decided on, but the validator does not have the new
6352    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6353    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6354    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6355    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6356    /// by CheckpointExecutor.
6357    #[instrument(level = "error", skip_all)]
6358    pub async fn create_and_execute_advance_epoch_tx(
6359        &self,
6360        epoch_store: &Arc<AuthorityPerEpochStore>,
6361        gas_cost_summary: &GasCostSummary,
6362        checkpoint: CheckpointSequenceNumber,
6363        epoch_start_timestamp_ms: CheckpointTimestamp,
6364        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6365        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6366        // >1 checkpoint.
6367        last_checkpoint: CheckpointSequenceNumber,
6368    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6369        let mut txns = Vec::new();
6370
6371        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6372            txns.push(tx);
6373        }
6374        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6375            txns.push(tx);
6376        }
6377        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6378            txns.push(tx);
6379        }
6380        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6381            txns.push(tx);
6382        }
6383        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6384            txns.push(tx);
6385        }
6386        if let Some(tx) = self.create_execution_time_observations_tx(
6387            epoch_store,
6388            end_of_epoch_observation_keys,
6389            last_checkpoint,
6390        ) {
6391            txns.push(tx);
6392        }
6393        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6394            txns.push(tx);
6395        }
6396
6397        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6398            txns.push(tx);
6399        }
6400        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6401            txns.push(tx);
6402        }
6403        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6404            txns.push(tx);
6405        }
6406        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6407            txns.push(tx);
6408        }
6409
6410        let next_epoch = epoch_store.epoch() + 1;
6411
6412        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6413
6414        let (next_epoch_protocol_version, next_epoch_system_packages) =
6415            Self::choose_protocol_version_and_system_packages_v2(
6416                epoch_store.protocol_version(),
6417                epoch_store.protocol_config(),
6418                epoch_store.committee(),
6419                epoch_store
6420                    .get_capabilities_v2()
6421                    .expect("read capabilities from db cannot fail"),
6422                buffer_stake_bps,
6423            );
6424
6425        // since system packages are created during the current epoch, they should abide by the
6426        // rules of the current epoch, including the current epoch's max Move binary format version
6427        let config = epoch_store.protocol_config();
6428        let binary_config = config.binary_config(None);
6429        let Some(next_epoch_system_package_bytes) = self
6430            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6431            .await
6432        else {
6433            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6434                // This case should only be hit if the validator supports the new protocol version,
6435                // but carries a different framework. The validator should still be able to
6436                // reconfigure, as the correct framework will be installed by the change epoch txn.
6437                debug_fatal!(
6438                    "upgraded system packages {:?} are not locally available, cannot create \
6439                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6440                    next_epoch_system_packages
6441                );
6442            } else {
6443                error!(
6444                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6445                    next_epoch_protocol_version
6446                );
6447            }
6448            // the checkpoint builder will keep retrying forever when it hits this error.
6449            // Eventually, one of two things will happen:
6450            // - The operator will upgrade this binary to one that has the new packages locally,
6451            //   and this function will succeed.
6452            // - The final checkpoint will be certified by other validators, we will receive it via
6453            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6454            //   and most likely shut down in the new epoch (this validator likely doesn't support
6455            //   the new protocol version, or else it should have had the packages.)
6456            return Err(CheckpointBuilderError::SystemPackagesMissing);
6457        };
6458
6459        let tx = if epoch_store
6460            .protocol_config()
6461            .end_of_epoch_transaction_supported()
6462        {
6463            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6464                next_epoch,
6465                next_epoch_protocol_version,
6466                gas_cost_summary.storage_cost,
6467                gas_cost_summary.computation_cost,
6468                gas_cost_summary.storage_rebate,
6469                gas_cost_summary.non_refundable_storage_fee,
6470                epoch_start_timestamp_ms,
6471                next_epoch_system_package_bytes,
6472            ));
6473
6474            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6475        } else {
6476            VerifiedTransaction::new_change_epoch(
6477                next_epoch,
6478                next_epoch_protocol_version,
6479                gas_cost_summary.storage_cost,
6480                gas_cost_summary.computation_cost,
6481                gas_cost_summary.storage_rebate,
6482                gas_cost_summary.non_refundable_storage_fee,
6483                epoch_start_timestamp_ms,
6484                next_epoch_system_package_bytes,
6485            )
6486        };
6487
6488        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6489            tx.clone(),
6490            epoch_store.epoch(),
6491            checkpoint,
6492        );
6493
6494        let tx_digest = executable_tx.digest();
6495
6496        info!(
6497            ?next_epoch,
6498            ?next_epoch_protocol_version,
6499            ?next_epoch_system_packages,
6500            computation_cost=?gas_cost_summary.computation_cost,
6501            storage_cost=?gas_cost_summary.storage_cost,
6502            storage_rebate=?gas_cost_summary.storage_rebate,
6503            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6504            ?tx_digest,
6505            "Creating advance epoch transaction"
6506        );
6507
6508        fail_point_async!("change_epoch_tx_delay");
6509        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6510
6511        // The tx could have been executed by state sync already - if so simply return an error.
6512        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6513        if self
6514            .get_transaction_cache_reader()
6515            .is_tx_already_executed(tx_digest)
6516        {
6517            warn!("change epoch tx has already been executed via state sync");
6518            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6519        }
6520
6521        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6522        else {
6523            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6524        };
6525
6526        // We must manually assign the shared object versions to the transaction before executing it.
6527        // This is because we do not sequence end-of-epoch transactions through consensus.
6528        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6529            self.get_object_cache_reader().as_ref(),
6530            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6531        )?;
6532
6533        assert_eq!(assigned_versions.0.len(), 1);
6534        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6535
6536        let input_objects = self.read_objects_for_execution(
6537            &tx_lock,
6538            &executable_tx,
6539            &assigned_versions,
6540            epoch_store,
6541        )?;
6542
6543        let (transaction_outputs, _timings, _execution_error_opt) = self
6544            .execute_certificate(
6545                &execution_guard,
6546                &executable_tx,
6547                input_objects,
6548                None,
6549                ExecutionEnv::default(),
6550                epoch_store,
6551            )
6552            .unwrap();
6553        let system_obj = get_sui_system_state(&transaction_outputs.written)
6554            .expect("change epoch tx must write to system object");
6555
6556        let effects = transaction_outputs.effects;
6557        // We must write tx and effects to the state sync tables so that state sync is able to
6558        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6559        // checkpoint.
6560        self.get_state_sync_store()
6561            .insert_transaction_and_effects(&tx, &effects);
6562
6563        info!(
6564            "Effects summary of the change epoch transaction: {:?}",
6565            effects.summary_for_debug()
6566        );
6567        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6568        // The change epoch transaction cannot fail to execute.
6569        assert!(effects.status().is_ok());
6570        Ok((system_obj, effects))
6571    }
6572
6573    #[instrument(level = "error", skip_all)]
6574    async fn reopen_epoch_db(
6575        &self,
6576        cur_epoch_store: &AuthorityPerEpochStore,
6577        new_committee: Committee,
6578        epoch_start_configuration: EpochStartConfiguration,
6579        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6580        epoch_last_checkpoint: CheckpointSequenceNumber,
6581    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6582        let new_epoch = new_committee.epoch;
6583        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6584        assert_eq!(
6585            epoch_start_configuration.epoch_start_state().epoch(),
6586            new_committee.epoch
6587        );
6588        fail_point!("before-open-new-epoch-store");
6589        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6590            self.name,
6591            new_committee,
6592            epoch_start_configuration,
6593            self.get_backing_package_store().clone(),
6594            self.get_object_store().clone(),
6595            expensive_safety_check_config,
6596            epoch_last_checkpoint,
6597            self.config.fullnode_sync_mode,
6598        )?;
6599        self.epoch_store.store(new_epoch_store.clone());
6600        Ok(new_epoch_store)
6601    }
6602
6603    #[cfg(test)]
6604    pub(crate) fn iter_live_object_set_for_testing(
6605        &self,
6606    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6607        let include_wrapped_object = !self
6608            .epoch_store_for_testing()
6609            .protocol_config()
6610            .simplified_unwrap_then_delete();
6611        self.get_global_state_hash_store()
6612            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6613    }
6614
6615    #[cfg(test)]
6616    pub(crate) fn shutdown_execution_for_test(&self) {
6617        self.tx_execution_shutdown
6618            .lock()
6619            .take()
6620            .unwrap()
6621            .send(())
6622            .unwrap();
6623    }
6624
6625    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6626    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6627        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6628        self.get_object_cache_reader()
6629            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6630        self.get_reconfig_api()
6631            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6632        Ok(())
6633    }
6634}
6635
6636pub struct RandomnessRoundReceiver {
6637    authority_state: Arc<AuthorityState>,
6638    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6639}
6640
6641impl RandomnessRoundReceiver {
6642    pub fn spawn(
6643        authority_state: Arc<AuthorityState>,
6644        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6645    ) -> JoinHandle<()> {
6646        let rrr = RandomnessRoundReceiver {
6647            authority_state,
6648            randomness_rx,
6649        };
6650        spawn_monitored_task!(rrr.run())
6651    }
6652
6653    async fn run(mut self) {
6654        info!("RandomnessRoundReceiver event loop started");
6655
6656        loop {
6657            tokio::select! {
6658                maybe_recv = self.randomness_rx.recv() => {
6659                    if let Some((epoch, round, bytes)) = maybe_recv {
6660                        self.handle_new_randomness(epoch, round, bytes).await;
6661                    } else {
6662                        break;
6663                    }
6664                },
6665            }
6666        }
6667
6668        info!("RandomnessRoundReceiver event loop ended");
6669    }
6670
6671    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6672    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6673        fail_point_async!("randomness-delay");
6674
6675        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6676        if epoch_store.epoch() != epoch {
6677            warn!(
6678                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6679                epoch_store.epoch()
6680            );
6681            return;
6682        }
6683        let key = TransactionKey::RandomnessRound(epoch, round);
6684        let transaction = VerifiedTransaction::new_randomness_state_update(
6685            epoch,
6686            round,
6687            bytes,
6688            epoch_store
6689                .epoch_start_config()
6690                .randomness_obj_initial_shared_version()
6691                .expect("randomness state obj must exist"),
6692        );
6693        debug!(
6694            "created randomness state update transaction with digest: {:?}",
6695            transaction.digest()
6696        );
6697        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6698        let digest = *transaction.digest();
6699
6700        // Randomness state updates contain the full bls signature for the random round,
6701        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6702        // persist this transaction. If we crash before its outputs are committed, this
6703        // ensures we will be able to re-execute it.
6704        self.authority_state
6705            .get_cache_commit()
6706            .persist_transaction(&transaction);
6707
6708        // Notify the scheduler that the transaction key now has a known digest
6709        if epoch_store.insert_tx_key(key, digest).is_err() {
6710            warn!("epoch ended while handling new randomness");
6711        }
6712
6713        let authority_state = self.authority_state.clone();
6714        spawn_monitored_task!(async move {
6715            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6716            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6717            // output of the RandomnessStateUpdate from the previous round.)
6718            //
6719            // We set a very long timeout so that in case this gets stuck for some reason, the
6720            // validator will eventually crash rather than continuing in a zombie mode.
6721            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6722            let result = tokio::time::timeout(
6723                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6724                authority_state
6725                    .get_transaction_cache_reader()
6726                    .notify_read_executed_effects(
6727                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6728                        &[digest],
6729                    ),
6730            )
6731            .await;
6732            let mut effects = match result {
6733                Ok(result) => result,
6734                Err(_) => {
6735                    // Crash on randomness update execution timeout in debug builds.
6736                    debug_fatal_no_invariant!(
6737                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6738                    );
6739                    // Continue waiting as long as necessary in non-debug builds.
6740                    authority_state
6741                        .get_transaction_cache_reader()
6742                        .notify_read_executed_effects(
6743                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6744                            &[digest],
6745                        )
6746                        .await
6747                }
6748            };
6749
6750            let effects = effects.pop().expect("should return effects");
6751            if *effects.status() != ExecutionStatus::Success {
6752                fatal!(
6753                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6754                );
6755            }
6756            debug!(
6757                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6758            );
6759        });
6760    }
6761}
6762
6763#[async_trait]
6764impl TransactionKeyValueStoreTrait for AuthorityState {
6765    #[instrument(skip(self))]
6766    async fn multi_get(
6767        &self,
6768        transactions: &[TransactionDigest],
6769        effects: &[TransactionDigest],
6770    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6771        let txns = if !transactions.is_empty() {
6772            self.get_transaction_cache_reader()
6773                .multi_get_transaction_blocks(transactions)
6774                .into_iter()
6775                .map(|t| t.map(|t| (*t).clone().into_inner()))
6776                .collect()
6777        } else {
6778            vec![]
6779        };
6780
6781        let fx = if !effects.is_empty() {
6782            self.get_transaction_cache_reader()
6783                .multi_get_executed_effects(effects)
6784        } else {
6785            vec![]
6786        };
6787
6788        Ok((txns, fx))
6789    }
6790
6791    #[instrument(skip(self))]
6792    async fn multi_get_checkpoints(
6793        &self,
6794        checkpoint_summaries: &[CheckpointSequenceNumber],
6795        checkpoint_contents: &[CheckpointSequenceNumber],
6796        checkpoint_summaries_by_digest: &[CheckpointDigest],
6797    ) -> SuiResult<(
6798        Vec<Option<CertifiedCheckpointSummary>>,
6799        Vec<Option<CheckpointContents>>,
6800        Vec<Option<CertifiedCheckpointSummary>>,
6801    )> {
6802        // TODO: use multi-get methods if it ever becomes important (unlikely)
6803        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6804        let store = self.get_checkpoint_store();
6805        for seq in checkpoint_summaries {
6806            let checkpoint = store
6807                .get_checkpoint_by_sequence_number(*seq)?
6808                .map(|c| c.into_inner());
6809
6810            summaries.push(checkpoint);
6811        }
6812
6813        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6814        for seq in checkpoint_contents {
6815            let checkpoint = store
6816                .get_checkpoint_by_sequence_number(*seq)?
6817                .and_then(|summary| {
6818                    store
6819                        .get_checkpoint_contents(&summary.content_digest)
6820                        .expect("db read cannot fail")
6821                });
6822            contents.push(checkpoint);
6823        }
6824
6825        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6826        for digest in checkpoint_summaries_by_digest {
6827            let checkpoint = store
6828                .get_checkpoint_by_digest(digest)?
6829                .map(|c| c.into_inner());
6830            summaries_by_digest.push(checkpoint);
6831        }
6832        Ok((summaries, contents, summaries_by_digest))
6833    }
6834
6835    #[instrument(skip(self))]
6836    async fn deprecated_get_transaction_checkpoint(
6837        &self,
6838        digest: TransactionDigest,
6839    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6840        Ok(self
6841            .get_checkpoint_cache()
6842            .deprecated_get_transaction_checkpoint(&digest)
6843            .map(|(_epoch, checkpoint)| checkpoint))
6844    }
6845
6846    #[instrument(skip(self))]
6847    async fn get_object(
6848        &self,
6849        object_id: ObjectID,
6850        version: VersionNumber,
6851    ) -> SuiResult<Option<Object>> {
6852        Ok(self
6853            .get_object_cache_reader()
6854            .get_object_by_key(&object_id, version))
6855    }
6856
6857    #[instrument(skip_all)]
6858    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6859        Ok(self
6860            .get_object_cache_reader()
6861            .multi_get_objects_by_key(object_keys))
6862    }
6863
6864    #[instrument(skip(self))]
6865    async fn multi_get_transaction_checkpoint(
6866        &self,
6867        digests: &[TransactionDigest],
6868    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6869        let res = self
6870            .get_checkpoint_cache()
6871            .deprecated_multi_get_transaction_checkpoint(digests);
6872
6873        Ok(res
6874            .into_iter()
6875            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6876            .collect())
6877    }
6878
6879    #[instrument(skip(self))]
6880    async fn multi_get_events_by_tx_digests(
6881        &self,
6882        digests: &[TransactionDigest],
6883    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6884        if digests.is_empty() {
6885            return Ok(vec![]);
6886        }
6887
6888        Ok(self
6889            .get_transaction_cache_reader()
6890            .multi_get_events(digests))
6891    }
6892}
6893
6894#[cfg(msim)]
6895pub mod framework_injection {
6896    use move_binary_format::CompiledModule;
6897    use std::collections::BTreeMap;
6898    use std::{cell::RefCell, collections::BTreeSet};
6899    use sui_framework::{BuiltInFramework, SystemPackage};
6900    use sui_types::base_types::{AuthorityName, ObjectID};
6901    use sui_types::is_system_package;
6902
6903    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6904
6905    // Thread local cache because all simtests run in a single unique thread.
6906    thread_local! {
6907        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6908    }
6909
6910    type Framework = Vec<CompiledModule>;
6911
6912    pub type PackageUpgradeCallback =
6913        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6914
6915    enum PackageOverrideConfig {
6916        Global(Framework),
6917        PerValidator(PackageUpgradeCallback),
6918    }
6919
6920    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6921        modules
6922            .iter()
6923            .map(|m| {
6924                let mut buf = Vec::new();
6925                m.serialize_with_version(m.version, &mut buf).unwrap();
6926                buf
6927            })
6928            .collect()
6929    }
6930
6931    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6932        OVERRIDE.with(|bs| {
6933            bs.borrow_mut()
6934                .insert(package_id, PackageOverrideConfig::Global(modules))
6935        });
6936    }
6937
6938    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6939        OVERRIDE.with(|bs| {
6940            bs.borrow_mut()
6941                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6942        });
6943    }
6944
6945    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6946        OVERRIDE.with(|bs| {
6947            let mut new_packages_not_to_include: BTreeSet<_> =
6948                BuiltInFramework::all_package_ids().into_iter().collect();
6949            for pkg in &packages {
6950                new_packages_not_to_include.remove(&pkg.id);
6951            }
6952            for pkg in packages {
6953                bs.borrow_mut()
6954                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6955            }
6956            for empty_pkg in new_packages_not_to_include {
6957                bs.borrow_mut()
6958                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6959            }
6960        });
6961    }
6962
6963    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6964        OVERRIDE.with(|cfg| {
6965            cfg.borrow().get(package_id).and_then(|entry| match entry {
6966                PackageOverrideConfig::Global(framework) => {
6967                    Some(compiled_modules_to_bytes(framework))
6968                }
6969                PackageOverrideConfig::PerValidator(func) => {
6970                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6971                }
6972            })
6973        })
6974    }
6975
6976    pub fn get_override_modules(
6977        package_id: &ObjectID,
6978        name: AuthorityName,
6979    ) -> Option<Vec<CompiledModule>> {
6980        OVERRIDE.with(|cfg| {
6981            cfg.borrow().get(package_id).and_then(|entry| match entry {
6982                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6983                PackageOverrideConfig::PerValidator(func) => func(name),
6984            })
6985        })
6986    }
6987
6988    pub fn get_override_system_package(
6989        package_id: &ObjectID,
6990        name: AuthorityName,
6991    ) -> Option<SystemPackage> {
6992        let bytes = get_override_bytes(package_id, name)?;
6993        let dependencies = if is_system_package(*package_id) {
6994            BuiltInFramework::get_package_by_id(package_id)
6995                .dependencies
6996                .to_vec()
6997        } else {
6998            // Assume that entirely new injected packages depend on all existing system packages.
6999            BuiltInFramework::all_package_ids()
7000        };
7001        Some(SystemPackage {
7002            id: *package_id,
7003            bytes,
7004            dependencies,
7005        })
7006    }
7007
7008    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
7009        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
7010        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
7011            cfg.borrow()
7012                .keys()
7013                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
7014                .collect()
7015        });
7016
7017        extra
7018            .into_iter()
7019            .map(|package| SystemPackage {
7020                id: package,
7021                bytes: get_override_bytes(&package, name).unwrap(),
7022                dependencies: BuiltInFramework::all_package_ids(),
7023            })
7024            .collect()
7025    }
7026}
7027
7028#[derive(Debug, Serialize, Deserialize, Clone)]
7029pub struct ObjDumpFormat {
7030    pub id: ObjectID,
7031    pub version: VersionNumber,
7032    pub digest: ObjectDigest,
7033    pub object: Object,
7034}
7035
7036impl ObjDumpFormat {
7037    fn new(object: Object) -> Self {
7038        let oref = object.compute_object_reference();
7039        Self {
7040            id: oref.0,
7041            version: oref.1,
7042            digest: oref.2,
7043            object,
7044        }
7045    }
7046}
7047
7048#[derive(Debug, Serialize, Deserialize, Clone)]
7049pub struct NodeStateDump {
7050    pub tx_digest: TransactionDigest,
7051    pub sender_signed_data: SenderSignedData,
7052    pub executed_epoch: u64,
7053    pub reference_gas_price: u64,
7054    pub protocol_version: u64,
7055    pub epoch_start_timestamp_ms: u64,
7056    pub computed_effects: TransactionEffects,
7057    pub expected_effects_digest: TransactionEffectsDigest,
7058    pub relevant_system_packages: Vec<ObjDumpFormat>,
7059    pub shared_objects: Vec<ObjDumpFormat>,
7060    pub loaded_child_objects: Vec<ObjDumpFormat>,
7061    pub modified_at_versions: Vec<ObjDumpFormat>,
7062    pub runtime_reads: Vec<ObjDumpFormat>,
7063    pub input_objects: Vec<ObjDumpFormat>,
7064}
7065
7066impl NodeStateDump {
7067    pub fn new(
7068        tx_digest: &TransactionDigest,
7069        effects: &TransactionEffects,
7070        expected_effects_digest: TransactionEffectsDigest,
7071        object_store: &dyn ObjectStore,
7072        epoch_store: &Arc<AuthorityPerEpochStore>,
7073        inner_temporary_store: &InnerTemporaryStore,
7074        certificate: &VerifiedExecutableTransaction,
7075    ) -> SuiResult<Self> {
7076        // Epoch info
7077        let executed_epoch = epoch_store.epoch();
7078        let reference_gas_price = epoch_store.reference_gas_price();
7079        let epoch_start_config = epoch_store.epoch_start_config();
7080        let protocol_version = epoch_store.protocol_version().as_u64();
7081        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7082
7083        // Record all system packages at this version
7084        let mut relevant_system_packages = Vec::new();
7085        for sys_package_id in BuiltInFramework::all_package_ids() {
7086            if let Some(w) = object_store.get_object(&sys_package_id) {
7087                relevant_system_packages.push(ObjDumpFormat::new(w))
7088            }
7089        }
7090
7091        // Record all the shared objects
7092        let mut shared_objects = Vec::new();
7093        for kind in effects.input_consensus_objects() {
7094            match kind {
7095                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7096                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7097                        shared_objects.push(ObjDumpFormat::new(w))
7098                    }
7099                }
7100                InputConsensusObject::ReadConsensusStreamEnded(..)
7101                | InputConsensusObject::MutateConsensusStreamEnded(..)
7102                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
7103            }
7104        }
7105
7106        // Record all loaded child objects
7107        // Child objects which are read but not mutated are not tracked anywhere else
7108        let mut loaded_child_objects = Vec::new();
7109        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7110            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7111                loaded_child_objects.push(ObjDumpFormat::new(w))
7112            }
7113        }
7114
7115        // Record all modified objects
7116        let mut modified_at_versions = Vec::new();
7117        for (id, ver) in effects.modified_at_versions() {
7118            if let Some(w) = object_store.get_object_by_key(&id, ver) {
7119                modified_at_versions.push(ObjDumpFormat::new(w))
7120            }
7121        }
7122
7123        // Packages read at runtime, which were not previously loaded into the temoorary store
7124        // Some packages may be fetched at runtime and wont show up in input objects
7125        let mut runtime_reads = Vec::new();
7126        for obj in inner_temporary_store
7127            .runtime_packages_loaded_from_db
7128            .values()
7129        {
7130            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7131        }
7132
7133        // All other input objects should already be in `inner_temporary_store.objects`
7134
7135        Ok(Self {
7136            tx_digest: *tx_digest,
7137            executed_epoch,
7138            reference_gas_price,
7139            epoch_start_timestamp_ms,
7140            protocol_version,
7141            relevant_system_packages,
7142            shared_objects,
7143            loaded_child_objects,
7144            modified_at_versions,
7145            runtime_reads,
7146            sender_signed_data: certificate.clone().into_message(),
7147            input_objects: inner_temporary_store
7148                .input_objects
7149                .values()
7150                .map(|o| ObjDumpFormat::new(o.clone()))
7151                .collect(),
7152            computed_effects: effects.clone(),
7153            expected_effects_digest,
7154        })
7155    }
7156
7157    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7158        let mut objects = Vec::new();
7159        objects.extend(self.relevant_system_packages.clone());
7160        objects.extend(self.shared_objects.clone());
7161        objects.extend(self.loaded_child_objects.clone());
7162        objects.extend(self.modified_at_versions.clone());
7163        objects.extend(self.runtime_reads.clone());
7164        objects.extend(self.input_objects.clone());
7165        objects
7166    }
7167
7168    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7169        let file_name = format!(
7170            "{}_{}_NODE_DUMP.json",
7171            self.tx_digest,
7172            AuthorityState::unixtime_now_ms()
7173        );
7174        let mut path = path.to_path_buf();
7175        path.push(&file_name);
7176        let mut file = File::create(path.clone())?;
7177        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7178        Ok(path)
7179    }
7180
7181    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7182        let file = File::open(path)?;
7183        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7184    }
7185}