sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_common::{assert_reachable, fatal};
39use nonempty::NonEmpty;
40use parking_lot::Mutex;
41use prometheus::{
42    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43    register_histogram_vec_with_registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62    collections::{HashMap, HashSet},
63    fs,
64    pin::Pin,
65    str::FromStr,
66    sync::Arc,
67    vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::Chain;
73use sui_protocol_config::PerObjectCongestionControlMode;
74use sui_types::accumulator_root::AccumulatorObjId;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::inner_temporary_store::PackageStoreWithFallback;
83use sui_types::layout_resolver::LayoutResolver;
84use sui_types::layout_resolver::into_struct_layout;
85use sui_types::messages_consensus::AuthorityCapabilitiesV2;
86use sui_types::node_role::NodeRole;
87use sui_types::object::bounded_visitor::BoundedVisitor;
88use sui_types::storage::ChildObjectResolver;
89use sui_types::storage::InputKey;
90use sui_types::storage::OverlayBackingPackageStore;
91use sui_types::storage::TrackingBackingStore;
92use sui_types::traffic_control::{
93    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
94};
95use sui_types::transaction_executor::SimulateTransactionResult;
96use sui_types::transaction_executor::TransactionChecks;
97use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
98use tap::TapFallible;
99use tokio::sync::RwLock;
100use tokio::sync::mpsc::unbounded_channel;
101use tokio::sync::oneshot;
102use tokio::sync::watch::error::RecvError;
103use tokio::time::timeout;
104use tracing::{debug, error, info, instrument, warn};
105
106use self::authority_store::ExecutionLockWriteGuard;
107use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
108pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
109use mysten_metrics::{monitored_scope, spawn_monitored_task};
110
111use crate::jsonrpc_index::IndexStore;
112use crate::jsonrpc_index::{
113    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
114};
115use mysten_common::debug_fatal;
116use shared_crypto::intent::{Intent, IntentScope};
117use sui_config::genesis::Genesis;
118use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
119use sui_framework::{BuiltInFramework, SystemPackage};
120use sui_json_rpc_types::{
121    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
122    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
123    SuiTransactionBlockEvents, TransactionFilter,
124};
125use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139    TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{InnerTemporaryStore, ObjectMap, TxCoins, WrittenObjects};
147use sui_types::message_envelope::Message;
148use sui_types::messages_checkpoint::{
149    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
150    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
151    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
152    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
153};
154use sui_types::messages_grpc::{
155    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
156    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
157};
158use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
159use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
160use sui_types::signature::GenericSignature;
161use sui_types::storage::{
162    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
163};
164use sui_types::sui_system_state::SuiSystemStateTrait;
165use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
166use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
167use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
168use sui_types::{
169    SUI_SYSTEM_ADDRESS,
170    base_types::*,
171    committee::Committee,
172    crypto::AuthoritySignature,
173    error::{SuiError, SuiResult},
174    object::{Object, ObjectRead},
175    transaction::*,
176};
177use sui_types::{TypeTag, is_system_package};
178use typed_store::TypedStoreError;
179use typed_store::rocks::StagedBatch;
180
181use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
182use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
183use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
184use crate::authority::authority_store_pruner::{
185    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
186};
187use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
188use crate::authority::epoch_start_configuration::EpochStartConfiguration;
189use crate::checkpoints::CheckpointStore;
190use crate::epoch::committee_store::CommitteeStore;
191use crate::execution_cache::{
192    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
193    ObjectCacheRead, StateSyncAPI,
194};
195use crate::execution_driver::execution_process;
196use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
197use crate::metrics::LatencyObserver;
198use crate::metrics::RateTracker;
199use crate::module_cache_metrics::ResolverMetrics;
200use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
201use crate::stake_aggregator::StakeAggregator;
202use crate::subscription_handler::SubscriptionHandler;
203use crate::transaction_input_loader::TransactionInputLoader;
204
205#[cfg(msim)]
206pub use crate::checkpoints::checkpoint_executor::utils::{
207    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
208};
209
210#[cfg(msim)]
211use sui_types::committee::CommitteeTrait;
212use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
213
214#[cfg(test)]
215#[path = "unit_tests/authority_tests.rs"]
216pub mod authority_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/transaction_tests.rs"]
220pub mod transaction_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_transaction_tests.rs"]
224mod batch_transaction_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/move_integration_tests.rs"]
228pub mod move_integration_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/gas_tests.rs"]
232mod gas_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/gas_data_tests.rs"]
236mod gas_data_tests;
237
238#[cfg(test)]
239#[path = "unit_tests/batch_verification_tests.rs"]
240mod batch_verification_tests;
241
242#[cfg(test)]
243#[path = "unit_tests/coin_deny_list_tests.rs"]
244mod coin_deny_list_tests;
245
246#[cfg(test)]
247#[path = "unit_tests/auth_unit_test_utils.rs"]
248pub mod auth_unit_test_utils;
249
250pub mod authority_test_utils;
251
252pub mod authority_per_epoch_store;
253pub mod authority_per_epoch_store_pruner;
254
255pub mod authority_store_pruner;
256pub mod authority_store_tables;
257pub mod authority_store_types;
258pub mod congestion_log;
259pub mod consensus_tx_status_cache;
260pub(crate) mod epoch_marker_key;
261pub mod epoch_start_configuration;
262pub mod execution_time_estimator;
263pub mod shared_object_congestion_tracker;
264pub mod shared_object_version_manager;
265pub mod submitted_transaction_cache;
266pub mod test_authority_builder;
267pub mod transaction_deferral;
268pub mod transaction_reject_reason_cache;
269mod weighted_moving_average;
270
271pub(crate) mod authority_store;
272pub mod backpressure;
273
274/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
275pub struct AuthorityMetrics {
276    tx_orders: IntCounter,
277    total_certs: IntCounter,
278    total_effects: IntCounter,
279    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
280    pub shared_obj_tx: IntCounter,
281    sponsored_tx: IntCounter,
282    tx_already_processed: IntCounter,
283    num_input_objs: Histogram,
284    // TODO: this tracks consensus object count, not just shared. Consider renaming.
285    num_shared_objects: Histogram,
286    batch_size: Histogram,
287
288    authority_state_handle_vote_transaction_latency: Histogram,
289
290    internal_execution_latency: Histogram,
291    execution_load_input_objects_latency: Histogram,
292    prepare_certificate_latency: Histogram,
293    commit_certificate_latency: Histogram,
294    db_checkpoint_latency: Histogram,
295
296    // TODO: Rename these metrics.
297    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
298    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
299    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
300    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
301
302    pub(crate) execution_driver_executed_transactions: IntCounter,
303    pub(crate) execution_driver_paused_transactions: IntCounter,
304    pub(crate) execution_driver_dispatch_queue: IntGauge,
305    pub(crate) execution_queueing_delay_s: Histogram,
306    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
307    pub(crate) execution_gas_latency_ratio: Histogram,
308
309    pub(crate) skipped_consensus_txns: IntCounter,
310    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
311    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
312
313    pub(crate) authority_overload_status: IntGauge,
314    pub(crate) authority_load_shedding_percentage: IntGauge,
315
316    pub(crate) transaction_overload_sources: IntCounterVec,
317
318    /// Post processing metrics
319    post_processing_total_events_emitted: IntCounter,
320    post_processing_total_tx_indexed: IntCounter,
321    post_processing_total_tx_had_event_processed: IntCounter,
322    post_processing_total_failures: IntCounter,
323
324    /// Consensus commit and transaction handler metrics
325    pub consensus_handler_processed: IntCounterVec,
326    pub consensus_handler_transaction_sizes: HistogramVec,
327    pub consensus_handler_deferred_transactions: IntCounter,
328    pub consensus_handler_congested_transactions: IntCounter,
329    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
330    pub consensus_handler_cancelled_transactions: IntCounter,
331    pub consensus_handler_max_object_costs: IntGaugeVec,
332    pub consensus_committed_subdags: IntCounterVec,
333    pub accumulator_deposits: IntCounter,
334    pub accumulator_withdrawals: IntCounter,
335    pub consensus_committed_messages: IntGaugeVec,
336    pub consensus_committed_user_transactions: IntGaugeVec,
337    pub consensus_finalized_user_transactions: IntGaugeVec,
338    pub consensus_rejected_user_transactions: IntGaugeVec,
339    pub consensus_calculated_throughput: IntGauge,
340    pub consensus_calculated_throughput_profile: IntGauge,
341    pub consensus_block_handler_block_processed: IntCounter,
342    pub consensus_block_handler_txn_processed: IntCounterVec,
343    pub consensus_block_handler_fastpath_executions: IntCounter,
344    pub consensus_timestamp_bias: Histogram,
345
346    pub execution_metrics: Arc<ExecutionMetrics>,
347
348    /// bytecode verifier metrics for tracking timeouts
349    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
350
351    /// Count of zklogin signatures
352    pub zklogin_sig_count: IntCounter,
353    /// Count of multisig signatures
354    pub multisig_sig_count: IntCounter,
355
356    // Tracks recent average txn queueing delay between when it is ready for execution
357    // until it starts executing.
358    pub execution_queueing_latency: LatencyObserver,
359
360    // Tracks the rate of transactions become ready for execution in transaction manager.
361    // The need for the Mutex is that the tracker is updated in transaction manager and read
362    // in the overload_monitor. There should be low mutex contention because
363    // transaction manager is single threaded and the read rate in overload_monitor is
364    // low. In the case where transaction manager becomes multi-threaded, we can
365    // create one rate tracker per thread.
366    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
367
368    // Tracks the rate of transactions starts execution in execution driver.
369    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
370    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
371}
372
373// Override default Prom buckets for positive numbers in 0-10M range
374const POSITIVE_INT_BUCKETS: &[f64] = &[
375    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
376    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
377    7000000., 10000000.,
378];
379
380const LATENCY_SEC_BUCKETS: &[f64] = &[
381    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
382    10., 20., 30., 60., 90.,
383];
384
385// Buckets for low latency samples. Starts from 10us.
386const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
387    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
388    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
389];
390
391// Buckets for consensus timestamp bias in seconds.
392// We expect 200-300ms, so we cluster the buckets more tightly in that range.
393const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
394    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
395    2.0, 10.0, 30.0,
396];
397
398const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
399    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
400    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
401];
402
403pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
404
405// Transaction author should have observed the input objects as finalized output,
406// so usually the wait does not need to be long.
407// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
408pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
409
410impl AuthorityMetrics {
411    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
412        Self {
413            tx_orders: register_int_counter_with_registry!(
414                "total_transaction_orders",
415                "Total number of transaction orders",
416                registry,
417            )
418            .unwrap(),
419            total_certs: register_int_counter_with_registry!(
420                "total_transaction_certificates",
421                "Total number of transaction certificates handled",
422                registry,
423            )
424            .unwrap(),
425            // total_effects == total transactions finished
426            total_effects: register_int_counter_with_registry!(
427                "total_transaction_effects",
428                "Total number of transaction effects produced",
429                registry,
430            )
431            .unwrap(),
432
433            shared_obj_tx: register_int_counter_with_registry!(
434                "num_shared_obj_tx",
435                "Number of transactions involving shared objects",
436                registry,
437            )
438            .unwrap(),
439
440            sponsored_tx: register_int_counter_with_registry!(
441                "num_sponsored_tx",
442                "Number of sponsored transactions",
443                registry,
444            )
445            .unwrap(),
446
447            tx_already_processed: register_int_counter_with_registry!(
448                "num_tx_already_processed",
449                "Number of transaction orders already processed previously",
450                registry,
451            )
452            .unwrap(),
453            num_input_objs: register_histogram_with_registry!(
454                "num_input_objects",
455                "Distribution of number of input TX objects per TX",
456                POSITIVE_INT_BUCKETS.to_vec(),
457                registry,
458            )
459            .unwrap(),
460            num_shared_objects: register_histogram_with_registry!(
461                "num_shared_objects",
462                "Number of shared input objects per TX",
463                POSITIVE_INT_BUCKETS.to_vec(),
464                registry,
465            )
466            .unwrap(),
467            batch_size: register_histogram_with_registry!(
468                "batch_size",
469                "Distribution of size of transaction batch",
470                POSITIVE_INT_BUCKETS.to_vec(),
471                registry,
472            )
473            .unwrap(),
474            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
475                "authority_state_handle_vote_transaction_latency",
476                "Latency of voting on transactions without signing",
477                LATENCY_SEC_BUCKETS.to_vec(),
478                registry,
479            )
480            .unwrap(),
481            internal_execution_latency: register_histogram_with_registry!(
482                "authority_state_internal_execution_latency",
483                "Latency of actual certificate executions",
484                LATENCY_SEC_BUCKETS.to_vec(),
485                registry,
486            )
487            .unwrap(),
488            execution_load_input_objects_latency: register_histogram_with_registry!(
489                "authority_state_execution_load_input_objects_latency",
490                "Latency of loading input objects for execution",
491                LOW_LATENCY_SEC_BUCKETS.to_vec(),
492                registry,
493            )
494            .unwrap(),
495            prepare_certificate_latency: register_histogram_with_registry!(
496                "authority_state_prepare_certificate_latency",
497                "Latency of executing certificates, before committing the results",
498                LATENCY_SEC_BUCKETS.to_vec(),
499                registry,
500            )
501            .unwrap(),
502            commit_certificate_latency: register_histogram_with_registry!(
503                "authority_state_commit_certificate_latency",
504                "Latency of committing certificate execution results",
505                LATENCY_SEC_BUCKETS.to_vec(),
506                registry,
507            )
508            .unwrap(),
509            db_checkpoint_latency: register_histogram_with_registry!(
510                "db_checkpoint_latency",
511                "Latency of checkpointing dbs",
512                LATENCY_SEC_BUCKETS.to_vec(),
513                registry,
514            ).unwrap(),
515            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
516                "transaction_manager_num_enqueued_certificates",
517                "Current number of certificates enqueued to ExecutionScheduler",
518                &["result"],
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
523                "transaction_manager_num_pending_certificates",
524                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
529                "transaction_manager_num_executing_certificates",
530                "Number of executing certificates, including queued and actually running certificates",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
545                "transaction_manager_transaction_queue_age_s",
546                "Time spent in waiting for transaction in the queue",
547                LATENCY_SEC_BUCKETS.to_vec(),
548                registry,
549            )
550            .unwrap(),
551            transaction_overload_sources: register_int_counter_vec_with_registry!(
552                "transaction_overload_sources",
553                "Number of times each source indicates transaction overload.",
554                &["source"],
555                registry)
556            .unwrap(),
557            execution_driver_executed_transactions: register_int_counter_with_registry!(
558                "execution_driver_executed_transactions",
559                "Cumulative number of transaction executed by execution driver",
560                registry,
561            )
562            .unwrap(),
563            execution_driver_paused_transactions: register_int_counter_with_registry!(
564                "execution_driver_paused_transactions",
565                "Cumulative number of transactions paused by execution driver",
566                registry,
567            )
568            .unwrap(),
569            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
570                "execution_driver_dispatch_queue",
571                "Number of transaction pending in execution driver dispatch queue",
572                registry,
573            )
574            .unwrap(),
575            execution_queueing_delay_s: register_histogram_with_registry!(
576                "execution_queueing_delay_s",
577                "Queueing delay between a transaction is ready for execution until it starts executing.",
578                LATENCY_SEC_BUCKETS.to_vec(),
579                registry
580            )
581            .unwrap(),
582            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
583                "prepare_cert_gas_latency_ratio",
584                "The ratio of computation gas divided by VM execution latency.",
585                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
586                registry
587            )
588            .unwrap(),
589            execution_gas_latency_ratio: register_histogram_with_registry!(
590                "execution_gas_latency_ratio",
591                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
592                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
593                registry
594            )
595            .unwrap(),
596            skipped_consensus_txns: register_int_counter_with_registry!(
597                "skipped_consensus_txns",
598                "Total number of consensus transactions skipped",
599                registry,
600            )
601            .unwrap(),
602            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
603                "skipped_consensus_txns_cache_hit",
604                "Total number of consensus transactions skipped because of local cache hit",
605                registry,
606            )
607            .unwrap(),
608            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
609                "consensus_handler_duplicate_tx_count",
610                "Number of times each transaction appears in its first consensus commit",
611                POSITIVE_INT_BUCKETS.to_vec(),
612                registry,
613            )
614            .unwrap(),
615            post_processing_total_events_emitted: register_int_counter_with_registry!(
616                "post_processing_total_events_emitted",
617                "Total number of events emitted in post processing",
618                registry,
619            )
620            .unwrap(),
621            post_processing_total_tx_indexed: register_int_counter_with_registry!(
622                "post_processing_total_tx_indexed",
623                "Total number of txes indexed in post processing",
624                registry,
625            )
626            .unwrap(),
627            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
628                "post_processing_total_tx_had_event_processed",
629                "Total number of txes finished event processing in post processing",
630                registry,
631            )
632            .unwrap(),
633            post_processing_total_failures: register_int_counter_with_registry!(
634                "post_processing_total_failures",
635                "Total number of failure in post processing",
636                registry,
637            )
638            .unwrap(),
639            consensus_handler_processed: register_int_counter_vec_with_registry!(
640                "consensus_handler_processed",
641                "Number of transactions processed by consensus handler",
642                &["class"],
643                registry
644            ).unwrap(),
645            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
646                "consensus_handler_transaction_sizes",
647                "Sizes of each type of transactions processed by consensus handler",
648                &["class"],
649                POSITIVE_INT_BUCKETS.to_vec(),
650                registry
651            ).unwrap(),
652            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
653                "consensus_handler_deferred_transactions",
654                "Number of transactions deferred by consensus handler",
655                registry,
656            ).unwrap(),
657            consensus_handler_congested_transactions: register_int_counter_with_registry!(
658                "consensus_handler_congested_transactions",
659                "Number of transactions deferred by consensus handler due to congestion",
660                registry,
661            ).unwrap(),
662            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
663                "consensus_handler_unpaid_amplification_deferrals",
664                "Number of transactions deferred due to unpaid consensus amplification",
665                registry,
666            ).unwrap(),
667            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
668                "consensus_handler_cancelled_transactions",
669                "Number of transactions cancelled by consensus handler",
670                registry,
671            ).unwrap(),
672            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
673                "consensus_handler_max_congestion_control_object_costs",
674                "Max object costs for congestion control in the current consensus commit",
675                &["commit_type"],
676                registry,
677            ).unwrap(),
678            consensus_committed_subdags: register_int_counter_vec_with_registry!(
679                "consensus_committed_subdags",
680                "Number of committed subdags, sliced by author",
681                &["authority"],
682                registry,
683            ).unwrap(),
684            accumulator_deposits: register_int_counter_with_registry!(
685                "accumulator_deposits",
686                "Total accumulator deposit (merge) events processed in settlement",
687                registry,
688            ).unwrap(),
689            accumulator_withdrawals: register_int_counter_with_registry!(
690                "accumulator_withdrawals",
691                "Total accumulator withdrawal (split) events processed in settlement",
692                registry,
693            ).unwrap(),
694            consensus_committed_messages: register_int_gauge_vec_with_registry!(
695                "consensus_committed_messages",
696                "Total number of committed consensus messages, sliced by author",
697                &["authority"],
698                registry,
699            ).unwrap(),
700            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
701                "consensus_committed_user_transactions",
702                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
703                &["authority"],
704                registry,
705            ).unwrap(),
706            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
707                "consensus_finalized_user_transactions",
708                "Number of user transactions finalized, sliced by submitter",
709                &["authority"],
710                registry,
711            ).unwrap(),
712            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
713                "consensus_rejected_user_transactions",
714                "Number of user transactions rejected, sliced by submitter",
715                &["authority"],
716                registry,
717            ).unwrap(),
718            execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
719            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
720            zklogin_sig_count: register_int_counter_with_registry!(
721                "zklogin_sig_count",
722                "Count of zkLogin signatures",
723                registry,
724            )
725            .unwrap(),
726            multisig_sig_count: register_int_counter_with_registry!(
727                "multisig_sig_count",
728                "Count of zkLogin signatures",
729                registry,
730            )
731            .unwrap(),
732            consensus_calculated_throughput: register_int_gauge_with_registry!(
733                "consensus_calculated_throughput",
734                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
735                registry,
736            ).unwrap(),
737            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
738                "consensus_calculated_throughput_profile",
739                "The current active calculated throughput profile",
740                registry
741            ).unwrap(),
742            consensus_block_handler_block_processed: register_int_counter_with_registry!(
743                "consensus_block_handler_block_processed",
744                "Number of blocks processed by consensus block handler.",
745                registry
746            ).unwrap(),
747            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
748                "consensus_block_handler_txn_processed",
749                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
750                &["outcome"],
751                registry
752            ).unwrap(),
753            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
754                "consensus_block_handler_fastpath_executions",
755                "Number of fastpath transactions sent for execution by consensus transaction handler",
756                registry,
757            ).unwrap(),
758            consensus_timestamp_bias: register_histogram_with_registry!(
759                "consensus_timestamp_bias",
760                "Bias/delay from consensus timestamp to system time",
761                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
762                registry
763            ).unwrap(),
764            execution_queueing_latency: LatencyObserver::new(),
765            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
766            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
767        }
768    }
769}
770
771/// a Trait object for `Signer` that is:
772/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
773/// - Sync, i.e. can be safely shared between threads.
774///
775/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
776///
777pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779/// Execution env contains the "environment" for the transaction to be executed in, that is,
780/// all the information necessary for execution that is not specified by the transaction itself.
781#[derive(Debug, Clone)]
782pub struct ExecutionEnv {
783    /// The assigned version of each shared object for the transaction.
784    pub assigned_versions: AssignedVersions,
785    /// The expected digest of the effects of the transaction, if executing from checkpoint or
786    /// other sources where the effects are known in advance.
787    pub expected_effects_digest: Option<TransactionEffectsDigest>,
788    /// Status of the address funds withdraw scheduling of the transaction,
789    /// including both address and object funds withdraws.
790    pub funds_withdraw_status: FundsWithdrawStatus,
791    /// Transactions that must finish before this transaction can be executed.
792    /// Used to schedule barrier transactions after non-exclusive writes.
793    pub barrier_dependencies: Vec<TransactionDigest>,
794}
795
796impl Default for ExecutionEnv {
797    fn default() -> Self {
798        Self {
799            assigned_versions: Default::default(),
800            expected_effects_digest: None,
801            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
802            barrier_dependencies: Default::default(),
803        }
804    }
805}
806
807impl ExecutionEnv {
808    pub fn new() -> Self {
809        Default::default()
810    }
811
812    pub fn with_expected_effects_digest(
813        mut self,
814        expected_effects_digest: TransactionEffectsDigest,
815    ) -> Self {
816        self.expected_effects_digest = Some(expected_effects_digest);
817        self
818    }
819
820    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
821        self.assigned_versions = assigned_versions;
822        self
823    }
824
825    pub fn with_insufficient_funds(mut self) -> Self {
826        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
827        self
828    }
829
830    pub fn with_barrier_dependencies(
831        mut self,
832        barrier_dependencies: BTreeSet<TransactionDigest>,
833    ) -> Self {
834        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
835        self
836    }
837}
838
839#[derive(Debug)]
840pub struct ForkRecoveryState {
841    /// Transaction digest to effects digest overrides
842    transaction_overrides:
843        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
844}
845
846impl Default for ForkRecoveryState {
847    fn default() -> Self {
848        Self {
849            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
850        }
851    }
852}
853
854impl ForkRecoveryState {
855    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
856        let Some(config) = config else {
857            return Ok(Self::default());
858        };
859
860        let mut transaction_overrides = HashMap::new();
861        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
862            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
863                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
864            })?;
865            let effects_digest =
866                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
867                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
868                })?;
869            transaction_overrides.insert(tx_digest, effects_digest);
870        }
871
872        Ok(Self {
873            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
874        })
875    }
876
877    pub fn get_transaction_override(
878        &self,
879        tx_digest: &TransactionDigest,
880    ) -> Option<TransactionEffectsDigest> {
881        self.transaction_overrides.read().get(tx_digest).copied()
882    }
883}
884
885pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
886
887pub struct AuthorityState {
888    // Fixed size, static, identity of the authority
889    /// The name of this authority.
890    pub name: AuthorityName,
891    /// The signature key of the authority.
892    pub secret: StableSyncAuthoritySigner,
893
894    /// The database
895    input_loader: TransactionInputLoader,
896    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
897    coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
898
899    epoch_store: ArcSwap<AuthorityPerEpochStore>,
900
901    /// This lock denotes current 'execution epoch'.
902    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
903    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
904    /// from previous epoch that are executed but did not make into checkpoint.
905    execution_lock: RwLock<EpochId>,
906
907    pub indexes: Option<Arc<IndexStore>>,
908    pub rpc_index: Option<Arc<RpcIndexStore>>,
909
910    pub subscription_handler: Arc<SubscriptionHandler>,
911    pub checkpoint_store: Arc<CheckpointStore>,
912
913    committee_store: Arc<CommitteeStore>,
914
915    /// Schedules transaction execution.
916    execution_scheduler: Arc<ExecutionScheduler>,
917
918    /// Shuts down the execution task. Used only in testing.
919    #[allow(unused)]
920    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
921
922    pub metrics: Arc<AuthorityMetrics>,
923    _pruner: AuthorityStorePruner,
924    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
925
926    /// Take db checkpoints of different dbs
927    db_checkpoint_config: DBCheckpointConfig,
928
929    pub config: NodeConfig,
930
931    /// Current overload status in this authority. Updated periodically.
932    pub overload_info: AuthorityOverloadInfo,
933
934    /// The chain identifier is derived from the digest of the genesis checkpoint.
935    chain_identifier: ChainIdentifier,
936
937    pub(crate) congestion_tracker: Arc<CongestionTracker>,
938
939    /// Consumed by gasless tx rate limiter.
940    pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
941
942    /// Traffic controller for Sui core servers (json-rpc, validator service)
943    pub traffic_controller: Option<Arc<TrafficController>>,
944
945    /// Fork recovery state for handling equivocation after forks
946    fork_recovery_state: Option<ForkRecoveryState>,
947
948    /// Notification channel for reconfiguration
949    notify_epoch: tokio::sync::watch::Sender<EpochId>,
950
951    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
952    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
953
954    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
955    /// CheckpointExecutor removes entries and collects the index batches before committing
956    /// them atomically at checkpoint boundaries.
957    pending_post_processing:
958        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
959
960    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
961    /// the blocking thread pool. Defaults to the number of available CPUs.
962    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
963}
964
965/// The authority state encapsulates all state, drives execution, and ensures safety.
966///
967/// Note the authority operations can be accessed through a read ref (&) and do not
968/// require &mut. Internally a database is synchronized through a mutex lock.
969///
970/// Repeating valid commands should produce no changes and return no error.
971impl AuthorityState {
972    pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
973        epoch_store.node_role()
974    }
975
976    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
977        epoch_store.node_role().is_validator()
978    }
979
980    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
981        epoch_store.node_role().is_fullnode()
982    }
983
984    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
985        &self.committee_store
986    }
987
988    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
989        self.committee_store.clone()
990    }
991
992    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
993        &self.config.authority_overload_config
994    }
995
996    pub fn get_epoch_state_commitments(
997        &self,
998        epoch: EpochId,
999    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1000        self.checkpoint_store.get_epoch_state_commitments(epoch)
1001    }
1002
1003    /// Runs deny list checks and processes funds withdrawals. Called before loading input
1004    /// objects, since these checks don't depend on object state.
1005    fn pre_object_load_checks(
1006        &self,
1007        tx_data: &TransactionData,
1008        tx_signatures: &[GenericSignature],
1009        input_object_kinds: &[InputObjectKind],
1010        receiving_objects_refs: &[ObjectRef],
1011        protocol_config: &ProtocolConfig,
1012    ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1013        // Note: the deny checks may do redundant package loads but:
1014        // - they only load packages when there is an active package deny map
1015        // - the loads are cached anyway
1016        sui_transaction_checks::deny::check_transaction_for_signing(
1017            tx_data,
1018            tx_signatures,
1019            input_object_kinds,
1020            receiving_objects_refs,
1021            &self.config.transaction_deny_config,
1022            self.get_backing_package_store().as_ref(),
1023        )?;
1024
1025        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1026            self.chain_identifier,
1027            self.coin_reservation_resolver.as_ref(),
1028        )?;
1029
1030        self.execution_cache_trait_pointers
1031            .account_funds_read
1032            .check_amounts_available(&declared_withdrawals)?;
1033
1034        if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1035            let min_amounts =
1036                sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1037            self.execution_cache_trait_pointers
1038                .account_funds_read
1039                .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1040        }
1041
1042        Ok(declared_withdrawals)
1043    }
1044
1045    fn handle_transaction_deny_checks(
1046        &self,
1047        transaction: &VerifiedTransaction,
1048        epoch_store: &Arc<AuthorityPerEpochStore>,
1049    ) -> SuiResult<CheckedInputObjects> {
1050        let tx_digest = transaction.digest();
1051        let tx_data = transaction.data().transaction_data();
1052
1053        let input_object_kinds = tx_data.input_objects()?;
1054        let receiving_objects_refs = tx_data.receiving_objects();
1055
1056        self.pre_object_load_checks(
1057            tx_data,
1058            transaction.tx_signatures(),
1059            &input_object_kinds,
1060            &receiving_objects_refs,
1061            epoch_store.protocol_config(),
1062        )?;
1063
1064        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1065            Some(tx_digest),
1066            &input_object_kinds,
1067            &receiving_objects_refs,
1068            epoch_store.epoch(),
1069        )?;
1070
1071        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1072            epoch_store.protocol_config(),
1073            epoch_store.reference_gas_price(),
1074            tx_data,
1075            input_objects,
1076            &receiving_objects,
1077            &self.metrics.bytecode_verifier_metrics,
1078            &self.config.verifier_signing_config,
1079        )?;
1080
1081        self.handle_coin_deny_list_checks(
1082            tx_data,
1083            &checked_input_objects,
1084            &receiving_objects,
1085            epoch_store,
1086        )?;
1087
1088        Ok(checked_input_objects)
1089    }
1090
1091    fn handle_coin_deny_list_checks(
1092        &self,
1093        tx_data: &TransactionData,
1094        checked_input_objects: &CheckedInputObjects,
1095        receiving_objects: &ReceivingObjects,
1096        epoch_store: &Arc<AuthorityPerEpochStore>,
1097    ) -> SuiResult<()> {
1098        use sui_types::balance::Balance;
1099
1100        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1101            self.chain_identifier,
1102            self.coin_reservation_resolver.as_ref(),
1103        )?;
1104
1105        let funds_withdraw_types = declared_withdrawals
1106            .values()
1107            .filter_map(|(_, type_tag)| {
1108                Balance::maybe_get_balance_type_param(type_tag)
1109                    .map(|ty| ty.to_canonical_string(false))
1110            })
1111            .collect::<BTreeSet<_>>();
1112
1113        if epoch_store.coin_deny_list_v1_enabled() {
1114            check_coin_deny_list_v1(
1115                tx_data.sender(),
1116                checked_input_objects,
1117                receiving_objects,
1118                funds_withdraw_types.clone(),
1119                &self.get_object_store(),
1120            )?;
1121        }
1122
1123        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1124            check_coin_deny_list_v2_during_signing(
1125                tx_data.sender(),
1126                checked_input_objects,
1127                receiving_objects,
1128                funds_withdraw_types.clone(),
1129                &self.get_object_store(),
1130            )?;
1131        }
1132
1133        Ok(())
1134    }
1135
1136    /// Vote for a transaction, either when validator receives a submit_transaction request,
1137    /// or sees a transaction from consensus. Performs the same types of checks as
1138    /// transaction signing, but does not explicitly sign the transaction.
1139    /// Note that if the transaction has been executed, we still go through the
1140    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1141    /// but not yet finalized, the signing will still work since the objects are still available.
1142    /// But if the transaction has been finalized, the signing will fail.
1143    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1144    /// has already been finalized executed.
1145    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1146    pub fn handle_vote_transaction(
1147        &self,
1148        epoch_store: &Arc<AuthorityPerEpochStore>,
1149        transaction: VerifiedTransaction,
1150    ) -> SuiResult<()> {
1151        debug!("handle_vote_transaction");
1152
1153        let _metrics_guard = self
1154            .metrics
1155            .authority_state_handle_vote_transaction_latency
1156            .start_timer();
1157        self.metrics.tx_orders.inc();
1158
1159        // The should_accept_user_certs check here is best effort, because
1160        // between a validator signs a tx and a cert is formed, the validator
1161        // could close the window.
1162        if !epoch_store
1163            .get_reconfig_state_read_lock_guard()
1164            .should_accept_user_certs()
1165        {
1166            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1167        }
1168
1169        // Accept finalized transactions, instead of voting to reject them.
1170        // Checking executed transactions is limited to the current epoch.
1171        // Otherwise there can be a race where the transaction is accepted because it has executed,
1172        // but the executed effects are pruned post consensus, leading to failures.
1173        let tx_digest = *transaction.digest();
1174        if epoch_store.is_recently_finalized(&tx_digest)
1175            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1176        {
1177            assert_reachable!("transaction recently executed");
1178            return Ok(());
1179        }
1180
1181        if self
1182            .get_transaction_cache_reader()
1183            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1184        {
1185            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1186                digest: (*transaction.digest()),
1187            }
1188            .into());
1189        }
1190
1191        // Ensure that validator cannot reconfigure while we are validating the transaction.
1192        let _execution_lock = self.execution_lock_for_validation()?;
1193
1194        let checked_input_objects =
1195            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1196
1197        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1198
1199        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1200        // in the consensus handler. Validation still runs to prevent spam transactions with
1201        // invalid object versions, and is necessary to handle recently created objects.
1202        self.get_cache_writer()
1203            .validate_owned_object_versions(&owned_objects)
1204    }
1205
1206    /// Used for early client validation check for transactions before submission to server.
1207    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1208    /// This allows for fast failure feedback to clients for non-retriable errors.
1209    ///
1210    /// The key addition is checking that owned object versions match live object versions.
1211    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1212    /// requested version (which may exist in storage as historical versions), whereas
1213    /// validators check against the live/current version during locking (verify_live_object).
1214    pub fn check_transaction_validity(
1215        &self,
1216        epoch_store: &Arc<AuthorityPerEpochStore>,
1217        transaction: &VerifiedTransaction,
1218        enforce_live_input_objects: bool,
1219    ) -> SuiResult<()> {
1220        if !epoch_store
1221            .get_reconfig_state_read_lock_guard()
1222            .should_accept_user_certs()
1223        {
1224            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1225        }
1226
1227        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1228
1229        let checked_input_objects =
1230            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1231
1232        // Check that owned object versions match live objects
1233        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1234        let cache_reader = self.get_object_cache_reader();
1235
1236        for obj_ref in &owned_objects {
1237            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1238                // Only reject if transaction references an old version. Allow newer
1239                // versions that may exist on validators but not yet synced to fullnode.
1240                if obj_ref.1 < live_object.version() {
1241                    return Err(SuiErrorKind::UserInputError {
1242                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1243                            provided_obj_ref: *obj_ref,
1244                            current_version: live_object.version(),
1245                        },
1246                    }
1247                    .into());
1248                }
1249
1250                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1251                    // Returns a non-retriable error when the input object version is required to be live.
1252                    return Err(SuiErrorKind::UserInputError {
1253                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1254                            provided_obj_ref: *obj_ref,
1255                            current_version: live_object.version(),
1256                        },
1257                    }
1258                    .into());
1259                }
1260
1261                // If version matches, verify digest also matches
1262                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1263                    return Err(SuiErrorKind::UserInputError {
1264                        error: UserInputError::InvalidObjectDigest {
1265                            object_id: obj_ref.0,
1266                            expected_digest: live_object.digest(),
1267                        },
1268                    }
1269                    .into());
1270                }
1271            }
1272        }
1273
1274        Ok(())
1275    }
1276
1277    pub fn check_system_overload_at_signing(&self) -> bool {
1278        self.config
1279            .authority_overload_config
1280            .check_system_overload_at_signing
1281    }
1282
1283    pub fn check_system_overload_at_execution(&self) -> bool {
1284        self.config
1285            .authority_overload_config
1286            .check_system_overload_at_execution
1287    }
1288
1289    pub(crate) fn check_system_overload(
1290        &self,
1291        tx_data: &SenderSignedData,
1292        do_authority_overload_check: bool,
1293    ) -> SuiResult {
1294        if do_authority_overload_check {
1295            self.check_authority_overload(tx_data).tap_err(|_| {
1296                self.update_overload_metrics("execution_queue");
1297            })?;
1298        }
1299        self.execution_scheduler
1300            .check_execution_overload(self.overload_config(), tx_data)
1301            .tap_err(|_| {
1302                self.update_overload_metrics("execution_pending");
1303            })?;
1304        // Consensus overload is handled by the admission queue in authority_server.rs.
1305
1306        let pending_tx_count = self
1307            .get_cache_commit()
1308            .approximate_pending_transaction_count();
1309        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1310            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1311                retry_after_secs: 10,
1312            }
1313            .into());
1314        }
1315
1316        Ok(())
1317    }
1318
1319    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1320        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1321            return Ok(());
1322        }
1323
1324        let load_shedding_percentage = self
1325            .overload_info
1326            .load_shedding_percentage
1327            .load(Ordering::Relaxed);
1328        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1329    }
1330
1331    pub(crate) fn update_overload_metrics(&self, source: &str) {
1332        self.metrics
1333            .transaction_overload_sources
1334            .with_label_values(&[source])
1335            .inc();
1336    }
1337
1338    /// Waits for fastpath (owned, package) dependency objects to become available.
1339    /// Returns true if a chosen set of fastpath dependency objects are available,
1340    /// returns false otherwise after an internal timeout.
1341    pub(crate) async fn wait_for_fastpath_dependency_objects(
1342        &self,
1343        transaction: &VerifiedTransaction,
1344        epoch: EpochId,
1345    ) -> SuiResult<bool> {
1346        let txn_data = transaction.data().transaction_data();
1347        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1348
1349        // Gather and filter input objects to wait for.
1350        let fastpath_dependency_objects: Vec<_> = move_objects
1351            .into_iter()
1352            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1353            .chain(
1354                packages
1355                    .into_iter()
1356                    .map(|package_id| InputKey::Package { id: package_id }),
1357            )
1358            .collect();
1359        let receiving_keys: HashSet<_> = receiving_objects
1360            .into_iter()
1361            .filter_map(|receiving_obj_ref| {
1362                self.should_wait_for_dependency_object(receiving_obj_ref)
1363            })
1364            .collect();
1365        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1366            return Ok(true);
1367        }
1368
1369        // Use shorter wait timeout in simtests to exercise server-side error paths and
1370        // client-side retry logic.
1371        let max_wait = if cfg!(msim) {
1372            Duration::from_millis(200)
1373        } else {
1374            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1375        };
1376
1377        match timeout(
1378            max_wait,
1379            self.get_object_cache_reader().notify_read_input_objects(
1380                &fastpath_dependency_objects,
1381                &receiving_keys,
1382                epoch,
1383            ),
1384        )
1385        .await
1386        {
1387            Ok(()) => Ok(true),
1388            // Maybe return an error for unavailable input objects,
1389            // and allow the caller to skip the rest of input checks?
1390            Err(_) => Ok(false),
1391        }
1392    }
1393
1394    /// Returns Some(inputKey) if the object reference should be waited on until it is
1395    /// finalized, before proceeding to input checks.
1396    ///
1397    /// Incorrect decisions here should only affect user experience, not safety:
1398    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1399    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1400    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1401        let (obj_id, cur_version, _digest) = obj_ref;
1402        let Some(latest_obj_ref) = self
1403            .get_object_cache_reader()
1404            .get_latest_object_ref_or_tombstone(obj_id)
1405        else {
1406            // Object might not have been created.
1407            return Some(InputKey::VersionedObject {
1408                id: FullObjectID::new(obj_id, None),
1409                version: cur_version,
1410            });
1411        };
1412        let latest_digest = latest_obj_ref.2;
1413        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1414            // Do not wait for deleted object and rely on input check instead.
1415            return None;
1416        }
1417        let latest_version = latest_obj_ref.1;
1418        if cur_version <= latest_version {
1419            // Do not wait for version that already exists or has been consumed.
1420            // Let the input check to handle them and return the proper error.
1421            return None;
1422        }
1423        // Wait for the object version to become available.
1424        Some(InputKey::VersionedObject {
1425            id: FullObjectID::new(obj_id, None),
1426            version: cur_version,
1427        })
1428    }
1429
1430    /// Wait for a transaction to be executed. Transactions need to be sequenced by consensus
1431    /// before being enqueued for execution.
1432    ///
1433    /// Only use this in tests.
1434    #[instrument(level = "trace", skip_all)]
1435    pub async fn wait_for_transaction_execution_for_testing(
1436        &self,
1437        transaction: &VerifiedExecutableTransaction,
1438    ) -> TransactionEffects {
1439        self.notify_read_effects_for_testing(
1440            "AuthorityState::wait_for_transaction_execution_for_testing",
1441            *transaction.digest(),
1442        )
1443        .await
1444    }
1445
1446    /// Internal logic to execute a certificate.
1447    ///
1448    /// Guarantees that
1449    /// - If input objects are available, return no permanent failure.
1450    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1451    /// on successful execution; crashed execution has no observable effect and can be retried.
1452    ///
1453    /// It is caller's responsibility to ensure input objects are available and locks are set.
1454    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1455    ///
1456    /// Should only be called within sui-core.
1457    #[instrument(level = "trace", skip_all)]
1458    pub fn try_execute_immediately(
1459        &self,
1460        certificate: &VerifiedExecutableTransaction,
1461        mut execution_env: ExecutionEnv,
1462        epoch_store: &Arc<AuthorityPerEpochStore>,
1463    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1464        let _scope = monitored_scope("Execution::try_execute_immediately");
1465        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1466
1467        let tx_digest = certificate.digest();
1468
1469        if let Some(fork_recovery) = &self.fork_recovery_state
1470            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1471        {
1472            warn!(
1473                ?tx_digest,
1474                original_digest = ?execution_env.expected_effects_digest,
1475                override_digest = ?override_digest,
1476                "Applying fork recovery override for transaction effects digest"
1477            );
1478            execution_env.expected_effects_digest = Some(override_digest);
1479        }
1480
1481        // prevent concurrent executions of the same tx.
1482        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1483
1484        let tx_cache_reader = self.get_transaction_cache_reader();
1485        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1486            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1487                assert_eq!(
1488                    effects.digest(),
1489                    expected_effects_digest,
1490                    "Unexpected effects digest for transaction {:?}",
1491                    tx_digest
1492                );
1493            }
1494            tx_guard.release();
1495            return ExecutionOutput::Success((effects, None));
1496        }
1497
1498        let execution_start_time = Instant::now();
1499
1500        // Any caller that verifies the signatures on the certificate will have already checked the
1501        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1502        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1503        // epoch, then it's invalid.
1504        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1505        else {
1506            tx_guard.release();
1507            return ExecutionOutput::EpochEnded;
1508        };
1509        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1510        // possible that reconfiguration has happened and they no longer match.
1511        // TODO: We may not need the following check anymore since the scheduler
1512        // should have checked that the certificate is from the same epoch as epoch_store.
1513        if *execution_guard != epoch_store.epoch() {
1514            tx_guard.release();
1515            info!("The epoch of the execution_guard doesn't match the epoch store");
1516            return ExecutionOutput::EpochEnded;
1517        }
1518
1519        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1520
1521        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1522            &tx_guard,
1523            &execution_guard,
1524            certificate,
1525            execution_env,
1526            epoch_store,
1527        ) {
1528            ExecutionOutput::Success(result) => result,
1529            output => return output.unwrap_err(),
1530        };
1531        let transaction_outputs = Arc::new(transaction_outputs);
1532
1533        fail_point!("crash");
1534
1535        let effects = transaction_outputs.effects.clone();
1536        debug!(
1537            ?tx_digest,
1538            fx_digest=?effects.digest(),
1539            "process_certificate succeeded in {:.3}ms",
1540            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1541        );
1542
1543        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1544        if let Err(err) = commit_result {
1545            error!(?tx_digest, "Error committing transaction: {err}");
1546            tx_guard.release();
1547            return ExecutionOutput::Fatal(err);
1548        }
1549
1550        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1551            certificate.data().transaction_data().kind()
1552        {
1553            if let Some(err) = &execution_error_opt {
1554                debug_fatal!("Authenticator state update failed: {:?}", err);
1555            }
1556            epoch_store.update_authenticator_state(auth_state);
1557
1558            // double check that the signature verifier always matches the authenticator state
1559            if cfg!(debug_assertions) {
1560                let authenticator_state = get_authenticator_state(self.get_object_store())
1561                    .expect("Read cannot fail")
1562                    .expect("Authenticator state must exist");
1563
1564                let mut sys_jwks: Vec<_> = authenticator_state
1565                    .active_jwks
1566                    .into_iter()
1567                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1568                    .collect();
1569                let mut active_jwks: Vec<_> = epoch_store
1570                    .signature_verifier
1571                    .get_jwks()
1572                    .into_iter()
1573                    .collect();
1574                sys_jwks.sort();
1575                active_jwks.sort();
1576
1577                assert_eq!(sys_jwks, active_jwks);
1578            }
1579        }
1580
1581        if certificate
1582            .transaction_data()
1583            .kind()
1584            .is_accumulator_barrier_settle_tx()
1585        {
1586            let object_funds_checker = self.object_funds_checker.load();
1587            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1588                // unwrap safe because we assign accumulator version for every transaction
1589                // when accumulator is enabled.
1590                let next_accumulator_version = accumulator_version.unwrap().next();
1591                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1592            }
1593        }
1594
1595        tx_guard.commit_tx();
1596
1597        let elapsed = execution_start_time.elapsed();
1598        epoch_store.record_local_execution_time(
1599            certificate.data().transaction_data(),
1600            &effects,
1601            timings,
1602            elapsed,
1603        );
1604
1605        let elapsed_us = elapsed.as_micros() as f64;
1606        if elapsed_us > 0.0 {
1607            self.metrics
1608                .execution_gas_latency_ratio
1609                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1610        };
1611        ExecutionOutput::Success((effects, execution_error_opt))
1612    }
1613
1614    pub fn read_objects_for_execution(
1615        &self,
1616        tx_lock: &CertLockGuard,
1617        certificate: &VerifiedExecutableTransaction,
1618        assigned_shared_object_versions: &AssignedVersions,
1619        epoch_store: &Arc<AuthorityPerEpochStore>,
1620    ) -> SuiResult<InputObjects> {
1621        let _scope = monitored_scope("Execution::load_input_objects");
1622        let _metrics_guard = self
1623            .metrics
1624            .execution_load_input_objects_latency
1625            .start_timer();
1626        let input_objects = &certificate.data().transaction_data().input_objects()?;
1627        self.input_loader.read_objects_for_execution(
1628            &certificate.key(),
1629            tx_lock,
1630            input_objects,
1631            assigned_shared_object_versions,
1632            epoch_store.epoch(),
1633        )
1634    }
1635
1636    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1637    /// transactions. Assumes execution will always succeed.
1638    pub async fn try_execute_for_test(
1639        &self,
1640        certificate: &VerifiedCertificate,
1641        execution_env: ExecutionEnv,
1642    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1643        let epoch_store = self.epoch_store_for_testing();
1644        let (effects, execution_error_opt) = self
1645            .try_execute_immediately(
1646                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1647                execution_env,
1648                &epoch_store,
1649            )
1650            .unwrap();
1651        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1652        (signed_effects, execution_error_opt)
1653    }
1654
1655    pub async fn try_execute_executable_for_test(
1656        &self,
1657        executable: &VerifiedExecutableTransaction,
1658        execution_env: ExecutionEnv,
1659    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1660        let epoch_store = self.epoch_store_for_testing();
1661        let (effects, execution_error_opt) = self
1662            .try_execute_immediately(executable, execution_env, &epoch_store)
1663            .unwrap();
1664        self.flush_post_processing(executable.digest()).await;
1665        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1666        (signed_effects, execution_error_opt)
1667    }
1668
1669    /// Wait until the effects of the given transaction are available and return them.
1670    /// Panics if the effects are not found.
1671    ///
1672    /// Only use this in tests where effects are expected to exist.
1673    pub async fn notify_read_effects_for_testing(
1674        &self,
1675        task_name: &'static str,
1676        digest: TransactionDigest,
1677    ) -> TransactionEffects {
1678        self.get_transaction_cache_reader()
1679            .notify_read_executed_effects(task_name, &[digest])
1680            .await
1681            .pop()
1682            .expect("must return correct number of effects")
1683    }
1684
1685    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1686        self.get_object_cache_reader()
1687            .check_owned_objects_are_live(owned_object_refs)
1688    }
1689
1690    /// This function captures the required state to debug a forked transaction.
1691    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1692    /// NOTE: Since this info escapes the validator context,
1693    /// make sure not to leak any private info here
1694    pub(crate) fn debug_dump_transaction_state(
1695        &self,
1696        tx_digest: &TransactionDigest,
1697        effects: &TransactionEffects,
1698        expected_effects_digest: TransactionEffectsDigest,
1699        inner_temporary_store: &InnerTemporaryStore,
1700        certificate: &VerifiedExecutableTransaction,
1701        debug_dump_config: &StateDebugDumpConfig,
1702    ) -> SuiResult<PathBuf> {
1703        let dump_dir = debug_dump_config
1704            .dump_file_directory
1705            .as_ref()
1706            .cloned()
1707            .unwrap_or(std::env::temp_dir());
1708        let epoch_store = self.load_epoch_store_one_call_per_task();
1709
1710        NodeStateDump::new(
1711            tx_digest,
1712            effects,
1713            expected_effects_digest,
1714            self.get_object_store().as_ref(),
1715            &epoch_store,
1716            inner_temporary_store,
1717            certificate,
1718        )?
1719        .write_to_file(&dump_dir)
1720        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1721    }
1722
1723    #[instrument(level = "trace", skip_all)]
1724    pub(crate) fn process_certificate(
1725        &self,
1726        tx_guard: &CertTxGuard,
1727        execution_guard: &ExecutionLockReadGuard<'_>,
1728        certificate: &VerifiedExecutableTransaction,
1729        execution_env: ExecutionEnv,
1730        epoch_store: &Arc<AuthorityPerEpochStore>,
1731    ) -> ExecutionOutput<(
1732        TransactionOutputs,
1733        Vec<ExecutionTiming>,
1734        Option<ExecutionError>,
1735    )> {
1736        let _scope = monitored_scope("Execution::process_certificate");
1737        let tx_digest = *certificate.digest();
1738
1739        let input_objects = match self.read_objects_for_execution(
1740            tx_guard.as_lock_guard(),
1741            certificate,
1742            &execution_env.assigned_versions,
1743            epoch_store,
1744        ) {
1745            Ok(objects) => objects,
1746            Err(e) => return ExecutionOutput::Fatal(e),
1747        };
1748
1749        let expected_effects_digest = match execution_env.expected_effects_digest {
1750            Some(expected_effects_digest) => Some(expected_effects_digest),
1751            None => {
1752                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1753                // restarting with a new binary. In this situation, if we have published an effects signature,
1754                // we must be sure not to equivocate.
1755                match epoch_store.get_signed_effects_digest(&tx_digest) {
1756                    Ok(digest) => digest,
1757                    Err(e) => return ExecutionOutput::Fatal(e),
1758                }
1759            }
1760        };
1761
1762        fail_point_if!("correlated-crash-process-certificate", || {
1763            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1764                sui_simulator::task::kill_current_node(None);
1765            }
1766        });
1767
1768        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1769        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1770        // this function occur before we have written anything to the db, so we commit the tx
1771        // guard and rely on the client to retry the tx (if it was transient).
1772        self.execute_certificate(
1773            execution_guard,
1774            certificate,
1775            input_objects,
1776            expected_effects_digest,
1777            execution_env,
1778            epoch_store,
1779        )
1780    }
1781
1782    pub async fn reconfigure_traffic_control(
1783        &self,
1784        params: TrafficControlReconfigParams,
1785    ) -> Result<TrafficControlReconfigParams, SuiError> {
1786        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1787            traffic_controller.admin_reconfigure(params).await
1788        } else {
1789            Err(SuiErrorKind::InvalidAdminRequest(
1790                "Traffic controller is not configured on this node".to_string(),
1791            )
1792            .into())
1793        }
1794    }
1795
1796    #[instrument(level = "trace", skip_all)]
1797    fn commit_certificate(
1798        &self,
1799        certificate: &VerifiedExecutableTransaction,
1800        transaction_outputs: Arc<TransactionOutputs>,
1801        epoch_store: &Arc<AuthorityPerEpochStore>,
1802    ) -> SuiResult {
1803        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1804            monitored_scope("Execution::commit_certificate");
1805        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1806
1807        let tx_digest = certificate.digest();
1808
1809        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1810        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1811        epoch_store.insert_executed_in_epoch(tx_digest);
1812        let key = certificate.key();
1813        if !matches!(key, TransactionKey::Digest(_)) {
1814            epoch_store.insert_tx_key(key, *tx_digest)?;
1815        }
1816
1817        // Allow testing what happens if we crash here.
1818        fail_point!("crash");
1819
1820        self.get_cache_writer()
1821            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1822
1823        // Fire an in-memory-only notification so the scheduler can detect that this
1824        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1825        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1826        // this to the DB to avoid stale entries surviving a crash while effects may not.
1827        if let Some(settlement_key) = certificate
1828            .transaction_data()
1829            .kind()
1830            .accumulator_barrier_settlement_key()
1831        {
1832            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1833        }
1834
1835        if certificate.transaction_data().is_end_of_epoch_tx() {
1836            // At the end of epoch, since system packages may have been upgraded, force
1837            // reload them in the cache.
1838            self.get_object_cache_reader()
1839                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1840        }
1841
1842        Ok(())
1843    }
1844
1845    fn update_metrics(
1846        &self,
1847        certificate: &VerifiedExecutableTransaction,
1848        inner_temporary_store: &InnerTemporaryStore,
1849        effects: &TransactionEffects,
1850    ) {
1851        // count signature by scheme, for zklogin and multisig
1852        if certificate.has_zklogin_sig() {
1853            self.metrics.zklogin_sig_count.inc();
1854        } else if certificate.has_upgraded_multisig() {
1855            self.metrics.multisig_sig_count.inc();
1856        }
1857
1858        self.metrics.total_effects.inc();
1859        self.metrics.total_certs.inc();
1860
1861        let consensus_object_count = effects.input_consensus_objects().len();
1862        if consensus_object_count > 0 {
1863            self.metrics.shared_obj_tx.inc();
1864        }
1865
1866        if certificate.is_sponsored_tx() {
1867            self.metrics.sponsored_tx.inc();
1868        }
1869
1870        let input_object_count = inner_temporary_store.input_objects.len();
1871        self.metrics
1872            .num_input_objs
1873            .observe(input_object_count as f64);
1874        self.metrics
1875            .num_shared_objects
1876            .observe(consensus_object_count as f64);
1877        self.metrics.batch_size.observe(
1878            certificate
1879                .data()
1880                .intent_message()
1881                .value
1882                .kind()
1883                .num_commands() as f64,
1884        );
1885    }
1886
1887    /// Runs the executor on an already-prepared transaction; the caller is responsible for any
1888    /// coin reservation rewriting.
1889    fn execute_transaction_to_effects(
1890        &self,
1891        executor: &dyn Executor,
1892        store: &dyn BackingStore,
1893        protocol_config: &ProtocolConfig,
1894        enable_expensive_checks: bool,
1895        execution_params: ExecutionOrEarlyError,
1896        epoch_id: &EpochId,
1897        epoch_timestamp_ms: u64,
1898        input_objects: CheckedInputObjects,
1899        gas_data: GasData,
1900        gas_status: SuiGasStatus,
1901        kind: TransactionKind,
1902        rewritten_inputs: Option<Vec<bool>>,
1903        signer: SuiAddress,
1904        tx_digest: TransactionDigest,
1905    ) -> (
1906        InnerTemporaryStore,
1907        SuiGasStatus,
1908        TransactionEffects,
1909        Vec<ExecutionTiming>,
1910        Result<(), ExecutionError>,
1911    ) {
1912        let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1913            // TODO only run this function on FullNodes, use `execute_transaction_to_effects` on validators.
1914            .execute_transaction_to_effects_and_execution_error(
1915                store,
1916                protocol_config,
1917                self.metrics.execution_metrics.clone(),
1918                enable_expensive_checks,
1919                execution_params,
1920                epoch_id,
1921                epoch_timestamp_ms,
1922                input_objects,
1923                gas_data,
1924                gas_status,
1925                kind,
1926                rewritten_inputs,
1927                signer,
1928                tx_digest,
1929                &mut None,
1930            );
1931
1932        (
1933            inner_temp_store,
1934            gas_status,
1935            effects,
1936            timings,
1937            execution_error,
1938        )
1939    }
1940
1941    /// execute_certificate validates the transaction input, and executes the certificate,
1942    /// returning transaction outputs.
1943    ///
1944    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1945    ///
1946    /// Executes a certificate and returns an ExecutionOutput.
1947    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1948    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1949    #[instrument(level = "trace", skip_all)]
1950    fn execute_certificate(
1951        &self,
1952        _execution_guard: &ExecutionLockReadGuard<'_>,
1953        certificate: &VerifiedExecutableTransaction,
1954        input_objects: InputObjects,
1955        expected_effects_digest: Option<TransactionEffectsDigest>,
1956        execution_env: ExecutionEnv,
1957        epoch_store: &Arc<AuthorityPerEpochStore>,
1958    ) -> ExecutionOutput<(
1959        TransactionOutputs,
1960        Vec<ExecutionTiming>,
1961        Option<ExecutionError>,
1962    )> {
1963        let _scope = monitored_scope("Execution::prepare_certificate");
1964        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1965        let prepare_certificate_start_time = tokio::time::Instant::now();
1966
1967        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1968        let tx_data = certificate.data().transaction_data();
1969
1970        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1971            return ExecutionOutput::Fatal(e);
1972        }
1973
1974        // The cost of partially re-auditing a transaction before execution is tolerated.
1975        // This step is required for correctness because, for example, ConsensusAddressOwner
1976        // object owner may have changed between signing and execution.
1977        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1978            certificate,
1979            input_objects,
1980            epoch_store.protocol_config(),
1981            epoch_store.reference_gas_price(),
1982        ) {
1983            Ok(result) => result,
1984            Err(e) => return ExecutionOutput::Fatal(e),
1985        };
1986
1987        let owned_object_refs = input_objects.inner().filter_owned_objects();
1988        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1989            return ExecutionOutput::Fatal(e);
1990        }
1991        let tx_digest = *certificate.digest();
1992        let protocol_config = epoch_store.protocol_config();
1993        let transaction_data = &certificate.data().intent_message().value;
1994        let sender = transaction_data.sender();
1995        let (mut kind, signer, gas_data) = transaction_data.execution_parts();
1996        let early_execution_error = get_early_execution_error(
1997            &tx_digest,
1998            &input_objects,
1999            self.config.certificate_deny_config.certificate_deny_set(),
2000            &execution_env.funds_withdraw_status,
2001        );
2002        // Mainnet-only: feed the accumulator (settlement) version so the address-balance gas-smash
2003        // short-circuit activates at its rollout version and replays bit-for-bit. Other chains pass
2004        // `None`, where the short-circuit applies unconditionally.
2005        let accumulator_version = if self.chain_identifier.chain() == Chain::Mainnet {
2006            execution_env.assigned_versions.accumulator_version
2007        } else {
2008            None
2009        };
2010        let execution_params = match early_execution_error {
2011            None => ExecutionOrEarlyError::ok(accumulator_version),
2012            Some(errors) => ExecutionOrEarlyError::failed(errors, accumulator_version),
2013        };
2014
2015        // Skip on early error: the tx will fail anyway and rewriting may fail if the accumulator
2016        // was deleted.
2017        let rewritten_inputs = if execution_params.is_ok() {
2018            rewrite_transaction_for_coin_reservations(
2019                self.chain_identifier,
2020                &*self.coin_reservation_resolver,
2021                sender,
2022                &mut kind,
2023                execution_env.assigned_versions.accumulator_version,
2024            )
2025            .expect("rewriting must succeed for a certified transaction")
2026        } else {
2027            None
2028        };
2029
2030        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2031
2032        #[allow(unused_mut)]
2033        let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2034            .execute_transaction_to_effects(
2035                &**epoch_store.executor(),
2036                &tracking_store,
2037                protocol_config,
2038                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2039                // cyclic dependency w/ sui-adapter
2040                self.config
2041                    .expensive_safety_check_config
2042                    .enable_deep_per_tx_sui_conservation_check(),
2043                execution_params,
2044                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2045                epoch_store
2046                    .epoch_start_config()
2047                    .epoch_data()
2048                    .epoch_start_timestamp(),
2049                input_objects,
2050                gas_data,
2051                gas_status,
2052                kind,
2053                rewritten_inputs,
2054                signer,
2055                tx_digest,
2056            );
2057
2058        let object_funds_checker = self.object_funds_checker.load();
2059        if let Some(object_funds_checker) = object_funds_checker.as_ref()
2060            && !object_funds_checker.should_commit_object_funds_withdraws(
2061                certificate,
2062                effects.status(),
2063                &inner_temp_store.accumulator_running_max_withdraws,
2064                &execution_env,
2065                self.get_account_funds_read(),
2066                &self.execution_scheduler,
2067                epoch_store,
2068            )
2069        {
2070            assert_reachable!("retry object withdraw later");
2071            return ExecutionOutput::RetryLater;
2072        }
2073
2074        if let Some(expected_effects_digest) = expected_effects_digest
2075            && effects.digest() != expected_effects_digest
2076        {
2077            // We dont want to mask the original error, so we log it and continue.
2078            match self.debug_dump_transaction_state(
2079                &tx_digest,
2080                &effects,
2081                expected_effects_digest,
2082                &inner_temp_store,
2083                certificate,
2084                &self.config.state_debug_dump_config,
2085            ) {
2086                Ok(out_path) => {
2087                    info!(
2088                        "Dumped node state for transaction {} to {}",
2089                        tx_digest,
2090                        out_path.as_path().display().to_string()
2091                    );
2092                }
2093                Err(e) => {
2094                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2095                }
2096            }
2097            let expected_effects = self
2098                .get_transaction_cache_reader()
2099                .get_effects(&expected_effects_digest);
2100            error!(
2101                ?tx_digest,
2102                ?expected_effects_digest,
2103                actual_effects = ?effects,
2104                expected_effects = ?expected_effects,
2105                "fork detected!"
2106            );
2107            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2108                tx_digest,
2109                expected_effects_digest,
2110                effects.digest(),
2111            ) {
2112                error!("Failed to record transaction fork: {e}");
2113            }
2114
2115            fail_point_if!("kill_transaction_fork_node", || {
2116                #[cfg(msim)]
2117                {
2118                    tracing::error!(
2119                        fatal = true,
2120                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2121                        tx_digest
2122                    );
2123                    sui_simulator::task::shutdown_current_node();
2124                }
2125            });
2126
2127            fatal!(
2128                "Transaction {} is expected to have effects digest {}, but got {}!",
2129                tx_digest,
2130                expected_effects_digest,
2131                effects.digest()
2132            );
2133        }
2134
2135        fail_point_arg!("simulate_fork_during_execution", |(
2136            forked_validators,
2137            full_halt,
2138            effects_overrides,
2139            fork_probability,
2140        ): (
2141            std::sync::Arc<
2142                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2143            >,
2144            bool,
2145            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2146            f32,
2147        )| {
2148            #[cfg(msim)]
2149            self.simulate_fork_during_execution(
2150                certificate,
2151                epoch_store,
2152                &mut effects,
2153                forked_validators,
2154                full_halt,
2155                effects_overrides,
2156                fork_probability,
2157            );
2158        });
2159
2160        let unchanged_loaded_runtime_objects =
2161            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2162                certificate.transaction_data(),
2163                &effects,
2164                &tracking_store.into_read_objects(),
2165            );
2166
2167        // index certificate
2168        let _ = self
2169            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2170            .tap_err(|e| {
2171                self.metrics.post_processing_total_failures.inc();
2172                error!(?tx_digest, "tx post processing failed: {e}");
2173            });
2174
2175        self.update_metrics(certificate, &inner_temp_store, &effects);
2176
2177        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2178            certificate.clone().into_unsigned(),
2179            effects,
2180            inner_temp_store,
2181            unchanged_loaded_runtime_objects,
2182        );
2183
2184        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2185        if elapsed > 0.0 {
2186            self.metrics.prepare_cert_gas_latency_ratio.observe(
2187                transaction_outputs
2188                    .effects
2189                    .gas_cost_summary()
2190                    .computation_cost as f64
2191                    / elapsed,
2192            );
2193        }
2194
2195        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2196    }
2197
2198    pub fn prepare_certificate_for_benchmark(
2199        &self,
2200        certificate: &VerifiedExecutableTransaction,
2201        input_objects: InputObjects,
2202        epoch_store: &Arc<AuthorityPerEpochStore>,
2203    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2204        let lock = RwLock::new(epoch_store.epoch());
2205        let execution_guard = lock.try_read().unwrap();
2206
2207        let (transaction_outputs, _timings, execution_error_opt) = self
2208            .execute_certificate(
2209                &execution_guard,
2210                certificate,
2211                input_objects,
2212                None,
2213                ExecutionEnv::default(),
2214                epoch_store,
2215            )
2216            .unwrap();
2217        Ok((transaction_outputs, execution_error_opt))
2218    }
2219
2220    #[instrument(skip_all)]
2221    #[allow(clippy::type_complexity)]
2222    pub async fn dry_exec_transaction(
2223        &self,
2224        transaction: TransactionData,
2225    ) -> SuiResult<(
2226        DryRunTransactionBlockResponse,
2227        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2228        TransactionEffects,
2229        Option<ObjectID>,
2230    )> {
2231        let epoch_store = self.load_epoch_store_one_call_per_task();
2232        if !self.is_fullnode(&epoch_store) {
2233            return Err(SuiErrorKind::UnsupportedFeatureError {
2234                error: "dry-exec is only supported on fullnodes".to_string(),
2235            }
2236            .into());
2237        }
2238
2239        if transaction.kind().is_system_tx() {
2240            return Err(SuiErrorKind::UnsupportedFeatureError {
2241                error: "dry-exec does not support system transactions".to_string(),
2242            }
2243            .into());
2244        }
2245
2246        self.dry_exec_transaction_impl(&epoch_store, transaction)
2247    }
2248
2249    #[allow(clippy::type_complexity)]
2250    fn dry_exec_transaction_impl(
2251        &self,
2252        epoch_store: &AuthorityPerEpochStore,
2253        transaction: TransactionData,
2254    ) -> SuiResult<(
2255        DryRunTransactionBlockResponse,
2256        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2257        TransactionEffects,
2258        Option<ObjectID>,
2259    )> {
2260        // Route through `simulate_transaction` -- `dry-exec` matches
2261        // `simulate_transaction(_, TransactionChecks::Enabled, _)`
2262        let sim = self.simulate_transaction(
2263            transaction.clone(),
2264            TransactionChecks::Enabled,
2265            /* allow_mock_gas_coin */ true,
2266        )?;
2267
2268        self.build_dry_run_response(epoch_store, transaction, sim)
2269    }
2270
2271    /// Adapt a `SimulateTransactionResult` into the
2272    /// `(DryRunTransactionBlockResponse, written_objects_with_kind, effects, mock_gas_id)`
2273    /// tuple that the JSON-RPC dry-run layer consumes. Shared between
2274    /// `dry_exec_transaction_impl` and `dry_exec_transaction_for_benchmark`'s
2275    /// callers; pulls together:
2276    ///   - layout resolution over the simulate-produced `ObjectSet`,
2277    ///   - `(ObjectRef, Object, WriteKind)` derivation by walking
2278    ///     `effects.created / unwrapped / mutated` against that `ObjectSet`,
2279    ///   - `execution_error_source` from `sim.execution_result`,
2280    ///   - the `SuiTransactionBlockData` / `SuiTransactionBlockEffects` /
2281    ///     `SuiTransactionBlockEvents` conversions.
2282    #[allow(clippy::type_complexity)]
2283    fn build_dry_run_response(
2284        &self,
2285        epoch_store: &AuthorityPerEpochStore,
2286        transaction: TransactionData,
2287        sim: SimulateTransactionResult,
2288    ) -> SuiResult<(
2289        DryRunTransactionBlockResponse,
2290        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2291        TransactionEffects,
2292        Option<ObjectID>,
2293    )> {
2294        let SimulateTransactionResult {
2295            effects,
2296            events,
2297            objects,
2298            execution_result,
2299            mock_gas_id,
2300            suggested_gas_price,
2301            ..
2302        } = sim;
2303
2304        let tx_digest = *effects.transaction_digest();
2305
2306        // Walk effects' created / unwrapped / mutated lists against the
2307        // simulate-produced ObjectSet (which carries both input and written
2308        // objects). Refs missing from `objects` are dropped silently — that
2309        // would indicate a simulate-vs-effects inconsistency.
2310        let written_with_kind: BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)> = effects
2311            .created()
2312            .into_iter()
2313            .map(|(oref, _)| (oref, WriteKind::Create))
2314            .chain(
2315                effects
2316                    .unwrapped()
2317                    .into_iter()
2318                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2319            )
2320            .chain(
2321                effects
2322                    .mutated()
2323                    .into_iter()
2324                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2325            )
2326            .filter_map(|(oref, kind)| {
2327                objects
2328                    .get(&ObjectKey(oref.0, oref.1))
2329                    .map(|obj| (oref.0, (oref, obj.clone(), kind)))
2330            })
2331            .collect();
2332
2333        // Resolve event/object layouts against the simulate-produced ObjectSet
2334        // (newly-published packages from the simulation appear there), with the
2335        // node's backing package store as fallback for already-on-chain packages.
2336        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2337            epoch_store.protocol_config(),
2338            Box::new(OverlayBackingPackageStore::new(
2339                &objects,
2340                self.get_backing_package_store(),
2341            )),
2342        );
2343
2344        let execution_error_source = execution_result
2345            .as_ref()
2346            .err()
2347            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2348
2349        let response = DryRunTransactionBlockResponse {
2350            suggested_gas_price,
2351            input: SuiTransactionBlockData::try_from_with_module_cache(
2352                transaction,
2353                &epoch_store.module_cache().clone(),
2354            )
2355            .map_err(|e| SuiErrorKind::TransactionSerializationError {
2356                error: format!("Failed to convert transaction to SuiTransactionBlockData: {e}"),
2357            })?,
2358            effects: effects.clone().try_into()?,
2359            events: SuiTransactionBlockEvents::try_from(
2360                events.unwrap_or_default(),
2361                tx_digest,
2362                None,
2363                layout_resolver.as_mut(),
2364            )?,
2365            // The RPC layer recalculates object_changes / balance_changes from
2366            // the written_objects map and effects.
2367            object_changes: Vec::new(),
2368            balance_changes: Vec::new(),
2369            execution_error_source,
2370        };
2371
2372        Ok((response, written_with_kind, effects, mock_gas_id))
2373    }
2374
2375    pub fn simulate_transaction(
2376        &self,
2377        mut transaction: TransactionData,
2378        checks: TransactionChecks,
2379        allow_mock_gas_coin: bool,
2380    ) -> SuiResult<SimulateTransactionResult> {
2381        if transaction.kind().is_system_tx() {
2382            return Err(SuiErrorKind::UnsupportedFeatureError {
2383                error: "simulate does not support system transactions".to_string(),
2384            }
2385            .into());
2386        }
2387
2388        let epoch_store = self.load_epoch_store_one_call_per_task();
2389        if !self.is_fullnode(&epoch_store) {
2390            return Err(SuiErrorKind::UnsupportedFeatureError {
2391                error: "simulate is only supported on fullnodes".to_string(),
2392            }
2393            .into());
2394        }
2395
2396        let dev_inspect = checks.disabled();
2397        if dev_inspect && self.config.dev_inspect_disabled {
2398            return Err(SuiErrorKind::UnsupportedFeatureError {
2399                error: "simulate with checks disabled is not allowed on this node".to_string(),
2400            }
2401            .into());
2402        }
2403
2404        // Reject coin reservations in gas payment when the execution engine
2405        // doesn't support them.
2406        let protocol_config = epoch_store.protocol_config();
2407        if !protocol_config.enable_coin_reservation_obj_refs()
2408            && transaction.gas().iter().any(|obj_ref| {
2409                sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2410            })
2411        {
2412            return Err(SuiErrorKind::UnsupportedFeatureError {
2413                error:
2414                    "coin reservations in gas payment are not supported at this protocol version"
2415                        .to_string(),
2416            }
2417            .into());
2418        }
2419
2420        // Cheap validity checks for a transaction, including input size limits.
2421        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2422
2423        let input_object_kinds = transaction.input_objects()?;
2424        let receiving_object_refs = transaction.receiving_objects();
2425
2426        // Create and inject mock gas coin before pre_object_load_checks so that
2427        // funds withdrawal processing sees non-empty payment and doesn't incorrectly
2428        // create an address balance withdrawal for gas.
2429        // Skip mock gas for gasless transactions — they don't use gas coins.
2430        let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2431        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2432        {
2433            let obj = Object::new_move(
2434                MoveObject::new_gas_coin(
2435                    OBJECT_START_VERSION,
2436                    ObjectID::MAX,
2437                    DEV_INSPECT_GAS_COIN_VALUE,
2438                ),
2439                Owner::AddressOwner(transaction.gas_data().owner),
2440                TransactionDigest::genesis_marker(),
2441            );
2442            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2443            Some(obj)
2444        } else {
2445            None
2446        };
2447
2448        let declared_withdrawals = self.pre_object_load_checks(
2449            &transaction,
2450            &[],
2451            &input_object_kinds,
2452            &receiving_object_refs,
2453            epoch_store.protocol_config(),
2454        )?;
2455        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2456
2457        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2458            // We don't want to cache this transaction since it's a simulation.
2459            None,
2460            &input_object_kinds,
2461            &receiving_object_refs,
2462            epoch_store.epoch(),
2463        )?;
2464
2465        // Add mock gas to input objects after loading (it doesn't exist in the store).
2466        let mock_gas_id = mock_gas_object.map(|obj| {
2467            let id = obj.id();
2468            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2469            id
2470        });
2471
2472        let protocol_config = epoch_store.protocol_config();
2473
2474        let (gas_status, checked_input_objects) = if dev_inspect {
2475            sui_transaction_checks::check_dev_inspect_input(
2476                protocol_config,
2477                &transaction,
2478                input_objects,
2479                receiving_objects,
2480                epoch_store.reference_gas_price(),
2481            )?
2482        } else {
2483            sui_transaction_checks::check_transaction_input(
2484                epoch_store.protocol_config(),
2485                epoch_store.reference_gas_price(),
2486                &transaction,
2487                input_objects,
2488                &receiving_objects,
2489                &self.metrics.bytecode_verifier_metrics,
2490                &self.config.verifier_signing_config,
2491            )?
2492        };
2493
2494        // TODO see if we can spin up a VM once and reuse it
2495        let executor = sui_execution::executor(
2496            protocol_config,
2497            true, // silent
2498        )
2499        .expect("Creating an executor should not fail here");
2500
2501        let (mut kind, signer, gas_data) = transaction.execution_parts();
2502        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2503            self.chain_identifier,
2504            &*self.coin_reservation_resolver,
2505            signer,
2506            &mut kind,
2507            None,
2508        )?;
2509        let early_execution_error = get_early_execution_error(
2510            &transaction.digest(),
2511            &checked_input_objects,
2512            self.config.certificate_deny_config.certificate_deny_set(),
2513            &FundsWithdrawStatus::MaybeSufficient,
2514        );
2515        // Dev-inspect/simulation path (not committed): no assigned accumulator version here, so the
2516        // IFFW short-circuit applies unconditionally (`None`), matching non-mainnet execution.
2517        let execution_params = match early_execution_error {
2518            None => ExecutionOrEarlyError::ok(None),
2519            Some(errors) => ExecutionOrEarlyError::failed(errors, None),
2520        };
2521
2522        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2523
2524        // Clone inputs for potential retry if object funds check fails post-execution.
2525        let cloned_input_objects = checked_input_objects.clone();
2526        let cloned_gas = gas_data.clone();
2527        let cloned_kind = kind.clone();
2528        let tx_digest = transaction.digest();
2529        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2530        let epoch_timestamp_ms = epoch_store
2531            .epoch_start_config()
2532            .epoch_data()
2533            .epoch_start_timestamp();
2534        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2535            &tracking_store,
2536            protocol_config,
2537            self.metrics.execution_metrics.clone(),
2538            false, // expensive_checks
2539            execution_params,
2540            &epoch_id,
2541            epoch_timestamp_ms,
2542            checked_input_objects,
2543            gas_data,
2544            gas_status,
2545            kind,
2546            rewritten_inputs.clone(),
2547            signer,
2548            tx_digest,
2549            dev_inspect,
2550        );
2551
2552        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2553        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2554            let has_insufficient_object_funds = inner_temp_store
2555                .accumulator_running_max_withdraws
2556                .iter()
2557                .filter(|(id, _)| !address_funds.contains(id))
2558                .any(|(id, max_withdraw)| {
2559                    let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2560                    balance < *max_withdraw
2561                });
2562
2563            if has_insufficient_object_funds {
2564                let retry_gas_status = SuiGasStatus::new(
2565                    cloned_gas.budget,
2566                    cloned_gas.price,
2567                    epoch_store.reference_gas_price(),
2568                    protocol_config,
2569                )?;
2570                let (store, _, effects, result) = executor.dev_inspect_transaction(
2571                    &tracking_store,
2572                    protocol_config,
2573                    self.metrics.execution_metrics.clone(),
2574                    false,
2575                    ExecutionOrEarlyError::failed(
2576                        NonEmpty::new(ExecutionErrorKind::InsufficientFundsForWithdraw),
2577                        None,
2578                    ),
2579                    &epoch_id,
2580                    epoch_timestamp_ms,
2581                    cloned_input_objects,
2582                    cloned_gas,
2583                    retry_gas_status,
2584                    cloned_kind,
2585                    rewritten_inputs,
2586                    signer,
2587                    tx_digest,
2588                    dev_inspect,
2589                );
2590                (store, effects, result)
2591            } else {
2592                (inner_temp_store, effects, execution_result)
2593            }
2594        } else {
2595            (inner_temp_store, effects, execution_result)
2596        };
2597
2598        let loaded_runtime_objects = tracking_store.into_read_objects();
2599        let unchanged_loaded_runtime_objects =
2600            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2601                &transaction,
2602                &effects,
2603                &loaded_runtime_objects,
2604            );
2605
2606        let object_set = {
2607            let objects = {
2608                let mut objects = loaded_runtime_objects;
2609
2610                for o in inner_temp_store
2611                    .input_objects
2612                    .into_values()
2613                    .chain(inner_temp_store.written.into_values())
2614                {
2615                    objects.insert(o);
2616                }
2617
2618                objects
2619            };
2620
2621            let object_keys = sui_types::storage::get_transaction_object_set(
2622                &transaction,
2623                &effects,
2624                &unchanged_loaded_runtime_objects,
2625            );
2626
2627            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2628            for k in object_keys {
2629                if let Some(o) = objects.get(&k) {
2630                    set.insert(o.clone());
2631                }
2632            }
2633
2634            set
2635        };
2636
2637        Ok(SimulateTransactionResult {
2638            objects: object_set,
2639            events: effects.events_digest().map(|_| inner_temp_store.events),
2640            effects,
2641            execution_result,
2642            mock_gas_id,
2643            unchanged_loaded_runtime_objects,
2644            suggested_gas_price: self
2645                .congestion_tracker
2646                .get_suggested_gas_prices(&transaction),
2647        })
2648    }
2649
2650    /// The object ID for gas can be any object ID, even for an uncreated object
2651    #[instrument(skip_all)]
2652    pub async fn dev_inspect_transaction_block(
2653        &self,
2654        sender: SuiAddress,
2655        transaction_kind: TransactionKind,
2656        gas_price: Option<u64>,
2657        gas_budget: Option<u64>,
2658        gas_sponsor: Option<SuiAddress>,
2659        gas_objects: Option<Vec<ObjectRef>>,
2660        show_raw_txn_data_and_effects: Option<bool>,
2661        skip_checks: Option<bool>,
2662    ) -> SuiResult<DevInspectResults> {
2663        let epoch_store = self.load_epoch_store_one_call_per_task();
2664        let protocol_config = epoch_store.protocol_config();
2665        let reference_gas_price = epoch_store.reference_gas_price();
2666
2667        let skip_checks = skip_checks.unwrap_or(true);
2668        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2669
2670        // Synthesize the full TransactionData the caller would have signed.
2671        let price = gas_price.unwrap_or(reference_gas_price);
2672        let budget = gas_budget.unwrap_or(protocol_config.max_tx_gas());
2673        let owner = gas_sponsor.unwrap_or(sender);
2674        let payment = gas_objects.unwrap_or_default();
2675        let transaction = TransactionData::V1(TransactionDataV1 {
2676            kind: transaction_kind,
2677            sender,
2678            gas_data: GasData {
2679                payment,
2680                owner,
2681                price,
2682                budget,
2683            },
2684            expiration: TransactionExpiration::None,
2685        });
2686
2687        // Capture raw bytes before simulate (which may mutate gas_data for mock
2688        // gas injection).
2689        let raw_txn_data = if show_raw_txn_data_and_effects {
2690            bcs::to_bytes(&transaction).map_err(|_| {
2691                SuiErrorKind::TransactionSerializationError {
2692                    error: "Failed to serialize transaction during dev inspect".to_string(),
2693                }
2694            })?
2695        } else {
2696            vec![]
2697        };
2698
2699        // Route through `simulate_transaction`:
2700        //   skip_checks = true  → TransactionChecks::Disabled
2701        //   skip_checks = false → TransactionChecks::Enabled
2702        let checks = if skip_checks {
2703            TransactionChecks::Disabled
2704        } else {
2705            TransactionChecks::Enabled
2706        };
2707        let sim =
2708            self.simulate_transaction(transaction, checks, /* allow_mock_gas_coin */ true)?;
2709
2710        let raw_effects = if show_raw_txn_data_and_effects {
2711            bcs::to_bytes(&sim.effects).map_err(|_| {
2712                SuiErrorKind::TransactionSerializationError {
2713                    error: "Failed to serialize transaction effects during dev inspect".to_string(),
2714                }
2715            })?
2716        } else {
2717            vec![]
2718        };
2719
2720        // Resolve event/object layouts against the simulate-produced ObjectSet
2721        // (newly-published packages from the simulation appear there), with the
2722        // node's backing package store as fallback for already-on-chain packages.
2723        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2724            epoch_store.protocol_config(),
2725            Box::new(OverlayBackingPackageStore::new(
2726                &sim.objects,
2727                self.get_backing_package_store(),
2728            )),
2729        );
2730
2731        DevInspectResults::new(
2732            sim.effects,
2733            sim.events.unwrap_or_default(),
2734            sim.execution_result,
2735            raw_txn_data,
2736            raw_effects,
2737            layout_resolver.as_mut(),
2738        )
2739    }
2740
2741    // Only used for testing because of how epoch store is loaded.
2742    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2743        let epoch_store = self.epoch_store_for_testing();
2744        Ok(epoch_store.reference_gas_price())
2745    }
2746
2747    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2748        self.get_transaction_cache_reader()
2749            .is_tx_already_executed(digest)
2750    }
2751
2752    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2753    fn index_tx(
2754        sequence: u64,
2755        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2756        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2757        indexes: &IndexStore,
2758        digest: &TransactionDigest,
2759        // TODO: index_tx really just need the transaction data here.
2760        cert: &VerifiedExecutableTransaction,
2761        effects: &TransactionEffects,
2762        events: &TransactionEvents,
2763        timestamp_ms: u64,
2764        tx_coins: Option<TxCoins>,
2765        written: &WrittenObjects,
2766        inner_temporary_store: &InnerTemporaryStore,
2767        epoch_store: &Arc<AuthorityPerEpochStore>,
2768        acquire_locks: bool,
2769    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2770        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2771            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2772
2773        indexes.index_tx(
2774            sequence,
2775            cert.data().intent_message().value.sender(),
2776            cert.data()
2777                .intent_message()
2778                .value
2779                .input_objects()?
2780                .iter()
2781                .map(|o| o.object_id()),
2782            effects
2783                .all_changed_objects()
2784                .into_iter()
2785                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2786            cert.data()
2787                .intent_message()
2788                .value
2789                .move_calls()
2790                .into_iter()
2791                .map(|(_cmd_idx, package, module, function)| {
2792                    (*package, module.to_owned(), function.to_owned())
2793                }),
2794            events,
2795            changes,
2796            digest,
2797            timestamp_ms,
2798            tx_coins,
2799            effects.accumulator_events(),
2800            acquire_locks,
2801        )
2802    }
2803
2804    #[cfg(msim)]
2805    fn simulate_fork_during_execution(
2806        &self,
2807        certificate: &VerifiedExecutableTransaction,
2808        epoch_store: &Arc<AuthorityPerEpochStore>,
2809        effects: &mut TransactionEffects,
2810        forked_validators: std::sync::Arc<
2811            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2812        >,
2813        full_halt: bool,
2814        effects_overrides: std::sync::Arc<
2815            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2816        >,
2817        fork_probability: f32,
2818    ) {
2819        use std::cell::RefCell;
2820        thread_local! {
2821            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2822        }
2823        if !certificate.data().intent_message().value.is_system_tx() {
2824            let committee = epoch_store.committee();
2825            let cur_stake = (**committee).weight(&self.name);
2826            if cur_stake > 0 {
2827                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2828                    let already_forked = forked_validators
2829                        .lock()
2830                        .ok()
2831                        .map(|set| set.contains(&self.name))
2832                        .unwrap_or(false);
2833
2834                    if !already_forked {
2835                        let should_fork = if full_halt {
2836                            // For full halt, fork enough nodes to reach validity threshold
2837                            *total_stake <= committee.validity_threshold()
2838                        } else {
2839                            // For partial fork, stay strictly below validity threshold
2840                            *total_stake + cur_stake < committee.validity_threshold()
2841                        };
2842
2843                        if should_fork {
2844                            *total_stake += cur_stake;
2845
2846                            if let Ok(mut external_set) = forked_validators.lock() {
2847                                external_set.insert(self.name);
2848                                info!("forked_validators: {:?}", external_set);
2849                            }
2850                        }
2851                    }
2852
2853                    if let Ok(external_set) = forked_validators.lock() {
2854                        if external_set.contains(&self.name) {
2855                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
2856                            // Fork this transaction and record the digest and the original effects to original_effects.
2857                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
2858                            // the transaction was forked on a different validator), fork this txn as well.
2859
2860                            let tx_digest = certificate.digest().to_string();
2861                            if let Ok(mut overrides) = effects_overrides.lock() {
2862                                if overrides.contains_key(&tx_digest)
2863                                    || overrides.is_empty()
2864                                        && sui_simulator::random::deterministic_probability(
2865                                            &tx_digest,
2866                                            fork_probability,
2867                                        )
2868                                {
2869                                    let original_effects_digest = effects.digest().to_string();
2870                                    overrides
2871                                        .insert(tx_digest.clone(), original_effects_digest.clone());
2872                                    info!(
2873                                        ?tx_digest,
2874                                        ?original_effects_digest,
2875                                        "Captured forked effects digest for transaction"
2876                                    );
2877                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
2878                                        1;
2879                                }
2880                            }
2881                        }
2882                    }
2883                });
2884            }
2885        }
2886    }
2887
2888    fn process_object_index(
2889        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2890        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2891        effects: &TransactionEffects,
2892        written: &WrittenObjects,
2893        inner_temporary_store: &InnerTemporaryStore,
2894        epoch_store: &Arc<AuthorityPerEpochStore>,
2895    ) -> SuiResult<ObjectIndexChanges> {
2896        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2897            epoch_store.protocol_config(),
2898            Box::new(PackageStoreWithFallback::new(
2899                inner_temporary_store,
2900                backing_package_store,
2901            )),
2902        );
2903
2904        let modified_at_version = effects
2905            .modified_at_versions()
2906            .into_iter()
2907            .collect::<HashMap<_, _>>();
2908
2909        let tx_digest = effects.transaction_digest();
2910        let mut deleted_owners = vec![];
2911        let mut deleted_dynamic_fields = vec![];
2912        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2913            let old_version = modified_at_version.get(&id).unwrap();
2914            // When we process the index, the latest object hasn't been written yet so
2915            // the old object must be present.
2916            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
2917                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2918            ) {
2919                Owner::AddressOwner(addr)
2920                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
2921                Owner::ObjectOwner(object_id) => {
2922                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2923                }
2924                _ => {}
2925            }
2926        }
2927
2928        let mut new_owners = vec![];
2929        let mut new_dynamic_fields = vec![];
2930
2931        for (oref, owner, kind) in effects.all_changed_objects() {
2932            let id = &oref.0;
2933            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
2934            if let WriteKind::Mutate = kind {
2935                let Some(old_version) = modified_at_version.get(id) else {
2936                    panic!(
2937                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2938                        tx_digest
2939                    );
2940                };
2941                // When we process the index, the latest object hasn't been written yet so
2942                // the old object must be present.
2943                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
2944                    panic!(
2945                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2946                        tx_digest, id, old_version
2947                    );
2948                };
2949                if old_object.owner != owner {
2950                    match old_object.owner {
2951                        Owner::AddressOwner(addr)
2952                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2953                            deleted_owners.push((addr, *id));
2954                        }
2955                        Owner::ObjectOwner(object_id) => {
2956                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2957                        }
2958                        _ => {}
2959                    }
2960                }
2961            }
2962
2963            match owner {
2964                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2965                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
2966                    let new_object = written.get(id).unwrap_or_else(
2967                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2968                    );
2969                    assert_eq!(
2970                        new_object.version(),
2971                        oref.1,
2972                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2973                        tx_digest,
2974                        id,
2975                        new_object.version(),
2976                        oref.1
2977                    );
2978
2979                    let type_ = new_object
2980                        .type_()
2981                        .map(|type_| ObjectType::Struct(type_.clone()))
2982                        .unwrap_or(ObjectType::Package);
2983
2984                    new_owners.push((
2985                        (addr, *id),
2986                        ObjectInfo {
2987                            object_id: *id,
2988                            version: oref.1,
2989                            digest: oref.2,
2990                            type_,
2991                            owner,
2992                            previous_transaction: *effects.transaction_digest(),
2993                        },
2994                    ));
2995                }
2996                Owner::ObjectOwner(owner) => {
2997                    let new_object = written.get(id).unwrap_or_else(
2998                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2999                    );
3000                    assert_eq!(
3001                        new_object.version(),
3002                        oref.1,
3003                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3004                        tx_digest,
3005                        id,
3006                        new_object.version(),
3007                        oref.1
3008                    );
3009
3010                    let Some(df_info) = Self::try_create_dynamic_field_info(
3011                        object_store,
3012                        new_object,
3013                        written,
3014                        layout_resolver.as_mut(),
3015                    )
3016                    .unwrap_or_else(|e| {
3017                        error!(
3018                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3019                            e, new_object
3020                        );
3021                        None
3022                    }) else {
3023                        // Skip indexing for non dynamic field objects.
3024                        continue;
3025                    };
3026                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3027                }
3028                _ => {}
3029            }
3030        }
3031
3032        Ok(ObjectIndexChanges {
3033            deleted_owners,
3034            deleted_dynamic_fields,
3035            new_owners,
3036            new_dynamic_fields,
3037        })
3038    }
3039
3040    fn try_create_dynamic_field_info(
3041        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3042        o: &Object,
3043        written: &WrittenObjects,
3044        resolver: &mut dyn LayoutResolver,
3045    ) -> SuiResult<Option<DynamicFieldInfo>> {
3046        // Skip if not a move object
3047        let Some(move_object) = o.data.try_as_move().cloned() else {
3048            return Ok(None);
3049        };
3050
3051        // We only index dynamic field objects
3052        if !move_object.type_().is_dynamic_field() {
3053            return Ok(None);
3054        }
3055
3056        let layout = resolver
3057            .get_annotated_layout(&move_object.type_().clone().into())?
3058            .into_layout();
3059
3060        let field =
3061            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3062                SuiErrorKind::ObjectDeserializationError {
3063                    error: e.to_string(),
3064                }
3065            })?;
3066
3067        let type_ = field.kind;
3068        let name_type: TypeTag = field.name_layout.into();
3069        let bcs_name = field.name_bytes.to_owned();
3070
3071        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3072            .map_err(|e| {
3073                warn!("{e}");
3074                SuiErrorKind::ObjectDeserializationError {
3075                    error: e.to_string(),
3076                }
3077            })?;
3078
3079        let name = DynamicFieldName {
3080            type_: name_type,
3081            value: SuiMoveValue::from(name_value).to_json_value(),
3082        };
3083
3084        let value_metadata = field.value_metadata().map_err(|e| {
3085            warn!("{e}");
3086            SuiErrorKind::ObjectDeserializationError {
3087                error: e.to_string(),
3088            }
3089        })?;
3090
3091        Ok(Some(match value_metadata {
3092            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3093                name,
3094                bcs_name,
3095                type_,
3096                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3097                object_id: o.id(),
3098                version: o.version(),
3099                digest: o.digest(),
3100            },
3101
3102            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3103                // Find the actual object from storage using the object id obtained from the wrapper.
3104
3105                // Try to find the object in the written objects first.
3106                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3107                    let version = object.version();
3108                    let digest = object.digest();
3109                    let object_type = object.data.type_().unwrap().clone();
3110                    (version, digest, object_type)
3111                } else {
3112                    // If not found, try to find it in the database.
3113                    let object = object_store
3114                        .get_object_by_key(&object_id, o.version())
3115                        .ok_or_else(|| UserInputError::ObjectNotFound {
3116                            object_id,
3117                            version: Some(o.version()),
3118                        })?;
3119                    let version = object.version();
3120                    let digest = object.digest();
3121                    let object_type = object.data.type_().unwrap().clone();
3122                    (version, digest, object_type)
3123                };
3124
3125                DynamicFieldInfo {
3126                    name,
3127                    bcs_name,
3128                    type_,
3129                    object_type: object_type.to_string(),
3130                    object_id,
3131                    version,
3132                    digest,
3133                }
3134            }
3135        }))
3136    }
3137
3138    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3139    fn post_process_one_tx(
3140        &self,
3141        certificate: &VerifiedExecutableTransaction,
3142        effects: &TransactionEffects,
3143        inner_temporary_store: &InnerTemporaryStore,
3144        epoch_store: &Arc<AuthorityPerEpochStore>,
3145    ) -> SuiResult {
3146        let Some(indexes) = &self.indexes else {
3147            return Ok(());
3148        };
3149
3150        let tx_digest = *certificate.digest();
3151
3152        // Allocate sequence number on the calling thread to preserve execution order.
3153        let sequence = indexes.allocate_sequence_number();
3154
3155        if self.config.sync_post_process_one_tx {
3156            // Synchronous mode: run post-processing inline on the calling thread
3157            // and commit the index batch immediately with locks held.
3158            // Used as a rollback mechanism and for testing correctness against async mode.
3159            // TODO: delete this branch once async mode has shipped
3160            let result = Self::post_process_one_tx_impl(
3161                sequence,
3162                indexes,
3163                &self.subscription_handler,
3164                &self.metrics,
3165                self.name,
3166                self.get_backing_package_store(),
3167                self.get_object_store(),
3168                certificate,
3169                effects,
3170                inner_temporary_store,
3171                epoch_store,
3172                true, // acquire_locks
3173            );
3174
3175            match result {
3176                Ok((raw_batch, cache_updates_with_locks)) => {
3177                    let mut db_batch = indexes.new_db_batch();
3178                    db_batch
3179                        .concat(vec![raw_batch])
3180                        .expect("failed to absorb raw index batch");
3181                    // Destructure to keep _locks alive through commit_index_batch.
3182                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3183                        cache_updates_with_locks;
3184                    indexes
3185                        .commit_index_batch(db_batch, vec![inner])
3186                        .expect("failed to commit index batch");
3187                }
3188                Err(e) => {
3189                    self.metrics.post_processing_total_failures.inc();
3190                    error!(?tx_digest, "tx post processing failed: {e}");
3191                    return Err(e);
3192                }
3193            }
3194
3195            return Ok(());
3196        }
3197
3198        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3199        self.pending_post_processing.insert(tx_digest, done_rx);
3200
3201        let indexes = indexes.clone();
3202        let subscription_handler = self.subscription_handler.clone();
3203        let metrics = self.metrics.clone();
3204        let name = self.name;
3205        let backing_package_store = self.get_backing_package_store().clone();
3206        let object_store = self.get_object_store().clone();
3207        let semaphore = self.post_processing_semaphore.clone();
3208
3209        let certificate = certificate.clone();
3210        let effects = effects.clone();
3211        let inner_temporary_store = inner_temporary_store.clone();
3212        let epoch_store = epoch_store.clone();
3213
3214        // spawn post processing on a blocking thread
3215        tokio::spawn(async move {
3216            let permit = {
3217                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3218                semaphore
3219                    .acquire_owned()
3220                    .await
3221                    .expect("post-processing semaphore should not be closed")
3222            };
3223
3224            let _ = tokio::task::spawn_blocking(move || {
3225                let _permit = permit;
3226
3227                let result = Self::post_process_one_tx_impl(
3228                    sequence,
3229                    &indexes,
3230                    &subscription_handler,
3231                    &metrics,
3232                    name,
3233                    &backing_package_store,
3234                    &object_store,
3235                    &certificate,
3236                    &effects,
3237                    &inner_temporary_store,
3238                    &epoch_store,
3239                    false, // acquire_locks
3240                );
3241
3242                match result {
3243                    Ok((raw_batch, cache_updates_with_locks)) => {
3244                        fail_point!("crash-after-post-process-one-tx");
3245                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3246                        let _ = done_tx.send(output);
3247                    }
3248                    Err(e) => {
3249                        metrics.post_processing_total_failures.inc();
3250                        error!(?tx_digest, "tx post processing failed: {e}");
3251                    }
3252                }
3253            })
3254            .await;
3255        });
3256
3257        Ok(())
3258    }
3259
3260    fn post_process_one_tx_impl(
3261        sequence: u64,
3262        indexes: &Arc<IndexStore>,
3263        subscription_handler: &Arc<SubscriptionHandler>,
3264        metrics: &Arc<AuthorityMetrics>,
3265        name: AuthorityName,
3266        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3267        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3268        certificate: &VerifiedExecutableTransaction,
3269        effects: &TransactionEffects,
3270        inner_temporary_store: &InnerTemporaryStore,
3271        epoch_store: &Arc<AuthorityPerEpochStore>,
3272        acquire_locks: bool,
3273    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3274        let _scope = monitored_scope("Execution::post_process_one_tx");
3275
3276        let tx_digest = certificate.digest();
3277        let timestamp_ms = Self::unixtime_now_ms();
3278        let events = &inner_temporary_store.events;
3279        let written = &inner_temporary_store.written;
3280        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3281            name,
3282            object_store,
3283            effects,
3284            inner_temporary_store,
3285            epoch_store,
3286        );
3287
3288        let (raw_batch, cache_updates) = Self::index_tx(
3289            sequence,
3290            backing_package_store,
3291            object_store,
3292            indexes,
3293            tx_digest,
3294            certificate,
3295            effects,
3296            events,
3297            timestamp_ms,
3298            tx_coins,
3299            written,
3300            inner_temporary_store,
3301            epoch_store,
3302            acquire_locks,
3303        )
3304        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3305        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3306        .expect("Indexing tx should not fail");
3307
3308        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3309        let events = Self::make_transaction_block_events(
3310            backing_package_store,
3311            events.clone(),
3312            *tx_digest,
3313            timestamp_ms,
3314            epoch_store,
3315            inner_temporary_store,
3316        )?;
3317        // Emit events
3318        subscription_handler
3319            .process_tx(certificate.data().transaction_data(), &effects, &events)
3320            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3321            .tap_err(|e| {
3322                warn!(
3323                    ?tx_digest,
3324                    "Post processing - Couldn't process events for tx: {}", e
3325                )
3326            })?;
3327
3328        metrics
3329            .post_processing_total_events_emitted
3330            .inc_by(events.data.len() as u64);
3331
3332        Ok((raw_batch, cache_updates))
3333    }
3334
3335    fn make_transaction_block_events(
3336        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3337        transaction_events: TransactionEvents,
3338        digest: TransactionDigest,
3339        timestamp_ms: u64,
3340        epoch_store: &Arc<AuthorityPerEpochStore>,
3341        inner_temporary_store: &InnerTemporaryStore,
3342    ) -> SuiResult<SuiTransactionBlockEvents> {
3343        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3344            epoch_store.protocol_config(),
3345            Box::new(PackageStoreWithFallback::new(
3346                inner_temporary_store,
3347                backing_package_store,
3348            )),
3349        );
3350        SuiTransactionBlockEvents::try_from(
3351            transaction_events,
3352            digest,
3353            Some(timestamp_ms),
3354            layout_resolver.as_mut(),
3355        )
3356    }
3357
3358    pub fn unixtime_now_ms() -> u64 {
3359        let now = SystemTime::now()
3360            .duration_since(UNIX_EPOCH)
3361            .expect("Time went backwards")
3362            .as_millis();
3363        u64::try_from(now).expect("Travelling in time machine")
3364    }
3365
3366    // TODO(fastpath): update this handler for Mysticeti fastpath.
3367    // There will no longer be validator quorum signed transactions or effects.
3368    // The proof of finality needs to come from checkpoints.
3369    #[instrument(level = "trace", skip_all)]
3370    pub async fn handle_transaction_info_request(
3371        &self,
3372        request: TransactionInfoRequest,
3373    ) -> SuiResult<TransactionInfoResponse> {
3374        let epoch_store = self.load_epoch_store_one_call_per_task();
3375        let (transaction, status) = self
3376            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3377            .ok_or(SuiErrorKind::TransactionNotFound {
3378                digest: request.transaction_digest,
3379            })?;
3380        Ok(TransactionInfoResponse {
3381            transaction,
3382            status,
3383        })
3384    }
3385
3386    #[instrument(level = "trace", skip_all)]
3387    pub async fn handle_object_info_request(
3388        &self,
3389        request: ObjectInfoRequest,
3390    ) -> SuiResult<ObjectInfoResponse> {
3391        let epoch_store = self.load_epoch_store_one_call_per_task();
3392
3393        let requested_object_seq = match request.request_kind {
3394            ObjectInfoRequestKind::LatestObjectInfo => {
3395                let (_, seq, _) =
3396                    self.get_object_or_tombstone(request.object_id)
3397                        .ok_or_else(|| {
3398                            SuiError::from(UserInputError::ObjectNotFound {
3399                                object_id: request.object_id,
3400                                version: None,
3401                            })
3402                        })?;
3403                seq
3404            }
3405            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3406        };
3407
3408        let object = self
3409            .get_object_store()
3410            .get_object_by_key(&request.object_id, requested_object_seq)
3411            .ok_or_else(|| {
3412                SuiError::from(UserInputError::ObjectNotFound {
3413                    object_id: request.object_id,
3414                    version: Some(requested_object_seq),
3415                })
3416            })?;
3417
3418        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3419            (request.generate_layout, object.data.try_as_move())
3420        {
3421            let epoch_store = self.load_epoch_store_one_call_per_task();
3422            Some(into_struct_layout(
3423                epoch_store
3424                    .executor()
3425                    .type_layout_resolver(
3426                        epoch_store.protocol_config(),
3427                        Box::new(self.get_backing_package_store().as_ref()),
3428                    )
3429                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3430            )?)
3431        } else {
3432            None
3433        };
3434
3435        let lock = if !object.is_address_owned() {
3436            // Only address owned objects have locks.
3437            None
3438        } else {
3439            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)?
3440                .map(|s| s.into_inner())
3441        };
3442
3443        Ok(ObjectInfoResponse {
3444            object,
3445            layout,
3446            lock_for_debugging: lock,
3447        })
3448    }
3449
3450    #[instrument(level = "trace", skip_all)]
3451    pub fn handle_checkpoint_request(
3452        &self,
3453        request: &CheckpointRequest,
3454    ) -> SuiResult<CheckpointResponse> {
3455        let summary = match request.sequence_number {
3456            Some(seq) => self
3457                .checkpoint_store
3458                .get_checkpoint_by_sequence_number(seq)?,
3459            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3460        }
3461        .map(|v| v.into_inner());
3462        let contents = match &summary {
3463            Some(s) => self
3464                .checkpoint_store
3465                .get_checkpoint_contents(&s.content_digest)?,
3466            None => None,
3467        };
3468        Ok(CheckpointResponse {
3469            checkpoint: summary,
3470            contents,
3471        })
3472    }
3473
3474    #[instrument(level = "trace", skip_all)]
3475    pub fn handle_checkpoint_request_v2(
3476        &self,
3477        request: &CheckpointRequestV2,
3478    ) -> SuiResult<CheckpointResponseV2> {
3479        let summary = if request.certified {
3480            let summary = match request.sequence_number {
3481                Some(seq) => self
3482                    .checkpoint_store
3483                    .get_checkpoint_by_sequence_number(seq)?,
3484                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3485            }
3486            .map(|v| v.into_inner());
3487            summary.map(CheckpointSummaryResponse::Certified)
3488        } else {
3489            let summary = match request.sequence_number {
3490                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3491                None => self
3492                    .checkpoint_store
3493                    .get_latest_locally_computed_checkpoint()?,
3494            };
3495            summary.map(CheckpointSummaryResponse::Pending)
3496        };
3497        let contents = match &summary {
3498            Some(s) => self
3499                .checkpoint_store
3500                .get_checkpoint_contents(&s.content_digest())?,
3501            None => None,
3502        };
3503        Ok(CheckpointResponseV2 {
3504            checkpoint: summary,
3505            contents,
3506        })
3507    }
3508
3509    fn check_protocol_version(
3510        supported_protocol_versions: SupportedProtocolVersions,
3511        current_version: ProtocolVersion,
3512    ) {
3513        info!("current protocol version is now {:?}", current_version);
3514        info!("supported versions are: {:?}", supported_protocol_versions);
3515        if !supported_protocol_versions.is_version_supported(current_version) {
3516            let msg = format!(
3517                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3518                current_version, supported_protocol_versions,
3519            );
3520
3521            error!("{}", msg);
3522            eprintln!("{}", msg);
3523
3524            #[cfg(not(msim))]
3525            std::process::exit(1);
3526
3527            #[cfg(msim)]
3528            sui_simulator::task::shutdown_current_node();
3529        }
3530    }
3531
3532    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3533    #[allow(clippy::too_many_arguments)]
3534    pub async fn new(
3535        name: AuthorityName,
3536        secret: StableSyncAuthoritySigner,
3537        supported_protocol_versions: SupportedProtocolVersions,
3538        store: Arc<AuthorityStore>,
3539        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3540        epoch_store: Arc<AuthorityPerEpochStore>,
3541        committee_store: Arc<CommitteeStore>,
3542        indexes: Option<Arc<IndexStore>>,
3543        rpc_index: Option<Arc<RpcIndexStore>>,
3544        checkpoint_store: Arc<CheckpointStore>,
3545        prometheus_registry: &Registry,
3546        genesis_objects: &[Object],
3547        db_checkpoint_config: &DBCheckpointConfig,
3548        config: NodeConfig,
3549        chain_identifier: ChainIdentifier,
3550        policy_config: Option<PolicyConfig>,
3551        firewall_config: Option<RemoteFirewallConfig>,
3552        pruner_watermarks: Arc<PrunerWatermarks>,
3553    ) -> Arc<Self> {
3554        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3555
3556        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3557        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3558        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3559            execution_cache_trait_pointers.object_cache_reader.clone(),
3560            execution_cache_trait_pointers.account_funds_read.clone(),
3561            execution_cache_trait_pointers
3562                .transaction_cache_reader
3563                .clone(),
3564            tx_ready_certificates,
3565            &epoch_store,
3566            config.funds_withdraw_scheduler_type,
3567            metrics.clone(),
3568            prometheus_registry,
3569        ));
3570        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3571
3572        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3573            epoch_store.get_parent_path(),
3574            &config.authority_store_pruning_config,
3575        );
3576        let _pruner = AuthorityStorePruner::new(
3577            store.perpetual_tables.clone(),
3578            checkpoint_store.clone(),
3579            rpc_index.clone(),
3580            indexes.clone(),
3581            config.authority_store_pruning_config.clone(),
3582            epoch_store.committee().authority_exists(&name),
3583            epoch_store.epoch_start_state().epoch_duration_ms(),
3584            prometheus_registry,
3585            pruner_watermarks,
3586        );
3587        let input_loader =
3588            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3589        let epoch = epoch_store.epoch();
3590        let traffic_controller_metrics =
3591            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3592        let traffic_controller = if let Some(policy_config) = policy_config {
3593            Some(Arc::new(
3594                TrafficController::init(
3595                    policy_config,
3596                    traffic_controller_metrics,
3597                    firewall_config.clone(),
3598                )
3599                .await,
3600            ))
3601        } else {
3602            None
3603        };
3604
3605        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3606            ForkRecoveryState::new(Some(fork_config))
3607                .expect("Failed to initialize fork recovery state")
3608        });
3609
3610        let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3611            execution_cache_trait_pointers.child_object_resolver.clone(),
3612        ));
3613
3614        let object_funds_checker_metrics =
3615            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3616        let state = Arc::new(AuthorityState {
3617            name,
3618            secret,
3619            execution_lock: RwLock::new(epoch),
3620            epoch_store: ArcSwap::new(epoch_store.clone()),
3621            input_loader,
3622            execution_cache_trait_pointers,
3623            coin_reservation_resolver,
3624            indexes,
3625            rpc_index,
3626            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3627            checkpoint_store,
3628            committee_store,
3629            execution_scheduler,
3630            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3631            metrics,
3632            _pruner,
3633            _authority_per_epoch_pruner,
3634            db_checkpoint_config: db_checkpoint_config.clone(),
3635            config,
3636            overload_info: AuthorityOverloadInfo::default(),
3637            chain_identifier,
3638            congestion_tracker: Arc::new(CongestionTracker::new()),
3639            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3640            traffic_controller,
3641            fork_recovery_state,
3642            notify_epoch: tokio::sync::watch::channel(epoch).0,
3643            object_funds_checker: ArcSwapOption::empty(),
3644            object_funds_checker_metrics,
3645            pending_post_processing: Arc::new(DashMap::new()),
3646            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3647        });
3648        state.init_object_funds_checker().await;
3649
3650        // Start a task to execute ready certificates.
3651        let authority_state = Arc::downgrade(&state);
3652        spawn_monitored_task!(execution_process(
3653            authority_state,
3654            rx_ready_certificates,
3655            rx_execution_shutdown,
3656        ));
3657        // TODO: This doesn't belong to the constructor of AuthorityState.
3658        state
3659            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3660            .expect("Error indexing genesis objects.");
3661
3662        if epoch_store
3663            .protocol_config()
3664            .enable_multi_epoch_transaction_expiration()
3665            && epoch_store.epoch() > 0
3666        {
3667            use typed_store::Map;
3668            let previous_epoch = epoch_store.epoch() - 1;
3669            let start_key = (previous_epoch, TransactionDigest::ZERO);
3670            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3671            let has_previous_epoch_data = store
3672                .perpetual_tables
3673                .executed_transaction_digests
3674                .safe_range_iter(start_key..end_key)
3675                .next()
3676                .is_some();
3677
3678            if !has_previous_epoch_data {
3679                panic!(
3680                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3681                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3682                    Please restore from a snapshot using the latest version of sui-tool.",
3683                    previous_epoch
3684                );
3685            }
3686        }
3687
3688        state
3689    }
3690
3691    async fn init_object_funds_checker(&self) {
3692        let epoch_store = self.epoch_store.load();
3693        if self.is_validator(&epoch_store)
3694            && epoch_store.protocol_config().enable_object_funds_withdraw()
3695        {
3696            if self.object_funds_checker.load().is_none() {
3697                let inner = self.get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID).map(|o| {
3698                    Arc::new(ObjectFundsChecker::new(
3699                        o.version(),
3700                        self.object_funds_checker_metrics.clone(),
3701                    ))
3702                });
3703                self.object_funds_checker.store(inner);
3704            }
3705        } else {
3706            self.object_funds_checker.store(None);
3707        }
3708    }
3709
3710    // TODO: Consolidate our traits to reduce the number of methods here.
3711    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3712        &self.execution_cache_trait_pointers.object_cache_reader
3713    }
3714
3715    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3716        &self.execution_cache_trait_pointers.transaction_cache_reader
3717    }
3718
3719    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3720        &self.execution_cache_trait_pointers.cache_writer
3721    }
3722
3723    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3724        &self.execution_cache_trait_pointers.backing_store
3725    }
3726
3727    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3728        &self.execution_cache_trait_pointers.child_object_resolver
3729    }
3730
3731    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3732        &self.execution_cache_trait_pointers.account_funds_read
3733    }
3734
3735    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3736        &self.execution_cache_trait_pointers.backing_package_store
3737    }
3738
3739    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3740        &self.execution_cache_trait_pointers.object_store
3741    }
3742
3743    pub fn pending_post_processing(
3744        &self,
3745    ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3746        &self.pending_post_processing
3747    }
3748
3749    pub async fn await_post_processing(
3750        &self,
3751        tx_digest: &TransactionDigest,
3752    ) -> Option<PostProcessingOutput> {
3753        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3754            // Tx was executed and post-processing is in flight.
3755            rx.await.ok()
3756        } else {
3757            // Tx was already persisted or post-processing already completed.
3758            None
3759        }
3760    }
3761
3762    /// Await post-processing for a transaction and commit the index batch immediately.
3763    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
3764    /// index batches at checkpoint boundaries.
3765    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3766        if let Some(indexes) = &self.indexes
3767            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3768        {
3769            let mut db_batch = indexes.new_db_batch();
3770            db_batch
3771                .concat(vec![raw_batch])
3772                .expect("failed to build index batch");
3773            indexes
3774                .commit_index_batch(db_batch, vec![cache_updates])
3775                .expect("failed to commit index batch");
3776        }
3777    }
3778
3779    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3780        &self.execution_cache_trait_pointers.reconfig_api
3781    }
3782
3783    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3784        &self.execution_cache_trait_pointers.global_state_hash_store
3785    }
3786
3787    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3788        &self.execution_cache_trait_pointers.checkpoint_cache
3789    }
3790
3791    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3792        &self.execution_cache_trait_pointers.state_sync_store
3793    }
3794
3795    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3796        &self.execution_cache_trait_pointers.cache_commit
3797    }
3798
3799    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3800        self.execution_cache_trait_pointers
3801            .testing_api
3802            .database_for_testing()
3803    }
3804
3805    pub fn cache_for_testing(&self) -> &WritebackCache {
3806        self.execution_cache_trait_pointers
3807            .testing_api
3808            .cache_for_testing()
3809    }
3810
3811    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3812        &self,
3813        config: NodeConfig,
3814        metrics: Arc<AuthorityStorePruningMetrics>,
3815    ) -> anyhow::Result<()> {
3816        use crate::authority::authority_store_pruner::PrunerWatermarks;
3817        let watermarks = Arc::new(PrunerWatermarks::default());
3818        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3819            &self.database_for_testing().perpetual_tables,
3820            &self.checkpoint_store,
3821            self.rpc_index.as_deref(),
3822            config.authority_store_pruning_config,
3823            metrics,
3824            EPOCH_DURATION_MS_FOR_TESTING,
3825            &watermarks,
3826        )
3827        .await
3828    }
3829
3830    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3831        &self.execution_scheduler
3832    }
3833
3834    fn create_owner_index_if_empty(
3835        &self,
3836        genesis_objects: &[Object],
3837        epoch_store: &Arc<AuthorityPerEpochStore>,
3838    ) -> SuiResult {
3839        let Some(index_store) = &self.indexes else {
3840            return Ok(());
3841        };
3842        if !index_store.is_empty() {
3843            return Ok(());
3844        }
3845
3846        let mut new_owners = vec![];
3847        let mut new_dynamic_fields = vec![];
3848        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3849            epoch_store.protocol_config(),
3850            Box::new(self.get_backing_package_store().as_ref()),
3851        );
3852        for o in genesis_objects.iter() {
3853            match o.owner {
3854                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3855                    new_owners.push((
3856                        (addr, o.id()),
3857                        ObjectInfo::new(&o.compute_object_reference(), o),
3858                    ))
3859                }
3860                Owner::ObjectOwner(object_id) => {
3861                    let id = o.id();
3862                    let Some(info) = Self::try_create_dynamic_field_info(
3863                        self.get_object_store(),
3864                        o,
3865                        &BTreeMap::new(),
3866                        layout_resolver.as_mut(),
3867                    )?
3868                    else {
3869                        continue;
3870                    };
3871                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3872                }
3873                _ => {}
3874            }
3875        }
3876
3877        index_store.insert_genesis_objects(ObjectIndexChanges {
3878            deleted_owners: vec![],
3879            deleted_dynamic_fields: vec![],
3880            new_owners,
3881            new_dynamic_fields,
3882        })
3883    }
3884
3885    /// Attempts to acquire execution lock for an executable transaction.
3886    /// Returns Some(lock) if the transaction is matching current executed epoch.
3887    /// Returns None if validator is halted at epoch end or epoch mismatch.
3888    pub fn execution_lock_for_executable_transaction(
3889        &self,
3890        transaction: &VerifiedExecutableTransaction,
3891    ) -> Option<ExecutionLockReadGuard<'_>> {
3892        let lock = self.execution_lock.try_read().ok()?;
3893        if *lock == transaction.auth_sig().epoch() {
3894            Some(lock)
3895        } else {
3896            // TODO: Can this still happen?
3897            None
3898        }
3899    }
3900
3901    /// Acquires the execution lock for the duration of transaction validation.
3902    /// This prevents reconfiguration from starting until we are finished validating the transaction.
3903    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3904        self.execution_lock
3905            .try_read()
3906            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3907    }
3908
3909    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3910        self.execution_lock.write().await
3911    }
3912
3913    #[instrument(level = "error", skip_all)]
3914    pub async fn reconfigure(
3915        &self,
3916        cur_epoch_store: &AuthorityPerEpochStore,
3917        supported_protocol_versions: SupportedProtocolVersions,
3918        new_committee: Committee,
3919        epoch_start_configuration: EpochStartConfiguration,
3920        state_hasher: Arc<GlobalStateHasher>,
3921        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3922        epoch_last_checkpoint: CheckpointSequenceNumber,
3923    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3924        Self::check_protocol_version(
3925            supported_protocol_versions,
3926            epoch_start_configuration
3927                .epoch_start_state()
3928                .protocol_version(),
3929        );
3930
3931        self.committee_store.insert_new_committee(&new_committee)?;
3932
3933        // Wait until no transactions are being executed.
3934        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3935
3936        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3937        cur_epoch_store.epoch_terminated().await;
3938
3939        // Safe to being reconfiguration now. No transactions are being executed,
3940        // and no epoch-specific tasks are running.
3941
3942        {
3943            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3944            if state.should_accept_user_certs() {
3945                // Need to change this so that consensus adapter do not accept certificates from user.
3946                // This can happen if our local validator did not initiate epoch change locally,
3947                // but 2f+1 nodes already concluded the epoch.
3948                //
3949                // This lock is essentially a barrier for
3950                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
3951                cur_epoch_store.close_user_certs(state);
3952            }
3953            // lock is dropped here
3954        }
3955
3956        self.get_reconfig_api()
3957            .clear_state_end_of_epoch(&execution_lock);
3958        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3959        self.maybe_reaccumulate_state_hash(
3960            cur_epoch_store,
3961            epoch_start_configuration
3962                .epoch_start_state()
3963                .protocol_version(),
3964        );
3965        self.get_reconfig_api()
3966            .set_epoch_start_configuration(&epoch_start_configuration);
3967        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3968            && self
3969                .db_checkpoint_config
3970                .perform_db_checkpoints_at_epoch_end
3971        {
3972            let checkpoint_indexes = self
3973                .db_checkpoint_config
3974                .perform_index_db_checkpoints_at_epoch_end
3975                .unwrap_or(false);
3976            let current_epoch = cur_epoch_store.epoch();
3977            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3978            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3979        }
3980
3981        self.get_reconfig_api()
3982            .reconfigure_cache(&epoch_start_configuration)
3983            .await;
3984
3985        let new_epoch = new_committee.epoch;
3986        let new_epoch_store = self
3987            .reopen_epoch_db(
3988                cur_epoch_store,
3989                new_committee,
3990                epoch_start_configuration,
3991                expensive_safety_check_config,
3992                epoch_last_checkpoint,
3993            )
3994            .await?;
3995        assert_eq!(new_epoch_store.epoch(), new_epoch);
3996        self.execution_scheduler
3997            .reconfigure(&new_epoch_store, self.get_account_funds_read());
3998        self.init_object_funds_checker().await;
3999        *execution_lock = new_epoch;
4000
4001        self.notify_epoch(new_epoch);
4002        // drop execution_lock after epoch store was updated
4003        // see also assert in AuthorityState::process_certificate
4004        // on the epoch store and execution lock epoch match
4005        Ok(new_epoch_store)
4006    }
4007
4008    fn notify_epoch(&self, new_epoch: EpochId) {
4009        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4010    }
4011
4012    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4013        let mut rx = self.notify_epoch.subscribe();
4014        loop {
4015            let epoch = *rx.borrow();
4016            if epoch >= target_epoch {
4017                return Ok(epoch);
4018            }
4019            rx.changed().await?;
4020        }
4021    }
4022
4023    /// Executes accumulator settlement for testing purposes.
4024    /// Returns a list of (transaction, execution_env) pairs that can be replayed on another
4025    /// AuthorityState (e.g., a fullnode) using `replay_settlement_for_testing`.
4026    pub async fn settle_accumulator_for_testing(
4027        &self,
4028        effects: &[TransactionEffects],
4029        checkpoint_seq: Option<u64>,
4030    ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4031        let accumulator_version = self
4032            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4033            .unwrap()
4034            .version();
4035        // Use provided checkpoint sequence, or fall back to accumulator version.
4036        let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4037        let builder = AccumulatorSettlementTxBuilder::new(
4038            Some(self.get_transaction_cache_reader().as_ref()),
4039            effects,
4040            ckpt_seq,
4041            0,
4042        );
4043        let balance_changes = builder.collect_funds_changes();
4044        let epoch_store = self.epoch_store_for_testing();
4045        let epoch = epoch_store.epoch();
4046        let accumulator_root_obj_initial_shared_version = epoch_store
4047            .epoch_start_config()
4048            .accumulator_root_obj_initial_shared_version()
4049            .unwrap();
4050        let settlements = builder.build_tx(
4051            epoch_store.protocol_config(),
4052            epoch,
4053            accumulator_root_obj_initial_shared_version,
4054            ckpt_seq,
4055            ckpt_seq,
4056        );
4057
4058        let settlements: Vec<_> = settlements
4059            .into_iter()
4060            .map(|tx| {
4061                VerifiedExecutableTransaction::new_system(
4062                    VerifiedTransaction::new_system_transaction(tx),
4063                    epoch,
4064                )
4065            })
4066            .collect();
4067
4068        let assigned_versions = epoch_store
4069            .assign_shared_object_versions_for_tests(
4070                self.get_object_cache_reader().as_ref(),
4071                &settlements,
4072            )
4073            .unwrap();
4074        let version_map = assigned_versions.into_map();
4075
4076        let mut replay_txns = Vec::new();
4077        let mut settlement_effects = Vec::with_capacity(settlements.len());
4078        for tx in settlements {
4079            let assigned = version_map.get(&tx.key()).unwrap().clone();
4080            let env = ExecutionEnv::new().with_assigned_versions(assigned);
4081            let (effects, _) = self
4082                .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4083                .unwrap();
4084            assert!(effects.status().is_ok());
4085            replay_txns.push((tx, env));
4086            settlement_effects.push(effects);
4087        }
4088
4089        let barrier = accumulators::build_accumulator_barrier_tx(
4090            epoch,
4091            accumulator_root_obj_initial_shared_version,
4092            ckpt_seq,
4093            &settlement_effects,
4094        );
4095        let barrier = VerifiedExecutableTransaction::new_system(
4096            VerifiedTransaction::new_system_transaction(barrier),
4097            epoch,
4098        );
4099
4100        let assigned_versions = epoch_store
4101            .assign_shared_object_versions_for_tests(
4102                self.get_object_cache_reader().as_ref(),
4103                std::slice::from_ref(&barrier),
4104            )
4105            .unwrap();
4106        let version_map = assigned_versions.into_map();
4107
4108        let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4109        let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4110        let (effects, _) = self
4111            .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4112            .unwrap();
4113        assert!(effects.status().is_ok());
4114        replay_txns.push((barrier, env));
4115
4116        let next_accumulator_version = accumulator_version.next();
4117        self.execution_scheduler
4118            .settle_address_funds(FundsSettlement {
4119                funds_changes: balance_changes,
4120                next_accumulator_version,
4121            });
4122        // object funds are settled while executing the barrier transaction
4123
4124        replay_txns
4125    }
4126
4127    /// Replays settlement transactions on this AuthorityState.
4128    /// Used to sync a fullnode with settlement transactions executed on a validator.
4129    pub async fn replay_settlement_for_testing(
4130        &self,
4131        txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4132    ) {
4133        let epoch_store = self.epoch_store_for_testing();
4134        for (tx, env) in txns {
4135            let (effects, _) = self
4136                .try_execute_immediately(tx, env.clone(), &epoch_store)
4137                .unwrap();
4138            assert!(effects.status().is_ok());
4139        }
4140    }
4141
4142    /// Advance the epoch store to the next epoch for testing only.
4143    /// This only manually sets all the places where we have the epoch number.
4144    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4145    pub async fn reconfigure_for_testing(&self) {
4146        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4147        let epoch_store = self.epoch_store_for_testing().clone();
4148        let protocol_config = epoch_store.protocol_config().clone();
4149        // The current protocol config used in the epoch store may have been overridden and diverged from
4150        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4151        // We reapply the override before creating the new epoch store, to make sure that
4152        // the new epoch store has the same protocol config as the current one.
4153        // Since this is for testing only, we mostly like to keep the protocol config the same
4154        // across epochs.
4155        let _guard =
4156            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4157        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4158            self.get_backing_package_store().clone(),
4159            self.get_object_store().clone(),
4160            &self.config.expensive_safety_check_config,
4161            self.checkpoint_store
4162                .get_epoch_last_checkpoint(epoch_store.epoch())
4163                .unwrap()
4164                .map(|c| *c.sequence_number())
4165                .unwrap_or_default(),
4166        );
4167        self.execution_scheduler
4168            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4169        let new_epoch = new_epoch_store.epoch();
4170        self.epoch_store.store(new_epoch_store);
4171        epoch_store.epoch_terminated().await;
4172
4173        *execution_lock = new_epoch;
4174    }
4175
4176    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4177    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4178    #[instrument(level = "error", skip_all)]
4179    fn maybe_reaccumulate_state_hash(
4180        &self,
4181        cur_epoch_store: &AuthorityPerEpochStore,
4182        new_protocol_version: ProtocolVersion,
4183    ) {
4184        self.get_reconfig_api()
4185            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4186    }
4187
4188    #[instrument(level = "error", skip_all)]
4189    fn check_system_consistency(
4190        &self,
4191        cur_epoch_store: &AuthorityPerEpochStore,
4192        state_hasher: Arc<GlobalStateHasher>,
4193        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4194    ) {
4195        info!(
4196            "Performing sui conservation consistency check for epoch {}",
4197            cur_epoch_store.epoch()
4198        );
4199
4200        if let Err(err) = self
4201            .get_reconfig_api()
4202            .expensive_check_sui_conservation(cur_epoch_store)
4203        {
4204            if cfg!(debug_assertions) {
4205                panic!("{}", err);
4206            } else {
4207                // We cannot panic in production yet because it is known that there are some
4208                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4209                warn!("Sui conservation consistency check failed: {}", err);
4210            }
4211        } else {
4212            info!("Sui conservation consistency check passed");
4213        }
4214
4215        // check for root state hash consistency with live object set
4216        if expensive_safety_check_config.enable_state_consistency_check() {
4217            info!(
4218                "Performing state consistency check for epoch {}",
4219                cur_epoch_store.epoch()
4220            );
4221            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4222        }
4223
4224        // Verify all checkpointed transactions are present in transactions_seq.
4225        // This catches any post-processing gaps that could occur if async
4226        // post-processing failed to complete before persistence.
4227        if expensive_safety_check_config.enable_secondary_index_checks()
4228            && let Some(indexes) = self.indexes.clone()
4229        {
4230            let epoch = cur_epoch_store.epoch();
4231            // Only verify the current epoch's checkpoints. Previous epoch contents
4232            // may have been pruned, and we only need to verify that this epoch's
4233            // async post-processing completed correctly.
4234            let first_checkpoint = if epoch == 0 {
4235                0
4236            } else {
4237                self.checkpoint_store
4238                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4239                    .expect("Failed to get previous epoch's last checkpoint")
4240                    .expect("Previous epoch's last checkpoint missing")
4241                    + 1
4242            };
4243            let highest_executed = self
4244                .checkpoint_store
4245                .get_highest_executed_checkpoint_seq_number()
4246                .expect("Failed to get highest executed checkpoint")
4247                .expect("No executed checkpoints");
4248
4249            info!(
4250                "Verifying checkpointed transactions are in transactions_seq \
4251                 (checkpoints {first_checkpoint}..={highest_executed})"
4252            );
4253            for seq in first_checkpoint..=highest_executed {
4254                let checkpoint = self
4255                    .checkpoint_store
4256                    .get_checkpoint_by_sequence_number(seq)
4257                    .expect("Failed to get checkpoint")
4258                    .expect("Checkpoint missing");
4259                let contents = self
4260                    .checkpoint_store
4261                    .get_checkpoint_contents(&checkpoint.content_digest)
4262                    .expect("Failed to get checkpoint contents")
4263                    .expect("Checkpoint contents missing");
4264                for digests in contents.iter() {
4265                    let tx_digest = digests.transaction;
4266                    assert!(
4267                        indexes
4268                            .get_transaction_seq(&tx_digest)
4269                            .expect("Failed to read transactions_seq")
4270                            .is_some(),
4271                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4272                    );
4273                }
4274            }
4275            info!("All checkpointed transactions verified in transactions_seq");
4276        }
4277    }
4278
4279    fn expensive_check_is_consistent_state(
4280        &self,
4281        state_hasher: Arc<GlobalStateHasher>,
4282        cur_epoch_store: &AuthorityPerEpochStore,
4283    ) {
4284        let live_object_set_hash = state_hasher.digest_live_object_set(
4285            !cur_epoch_store
4286                .protocol_config()
4287                .simplified_unwrap_then_delete(),
4288        );
4289
4290        let root_state_hash: ECMHLiveObjectSetDigest = self
4291            .get_global_state_hash_store()
4292            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4293            .expect("Retrieving root state hash cannot fail")
4294            .expect("Root state hash for epoch must exist")
4295            .1
4296            .digest()
4297            .into();
4298
4299        let is_inconsistent = root_state_hash != live_object_set_hash;
4300        if is_inconsistent {
4301            debug_fatal!(
4302                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4303                root_state_hash,
4304                live_object_set_hash
4305            );
4306        } else {
4307            info!("State consistency check passed");
4308        }
4309
4310        state_hasher.set_inconsistent_state(is_inconsistent);
4311    }
4312
4313    pub fn current_epoch_for_testing(&self) -> EpochId {
4314        self.epoch_store_for_testing().epoch()
4315    }
4316
4317    #[instrument(level = "error", skip_all)]
4318    pub fn checkpoint_all_dbs(
4319        &self,
4320        checkpoint_path: &Path,
4321        cur_epoch_store: &AuthorityPerEpochStore,
4322        checkpoint_indexes: bool,
4323    ) -> SuiResult {
4324        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4325        let current_epoch = cur_epoch_store.epoch();
4326
4327        if checkpoint_path.exists() {
4328            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4329            return Ok(());
4330        }
4331
4332        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4333        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4334
4335        if checkpoint_path_tmp.exists() {
4336            fs::remove_dir_all(&checkpoint_path_tmp)
4337                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4338        }
4339
4340        fs::create_dir_all(&checkpoint_path_tmp)
4341            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4342        fs::create_dir(&store_checkpoint_path_tmp)
4343            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4344
4345        // NOTE: Do not change the order of invoking these checkpoint calls
4346        // We want to snapshot checkpoint db first to not race with state sync
4347        self.checkpoint_store
4348            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4349
4350        self.get_reconfig_api()
4351            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4352
4353        self.committee_store
4354            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4355
4356        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4357            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4358        }
4359
4360        fs::rename(checkpoint_path_tmp, checkpoint_path)
4361            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4362        Ok(())
4363    }
4364
4365    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4366    /// we never end up accessing different epoch stores in a single task, we need to make sure
4367    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4368    /// the case. This also means we should minimize the number of call-sites. Only call it when
4369    /// there is no way to obtain it from somewhere else.
4370    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4371        self.epoch_store.load()
4372    }
4373
4374    // Load the epoch store, should be used in tests only.
4375    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4376        self.load_epoch_store_one_call_per_task()
4377    }
4378
4379    pub fn clone_committee_for_testing(&self) -> Committee {
4380        Committee::clone(self.epoch_store_for_testing().committee())
4381    }
4382
4383    #[instrument(level = "trace", skip_all)]
4384    pub fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4385        self.get_object_store().get_object(object_id)
4386    }
4387
4388    pub fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4389        Ok(self
4390            .get_object(&SUI_SYSTEM_ADDRESS.into())
4391            .expect("framework object should always exist")
4392            .compute_object_reference())
4393    }
4394
4395    // This function is only used for testing.
4396    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4397        self.get_object_cache_reader()
4398            .get_sui_system_state_object_unsafe()
4399    }
4400
4401    #[instrument(level = "trace", skip_all)]
4402    fn get_transaction_checkpoint_sequence(
4403        &self,
4404        digest: &TransactionDigest,
4405        epoch_store: &AuthorityPerEpochStore,
4406    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4407        epoch_store.get_transaction_checkpoint(digest)
4408    }
4409
4410    #[instrument(level = "trace", skip_all)]
4411    pub fn get_checkpoint_by_sequence_number(
4412        &self,
4413        sequence_number: CheckpointSequenceNumber,
4414    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4415        Ok(self
4416            .checkpoint_store
4417            .get_checkpoint_by_sequence_number(sequence_number)?)
4418    }
4419
4420    #[instrument(level = "trace", skip_all)]
4421    pub fn get_transaction_checkpoint_for_tests(
4422        &self,
4423        digest: &TransactionDigest,
4424        epoch_store: &AuthorityPerEpochStore,
4425    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4426        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4427        let Some(checkpoint) = checkpoint else {
4428            return Ok(None);
4429        };
4430        let checkpoint = self
4431            .checkpoint_store
4432            .get_checkpoint_by_sequence_number(checkpoint)?;
4433        Ok(checkpoint)
4434    }
4435
4436    #[instrument(level = "trace", skip_all)]
4437    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4438        Ok(
4439            match self
4440                .get_object_cache_reader()
4441                .get_latest_object_or_tombstone(*object_id)
4442            {
4443                Some((_, ObjectOrTombstone::Object(object))) => {
4444                    let layout = self.get_object_layout(&object)?;
4445                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4446                }
4447                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4448                None => ObjectRead::NotExists(*object_id),
4449            },
4450        )
4451    }
4452
4453    /// Chain Identifier is the digest of the genesis checkpoint.
4454    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4455        self.chain_identifier
4456    }
4457
4458    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4459        self.fork_recovery_state.as_ref()
4460    }
4461
4462    #[instrument(level = "trace", skip_all)]
4463    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4464    where
4465        T: DeserializeOwned,
4466    {
4467        let o = self.get_object_read(object_id)?.into_object()?;
4468        if let Some(move_object) = o.data.try_as_move() {
4469            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4470                SuiErrorKind::ObjectDeserializationError {
4471                    error: format!("{e}"),
4472                }
4473            })?)
4474        } else {
4475            Err(SuiErrorKind::ObjectDeserializationError {
4476                error: format!("Provided object : [{object_id}] is not a Move object."),
4477            }
4478            .into())
4479        }
4480    }
4481
4482    /// This function aims to serve rpc reads on past objects and
4483    /// we don't expect it to be called for other purposes.
4484    /// Depending on the object pruning policies that will be enforced in the
4485    /// future there is no software-level guarantee/SLA to retrieve an object
4486    /// with an old version even if it exists/existed.
4487    #[instrument(level = "trace", skip_all)]
4488    pub fn get_past_object_read(
4489        &self,
4490        object_id: &ObjectID,
4491        version: SequenceNumber,
4492    ) -> SuiResult<PastObjectRead> {
4493        // Firstly we see if the object ever existed by getting its latest data
4494        let Some(obj_ref) = self
4495            .get_object_cache_reader()
4496            .get_latest_object_ref_or_tombstone(*object_id)
4497        else {
4498            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4499        };
4500
4501        if version > obj_ref.1 {
4502            return Ok(PastObjectRead::VersionTooHigh {
4503                object_id: *object_id,
4504                asked_version: version,
4505                latest_version: obj_ref.1,
4506            });
4507        }
4508
4509        if version < obj_ref.1 {
4510            // Read past objects
4511            return Ok(match self.read_object_at_version(object_id, version)? {
4512                Some((object, layout)) => {
4513                    let obj_ref = object.compute_object_reference();
4514                    PastObjectRead::VersionFound(obj_ref, object, layout)
4515                }
4516
4517                None => PastObjectRead::VersionNotFound(*object_id, version),
4518            });
4519        }
4520
4521        if !obj_ref.2.is_alive() {
4522            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4523        }
4524
4525        match self.read_object_at_version(object_id, obj_ref.1)? {
4526            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4527            None => {
4528                debug_fatal!(
4529                    "Object with in parent_entry is missing from object store, datastore is \
4530                     inconsistent"
4531                );
4532                Err(UserInputError::ObjectNotFound {
4533                    object_id: *object_id,
4534                    version: Some(obj_ref.1),
4535                }
4536                .into())
4537            }
4538        }
4539    }
4540
4541    #[instrument(level = "trace", skip_all)]
4542    fn read_object_at_version(
4543        &self,
4544        object_id: &ObjectID,
4545        version: SequenceNumber,
4546    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4547        let Some(object) = self
4548            .get_object_cache_reader()
4549            .get_object_by_key(object_id, version)
4550        else {
4551            return Ok(None);
4552        };
4553
4554        let layout = self.get_object_layout(&object)?;
4555        Ok(Some((object, layout)))
4556    }
4557
4558    pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4559        let layout = object
4560            .data
4561            .try_as_move()
4562            .map(|object| {
4563                let epoch_store = self.load_epoch_store_one_call_per_task();
4564                into_struct_layout(
4565                    epoch_store
4566                        .executor()
4567                        // TODO(cache) - must read through cache
4568                        .type_layout_resolver(
4569                            epoch_store.protocol_config(),
4570                            Box::new(self.get_backing_package_store().as_ref()),
4571                        )
4572                        .get_annotated_layout(&object.type_().clone().into())?,
4573                )
4574            })
4575            .transpose()?;
4576        Ok(layout)
4577    }
4578
4579    /// Returns a fake ObjectRef representing an address balance, along with the balance value
4580    /// and the previous transaction digest. The ObjectRef can be returned to JSON-RPC clients
4581    /// that don't understand address balances.
4582    #[instrument(level = "trace", skip_all)]
4583    pub fn get_address_balance_coin_info(
4584        &self,
4585        owner: SuiAddress,
4586        balance_type: TypeTag,
4587    ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4588        let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4589        let accumulator_obj = AccumulatorValue::load_object_by_id(
4590            self.get_child_object_resolver().as_ref(),
4591            None,
4592            *accumulator_id.inner(),
4593        )?;
4594
4595        let Some(accumulator_obj) = accumulator_obj else {
4596            return Ok(None);
4597        };
4598
4599        // Extract the currency type from balance_type (e.g., SUI from Balance<SUI>).
4600        // get_balance expects the currency type, not the balance type.
4601        let currency_type =
4602            Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4603
4604        let balance = crate::accumulators::balances::get_balance(
4605            owner,
4606            self.get_child_object_resolver().as_ref(),
4607            currency_type,
4608        )?;
4609
4610        if balance == 0 {
4611            return Ok(None);
4612        };
4613
4614        let object_ref = coin_reservation::encode_object_ref(
4615            accumulator_obj.id(),
4616            accumulator_obj.version(),
4617            self.load_epoch_store_one_call_per_task().epoch(),
4618            balance,
4619            self.get_chain_identifier(),
4620        );
4621
4622        Ok(Some((
4623            object_ref,
4624            balance,
4625            accumulator_obj.previous_transaction,
4626        )))
4627    }
4628
4629    /// Returns fake ObjectRefs for all address balances of an owner, keyed by coin type string.
4630    /// Used by get_all_coins to include fake coins for each coin type.
4631    #[instrument(level = "trace", skip_all)]
4632    pub fn get_all_address_balance_coin_infos(
4633        &self,
4634        owner: SuiAddress,
4635    ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4636        let indexes = self
4637            .indexes
4638            .as_ref()
4639            .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4640
4641        let mut result = std::collections::HashMap::new();
4642        for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4643            let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4644            if let Some((obj_ref, balance, prev_tx)) =
4645                self.get_address_balance_coin_info(owner, balance_type)?
4646            {
4647                // Use currency_type.to_string() to match the format in CoinIndexKey2
4648                // (e.g., "0x2::sui::SUI", not "0x2::coin::Coin<0x2::sui::SUI>")
4649                result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4650            }
4651        }
4652        Ok(result)
4653    }
4654
4655    fn get_owner_at_version(
4656        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4657        object_id: &ObjectID,
4658        version: SequenceNumber,
4659    ) -> SuiResult<Owner> {
4660        object_store
4661            .get_object_by_key(object_id, version)
4662            .ok_or_else(|| {
4663                SuiError::from(UserInputError::ObjectNotFound {
4664                    object_id: *object_id,
4665                    version: Some(version),
4666                })
4667            })
4668            .map(|o| o.owner.clone())
4669    }
4670
4671    #[instrument(level = "trace", skip_all)]
4672    pub fn get_owner_objects(
4673        &self,
4674        owner: SuiAddress,
4675        // If `Some`, the query will start from the next item after the specified cursor
4676        cursor: Option<ObjectID>,
4677        limit: usize,
4678        filter: Option<SuiObjectDataFilter>,
4679    ) -> SuiResult<Vec<ObjectInfo>> {
4680        if let Some(indexes) = &self.indexes {
4681            indexes.get_owner_objects(owner, cursor, limit, filter)
4682        } else {
4683            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4684        }
4685    }
4686
4687    #[instrument(level = "trace", skip_all)]
4688    pub fn get_owned_coins_iterator_with_cursor(
4689        &self,
4690        owner: SuiAddress,
4691        // If `Some`, the query will start from the next item after the specified cursor
4692        cursor: (String, u64, ObjectID),
4693        limit: usize,
4694        one_coin_type_only: bool,
4695    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4696        if let Some(indexes) = &self.indexes {
4697            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4698        } else {
4699            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4700        }
4701    }
4702
4703    #[instrument(level = "trace", skip_all)]
4704    pub fn get_owner_objects_iterator(
4705        &self,
4706        owner: SuiAddress,
4707        // If `Some`, the query will start from the next item after the specified cursor
4708        cursor: Option<ObjectID>,
4709        filter: Option<SuiObjectDataFilter>,
4710    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4711        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4712        if let Some(indexes) = &self.indexes {
4713            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4714        } else {
4715            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4716        }
4717    }
4718
4719    #[instrument(level = "trace", skip_all)]
4720    pub fn get_move_objects<T>(&self, owner: SuiAddress, type_: MoveObjectType) -> SuiResult<Vec<T>>
4721    where
4722        T: DeserializeOwned,
4723    {
4724        let object_ids = self
4725            .get_owner_objects_iterator(owner, None, None)?
4726            .filter(|o| match &o.type_ {
4727                ObjectType::Struct(s) => &type_ == s,
4728                ObjectType::Package => false,
4729            })
4730            .map(|info| ObjectKey(info.object_id, info.version))
4731            .collect::<Vec<_>>();
4732        let mut move_objects = vec![];
4733
4734        let objects = self
4735            .get_object_store()
4736            .multi_get_objects_by_key(&object_ids);
4737
4738        for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4739            let object = o.ok_or_else(|| {
4740                SuiError::from(UserInputError::ObjectNotFound {
4741                    object_id: id.0,
4742                    version: Some(id.1),
4743                })
4744            })?;
4745            let move_object = object.data.try_as_move().ok_or_else(|| {
4746                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4747            })?;
4748            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4749                SuiErrorKind::ObjectDeserializationError {
4750                    error: format!("{e}"),
4751                }
4752            })?);
4753        }
4754        Ok(move_objects)
4755    }
4756
4757    #[instrument(level = "trace", skip_all)]
4758    pub fn get_dynamic_fields(
4759        &self,
4760        owner: ObjectID,
4761        // If `Some`, the query will start from the next item after the specified cursor
4762        cursor: Option<ObjectID>,
4763        limit: usize,
4764    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4765        Ok(self
4766            .get_dynamic_fields_iterator(owner, cursor)?
4767            .take(limit)
4768            .collect::<Result<Vec<_>, _>>()?)
4769    }
4770
4771    fn get_dynamic_fields_iterator(
4772        &self,
4773        owner: ObjectID,
4774        // If `Some`, the query will start from the next item after the specified cursor
4775        cursor: Option<ObjectID>,
4776    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4777    {
4778        if let Some(indexes) = &self.indexes {
4779            indexes.get_dynamic_fields_iterator(owner, cursor)
4780        } else {
4781            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4782        }
4783    }
4784
4785    #[instrument(level = "trace", skip_all)]
4786    pub fn get_dynamic_field_object_id(
4787        &self,
4788        owner: ObjectID,
4789        name_type: TypeTag,
4790        name_bcs_bytes: &[u8],
4791    ) -> SuiResult<Option<ObjectID>> {
4792        if let Some(indexes) = &self.indexes {
4793            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4794        } else {
4795            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4796        }
4797    }
4798
4799    #[instrument(level = "trace", skip_all)]
4800    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4801        Ok(self.get_indexes()?.next_sequence_number())
4802    }
4803
4804    #[instrument(level = "trace", skip_all)]
4805    pub async fn get_executed_transaction_and_effects(
4806        &self,
4807        digest: TransactionDigest,
4808        kv_store: Arc<TransactionKeyValueStore>,
4809    ) -> SuiResult<(Transaction, TransactionEffects)> {
4810        let transaction = kv_store.get_tx(digest).await?;
4811        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4812        Ok((transaction, effects))
4813    }
4814
4815    #[instrument(level = "trace", skip_all)]
4816    pub fn multi_get_checkpoint_by_sequence_number(
4817        &self,
4818        sequence_numbers: &[CheckpointSequenceNumber],
4819    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4820        Ok(self
4821            .checkpoint_store
4822            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4823    }
4824
4825    #[instrument(level = "trace", skip_all)]
4826    pub fn get_transaction_events(
4827        &self,
4828        digest: &TransactionDigest,
4829    ) -> SuiResult<TransactionEvents> {
4830        self.get_transaction_cache_reader()
4831            .get_events(digest)
4832            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4833    }
4834
4835    pub fn get_transaction_input_objects(
4836        &self,
4837        effects: &TransactionEffects,
4838    ) -> SuiResult<Vec<Object>> {
4839        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4840            .map_err(Into::into)
4841    }
4842
4843    pub fn get_transaction_output_objects(
4844        &self,
4845        effects: &TransactionEffects,
4846    ) -> SuiResult<Vec<Object>> {
4847        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4848            .map_err(Into::into)
4849    }
4850
4851    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4852        match &self.indexes {
4853            Some(i) => Ok(i.clone()),
4854            None => Err(SuiErrorKind::UnsupportedFeatureError {
4855                error: "extended object indexing is not enabled on this server".into(),
4856            }
4857            .into()),
4858        }
4859    }
4860
4861    pub async fn get_transactions_for_tests(
4862        self: &Arc<Self>,
4863        filter: Option<TransactionFilter>,
4864        cursor: Option<TransactionDigest>,
4865        limit: Option<usize>,
4866        reverse: bool,
4867    ) -> SuiResult<Vec<TransactionDigest>> {
4868        let metrics = KeyValueStoreMetrics::new_for_tests();
4869        let kv_store = Arc::new(TransactionKeyValueStore::new(
4870            "rocksdb",
4871            metrics,
4872            self.clone(),
4873        ));
4874        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4875            .await
4876    }
4877
4878    #[instrument(level = "trace", skip_all)]
4879    pub async fn get_transactions(
4880        &self,
4881        kv_store: &Arc<TransactionKeyValueStore>,
4882        filter: Option<TransactionFilter>,
4883        // If `Some`, the query will start from the next item after the specified cursor
4884        cursor: Option<TransactionDigest>,
4885        limit: Option<usize>,
4886        reverse: bool,
4887    ) -> SuiResult<Vec<TransactionDigest>> {
4888        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4889            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4890            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4891            if reverse {
4892                let iter = iter
4893                    .rev()
4894                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4895                    .skip(usize::from(cursor.is_some()));
4896                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4897            } else {
4898                let iter = iter
4899                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4900                    .skip(usize::from(cursor.is_some()));
4901                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4902            }
4903        }
4904        self.get_indexes()?
4905            .get_transactions(filter, cursor, limit, reverse)
4906    }
4907
4908    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4909        &self.checkpoint_store
4910    }
4911
4912    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4913        self.get_checkpoint_store()
4914            .get_highest_executed_checkpoint_seq_number()?
4915            .ok_or(
4916                SuiErrorKind::UserInputError {
4917                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4918                }
4919                .into(),
4920            )
4921    }
4922
4923    #[cfg(msim)]
4924    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4925        self.database_for_testing()
4926            .perpetual_tables
4927            .get_highest_pruned_checkpoint()
4928            .map(|c| c.unwrap_or(0))
4929            .map_err(Into::into)
4930    }
4931
4932    #[instrument(level = "trace", skip_all)]
4933    pub fn get_checkpoint_summary_by_sequence_number(
4934        &self,
4935        sequence_number: CheckpointSequenceNumber,
4936    ) -> SuiResult<CheckpointSummary> {
4937        let verified_checkpoint = self
4938            .get_checkpoint_store()
4939            .get_checkpoint_by_sequence_number(sequence_number)?;
4940        match verified_checkpoint {
4941            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4942            None => Err(SuiErrorKind::UserInputError {
4943                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4944            }
4945            .into()),
4946        }
4947    }
4948
4949    #[instrument(level = "trace", skip_all)]
4950    pub fn get_checkpoint_summary_by_digest(
4951        &self,
4952        digest: CheckpointDigest,
4953    ) -> SuiResult<CheckpointSummary> {
4954        let verified_checkpoint = self
4955            .get_checkpoint_store()
4956            .get_checkpoint_by_digest(&digest)?;
4957        match verified_checkpoint {
4958            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4959            None => Err(SuiErrorKind::UserInputError {
4960                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4961            }
4962            .into()),
4963        }
4964    }
4965
4966    #[instrument(level = "trace", skip_all)]
4967    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4968        if is_system_package(package_id) {
4969            return self.find_genesis_txn_digest();
4970        }
4971        Ok(self
4972            .get_object_read(&package_id)?
4973            .into_object()?
4974            .previous_transaction)
4975    }
4976
4977    #[instrument(level = "trace", skip_all)]
4978    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4979        let summary = self
4980            .get_verified_checkpoint_by_sequence_number(0)?
4981            .into_message();
4982        let content = self.get_checkpoint_contents(summary.content_digest)?;
4983        let genesis_transaction = content.enumerate_transactions(&summary).next();
4984        Ok(genesis_transaction
4985            .ok_or(SuiErrorKind::UserInputError {
4986                error: UserInputError::GenesisTransactionNotFound,
4987            })?
4988            .1
4989            .transaction)
4990    }
4991
4992    #[instrument(level = "trace", skip_all)]
4993    pub fn get_verified_checkpoint_by_sequence_number(
4994        &self,
4995        sequence_number: CheckpointSequenceNumber,
4996    ) -> SuiResult<VerifiedCheckpoint> {
4997        let verified_checkpoint = self
4998            .get_checkpoint_store()
4999            .get_checkpoint_by_sequence_number(sequence_number)?;
5000        match verified_checkpoint {
5001            Some(verified_checkpoint) => Ok(verified_checkpoint),
5002            None => Err(SuiErrorKind::UserInputError {
5003                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5004            }
5005            .into()),
5006        }
5007    }
5008
5009    #[instrument(level = "trace", skip_all)]
5010    pub fn get_verified_checkpoint_summary_by_digest(
5011        &self,
5012        digest: CheckpointDigest,
5013    ) -> SuiResult<VerifiedCheckpoint> {
5014        let verified_checkpoint = self
5015            .get_checkpoint_store()
5016            .get_checkpoint_by_digest(&digest)?;
5017        match verified_checkpoint {
5018            Some(verified_checkpoint) => Ok(verified_checkpoint),
5019            None => Err(SuiErrorKind::UserInputError {
5020                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5021            }
5022            .into()),
5023        }
5024    }
5025
5026    #[instrument(level = "trace", skip_all)]
5027    pub fn get_checkpoint_contents(
5028        &self,
5029        digest: CheckpointContentsDigest,
5030    ) -> SuiResult<CheckpointContents> {
5031        self.get_checkpoint_store()
5032            .get_checkpoint_contents(&digest)?
5033            .ok_or(
5034                SuiErrorKind::UserInputError {
5035                    error: UserInputError::CheckpointContentsNotFound(digest),
5036                }
5037                .into(),
5038            )
5039    }
5040
5041    #[instrument(level = "trace", skip_all)]
5042    pub fn get_checkpoint_contents_by_sequence_number(
5043        &self,
5044        sequence_number: CheckpointSequenceNumber,
5045    ) -> SuiResult<CheckpointContents> {
5046        let verified_checkpoint = self
5047            .get_checkpoint_store()
5048            .get_checkpoint_by_sequence_number(sequence_number)?;
5049        match verified_checkpoint {
5050            Some(verified_checkpoint) => {
5051                let content_digest = verified_checkpoint.into_inner().content_digest;
5052                self.get_checkpoint_contents(content_digest)
5053            }
5054            None => Err(SuiErrorKind::UserInputError {
5055                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5056            }
5057            .into()),
5058        }
5059    }
5060
5061    #[instrument(level = "trace", skip_all)]
5062    pub async fn query_events(
5063        &self,
5064        kv_store: &Arc<TransactionKeyValueStore>,
5065        query: EventFilter,
5066        // If `Some`, the query will start from the next item after the specified cursor
5067        cursor: Option<EventID>,
5068        limit: usize,
5069        descending: bool,
5070    ) -> SuiResult<Vec<SuiEvent>> {
5071        let index_store = self.get_indexes()?;
5072
5073        //Get the tx_num from tx_digest
5074        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5075            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5076                SuiErrorKind::TransactionNotFound {
5077                    digest: cursor.tx_digest,
5078                },
5079            )?;
5080            (tx_seq, cursor.event_seq as usize)
5081        } else if descending {
5082            (u64::MAX, usize::MAX)
5083        } else {
5084            (0, 0)
5085        };
5086
5087        let limit = limit + 1;
5088        let mut event_keys = match query {
5089            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5090            EventFilter::Transaction(digest) => {
5091                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5092            }
5093            EventFilter::MoveModule { package, module } => {
5094                let module_id = ModuleId::new(package.into(), module);
5095                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5096            }
5097            EventFilter::MoveEventType(struct_name) => index_store
5098                .events_by_move_event_struct_name(
5099                    &struct_name,
5100                    tx_num,
5101                    event_num,
5102                    limit,
5103                    descending,
5104                )?,
5105            EventFilter::Sender(sender) => {
5106                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5107            }
5108            EventFilter::TimeRange {
5109                start_time,
5110                end_time,
5111            } => index_store
5112                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5113            EventFilter::MoveEventModule { package, module } => index_store
5114                .events_by_move_event_module(
5115                    &ModuleId::new(package.into(), module),
5116                    tx_num,
5117                    event_num,
5118                    limit,
5119                    descending,
5120                )?,
5121            // not using "_ =>" because we want to make sure we remember to add new variants here
5122            EventFilter::Any(_) => {
5123                return Err(SuiErrorKind::UserInputError {
5124                    error: UserInputError::Unsupported(
5125                        "'Any' queries are not supported by the fullnode.".to_string(),
5126                    ),
5127                }
5128                .into());
5129            }
5130        };
5131
5132        // skip one event if exclusive cursor is provided,
5133        // otherwise truncate to the original limit.
5134        if cursor.is_some() {
5135            if !event_keys.is_empty() {
5136                event_keys.remove(0);
5137            }
5138        } else {
5139            event_keys.truncate(limit - 1);
5140        }
5141
5142        // get the unique set of digests from the event_keys
5143        let transaction_digests = event_keys
5144            .iter()
5145            .map(|(_, digest, _, _)| *digest)
5146            .collect::<HashSet<_>>()
5147            .into_iter()
5148            .collect::<Vec<_>>();
5149
5150        let events = kv_store
5151            .multi_get_events_by_tx_digests(&transaction_digests)
5152            .await?;
5153
5154        let events_map: HashMap<_, _> = transaction_digests
5155            .iter()
5156            .zip_debug_eq(events.into_iter())
5157            .collect();
5158
5159        let stored_events = event_keys
5160            .into_iter()
5161            .map(|k| {
5162                (
5163                    k,
5164                    events_map
5165                        .get(&k.1)
5166                        .expect("fetched digest is missing")
5167                        .clone()
5168                        .and_then(|e| e.data.get(k.2).cloned()),
5169                )
5170            })
5171            .map(
5172                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5173                    event
5174                        .map(|e| (e, tx_digest, event_seq, timestamp))
5175                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5176                },
5177            )
5178            .collect::<Result<Vec<_>, _>>()?;
5179
5180        let epoch_store = self.load_epoch_store_one_call_per_task();
5181        let backing_store = self.get_backing_package_store().as_ref();
5182        let mut layout_resolver = epoch_store
5183            .executor()
5184            .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5185        let mut events = vec![];
5186        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5187            events.push(SuiEvent::try_from(
5188                e.clone(),
5189                tx_digest,
5190                event_seq as u64,
5191                Some(timestamp),
5192                layout_resolver.get_annotated_layout(&e.type_)?,
5193            )?)
5194        }
5195        Ok(events)
5196    }
5197
5198    pub fn insert_genesis_object(&self, object: Object) {
5199        self.get_reconfig_api().insert_genesis_object(object);
5200    }
5201
5202    pub fn insert_genesis_objects(&self, objects: &[Object]) {
5203        for o in objects {
5204            self.insert_genesis_object(o.clone());
5205        }
5206    }
5207
5208    /// Make a status response for a transaction
5209    #[instrument(level = "trace", skip_all)]
5210    pub fn get_transaction_status(
5211        &self,
5212        transaction_digest: &TransactionDigest,
5213        epoch_store: &Arc<AuthorityPerEpochStore>,
5214    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5215        // TODO: In the case of read path, we should not have to re-sign the effects.
5216        if let Some(effects) =
5217            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5218        {
5219            if let Some(transaction) = self
5220                .get_transaction_cache_reader()
5221                .get_transaction_block(transaction_digest)
5222            {
5223                let events = if effects.events_digest().is_some() {
5224                    self.get_transaction_events(effects.transaction_digest())?
5225                } else {
5226                    TransactionEvents::default()
5227                };
5228                // The cert_sig slot is permanently None: validators no longer aggregate or
5229                // persist per-transaction quorum signatures (the transaction_cert_signatures
5230                // table is deprecated and unwritten).
5231                return Ok(Some((
5232                    (*transaction).clone().into_message(),
5233                    TransactionStatus::Executed(None, effects.into_inner(), events),
5234                )));
5235            } else {
5236                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5237                // the transaction (during epoch change) in between the above two reads, and we end up
5238                // having effects but not transaction. In this case, we just fall through.
5239                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5240            }
5241        }
5242        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5243            self.metrics.tx_already_processed.inc();
5244            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5245            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5246        } else {
5247            Ok(None)
5248        }
5249    }
5250
5251    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5252    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5253    /// epoch.
5254    #[instrument(level = "trace", skip_all)]
5255    pub fn get_signed_effects_and_maybe_resign(
5256        &self,
5257        transaction_digest: &TransactionDigest,
5258        epoch_store: &Arc<AuthorityPerEpochStore>,
5259    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5260        let effects = self
5261            .get_transaction_cache_reader()
5262            .get_executed_effects(transaction_digest);
5263        match effects {
5264            Some(effects) => {
5265                // If the transaction was executed in previous epochs, the validator will
5266                // re-sign the effects with new current epoch so that a client is always able to
5267                // obtain an effects certificate at the current epoch.
5268                //
5269                // Why is this necessary? Consider the following case:
5270                // - assume there are 4 validators
5271                // - Quorum driver gets 2 signed effects before reconfig halt
5272                // - The tx makes it into final checkpoint.
5273                // - 2 validators go away and are replaced in the new epoch.
5274                // - The new epoch begins.
5275                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5276                //   because it may not be able to reach either of the 2 former validators.
5277                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5278                //   epoch, the QD can make a new effects cert and return it to the client.
5279                //
5280                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5281                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5282                // the case above, the Quorum Driver would return a proof of inclusion in the final
5283                // checkpoint, and this code would no longer be necessary.
5284                if effects.executed_epoch() != epoch_store.epoch() {
5285                    debug!(
5286                        tx_digest=?transaction_digest,
5287                        effects_epoch=?effects.executed_epoch(),
5288                        epoch=?epoch_store.epoch(),
5289                        "Re-signing the effects with the current epoch"
5290                    );
5291                }
5292                Ok(Some(self.sign_effects(effects, epoch_store)?))
5293            }
5294            None => Ok(None),
5295        }
5296    }
5297
5298    #[instrument(level = "trace", skip_all)]
5299    pub(crate) fn sign_effects(
5300        &self,
5301        effects: TransactionEffects,
5302        epoch_store: &Arc<AuthorityPerEpochStore>,
5303    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5304        let tx_digest = *effects.transaction_digest();
5305        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5306            Some(sig) => {
5307                debug_assert!(sig.epoch == epoch_store.epoch());
5308                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5309            }
5310            _ => {
5311                let sig = AuthoritySignInfo::new(
5312                    epoch_store.epoch(),
5313                    &effects,
5314                    Intent::sui_app(IntentScope::TransactionEffects),
5315                    self.name,
5316                    &*self.secret,
5317                );
5318
5319                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5320
5321                epoch_store.insert_effects_digest_and_signature(
5322                    &tx_digest,
5323                    effects.digest(),
5324                    &sig,
5325                )?;
5326
5327                effects
5328            }
5329        };
5330
5331        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5332            signed_effects,
5333        ))
5334    }
5335
5336    // Returns coin objects for indexing for fullnode if indexing is enabled.
5337    #[instrument(level = "trace", skip_all)]
5338    fn fullnode_only_get_tx_coins_for_indexing(
5339        name: AuthorityName,
5340        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5341        effects: &TransactionEffects,
5342        inner_temporary_store: &InnerTemporaryStore,
5343        epoch_store: &Arc<AuthorityPerEpochStore>,
5344    ) -> Option<TxCoins> {
5345        if epoch_store.committee().authority_exists(&name) {
5346            return None;
5347        }
5348        let written_coin_objects = inner_temporary_store
5349            .written
5350            .iter()
5351            .filter_map(|(k, v)| {
5352                if v.is_coin() {
5353                    Some((*k, v.clone()))
5354                } else {
5355                    None
5356                }
5357            })
5358            .collect();
5359        let mut input_coin_objects = inner_temporary_store
5360            .input_objects
5361            .iter()
5362            .filter_map(|(k, v)| {
5363                if v.is_coin() {
5364                    Some((*k, v.clone()))
5365                } else {
5366                    None
5367                }
5368            })
5369            .collect::<ObjectMap>();
5370
5371        // Check for receiving objects that were actually used and modified during execution. Their
5372        // updated version will already showup in "written_coins" but their input isn't included in
5373        // the set of input objects in a inner_temporary_store.
5374        for (object_id, version) in effects.modified_at_versions() {
5375            if inner_temporary_store
5376                .loaded_runtime_objects
5377                .contains_key(&object_id)
5378                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5379                && object.is_coin()
5380            {
5381                input_coin_objects.insert(object_id, object);
5382            }
5383        }
5384
5385        Some((input_coin_objects, written_coin_objects))
5386    }
5387
5388    /// Get the TransactionEnvelope that currently locks the given object, if any.
5389    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5390    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5391    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5392    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5393    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5394    ///     or cannot find the transaction in transaction table, because of data race etc.
5395    #[instrument(level = "trace", skip_all)]
5396    pub fn get_transaction_lock(
5397        &self,
5398        object_ref: &ObjectRef,
5399        epoch_store: &AuthorityPerEpochStore,
5400    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5401        let lock_info = self
5402            .get_object_cache_reader()
5403            .get_lock(*object_ref, epoch_store)?;
5404        let lock_info = match lock_info {
5405            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5406                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5407                    provided_obj_ref: *object_ref,
5408                    current_version: locked_ref.1,
5409                }
5410                .into());
5411            }
5412            ObjectLockStatus::Initialized => {
5413                return Ok(None);
5414            }
5415            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5416        };
5417
5418        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5419    }
5420
5421    pub fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5422        self.get_object_cache_reader().get_objects(objects)
5423    }
5424
5425    pub fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5426        self.get_object_cache_reader()
5427            .get_latest_object_ref_or_tombstone(object_id)
5428    }
5429
5430    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5431    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5432    ///
5433    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5434    /// will go through with only 2f+1 votes.
5435    ///
5436    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5437    /// value), or you risk halting the chain.
5438    pub fn set_override_protocol_upgrade_buffer_stake(
5439        &self,
5440        expected_epoch: EpochId,
5441        buffer_stake_bps: u64,
5442    ) -> SuiResult {
5443        let epoch_store = self.load_epoch_store_one_call_per_task();
5444        let actual_epoch = epoch_store.epoch();
5445        if actual_epoch != expected_epoch {
5446            return Err(SuiErrorKind::WrongEpoch {
5447                expected_epoch,
5448                actual_epoch,
5449            }
5450            .into());
5451        }
5452
5453        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5454    }
5455
5456    pub fn clear_override_protocol_upgrade_buffer_stake(
5457        &self,
5458        expected_epoch: EpochId,
5459    ) -> SuiResult {
5460        let epoch_store = self.load_epoch_store_one_call_per_task();
5461        let actual_epoch = epoch_store.epoch();
5462        if actual_epoch != expected_epoch {
5463            return Err(SuiErrorKind::WrongEpoch {
5464                expected_epoch,
5465                actual_epoch,
5466            }
5467            .into());
5468        }
5469
5470        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5471    }
5472
5473    /// Get the set of system packages that are compiled in to this build, if those packages are
5474    /// compatible with the current versions of those packages on-chain.
5475    pub async fn get_available_system_packages(
5476        &self,
5477        binary_config: &BinaryConfig,
5478    ) -> Vec<ObjectRef> {
5479        let mut results = vec![];
5480
5481        let system_packages = BuiltInFramework::iter_system_packages();
5482
5483        // Add extra framework packages during simtest
5484        #[cfg(msim)]
5485        let extra_packages = framework_injection::get_extra_packages(self.name);
5486        #[cfg(msim)]
5487        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5488
5489        for system_package in system_packages {
5490            let modules = system_package.modules().to_vec();
5491            // In simtests, we could override the current built-in framework packages.
5492            #[cfg(msim)]
5493            let modules =
5494                match framework_injection::get_override_modules(&system_package.id, self.name) {
5495                    Some(overrides) if overrides.is_empty() => continue,
5496                    Some(overrides) => overrides,
5497                    None => modules,
5498                };
5499
5500            let Some(obj_ref) = sui_framework::compare_system_package(
5501                &self.get_object_store(),
5502                &system_package.id,
5503                &modules,
5504                system_package.dependencies.to_vec(),
5505                binary_config,
5506            )
5507            .await
5508            else {
5509                return vec![];
5510            };
5511            results.push(obj_ref);
5512        }
5513
5514        results
5515    }
5516
5517    /// Return the new versions, module bytes, and dependencies for the packages that have been
5518    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5519    /// the binary, and performs the following checks:
5520    ///
5521    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5522    ///   required, and its contents are omitted from the output.
5523    /// - Whether the contents in the binary can form a package whose digest matches the input,
5524    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5525    ///   which case the contents are included in the output.
5526    ///
5527    /// If the current version of the framework can't be loaded, the binary does not contain the
5528    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5529    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5530    async fn get_system_package_bytes(
5531        &self,
5532        system_packages: Vec<ObjectRef>,
5533        binary_config: &BinaryConfig,
5534    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5535        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5536        let objects = self.get_objects(&ids);
5537
5538        let mut res = Vec::with_capacity(system_packages.len());
5539        for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5540        {
5541            let prev_transaction = match object {
5542                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5543                    // Skip this one because it doesn't need to be upgraded.
5544                    info!("Framework {} does not need updating", system_package_ref.0);
5545                    continue;
5546                }
5547
5548                Some(cur_object) => cur_object.previous_transaction,
5549                None => TransactionDigest::genesis_marker(),
5550            };
5551
5552            #[cfg(msim)]
5553            let SystemPackage {
5554                id: _,
5555                bytes,
5556                dependencies,
5557            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5558                .unwrap_or_else(|| {
5559                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5560                });
5561
5562            #[cfg(not(msim))]
5563            let SystemPackage {
5564                id: _,
5565                bytes,
5566                dependencies,
5567            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5568
5569            let modules: Vec<_> = bytes
5570                .iter()
5571                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5572                .collect();
5573
5574            let new_object = Object::new_system_package(
5575                &modules,
5576                system_package_ref.1,
5577                dependencies.clone(),
5578                prev_transaction,
5579            );
5580
5581            let new_ref = new_object.compute_object_reference();
5582            if new_ref != system_package_ref {
5583                if cfg!(msim) {
5584                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5585                    debug_fatal!(
5586                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5587                    );
5588                } else {
5589                    error!(
5590                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5591                    );
5592                }
5593                return None;
5594            }
5595
5596            res.push((system_package_ref.1, bytes, dependencies));
5597        }
5598
5599        Some(res)
5600    }
5601
5602    fn is_protocol_version_supported_v2(
5603        current_protocol_version: ProtocolVersion,
5604        proposed_protocol_version: ProtocolVersion,
5605        protocol_config: &ProtocolConfig,
5606        committee: &Committee,
5607        capabilities: Vec<AuthorityCapabilitiesV2>,
5608        mut buffer_stake_bps: u64,
5609    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5610        if proposed_protocol_version > current_protocol_version + 1
5611            && !protocol_config.advance_to_highest_supported_protocol_version()
5612        {
5613            return None;
5614        }
5615
5616        if buffer_stake_bps > 10000 {
5617            warn!("clamping buffer_stake_bps to 10000");
5618            buffer_stake_bps = 10000;
5619        }
5620
5621        // For each validator, gather the protocol version and system packages that it would like
5622        // to upgrade to in the next epoch.
5623        let mut desired_upgrades: Vec<_> = capabilities
5624            .into_iter()
5625            .filter_map(|mut cap| {
5626                // A validator that lists no packages is voting against any change at all.
5627                if cap.available_system_packages.is_empty() {
5628                    return None;
5629                }
5630
5631                cap.available_system_packages.sort();
5632
5633                info!(
5634                    "validator {:?} supports {:?} with system packages: {:?}",
5635                    cap.authority.concise(),
5636                    cap.supported_protocol_versions,
5637                    cap.available_system_packages,
5638                );
5639
5640                // A validator that only supports the current protocol version is also voting
5641                // against any change, because framework upgrades always require a protocol version
5642                // bump.
5643                cap.supported_protocol_versions
5644                    .get_version_digest(proposed_protocol_version)
5645                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5646            })
5647            .collect();
5648
5649        // There can only be one set of votes that have a majority, find one if it exists.
5650        desired_upgrades.sort();
5651        desired_upgrades
5652            .into_iter()
5653            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5654            .into_iter()
5655            .find_map(|((digest, packages), group)| {
5656                // should have been filtered out earlier.
5657                assert!(!packages.is_empty());
5658
5659                let mut stake_aggregator: StakeAggregator<(), true> =
5660                    StakeAggregator::new(Arc::new(committee.clone()));
5661
5662                for (_, _, authority) in group {
5663                    stake_aggregator.insert_generic(authority, ());
5664                }
5665
5666                let total_votes = stake_aggregator.total_votes();
5667                let quorum_threshold = committee.quorum_threshold();
5668                let f = committee.total_votes() - committee.quorum_threshold();
5669
5670                // multiple by buffer_stake_bps / 10000, rounded up.
5671                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5672                let effective_threshold = quorum_threshold + buffer_stake;
5673
5674                info!(
5675                    protocol_config_digest = ?digest,
5676                    ?total_votes,
5677                    ?quorum_threshold,
5678                    ?buffer_stake_bps,
5679                    ?effective_threshold,
5680                    ?proposed_protocol_version,
5681                    ?packages,
5682                    "support for upgrade"
5683                );
5684
5685                let has_support = total_votes >= effective_threshold;
5686                has_support.then_some((proposed_protocol_version, packages))
5687            })
5688    }
5689
5690    fn choose_protocol_version_and_system_packages_v2(
5691        current_protocol_version: ProtocolVersion,
5692        protocol_config: &ProtocolConfig,
5693        committee: &Committee,
5694        capabilities: Vec<AuthorityCapabilitiesV2>,
5695        buffer_stake_bps: u64,
5696    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5697        assert!(protocol_config.authority_capabilities_v2());
5698        let mut next_protocol_version = current_protocol_version;
5699        let mut system_packages = vec![];
5700
5701        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5702            current_protocol_version,
5703            next_protocol_version + 1,
5704            protocol_config,
5705            committee,
5706            capabilities.clone(),
5707            buffer_stake_bps,
5708        ) {
5709            next_protocol_version = version;
5710            system_packages = packages;
5711        }
5712
5713        (next_protocol_version, system_packages)
5714    }
5715
5716    #[instrument(level = "debug", skip_all)]
5717    fn create_authenticator_state_tx(
5718        &self,
5719        epoch_store: &Arc<AuthorityPerEpochStore>,
5720    ) -> Option<EndOfEpochTransactionKind> {
5721        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5722            info!("authenticator state transactions not enabled");
5723            return None;
5724        }
5725
5726        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5727        let tx = if authenticator_state_exists {
5728            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5729            let min_epoch =
5730                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5731            let authenticator_obj_initial_shared_version = epoch_store
5732                .epoch_start_config()
5733                .authenticator_obj_initial_shared_version()
5734                .expect("initial version must exist");
5735
5736            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5737                min_epoch,
5738                authenticator_obj_initial_shared_version,
5739            );
5740
5741            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5742
5743            tx
5744        } else {
5745            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5746            info!("Creating AuthenticatorStateCreate tx");
5747            tx
5748        };
5749        Some(tx)
5750    }
5751
5752    #[instrument(level = "debug", skip_all)]
5753    fn create_randomness_state_tx(
5754        &self,
5755        epoch_store: &Arc<AuthorityPerEpochStore>,
5756    ) -> Option<EndOfEpochTransactionKind> {
5757        if !epoch_store.protocol_config().random_beacon() {
5758            info!("randomness state transactions not enabled");
5759            return None;
5760        }
5761
5762        if epoch_store.randomness_state_exists() {
5763            return None;
5764        }
5765
5766        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5767        info!("Creating RandomnessStateCreate tx");
5768        Some(tx)
5769    }
5770
5771    #[instrument(level = "debug", skip_all)]
5772    fn create_accumulator_root_tx(
5773        &self,
5774        epoch_store: &Arc<AuthorityPerEpochStore>,
5775    ) -> Option<EndOfEpochTransactionKind> {
5776        if !epoch_store
5777            .protocol_config()
5778            .create_root_accumulator_object()
5779        {
5780            info!("accumulator root creation not enabled");
5781            return None;
5782        }
5783
5784        if epoch_store.accumulator_root_exists() {
5785            return None;
5786        }
5787
5788        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5789        info!("Creating AccumulatorRootCreate tx");
5790        Some(tx)
5791    }
5792
5793    #[instrument(level = "debug", skip_all)]
5794    fn create_write_accumulator_storage_cost_tx(
5795        &self,
5796        epoch_store: &Arc<AuthorityPerEpochStore>,
5797    ) -> Option<EndOfEpochTransactionKind> {
5798        if !epoch_store.accumulator_root_exists() {
5799            info!("accumulator root does not exist yet");
5800            return None;
5801        }
5802        if !epoch_store.protocol_config().enable_accumulators() {
5803            info!("accumulators not enabled");
5804            return None;
5805        }
5806
5807        let object_store = self.get_object_store();
5808        let object_count =
5809            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5810                Ok(Some(count)) => count,
5811                Ok(None) => return None,
5812                Err(e) => {
5813                    fatal!("failed to read accumulator object count: {e}");
5814                }
5815            };
5816
5817        let storage_cost = object_count.saturating_mul(
5818            epoch_store
5819                .protocol_config()
5820                .accumulator_object_storage_cost(),
5821        );
5822
5823        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5824        info!(
5825            object_count,
5826            storage_cost, "Creating WriteAccumulatorStorageCost tx"
5827        );
5828        Some(tx)
5829    }
5830
5831    #[instrument(level = "debug", skip_all)]
5832    fn create_coin_registry_tx(
5833        &self,
5834        epoch_store: &Arc<AuthorityPerEpochStore>,
5835    ) -> Option<EndOfEpochTransactionKind> {
5836        if !epoch_store.protocol_config().enable_coin_registry() {
5837            info!("coin registry not enabled");
5838            return None;
5839        }
5840
5841        if epoch_store.coin_registry_exists() {
5842            return None;
5843        }
5844
5845        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5846        info!("Creating CoinRegistryCreate tx");
5847        Some(tx)
5848    }
5849
5850    #[instrument(level = "debug", skip_all)]
5851    fn create_display_registry_tx(
5852        &self,
5853        epoch_store: &Arc<AuthorityPerEpochStore>,
5854    ) -> Option<EndOfEpochTransactionKind> {
5855        if !epoch_store.protocol_config().enable_display_registry() {
5856            info!("display registry not enabled");
5857            return None;
5858        }
5859
5860        if epoch_store.display_registry_exists() {
5861            return None;
5862        }
5863
5864        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5865        info!("Creating DisplayRegistryCreate tx");
5866        Some(tx)
5867    }
5868
5869    #[instrument(level = "debug", skip_all)]
5870    fn create_bridge_tx(
5871        &self,
5872        epoch_store: &Arc<AuthorityPerEpochStore>,
5873    ) -> Option<EndOfEpochTransactionKind> {
5874        if !epoch_store.protocol_config().enable_bridge() {
5875            info!("bridge not enabled");
5876            return None;
5877        }
5878        if epoch_store.bridge_exists() {
5879            return None;
5880        }
5881        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5882        info!("Creating Bridge Create tx");
5883        Some(tx)
5884    }
5885
5886    #[instrument(level = "debug", skip_all)]
5887    fn init_bridge_committee_tx(
5888        &self,
5889        epoch_store: &Arc<AuthorityPerEpochStore>,
5890    ) -> Option<EndOfEpochTransactionKind> {
5891        if !epoch_store.protocol_config().enable_bridge() {
5892            info!("bridge not enabled");
5893            return None;
5894        }
5895        if !epoch_store
5896            .protocol_config()
5897            .should_try_to_finalize_bridge_committee()
5898        {
5899            info!("should not try to finalize bridge committee yet");
5900            return None;
5901        }
5902        // Only create this transaction if bridge exists
5903        if !epoch_store.bridge_exists() {
5904            return None;
5905        }
5906
5907        if epoch_store.bridge_committee_initiated() {
5908            return None;
5909        }
5910
5911        let bridge_initial_shared_version = epoch_store
5912            .epoch_start_config()
5913            .bridge_obj_initial_shared_version()
5914            .expect("initial version must exist");
5915        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5916        info!("Init Bridge committee tx");
5917        Some(tx)
5918    }
5919
5920    #[instrument(level = "debug", skip_all)]
5921    fn create_deny_list_state_tx(
5922        &self,
5923        epoch_store: &Arc<AuthorityPerEpochStore>,
5924    ) -> Option<EndOfEpochTransactionKind> {
5925        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5926            return None;
5927        }
5928
5929        if epoch_store.coin_deny_list_state_exists() {
5930            return None;
5931        }
5932
5933        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5934        info!("Creating DenyListStateCreate tx");
5935        Some(tx)
5936    }
5937
5938    #[instrument(level = "debug", skip_all)]
5939    fn create_address_alias_state_tx(
5940        &self,
5941        epoch_store: &Arc<AuthorityPerEpochStore>,
5942    ) -> Option<EndOfEpochTransactionKind> {
5943        if !epoch_store.protocol_config().address_aliases() {
5944            info!("address aliases not enabled");
5945            return None;
5946        }
5947
5948        if epoch_store.address_alias_state_exists() {
5949            return None;
5950        }
5951
5952        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5953        info!("Creating AddressAliasStateCreate tx");
5954        Some(tx)
5955    }
5956
5957    #[instrument(level = "debug", skip_all)]
5958    fn create_execution_time_observations_tx(
5959        &self,
5960        epoch_store: &Arc<AuthorityPerEpochStore>,
5961        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5962        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5963    ) -> Option<EndOfEpochTransactionKind> {
5964        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5965            .protocol_config()
5966            .per_object_congestion_control_mode()
5967        else {
5968            return None;
5969        };
5970
5971        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5972        // execution time observations for commands in these checkpoints.
5973        let start_checkpoint = std::cmp::max(
5974            last_checkpoint_before_end_of_epoch
5975                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5976            // If we have <N checkpoints in the epoch, use all of them.
5977            epoch_store
5978                .epoch()
5979                .checked_sub(1)
5980                .map(|prev_epoch| {
5981                    self.checkpoint_store
5982                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
5983                        .expect("typed store must not fail")
5984                        .expect(
5985                            "sequence number of last checkpoint of preceding epoch must be saved",
5986                        )
5987                        + 1
5988                })
5989                .unwrap_or(1),
5990        );
5991        info!(
5992            "reading checkpoint range {:?}..={:?}",
5993            start_checkpoint, last_checkpoint_before_end_of_epoch
5994        );
5995        let sequence_numbers =
5996            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5997        let contents_digests: Vec<_> = self
5998            .checkpoint_store
5999            .multi_get_locally_computed_checkpoints(&sequence_numbers)
6000            .expect("typed store must not fail")
6001            .into_iter()
6002            .zip_debug_eq(sequence_numbers)
6003            .map(|(maybe_checkpoint, sequence_number)| {
6004                if let Some(checkpoint) = maybe_checkpoint {
6005                    checkpoint.content_digest
6006                } else {
6007                    // If locally computed checkpoint summary was already pruned, load the
6008                    // certified checkpoint.
6009                    self.checkpoint_store
6010                        .get_checkpoint_by_sequence_number(sequence_number)
6011                        .expect("typed store must not fail")
6012                        .unwrap_or_else(|| {
6013                            fatal!("preceding checkpoints must exist by end of epoch")
6014                        })
6015                        .data()
6016                        .content_digest
6017                }
6018            })
6019            .collect();
6020        let tx_digests: Vec<_> = self
6021            .checkpoint_store
6022            .multi_get_checkpoint_content(&contents_digests)
6023            .expect("typed store must not fail")
6024            .into_iter()
6025            .flat_map(|maybe_contents| {
6026                maybe_contents
6027                    .expect("preceding checkpoint contents must exist by end of epoch")
6028                    .into_inner()
6029                    .into_iter()
6030                    .map(|digests| digests.transaction)
6031            })
6032            .collect();
6033        let included_execution_time_observations: HashSet<_> = self
6034            .get_transaction_cache_reader()
6035            .multi_get_transaction_blocks(&tx_digests)
6036            .into_iter()
6037            .flat_map(|maybe_tx| {
6038                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6039                    .expect("preceding transaction must exist by end of epoch")
6040                    .transaction_data()
6041                    .kind()
6042                {
6043                    #[allow(clippy::unnecessary_to_owned)]
6044                    itertools::Either::Left(
6045                        ptb.commands
6046                            .to_owned()
6047                            .into_iter()
6048                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6049                    )
6050                } else {
6051                    itertools::Either::Right(std::iter::empty())
6052                }
6053            })
6054            .chain(end_of_epoch_observation_keys.into_iter())
6055            .collect();
6056
6057        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6058            epoch_store
6059                .get_end_of_epoch_execution_time_observations()
6060                .filter_and_sort_v1(
6061                    |(key, _)| included_execution_time_observations.contains(key),
6062                    params.stored_observations_limit.try_into().unwrap(),
6063                ),
6064        );
6065        info!("Creating StoreExecutionTimeObservations tx");
6066        Some(tx)
6067    }
6068
6069    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6070    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6071    /// formed and executed by CheckpointExecutor.
6072    ///
6073    /// When a framework upgraded has been decided on, but the validator does not have the new
6074    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6075    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6076    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6077    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6078    /// by CheckpointExecutor.
6079    #[instrument(level = "error", skip_all)]
6080    pub async fn create_and_execute_advance_epoch_tx(
6081        &self,
6082        epoch_store: &Arc<AuthorityPerEpochStore>,
6083        gas_cost_summary: &GasCostSummary,
6084        checkpoint: CheckpointSequenceNumber,
6085        epoch_start_timestamp_ms: CheckpointTimestamp,
6086        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6087        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6088        // >1 checkpoint.
6089        last_checkpoint: CheckpointSequenceNumber,
6090    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6091        let mut txns = Vec::new();
6092
6093        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6094            txns.push(tx);
6095        }
6096        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6097            txns.push(tx);
6098        }
6099        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6100            txns.push(tx);
6101        }
6102        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6103            txns.push(tx);
6104        }
6105        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6106            txns.push(tx);
6107        }
6108        if let Some(tx) = self.create_execution_time_observations_tx(
6109            epoch_store,
6110            end_of_epoch_observation_keys,
6111            last_checkpoint,
6112        ) {
6113            txns.push(tx);
6114        }
6115        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6116            txns.push(tx);
6117        }
6118
6119        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6120            txns.push(tx);
6121        }
6122        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6123            txns.push(tx);
6124        }
6125        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6126            txns.push(tx);
6127        }
6128        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6129            txns.push(tx);
6130        }
6131
6132        let next_epoch = epoch_store.epoch() + 1;
6133
6134        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6135
6136        let (next_epoch_protocol_version, next_epoch_system_packages) =
6137            Self::choose_protocol_version_and_system_packages_v2(
6138                epoch_store.protocol_version(),
6139                epoch_store.protocol_config(),
6140                epoch_store.committee(),
6141                epoch_store
6142                    .get_capabilities_v2()
6143                    .expect("read capabilities from db cannot fail"),
6144                buffer_stake_bps,
6145            );
6146
6147        // since system packages are created during the current epoch, they should abide by the
6148        // rules of the current epoch, including the current epoch's max Move binary format version
6149        let config = epoch_store.protocol_config();
6150        let binary_config = config.binary_config(None);
6151        let Some(next_epoch_system_package_bytes) = self
6152            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6153            .await
6154        else {
6155            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6156                // This case should only be hit if the validator supports the new protocol version,
6157                // but carries a different framework. The validator should still be able to
6158                // reconfigure, as the correct framework will be installed by the change epoch txn.
6159                debug_fatal!(
6160                    "upgraded system packages {:?} are not locally available, cannot create \
6161                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6162                    next_epoch_system_packages
6163                );
6164            } else {
6165                error!(
6166                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6167                    next_epoch_protocol_version
6168                );
6169            }
6170            // the checkpoint builder will keep retrying forever when it hits this error.
6171            // Eventually, one of two things will happen:
6172            // - The operator will upgrade this binary to one that has the new packages locally,
6173            //   and this function will succeed.
6174            // - The final checkpoint will be certified by other validators, we will receive it via
6175            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6176            //   and most likely shut down in the new epoch (this validator likely doesn't support
6177            //   the new protocol version, or else it should have had the packages.)
6178            return Err(CheckpointBuilderError::SystemPackagesMissing);
6179        };
6180
6181        let tx = if epoch_store
6182            .protocol_config()
6183            .end_of_epoch_transaction_supported()
6184        {
6185            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6186                next_epoch,
6187                next_epoch_protocol_version,
6188                gas_cost_summary.storage_cost,
6189                gas_cost_summary.computation_cost,
6190                gas_cost_summary.storage_rebate,
6191                gas_cost_summary.non_refundable_storage_fee,
6192                epoch_start_timestamp_ms,
6193                next_epoch_system_package_bytes,
6194            ));
6195
6196            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6197        } else {
6198            VerifiedTransaction::new_change_epoch(
6199                next_epoch,
6200                next_epoch_protocol_version,
6201                gas_cost_summary.storage_cost,
6202                gas_cost_summary.computation_cost,
6203                gas_cost_summary.storage_rebate,
6204                gas_cost_summary.non_refundable_storage_fee,
6205                epoch_start_timestamp_ms,
6206                next_epoch_system_package_bytes,
6207            )
6208        };
6209
6210        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6211            tx.clone(),
6212            epoch_store.epoch(),
6213            checkpoint,
6214        );
6215
6216        let tx_digest = executable_tx.digest();
6217
6218        info!(
6219            ?next_epoch,
6220            ?next_epoch_protocol_version,
6221            ?next_epoch_system_packages,
6222            computation_cost=?gas_cost_summary.computation_cost,
6223            storage_cost=?gas_cost_summary.storage_cost,
6224            storage_rebate=?gas_cost_summary.storage_rebate,
6225            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6226            ?tx_digest,
6227            "Creating advance epoch transaction"
6228        );
6229
6230        fail_point_async!("change_epoch_tx_delay");
6231        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6232
6233        // The tx could have been executed by state sync already - if so simply return an error.
6234        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6235        if self
6236            .get_transaction_cache_reader()
6237            .is_tx_already_executed(tx_digest)
6238        {
6239            warn!("change epoch tx has already been executed via state sync");
6240            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6241        }
6242
6243        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6244        else {
6245            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6246        };
6247
6248        // We must manually assign the shared object versions to the transaction before executing it.
6249        // This is because we do not sequence end-of-epoch transactions through consensus.
6250        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6251            self.get_object_cache_reader().as_ref(),
6252            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6253        )?;
6254
6255        assert_eq!(assigned_versions.0.len(), 1);
6256        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6257
6258        let input_objects = self.read_objects_for_execution(
6259            &tx_lock,
6260            &executable_tx,
6261            &assigned_versions,
6262            epoch_store,
6263        )?;
6264
6265        let (transaction_outputs, _timings, _execution_error_opt) = self
6266            .execute_certificate(
6267                &execution_guard,
6268                &executable_tx,
6269                input_objects,
6270                None,
6271                ExecutionEnv::default(),
6272                epoch_store,
6273            )
6274            .unwrap();
6275        let system_obj = get_sui_system_state(&transaction_outputs.written)
6276            .expect("change epoch tx must write to system object");
6277
6278        let effects = transaction_outputs.effects;
6279        // We must write tx and effects to the state sync tables so that state sync is able to
6280        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6281        // checkpoint.
6282        self.get_state_sync_store()
6283            .insert_transaction_and_effects(&tx, &effects);
6284
6285        info!(
6286            "Effects summary of the change epoch transaction: {:?}",
6287            effects.summary_for_debug()
6288        );
6289        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6290        // The change epoch transaction cannot fail to execute.
6291        assert!(effects.status().is_ok());
6292        Ok((system_obj, effects))
6293    }
6294
6295    #[instrument(level = "error", skip_all)]
6296    async fn reopen_epoch_db(
6297        &self,
6298        cur_epoch_store: &AuthorityPerEpochStore,
6299        new_committee: Committee,
6300        epoch_start_configuration: EpochStartConfiguration,
6301        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6302        epoch_last_checkpoint: CheckpointSequenceNumber,
6303    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6304        let new_epoch = new_committee.epoch;
6305        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6306        assert_eq!(
6307            epoch_start_configuration.epoch_start_state().epoch(),
6308            new_committee.epoch
6309        );
6310        fail_point!("before-open-new-epoch-store");
6311        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6312            self.name,
6313            new_committee,
6314            epoch_start_configuration,
6315            self.get_backing_package_store().clone(),
6316            self.get_object_store().clone(),
6317            expensive_safety_check_config,
6318            epoch_last_checkpoint,
6319            self.config.fullnode_sync_mode,
6320        )?;
6321        self.epoch_store.store(new_epoch_store.clone());
6322        Ok(new_epoch_store)
6323    }
6324
6325    #[cfg(test)]
6326    pub(crate) fn iter_live_object_set_for_testing(
6327        &self,
6328    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6329        let include_wrapped_object = !self
6330            .epoch_store_for_testing()
6331            .protocol_config()
6332            .simplified_unwrap_then_delete();
6333        self.get_global_state_hash_store()
6334            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6335    }
6336
6337    #[cfg(test)]
6338    pub(crate) fn shutdown_execution_for_test(&self) {
6339        self.tx_execution_shutdown
6340            .lock()
6341            .take()
6342            .unwrap()
6343            .send(())
6344            .unwrap();
6345    }
6346
6347    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6348    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6349        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6350        self.get_object_cache_reader()
6351            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6352        self.get_reconfig_api()
6353            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6354        Ok(())
6355    }
6356}
6357
6358#[async_trait]
6359impl TransactionKeyValueStoreTrait for AuthorityState {
6360    #[instrument(skip(self))]
6361    async fn multi_get(
6362        &self,
6363        transactions: &[TransactionDigest],
6364        effects: &[TransactionDigest],
6365    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6366        let txns = if !transactions.is_empty() {
6367            self.get_transaction_cache_reader()
6368                .multi_get_transaction_blocks(transactions)
6369                .into_iter()
6370                .map(|t| t.map(|t| (*t).clone().into_inner()))
6371                .collect()
6372        } else {
6373            vec![]
6374        };
6375
6376        let fx = if !effects.is_empty() {
6377            self.get_transaction_cache_reader()
6378                .multi_get_executed_effects(effects)
6379        } else {
6380            vec![]
6381        };
6382
6383        Ok((txns, fx))
6384    }
6385
6386    #[instrument(skip(self))]
6387    async fn multi_get_checkpoints(
6388        &self,
6389        checkpoint_summaries: &[CheckpointSequenceNumber],
6390        checkpoint_contents: &[CheckpointSequenceNumber],
6391        checkpoint_summaries_by_digest: &[CheckpointDigest],
6392    ) -> SuiResult<(
6393        Vec<Option<CertifiedCheckpointSummary>>,
6394        Vec<Option<CheckpointContents>>,
6395        Vec<Option<CertifiedCheckpointSummary>>,
6396    )> {
6397        // TODO: use multi-get methods if it ever becomes important (unlikely)
6398        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6399        let store = self.get_checkpoint_store();
6400        for seq in checkpoint_summaries {
6401            let checkpoint = store
6402                .get_checkpoint_by_sequence_number(*seq)?
6403                .map(|c| c.into_inner());
6404
6405            summaries.push(checkpoint);
6406        }
6407
6408        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6409        for seq in checkpoint_contents {
6410            let checkpoint = store
6411                .get_checkpoint_by_sequence_number(*seq)?
6412                .and_then(|summary| {
6413                    store
6414                        .get_checkpoint_contents(&summary.content_digest)
6415                        .expect("db read cannot fail")
6416                });
6417            contents.push(checkpoint);
6418        }
6419
6420        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6421        for digest in checkpoint_summaries_by_digest {
6422            let checkpoint = store
6423                .get_checkpoint_by_digest(digest)?
6424                .map(|c| c.into_inner());
6425            summaries_by_digest.push(checkpoint);
6426        }
6427        Ok((summaries, contents, summaries_by_digest))
6428    }
6429
6430    #[instrument(skip(self))]
6431    async fn deprecated_get_transaction_checkpoint(
6432        &self,
6433        digest: TransactionDigest,
6434    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6435        Ok(self
6436            .get_checkpoint_cache()
6437            .deprecated_get_transaction_checkpoint(&digest)
6438            .map(|(_epoch, checkpoint)| checkpoint))
6439    }
6440
6441    #[instrument(skip(self))]
6442    async fn get_object(
6443        &self,
6444        object_id: ObjectID,
6445        version: VersionNumber,
6446    ) -> SuiResult<Option<Object>> {
6447        Ok(self
6448            .get_object_cache_reader()
6449            .get_object_by_key(&object_id, version))
6450    }
6451
6452    #[instrument(skip_all)]
6453    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6454        Ok(self
6455            .get_object_cache_reader()
6456            .multi_get_objects_by_key(object_keys))
6457    }
6458
6459    #[instrument(skip(self))]
6460    async fn multi_get_transaction_checkpoint(
6461        &self,
6462        digests: &[TransactionDigest],
6463    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6464        let res = self
6465            .get_checkpoint_cache()
6466            .deprecated_multi_get_transaction_checkpoint(digests);
6467
6468        Ok(res
6469            .into_iter()
6470            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6471            .collect())
6472    }
6473
6474    #[instrument(skip(self))]
6475    async fn multi_get_events_by_tx_digests(
6476        &self,
6477        digests: &[TransactionDigest],
6478    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6479        if digests.is_empty() {
6480            return Ok(vec![]);
6481        }
6482
6483        Ok(self
6484            .get_transaction_cache_reader()
6485            .multi_get_events(digests))
6486    }
6487}
6488
6489#[cfg(msim)]
6490pub mod framework_injection {
6491    use move_binary_format::CompiledModule;
6492    use std::collections::BTreeMap;
6493    use std::{cell::RefCell, collections::BTreeSet};
6494    use sui_framework::{BuiltInFramework, SystemPackage};
6495    use sui_types::base_types::{AuthorityName, ObjectID};
6496    use sui_types::is_system_package;
6497
6498    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6499
6500    // Thread local cache because all simtests run in a single unique thread.
6501    thread_local! {
6502        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6503    }
6504
6505    type Framework = Vec<CompiledModule>;
6506
6507    pub type PackageUpgradeCallback =
6508        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6509
6510    enum PackageOverrideConfig {
6511        Global(Framework),
6512        PerValidator(PackageUpgradeCallback),
6513    }
6514
6515    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6516        modules
6517            .iter()
6518            .map(|m| {
6519                let mut buf = Vec::new();
6520                m.serialize_with_version(m.version, &mut buf).unwrap();
6521                buf
6522            })
6523            .collect()
6524    }
6525
6526    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6527        OVERRIDE.with(|bs| {
6528            bs.borrow_mut()
6529                .insert(package_id, PackageOverrideConfig::Global(modules))
6530        });
6531    }
6532
6533    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6534        OVERRIDE.with(|bs| {
6535            bs.borrow_mut()
6536                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6537        });
6538    }
6539
6540    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6541        OVERRIDE.with(|bs| {
6542            let mut new_packages_not_to_include: BTreeSet<_> =
6543                BuiltInFramework::all_package_ids().into_iter().collect();
6544            for pkg in &packages {
6545                new_packages_not_to_include.remove(&pkg.id);
6546            }
6547            for pkg in packages {
6548                bs.borrow_mut()
6549                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6550            }
6551            for empty_pkg in new_packages_not_to_include {
6552                bs.borrow_mut()
6553                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6554            }
6555        });
6556    }
6557
6558    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6559        OVERRIDE.with(|cfg| {
6560            cfg.borrow().get(package_id).and_then(|entry| match entry {
6561                PackageOverrideConfig::Global(framework) => {
6562                    Some(compiled_modules_to_bytes(framework))
6563                }
6564                PackageOverrideConfig::PerValidator(func) => {
6565                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6566                }
6567            })
6568        })
6569    }
6570
6571    pub fn get_override_modules(
6572        package_id: &ObjectID,
6573        name: AuthorityName,
6574    ) -> Option<Vec<CompiledModule>> {
6575        OVERRIDE.with(|cfg| {
6576            cfg.borrow().get(package_id).and_then(|entry| match entry {
6577                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6578                PackageOverrideConfig::PerValidator(func) => func(name),
6579            })
6580        })
6581    }
6582
6583    pub fn get_override_system_package(
6584        package_id: &ObjectID,
6585        name: AuthorityName,
6586    ) -> Option<SystemPackage> {
6587        let bytes = get_override_bytes(package_id, name)?;
6588        let dependencies = if is_system_package(*package_id) {
6589            BuiltInFramework::get_package_by_id(package_id)
6590                .dependencies
6591                .to_vec()
6592        } else {
6593            // Assume that entirely new injected packages depend on all existing system packages.
6594            BuiltInFramework::all_package_ids()
6595        };
6596        Some(SystemPackage {
6597            id: *package_id,
6598            bytes,
6599            dependencies,
6600        })
6601    }
6602
6603    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6604        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6605        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6606            cfg.borrow()
6607                .keys()
6608                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6609                .collect()
6610        });
6611
6612        extra
6613            .into_iter()
6614            .map(|package| SystemPackage {
6615                id: package,
6616                bytes: get_override_bytes(&package, name).unwrap(),
6617                dependencies: BuiltInFramework::all_package_ids(),
6618            })
6619            .collect()
6620    }
6621}
6622
6623#[derive(Debug, Serialize, Deserialize, Clone)]
6624pub struct ObjDumpFormat {
6625    pub id: ObjectID,
6626    pub version: VersionNumber,
6627    pub digest: ObjectDigest,
6628    pub object: Object,
6629}
6630
6631impl ObjDumpFormat {
6632    fn new(object: Object) -> Self {
6633        let oref = object.compute_object_reference();
6634        Self {
6635            id: oref.0,
6636            version: oref.1,
6637            digest: oref.2,
6638            object,
6639        }
6640    }
6641}
6642
6643#[derive(Debug, Serialize, Deserialize, Clone)]
6644pub struct NodeStateDump {
6645    pub tx_digest: TransactionDigest,
6646    pub sender_signed_data: SenderSignedData,
6647    pub executed_epoch: u64,
6648    pub reference_gas_price: u64,
6649    pub protocol_version: u64,
6650    pub epoch_start_timestamp_ms: u64,
6651    pub computed_effects: TransactionEffects,
6652    pub expected_effects_digest: TransactionEffectsDigest,
6653    pub relevant_system_packages: Vec<ObjDumpFormat>,
6654    pub shared_objects: Vec<ObjDumpFormat>,
6655    pub loaded_child_objects: Vec<ObjDumpFormat>,
6656    pub modified_at_versions: Vec<ObjDumpFormat>,
6657    pub runtime_reads: Vec<ObjDumpFormat>,
6658    pub input_objects: Vec<ObjDumpFormat>,
6659}
6660
6661impl NodeStateDump {
6662    pub fn new(
6663        tx_digest: &TransactionDigest,
6664        effects: &TransactionEffects,
6665        expected_effects_digest: TransactionEffectsDigest,
6666        object_store: &dyn ObjectStore,
6667        epoch_store: &Arc<AuthorityPerEpochStore>,
6668        inner_temporary_store: &InnerTemporaryStore,
6669        certificate: &VerifiedExecutableTransaction,
6670    ) -> SuiResult<Self> {
6671        // Epoch info
6672        let executed_epoch = epoch_store.epoch();
6673        let reference_gas_price = epoch_store.reference_gas_price();
6674        let epoch_start_config = epoch_store.epoch_start_config();
6675        let protocol_version = epoch_store.protocol_version().as_u64();
6676        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6677
6678        // Record all system packages at this version
6679        let mut relevant_system_packages = Vec::new();
6680        for sys_package_id in BuiltInFramework::all_package_ids() {
6681            if let Some(w) = object_store.get_object(&sys_package_id) {
6682                relevant_system_packages.push(ObjDumpFormat::new(w))
6683            }
6684        }
6685
6686        // Record all the shared objects
6687        let mut shared_objects = Vec::new();
6688        for kind in effects.input_consensus_objects() {
6689            match kind {
6690                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6691                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6692                        shared_objects.push(ObjDumpFormat::new(w))
6693                    }
6694                }
6695                InputConsensusObject::ReadConsensusStreamEnded(..)
6696                | InputConsensusObject::MutateConsensusStreamEnded(..)
6697                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6698            }
6699        }
6700
6701        // Record all loaded child objects
6702        // Child objects which are read but not mutated are not tracked anywhere else
6703        let mut loaded_child_objects = Vec::new();
6704        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6705            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6706                loaded_child_objects.push(ObjDumpFormat::new(w))
6707            }
6708        }
6709
6710        // Record all modified objects
6711        let mut modified_at_versions = Vec::new();
6712        for (id, ver) in effects.modified_at_versions() {
6713            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6714                modified_at_versions.push(ObjDumpFormat::new(w))
6715            }
6716        }
6717
6718        // Packages read at runtime, which were not previously loaded into the temoorary store
6719        // Some packages may be fetched at runtime and wont show up in input objects
6720        let mut runtime_reads = Vec::new();
6721        for obj in inner_temporary_store
6722            .runtime_packages_loaded_from_db
6723            .values()
6724        {
6725            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6726        }
6727
6728        // All other input objects should already be in `inner_temporary_store.objects`
6729
6730        Ok(Self {
6731            tx_digest: *tx_digest,
6732            executed_epoch,
6733            reference_gas_price,
6734            epoch_start_timestamp_ms,
6735            protocol_version,
6736            relevant_system_packages,
6737            shared_objects,
6738            loaded_child_objects,
6739            modified_at_versions,
6740            runtime_reads,
6741            sender_signed_data: certificate.clone().into_message(),
6742            input_objects: inner_temporary_store
6743                .input_objects
6744                .values()
6745                .map(|o| ObjDumpFormat::new(o.clone()))
6746                .collect(),
6747            computed_effects: effects.clone(),
6748            expected_effects_digest,
6749        })
6750    }
6751
6752    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6753        let mut objects = Vec::new();
6754        objects.extend(self.relevant_system_packages.clone());
6755        objects.extend(self.shared_objects.clone());
6756        objects.extend(self.loaded_child_objects.clone());
6757        objects.extend(self.modified_at_versions.clone());
6758        objects.extend(self.runtime_reads.clone());
6759        objects.extend(self.input_objects.clone());
6760        objects
6761    }
6762
6763    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6764        let file_name = format!(
6765            "{}_{}_NODE_DUMP.json",
6766            self.tx_digest,
6767            AuthorityState::unixtime_now_ms()
6768        );
6769        let mut path = path.to_path_buf();
6770        path.push(&file_name);
6771        let mut file = File::create(path.clone())?;
6772        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6773        Ok(path)
6774    }
6775
6776    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6777        let file = File::open(path)?;
6778        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6779    }
6780}