sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::ZipDebugEqIteratorExt;
39use mysten_common::{assert_reachable, fatal};
40use parking_lot::Mutex;
41use prometheus::{
42    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43    register_histogram_vec_with_registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62    collections::{HashMap, HashSet},
63    fs,
64    pin::Pin,
65    str::FromStr,
66    sync::Arc,
67    vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::crypto::RandomnessRound;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::execution_status::ExecutionStatus;
83use sui_types::inner_temporary_store::PackageStoreWithFallback;
84use sui_types::layout_resolver::LayoutResolver;
85use sui_types::layout_resolver::into_struct_layout;
86use sui_types::messages_consensus::AuthorityCapabilitiesV2;
87use sui_types::object::bounded_visitor::BoundedVisitor;
88use sui_types::storage::ChildObjectResolver;
89use sui_types::storage::InputKey;
90use sui_types::storage::TrackingBackingStore;
91use sui_types::traffic_control::{
92    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
93};
94use sui_types::transaction_executor::SimulateTransactionResult;
95use sui_types::transaction_executor::TransactionChecks;
96use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
97use tap::TapFallible;
98use tokio::sync::RwLock;
99use tokio::sync::mpsc::unbounded_channel;
100use tokio::sync::watch::error::RecvError;
101use tokio::sync::{mpsc, oneshot};
102use tokio::task::JoinHandle;
103use tokio::time::timeout;
104use tracing::{debug, error, info, instrument, warn};
105
106use self::authority_store::ExecutionLockWriteGuard;
107use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
108pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
109use mysten_metrics::{monitored_scope, spawn_monitored_task};
110
111use crate::jsonrpc_index::IndexStore;
112use crate::jsonrpc_index::{
113    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
114};
115use mysten_common::{debug_fatal, debug_fatal_no_invariant};
116use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
117use sui_config::genesis::Genesis;
118use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
119use sui_framework::{BuiltInFramework, SystemPackage};
120use sui_json_rpc_types::{
121    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
122    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
123    SuiTransactionBlockEvents, TransactionFilter,
124};
125use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139    TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, ExecutionErrorTrait, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{
147    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
148};
149use sui_types::message_envelope::Message;
150use sui_types::messages_checkpoint::{
151    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
152    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
153    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
154    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
155};
156use sui_types::messages_grpc::{
157    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
158    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
159};
160use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
161use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
162use sui_types::signature::GenericSignature;
163use sui_types::storage::{
164    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
165};
166use sui_types::sui_system_state::SuiSystemStateTrait;
167use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
168use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
169use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
170use sui_types::{
171    SUI_SYSTEM_ADDRESS,
172    base_types::*,
173    committee::Committee,
174    crypto::AuthoritySignature,
175    error::{SuiError, SuiResult},
176    object::{Object, ObjectRead},
177    transaction::*,
178};
179use sui_types::{TypeTag, is_system_package};
180use typed_store::TypedStoreError;
181use typed_store::rocks::StagedBatch;
182
183use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
184use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
185use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
186use crate::authority::authority_store_pruner::{
187    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
188};
189use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
190use crate::authority::epoch_start_configuration::EpochStartConfiguration;
191use crate::checkpoints::CheckpointStore;
192use crate::epoch::committee_store::CommitteeStore;
193use crate::execution_cache::{
194    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
195    ObjectCacheRead, StateSyncAPI,
196};
197use crate::execution_driver::execution_process;
198use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
199use crate::metrics::LatencyObserver;
200use crate::metrics::RateTracker;
201use crate::module_cache_metrics::ResolverMetrics;
202use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
203use crate::stake_aggregator::StakeAggregator;
204use crate::subscription_handler::SubscriptionHandler;
205use crate::transaction_input_loader::TransactionInputLoader;
206
207#[cfg(msim)]
208pub use crate::checkpoints::checkpoint_executor::utils::{
209    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
210};
211
212#[cfg(msim)]
213use sui_types::committee::CommitteeTrait;
214use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
215
216#[cfg(test)]
217#[path = "unit_tests/authority_tests.rs"]
218pub mod authority_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/transaction_tests.rs"]
222pub mod transaction_tests;
223
224#[cfg(test)]
225#[path = "unit_tests/batch_transaction_tests.rs"]
226mod batch_transaction_tests;
227
228#[cfg(test)]
229#[path = "unit_tests/move_integration_tests.rs"]
230pub mod move_integration_tests;
231
232#[cfg(test)]
233#[path = "unit_tests/gas_tests.rs"]
234mod gas_tests;
235
236#[cfg(test)]
237#[path = "unit_tests/gas_data_tests.rs"]
238mod gas_data_tests;
239
240#[cfg(test)]
241#[path = "unit_tests/batch_verification_tests.rs"]
242mod batch_verification_tests;
243
244#[cfg(test)]
245#[path = "unit_tests/coin_deny_list_tests.rs"]
246mod coin_deny_list_tests;
247
248#[cfg(test)]
249#[path = "unit_tests/auth_unit_test_utils.rs"]
250pub mod auth_unit_test_utils;
251
252pub mod authority_test_utils;
253
254pub mod authority_per_epoch_store;
255pub mod authority_per_epoch_store_pruner;
256
257pub mod authority_store_pruner;
258pub mod authority_store_tables;
259pub mod authority_store_types;
260pub mod congestion_log;
261pub mod consensus_tx_status_cache;
262pub(crate) mod epoch_marker_key;
263pub mod epoch_start_configuration;
264pub mod execution_time_estimator;
265pub mod shared_object_congestion_tracker;
266pub mod shared_object_version_manager;
267pub mod submitted_transaction_cache;
268pub mod test_authority_builder;
269pub mod transaction_deferral;
270pub mod transaction_reject_reason_cache;
271mod weighted_moving_average;
272
273pub(crate) mod authority_store;
274pub mod backpressure;
275
276/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
277pub struct AuthorityMetrics {
278    tx_orders: IntCounter,
279    total_certs: IntCounter,
280    total_effects: IntCounter,
281    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
282    pub shared_obj_tx: IntCounter,
283    sponsored_tx: IntCounter,
284    tx_already_processed: IntCounter,
285    num_input_objs: Histogram,
286    // TODO: this tracks consensus object count, not just shared. Consider renaming.
287    num_shared_objects: Histogram,
288    batch_size: Histogram,
289
290    authority_state_handle_vote_transaction_latency: Histogram,
291
292    internal_execution_latency: Histogram,
293    execution_load_input_objects_latency: Histogram,
294    prepare_certificate_latency: Histogram,
295    commit_certificate_latency: Histogram,
296    db_checkpoint_latency: Histogram,
297
298    // TODO: Rename these metrics.
299    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
300    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
301    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
302    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
303
304    pub(crate) execution_driver_executed_transactions: IntCounter,
305    pub(crate) execution_driver_paused_transactions: IntCounter,
306    pub(crate) execution_driver_dispatch_queue: IntGauge,
307    pub(crate) execution_queueing_delay_s: Histogram,
308    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
309    pub(crate) execution_gas_latency_ratio: Histogram,
310
311    pub(crate) skipped_consensus_txns: IntCounter,
312    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
313    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
314
315    pub(crate) authority_overload_status: IntGauge,
316    pub(crate) authority_load_shedding_percentage: IntGauge,
317
318    pub(crate) transaction_overload_sources: IntCounterVec,
319
320    /// Post processing metrics
321    post_processing_total_events_emitted: IntCounter,
322    post_processing_total_tx_indexed: IntCounter,
323    post_processing_total_tx_had_event_processed: IntCounter,
324    post_processing_total_failures: IntCounter,
325
326    /// Consensus commit and transaction handler metrics
327    pub consensus_handler_processed: IntCounterVec,
328    pub consensus_handler_transaction_sizes: HistogramVec,
329    pub consensus_handler_deferred_transactions: IntCounter,
330    pub consensus_handler_congested_transactions: IntCounter,
331    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
332    pub consensus_handler_cancelled_transactions: IntCounter,
333    pub consensus_handler_max_object_costs: IntGaugeVec,
334    pub consensus_committed_subdags: IntCounterVec,
335    pub accumulator_deposits: IntCounter,
336    pub accumulator_withdrawals: IntCounter,
337    pub consensus_committed_messages: IntGaugeVec,
338    pub consensus_committed_user_transactions: IntGaugeVec,
339    pub consensus_finalized_user_transactions: IntGaugeVec,
340    pub consensus_rejected_user_transactions: IntGaugeVec,
341    pub consensus_calculated_throughput: IntGauge,
342    pub consensus_calculated_throughput_profile: IntGauge,
343    pub consensus_block_handler_block_processed: IntCounter,
344    pub consensus_block_handler_txn_processed: IntCounterVec,
345    pub consensus_block_handler_fastpath_executions: IntCounter,
346    pub consensus_timestamp_bias: Histogram,
347
348    pub execution_metrics: Arc<ExecutionMetrics>,
349
350    /// bytecode verifier metrics for tracking timeouts
351    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
352
353    /// Count of zklogin signatures
354    pub zklogin_sig_count: IntCounter,
355    /// Count of multisig signatures
356    pub multisig_sig_count: IntCounter,
357
358    // Tracks recent average txn queueing delay between when it is ready for execution
359    // until it starts executing.
360    pub execution_queueing_latency: LatencyObserver,
361
362    // Tracks the rate of transactions become ready for execution in transaction manager.
363    // The need for the Mutex is that the tracker is updated in transaction manager and read
364    // in the overload_monitor. There should be low mutex contention because
365    // transaction manager is single threaded and the read rate in overload_monitor is
366    // low. In the case where transaction manager becomes multi-threaded, we can
367    // create one rate tracker per thread.
368    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
369
370    // Tracks the rate of transactions starts execution in execution driver.
371    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
372    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
373}
374
375// Override default Prom buckets for positive numbers in 0-10M range
376const POSITIVE_INT_BUCKETS: &[f64] = &[
377    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
378    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
379    7000000., 10000000.,
380];
381
382const LATENCY_SEC_BUCKETS: &[f64] = &[
383    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
384    10., 20., 30., 60., 90.,
385];
386
387// Buckets for low latency samples. Starts from 10us.
388const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
389    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
390    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
391];
392
393// Buckets for consensus timestamp bias in seconds.
394// We expect 200-300ms, so we cluster the buckets more tightly in that range.
395const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
396    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
397    2.0, 10.0, 30.0,
398];
399
400const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
401    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
402    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
403];
404
405pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
406
407// Transaction author should have observed the input objects as finalized output,
408// so usually the wait does not need to be long.
409// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
410pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
411
412impl AuthorityMetrics {
413    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
414        Self {
415            tx_orders: register_int_counter_with_registry!(
416                "total_transaction_orders",
417                "Total number of transaction orders",
418                registry,
419            )
420            .unwrap(),
421            total_certs: register_int_counter_with_registry!(
422                "total_transaction_certificates",
423                "Total number of transaction certificates handled",
424                registry,
425            )
426            .unwrap(),
427            // total_effects == total transactions finished
428            total_effects: register_int_counter_with_registry!(
429                "total_transaction_effects",
430                "Total number of transaction effects produced",
431                registry,
432            )
433            .unwrap(),
434
435            shared_obj_tx: register_int_counter_with_registry!(
436                "num_shared_obj_tx",
437                "Number of transactions involving shared objects",
438                registry,
439            )
440            .unwrap(),
441
442            sponsored_tx: register_int_counter_with_registry!(
443                "num_sponsored_tx",
444                "Number of sponsored transactions",
445                registry,
446            )
447            .unwrap(),
448
449            tx_already_processed: register_int_counter_with_registry!(
450                "num_tx_already_processed",
451                "Number of transaction orders already processed previously",
452                registry,
453            )
454            .unwrap(),
455            num_input_objs: register_histogram_with_registry!(
456                "num_input_objects",
457                "Distribution of number of input TX objects per TX",
458                POSITIVE_INT_BUCKETS.to_vec(),
459                registry,
460            )
461            .unwrap(),
462            num_shared_objects: register_histogram_with_registry!(
463                "num_shared_objects",
464                "Number of shared input objects per TX",
465                POSITIVE_INT_BUCKETS.to_vec(),
466                registry,
467            )
468            .unwrap(),
469            batch_size: register_histogram_with_registry!(
470                "batch_size",
471                "Distribution of size of transaction batch",
472                POSITIVE_INT_BUCKETS.to_vec(),
473                registry,
474            )
475            .unwrap(),
476            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
477                "authority_state_handle_vote_transaction_latency",
478                "Latency of voting on transactions without signing",
479                LATENCY_SEC_BUCKETS.to_vec(),
480                registry,
481            )
482            .unwrap(),
483            internal_execution_latency: register_histogram_with_registry!(
484                "authority_state_internal_execution_latency",
485                "Latency of actual certificate executions",
486                LATENCY_SEC_BUCKETS.to_vec(),
487                registry,
488            )
489            .unwrap(),
490            execution_load_input_objects_latency: register_histogram_with_registry!(
491                "authority_state_execution_load_input_objects_latency",
492                "Latency of loading input objects for execution",
493                LOW_LATENCY_SEC_BUCKETS.to_vec(),
494                registry,
495            )
496            .unwrap(),
497            prepare_certificate_latency: register_histogram_with_registry!(
498                "authority_state_prepare_certificate_latency",
499                "Latency of executing certificates, before committing the results",
500                LATENCY_SEC_BUCKETS.to_vec(),
501                registry,
502            )
503            .unwrap(),
504            commit_certificate_latency: register_histogram_with_registry!(
505                "authority_state_commit_certificate_latency",
506                "Latency of committing certificate execution results",
507                LATENCY_SEC_BUCKETS.to_vec(),
508                registry,
509            )
510            .unwrap(),
511            db_checkpoint_latency: register_histogram_with_registry!(
512                "db_checkpoint_latency",
513                "Latency of checkpointing dbs",
514                LATENCY_SEC_BUCKETS.to_vec(),
515                registry,
516            ).unwrap(),
517            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
518                "transaction_manager_num_enqueued_certificates",
519                "Current number of certificates enqueued to ExecutionScheduler",
520                &["result"],
521                registry,
522            )
523            .unwrap(),
524            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
525                "transaction_manager_num_pending_certificates",
526                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
527                registry,
528            )
529            .unwrap(),
530            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
531                "transaction_manager_num_executing_certificates",
532                "Number of executing certificates, including queued and actually running certificates",
533                registry,
534            )
535            .unwrap(),
536            authority_overload_status: register_int_gauge_with_registry!(
537                "authority_overload_status",
538                "Whether authority is current experiencing overload and enters load shedding mode.",
539                registry)
540            .unwrap(),
541            authority_load_shedding_percentage: register_int_gauge_with_registry!(
542                "authority_load_shedding_percentage",
543                "The percentage of transactions is shed when the authority is in load shedding mode.",
544                registry)
545            .unwrap(),
546            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
547                "transaction_manager_transaction_queue_age_s",
548                "Time spent in waiting for transaction in the queue",
549                LATENCY_SEC_BUCKETS.to_vec(),
550                registry,
551            )
552            .unwrap(),
553            transaction_overload_sources: register_int_counter_vec_with_registry!(
554                "transaction_overload_sources",
555                "Number of times each source indicates transaction overload.",
556                &["source"],
557                registry)
558            .unwrap(),
559            execution_driver_executed_transactions: register_int_counter_with_registry!(
560                "execution_driver_executed_transactions",
561                "Cumulative number of transaction executed by execution driver",
562                registry,
563            )
564            .unwrap(),
565            execution_driver_paused_transactions: register_int_counter_with_registry!(
566                "execution_driver_paused_transactions",
567                "Cumulative number of transactions paused by execution driver",
568                registry,
569            )
570            .unwrap(),
571            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
572                "execution_driver_dispatch_queue",
573                "Number of transaction pending in execution driver dispatch queue",
574                registry,
575            )
576            .unwrap(),
577            execution_queueing_delay_s: register_histogram_with_registry!(
578                "execution_queueing_delay_s",
579                "Queueing delay between a transaction is ready for execution until it starts executing.",
580                LATENCY_SEC_BUCKETS.to_vec(),
581                registry
582            )
583            .unwrap(),
584            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
585                "prepare_cert_gas_latency_ratio",
586                "The ratio of computation gas divided by VM execution latency.",
587                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
588                registry
589            )
590            .unwrap(),
591            execution_gas_latency_ratio: register_histogram_with_registry!(
592                "execution_gas_latency_ratio",
593                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
594                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
595                registry
596            )
597            .unwrap(),
598            skipped_consensus_txns: register_int_counter_with_registry!(
599                "skipped_consensus_txns",
600                "Total number of consensus transactions skipped",
601                registry,
602            )
603            .unwrap(),
604            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
605                "skipped_consensus_txns_cache_hit",
606                "Total number of consensus transactions skipped because of local cache hit",
607                registry,
608            )
609            .unwrap(),
610            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
611                "consensus_handler_duplicate_tx_count",
612                "Number of times each transaction appears in its first consensus commit",
613                POSITIVE_INT_BUCKETS.to_vec(),
614                registry,
615            )
616            .unwrap(),
617            post_processing_total_events_emitted: register_int_counter_with_registry!(
618                "post_processing_total_events_emitted",
619                "Total number of events emitted in post processing",
620                registry,
621            )
622            .unwrap(),
623            post_processing_total_tx_indexed: register_int_counter_with_registry!(
624                "post_processing_total_tx_indexed",
625                "Total number of txes indexed in post processing",
626                registry,
627            )
628            .unwrap(),
629            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
630                "post_processing_total_tx_had_event_processed",
631                "Total number of txes finished event processing in post processing",
632                registry,
633            )
634            .unwrap(),
635            post_processing_total_failures: register_int_counter_with_registry!(
636                "post_processing_total_failures",
637                "Total number of failure in post processing",
638                registry,
639            )
640            .unwrap(),
641            consensus_handler_processed: register_int_counter_vec_with_registry!(
642                "consensus_handler_processed",
643                "Number of transactions processed by consensus handler",
644                &["class"],
645                registry
646            ).unwrap(),
647            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
648                "consensus_handler_transaction_sizes",
649                "Sizes of each type of transactions processed by consensus handler",
650                &["class"],
651                POSITIVE_INT_BUCKETS.to_vec(),
652                registry
653            ).unwrap(),
654            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
655                "consensus_handler_deferred_transactions",
656                "Number of transactions deferred by consensus handler",
657                registry,
658            ).unwrap(),
659            consensus_handler_congested_transactions: register_int_counter_with_registry!(
660                "consensus_handler_congested_transactions",
661                "Number of transactions deferred by consensus handler due to congestion",
662                registry,
663            ).unwrap(),
664            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
665                "consensus_handler_unpaid_amplification_deferrals",
666                "Number of transactions deferred due to unpaid consensus amplification",
667                registry,
668            ).unwrap(),
669            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
670                "consensus_handler_cancelled_transactions",
671                "Number of transactions cancelled by consensus handler",
672                registry,
673            ).unwrap(),
674            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
675                "consensus_handler_max_congestion_control_object_costs",
676                "Max object costs for congestion control in the current consensus commit",
677                &["commit_type"],
678                registry,
679            ).unwrap(),
680            consensus_committed_subdags: register_int_counter_vec_with_registry!(
681                "consensus_committed_subdags",
682                "Number of committed subdags, sliced by author",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            accumulator_deposits: register_int_counter_with_registry!(
687                "accumulator_deposits",
688                "Total accumulator deposit (merge) events processed in settlement",
689                registry,
690            ).unwrap(),
691            accumulator_withdrawals: register_int_counter_with_registry!(
692                "accumulator_withdrawals",
693                "Total accumulator withdrawal (split) events processed in settlement",
694                registry,
695            ).unwrap(),
696            consensus_committed_messages: register_int_gauge_vec_with_registry!(
697                "consensus_committed_messages",
698                "Total number of committed consensus messages, sliced by author",
699                &["authority"],
700                registry,
701            ).unwrap(),
702            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
703                "consensus_committed_user_transactions",
704                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
705                &["authority"],
706                registry,
707            ).unwrap(),
708            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
709                "consensus_finalized_user_transactions",
710                "Number of user transactions finalized, sliced by submitter",
711                &["authority"],
712                registry,
713            ).unwrap(),
714            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
715                "consensus_rejected_user_transactions",
716                "Number of user transactions rejected, sliced by submitter",
717                &["authority"],
718                registry,
719            ).unwrap(),
720            execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
721            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
722            zklogin_sig_count: register_int_counter_with_registry!(
723                "zklogin_sig_count",
724                "Count of zkLogin signatures",
725                registry,
726            )
727            .unwrap(),
728            multisig_sig_count: register_int_counter_with_registry!(
729                "multisig_sig_count",
730                "Count of zkLogin signatures",
731                registry,
732            )
733            .unwrap(),
734            consensus_calculated_throughput: register_int_gauge_with_registry!(
735                "consensus_calculated_throughput",
736                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
737                registry,
738            ).unwrap(),
739            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
740                "consensus_calculated_throughput_profile",
741                "The current active calculated throughput profile",
742                registry
743            ).unwrap(),
744            consensus_block_handler_block_processed: register_int_counter_with_registry!(
745                "consensus_block_handler_block_processed",
746                "Number of blocks processed by consensus block handler.",
747                registry
748            ).unwrap(),
749            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
750                "consensus_block_handler_txn_processed",
751                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
752                &["outcome"],
753                registry
754            ).unwrap(),
755            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
756                "consensus_block_handler_fastpath_executions",
757                "Number of fastpath transactions sent for execution by consensus transaction handler",
758                registry,
759            ).unwrap(),
760            consensus_timestamp_bias: register_histogram_with_registry!(
761                "consensus_timestamp_bias",
762                "Bias/delay from consensus timestamp to system time",
763                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
764                registry
765            ).unwrap(),
766            execution_queueing_latency: LatencyObserver::new(),
767            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
768            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
769        }
770    }
771}
772
773/// a Trait object for `Signer` that is:
774/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
775/// - Sync, i.e. can be safely shared between threads.
776///
777/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
778///
779pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
780
781/// Execution env contains the "environment" for the transaction to be executed in, that is,
782/// all the information necessary for execution that is not specified by the transaction itself.
783#[derive(Debug, Clone)]
784pub struct ExecutionEnv {
785    /// The assigned version of each shared object for the transaction.
786    pub assigned_versions: AssignedVersions,
787    /// The expected digest of the effects of the transaction, if executing from checkpoint or
788    /// other sources where the effects are known in advance.
789    pub expected_effects_digest: Option<TransactionEffectsDigest>,
790    /// Status of the address funds withdraw scheduling of the transaction,
791    /// including both address and object funds withdraws.
792    pub funds_withdraw_status: FundsWithdrawStatus,
793    /// Transactions that must finish before this transaction can be executed.
794    /// Used to schedule barrier transactions after non-exclusive writes.
795    pub barrier_dependencies: Vec<TransactionDigest>,
796}
797
798impl Default for ExecutionEnv {
799    fn default() -> Self {
800        Self {
801            assigned_versions: Default::default(),
802            expected_effects_digest: None,
803            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
804            barrier_dependencies: Default::default(),
805        }
806    }
807}
808
809impl ExecutionEnv {
810    pub fn new() -> Self {
811        Default::default()
812    }
813
814    pub fn with_expected_effects_digest(
815        mut self,
816        expected_effects_digest: TransactionEffectsDigest,
817    ) -> Self {
818        self.expected_effects_digest = Some(expected_effects_digest);
819        self
820    }
821
822    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
823        self.assigned_versions = assigned_versions;
824        self
825    }
826
827    pub fn with_insufficient_funds(mut self) -> Self {
828        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
829        self
830    }
831
832    pub fn with_barrier_dependencies(
833        mut self,
834        barrier_dependencies: BTreeSet<TransactionDigest>,
835    ) -> Self {
836        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
837        self
838    }
839}
840
841#[derive(Debug)]
842pub struct ForkRecoveryState {
843    /// Transaction digest to effects digest overrides
844    transaction_overrides:
845        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
846}
847
848impl Default for ForkRecoveryState {
849    fn default() -> Self {
850        Self {
851            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
852        }
853    }
854}
855
856impl ForkRecoveryState {
857    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
858        let Some(config) = config else {
859            return Ok(Self::default());
860        };
861
862        let mut transaction_overrides = HashMap::new();
863        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
864            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
865                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
866            })?;
867            let effects_digest =
868                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
869                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
870                })?;
871            transaction_overrides.insert(tx_digest, effects_digest);
872        }
873
874        Ok(Self {
875            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
876        })
877    }
878
879    pub fn get_transaction_override(
880        &self,
881        tx_digest: &TransactionDigest,
882    ) -> Option<TransactionEffectsDigest> {
883        self.transaction_overrides.read().get(tx_digest).copied()
884    }
885}
886
887pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
888
889pub struct AuthorityState {
890    // Fixed size, static, identity of the authority
891    /// The name of this authority.
892    pub name: AuthorityName,
893    /// The signature key of the authority.
894    pub secret: StableSyncAuthoritySigner,
895
896    /// The database
897    input_loader: TransactionInputLoader,
898    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
899    coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
900
901    epoch_store: ArcSwap<AuthorityPerEpochStore>,
902
903    /// This lock denotes current 'execution epoch'.
904    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
905    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
906    /// from previous epoch that are executed but did not make into checkpoint.
907    execution_lock: RwLock<EpochId>,
908
909    pub indexes: Option<Arc<IndexStore>>,
910    pub rpc_index: Option<Arc<RpcIndexStore>>,
911
912    pub subscription_handler: Arc<SubscriptionHandler>,
913    pub checkpoint_store: Arc<CheckpointStore>,
914
915    committee_store: Arc<CommitteeStore>,
916
917    /// Schedules transaction execution.
918    execution_scheduler: Arc<ExecutionScheduler>,
919
920    /// Shuts down the execution task. Used only in testing.
921    #[allow(unused)]
922    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
923
924    pub metrics: Arc<AuthorityMetrics>,
925    _pruner: AuthorityStorePruner,
926    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
927
928    /// Take db checkpoints of different dbs
929    db_checkpoint_config: DBCheckpointConfig,
930
931    pub config: NodeConfig,
932
933    /// Current overload status in this authority. Updated periodically.
934    pub overload_info: AuthorityOverloadInfo,
935
936    /// The chain identifier is derived from the digest of the genesis checkpoint.
937    chain_identifier: ChainIdentifier,
938
939    pub(crate) congestion_tracker: Arc<CongestionTracker>,
940
941    /// Consumed by gasless tx rate limiter.
942    pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
943
944    /// Traffic controller for Sui core servers (json-rpc, validator service)
945    pub traffic_controller: Option<Arc<TrafficController>>,
946
947    /// Fork recovery state for handling equivocation after forks
948    fork_recovery_state: Option<ForkRecoveryState>,
949
950    /// Notification channel for reconfiguration
951    notify_epoch: tokio::sync::watch::Sender<EpochId>,
952
953    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
954    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
955
956    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
957    /// CheckpointExecutor removes entries and collects the index batches before committing
958    /// them atomically at checkpoint boundaries.
959    pending_post_processing:
960        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
961
962    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
963    /// the blocking thread pool. Defaults to the number of available CPUs.
964    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
965}
966
967/// The authority state encapsulates all state, drives execution, and ensures safety.
968///
969/// Note the authority operations can be accessed through a read ref (&) and do not
970/// require &mut. Internally a database is synchronized through a mutex lock.
971///
972/// Repeating valid commands should produce no changes and return no error.
973impl AuthorityState {
974    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
975        epoch_store.committee().authority_exists(&self.name)
976    }
977
978    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
979        !self.is_validator(epoch_store)
980    }
981
982    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
983        &self.committee_store
984    }
985
986    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
987        self.committee_store.clone()
988    }
989
990    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
991        &self.config.authority_overload_config
992    }
993
994    pub fn get_epoch_state_commitments(
995        &self,
996        epoch: EpochId,
997    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
998        self.checkpoint_store.get_epoch_state_commitments(epoch)
999    }
1000
1001    /// Runs deny list checks and processes funds withdrawals. Called before loading input
1002    /// objects, since these checks don't depend on object state.
1003    fn pre_object_load_checks(
1004        &self,
1005        tx_data: &TransactionData,
1006        tx_signatures: &[GenericSignature],
1007        input_object_kinds: &[InputObjectKind],
1008        receiving_objects_refs: &[ObjectRef],
1009        protocol_config: &ProtocolConfig,
1010    ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1011        // Note: the deny checks may do redundant package loads but:
1012        // - they only load packages when there is an active package deny map
1013        // - the loads are cached anyway
1014        sui_transaction_checks::deny::check_transaction_for_signing(
1015            tx_data,
1016            tx_signatures,
1017            input_object_kinds,
1018            receiving_objects_refs,
1019            &self.config.transaction_deny_config,
1020            self.get_backing_package_store().as_ref(),
1021        )?;
1022
1023        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1024            self.chain_identifier,
1025            self.coin_reservation_resolver.as_ref(),
1026        )?;
1027
1028        self.execution_cache_trait_pointers
1029            .account_funds_read
1030            .check_amounts_available(&declared_withdrawals)?;
1031
1032        if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1033            let min_amounts =
1034                sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1035            self.execution_cache_trait_pointers
1036                .account_funds_read
1037                .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1038        }
1039
1040        Ok(declared_withdrawals)
1041    }
1042
1043    fn handle_transaction_deny_checks(
1044        &self,
1045        transaction: &VerifiedTransaction,
1046        epoch_store: &Arc<AuthorityPerEpochStore>,
1047    ) -> SuiResult<CheckedInputObjects> {
1048        let tx_digest = transaction.digest();
1049        let tx_data = transaction.data().transaction_data();
1050
1051        let input_object_kinds = tx_data.input_objects()?;
1052        let receiving_objects_refs = tx_data.receiving_objects();
1053
1054        self.pre_object_load_checks(
1055            tx_data,
1056            transaction.tx_signatures(),
1057            &input_object_kinds,
1058            &receiving_objects_refs,
1059            epoch_store.protocol_config(),
1060        )?;
1061
1062        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1063            Some(tx_digest),
1064            &input_object_kinds,
1065            &receiving_objects_refs,
1066            epoch_store.epoch(),
1067        )?;
1068
1069        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1070            epoch_store.protocol_config(),
1071            epoch_store.reference_gas_price(),
1072            tx_data,
1073            input_objects,
1074            &receiving_objects,
1075            &self.metrics.bytecode_verifier_metrics,
1076            &self.config.verifier_signing_config,
1077        )?;
1078
1079        self.handle_coin_deny_list_checks(
1080            tx_data,
1081            &checked_input_objects,
1082            &receiving_objects,
1083            epoch_store,
1084        )?;
1085
1086        Ok(checked_input_objects)
1087    }
1088
1089    fn handle_coin_deny_list_checks(
1090        &self,
1091        tx_data: &TransactionData,
1092        checked_input_objects: &CheckedInputObjects,
1093        receiving_objects: &ReceivingObjects,
1094        epoch_store: &Arc<AuthorityPerEpochStore>,
1095    ) -> SuiResult<()> {
1096        use sui_types::balance::Balance;
1097
1098        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1099            self.chain_identifier,
1100            self.coin_reservation_resolver.as_ref(),
1101        )?;
1102
1103        let funds_withdraw_types = declared_withdrawals
1104            .values()
1105            .filter_map(|(_, type_tag)| {
1106                Balance::maybe_get_balance_type_param(type_tag)
1107                    .map(|ty| ty.to_canonical_string(false))
1108            })
1109            .collect::<BTreeSet<_>>();
1110
1111        if epoch_store.coin_deny_list_v1_enabled() {
1112            check_coin_deny_list_v1(
1113                tx_data.sender(),
1114                checked_input_objects,
1115                receiving_objects,
1116                funds_withdraw_types.clone(),
1117                &self.get_object_store(),
1118            )?;
1119        }
1120
1121        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1122            check_coin_deny_list_v2_during_signing(
1123                tx_data.sender(),
1124                checked_input_objects,
1125                receiving_objects,
1126                funds_withdraw_types.clone(),
1127                &self.get_object_store(),
1128            )?;
1129        }
1130
1131        Ok(())
1132    }
1133
1134    /// Vote for a transaction, either when validator receives a submit_transaction request,
1135    /// or sees a transaction from consensus. Performs the same types of checks as
1136    /// transaction signing, but does not explicitly sign the transaction.
1137    /// Note that if the transaction has been executed, we still go through the
1138    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1139    /// but not yet finalized, the signing will still work since the objects are still available.
1140    /// But if the transaction has been finalized, the signing will fail.
1141    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1142    /// has already been finalized executed.
1143    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1144    pub fn handle_vote_transaction(
1145        &self,
1146        epoch_store: &Arc<AuthorityPerEpochStore>,
1147        transaction: VerifiedTransaction,
1148    ) -> SuiResult<()> {
1149        debug!("handle_vote_transaction");
1150
1151        let _metrics_guard = self
1152            .metrics
1153            .authority_state_handle_vote_transaction_latency
1154            .start_timer();
1155        self.metrics.tx_orders.inc();
1156
1157        // The should_accept_user_certs check here is best effort, because
1158        // between a validator signs a tx and a cert is formed, the validator
1159        // could close the window.
1160        if !epoch_store
1161            .get_reconfig_state_read_lock_guard()
1162            .should_accept_user_certs()
1163        {
1164            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1165        }
1166
1167        // Accept finalized transactions, instead of voting to reject them.
1168        // Checking executed transactions is limited to the current epoch.
1169        // Otherwise there can be a race where the transaction is accepted because it has executed,
1170        // but the executed effects are pruned post consensus, leading to failures.
1171        let tx_digest = *transaction.digest();
1172        if epoch_store.is_recently_finalized(&tx_digest)
1173            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1174        {
1175            assert_reachable!("transaction recently executed");
1176            return Ok(());
1177        }
1178
1179        if self
1180            .get_transaction_cache_reader()
1181            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1182        {
1183            assert_reachable!("transaction executed in last epoch");
1184            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1185                digest: (*transaction.digest()),
1186            }
1187            .into());
1188        }
1189
1190        // Ensure that validator cannot reconfigure while we are validating the transaction.
1191        let _execution_lock = self.execution_lock_for_validation()?;
1192
1193        let checked_input_objects =
1194            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1195
1196        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1197
1198        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1199        // in the consensus handler. Validation still runs to prevent spam transactions with
1200        // invalid object versions, and is necessary to handle recently created objects.
1201        self.get_cache_writer()
1202            .validate_owned_object_versions(&owned_objects)
1203    }
1204
1205    /// Used for early client validation check for transactions before submission to server.
1206    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1207    /// This allows for fast failure feedback to clients for non-retriable errors.
1208    ///
1209    /// The key addition is checking that owned object versions match live object versions.
1210    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1211    /// requested version (which may exist in storage as historical versions), whereas
1212    /// validators check against the live/current version during locking (verify_live_object).
1213    pub fn check_transaction_validity(
1214        &self,
1215        epoch_store: &Arc<AuthorityPerEpochStore>,
1216        transaction: &VerifiedTransaction,
1217        enforce_live_input_objects: bool,
1218    ) -> SuiResult<()> {
1219        if !epoch_store
1220            .get_reconfig_state_read_lock_guard()
1221            .should_accept_user_certs()
1222        {
1223            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1224        }
1225
1226        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1227
1228        let checked_input_objects =
1229            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1230
1231        // Check that owned object versions match live objects
1232        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1233        let cache_reader = self.get_object_cache_reader();
1234
1235        for obj_ref in &owned_objects {
1236            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1237                // Only reject if transaction references an old version. Allow newer
1238                // versions that may exist on validators but not yet synced to fullnode.
1239                if obj_ref.1 < live_object.version() {
1240                    return Err(SuiErrorKind::UserInputError {
1241                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1242                            provided_obj_ref: *obj_ref,
1243                            current_version: live_object.version(),
1244                        },
1245                    }
1246                    .into());
1247                }
1248
1249                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1250                    // Returns a non-retriable error when the input object version is required to be live.
1251                    return Err(SuiErrorKind::UserInputError {
1252                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1253                            provided_obj_ref: *obj_ref,
1254                            current_version: live_object.version(),
1255                        },
1256                    }
1257                    .into());
1258                }
1259
1260                // If version matches, verify digest also matches
1261                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1262                    return Err(SuiErrorKind::UserInputError {
1263                        error: UserInputError::InvalidObjectDigest {
1264                            object_id: obj_ref.0,
1265                            expected_digest: live_object.digest(),
1266                        },
1267                    }
1268                    .into());
1269                }
1270            }
1271        }
1272
1273        Ok(())
1274    }
1275
1276    pub fn check_system_overload_at_signing(&self) -> bool {
1277        self.config
1278            .authority_overload_config
1279            .check_system_overload_at_signing
1280    }
1281
1282    pub fn check_system_overload_at_execution(&self) -> bool {
1283        self.config
1284            .authority_overload_config
1285            .check_system_overload_at_execution
1286    }
1287
1288    pub(crate) fn check_system_overload(
1289        &self,
1290        tx_data: &SenderSignedData,
1291        do_authority_overload_check: bool,
1292    ) -> SuiResult {
1293        if do_authority_overload_check {
1294            self.check_authority_overload(tx_data).tap_err(|_| {
1295                self.update_overload_metrics("execution_queue");
1296            })?;
1297        }
1298        self.execution_scheduler
1299            .check_execution_overload(self.overload_config(), tx_data)
1300            .tap_err(|_| {
1301                self.update_overload_metrics("execution_pending");
1302            })?;
1303        // Consensus overload is handled by the admission queue in authority_server.rs.
1304
1305        let pending_tx_count = self
1306            .get_cache_commit()
1307            .approximate_pending_transaction_count();
1308        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1309            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1310                retry_after_secs: 10,
1311            }
1312            .into());
1313        }
1314
1315        Ok(())
1316    }
1317
1318    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1319        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1320            return Ok(());
1321        }
1322
1323        let load_shedding_percentage = self
1324            .overload_info
1325            .load_shedding_percentage
1326            .load(Ordering::Relaxed);
1327        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1328    }
1329
1330    pub(crate) fn update_overload_metrics(&self, source: &str) {
1331        self.metrics
1332            .transaction_overload_sources
1333            .with_label_values(&[source])
1334            .inc();
1335    }
1336
1337    /// Waits for fastpath (owned, package) dependency objects to become available.
1338    /// Returns true if a chosen set of fastpath dependency objects are available,
1339    /// returns false otherwise after an internal timeout.
1340    pub(crate) async fn wait_for_fastpath_dependency_objects(
1341        &self,
1342        transaction: &VerifiedTransaction,
1343        epoch: EpochId,
1344    ) -> SuiResult<bool> {
1345        let txn_data = transaction.data().transaction_data();
1346        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1347
1348        // Gather and filter input objects to wait for.
1349        let fastpath_dependency_objects: Vec<_> = move_objects
1350            .into_iter()
1351            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1352            .chain(
1353                packages
1354                    .into_iter()
1355                    .map(|package_id| InputKey::Package { id: package_id }),
1356            )
1357            .collect();
1358        let receiving_keys: HashSet<_> = receiving_objects
1359            .into_iter()
1360            .filter_map(|receiving_obj_ref| {
1361                self.should_wait_for_dependency_object(receiving_obj_ref)
1362            })
1363            .collect();
1364        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1365            return Ok(true);
1366        }
1367
1368        // Use shorter wait timeout in simtests to exercise server-side error paths and
1369        // client-side retry logic.
1370        let max_wait = if cfg!(msim) {
1371            Duration::from_millis(200)
1372        } else {
1373            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1374        };
1375
1376        match timeout(
1377            max_wait,
1378            self.get_object_cache_reader().notify_read_input_objects(
1379                &fastpath_dependency_objects,
1380                &receiving_keys,
1381                epoch,
1382            ),
1383        )
1384        .await
1385        {
1386            Ok(()) => Ok(true),
1387            // Maybe return an error for unavailable input objects,
1388            // and allow the caller to skip the rest of input checks?
1389            Err(_) => Ok(false),
1390        }
1391    }
1392
1393    /// Returns Some(inputKey) if the object reference should be waited on until it is
1394    /// finalized, before proceeding to input checks.
1395    ///
1396    /// Incorrect decisions here should only affect user experience, not safety:
1397    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1398    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1399    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1400        let (obj_id, cur_version, _digest) = obj_ref;
1401        let Some(latest_obj_ref) = self
1402            .get_object_cache_reader()
1403            .get_latest_object_ref_or_tombstone(obj_id)
1404        else {
1405            // Object might not have been created.
1406            return Some(InputKey::VersionedObject {
1407                id: FullObjectID::new(obj_id, None),
1408                version: cur_version,
1409            });
1410        };
1411        let latest_digest = latest_obj_ref.2;
1412        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1413            // Do not wait for deleted object and rely on input check instead.
1414            return None;
1415        }
1416        let latest_version = latest_obj_ref.1;
1417        if cur_version <= latest_version {
1418            // Do not wait for version that already exists or has been consumed.
1419            // Let the input check to handle them and return the proper error.
1420            return None;
1421        }
1422        // Wait for the object version to become available.
1423        Some(InputKey::VersionedObject {
1424            id: FullObjectID::new(obj_id, None),
1425            version: cur_version,
1426        })
1427    }
1428
1429    /// Wait for a transaction to be executed. Transactions need to be sequenced by consensus
1430    /// before being enqueued for execution.
1431    ///
1432    /// Only use this in tests.
1433    #[instrument(level = "trace", skip_all)]
1434    pub async fn wait_for_transaction_execution_for_testing(
1435        &self,
1436        transaction: &VerifiedExecutableTransaction,
1437    ) -> TransactionEffects {
1438        self.notify_read_effects_for_testing(
1439            "AuthorityState::wait_for_transaction_execution_for_testing",
1440            *transaction.digest(),
1441        )
1442        .await
1443    }
1444
1445    /// Internal logic to execute a certificate.
1446    ///
1447    /// Guarantees that
1448    /// - If input objects are available, return no permanent failure.
1449    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1450    /// on successful execution; crashed execution has no observable effect and can be retried.
1451    ///
1452    /// It is caller's responsibility to ensure input objects are available and locks are set.
1453    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1454    ///
1455    /// Should only be called within sui-core.
1456    #[instrument(level = "trace", skip_all)]
1457    pub async fn try_execute_immediately(
1458        &self,
1459        certificate: &VerifiedExecutableTransaction,
1460        mut execution_env: ExecutionEnv,
1461        epoch_store: &Arc<AuthorityPerEpochStore>,
1462    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1463        let _scope = monitored_scope("Execution::try_execute_immediately");
1464        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1465
1466        let tx_digest = certificate.digest();
1467
1468        if let Some(fork_recovery) = &self.fork_recovery_state
1469            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1470        {
1471            warn!(
1472                ?tx_digest,
1473                original_digest = ?execution_env.expected_effects_digest,
1474                override_digest = ?override_digest,
1475                "Applying fork recovery override for transaction effects digest"
1476            );
1477            execution_env.expected_effects_digest = Some(override_digest);
1478        }
1479
1480        // prevent concurrent executions of the same tx.
1481        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1482
1483        let tx_cache_reader = self.get_transaction_cache_reader();
1484        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1485            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1486                assert_eq!(
1487                    effects.digest(),
1488                    expected_effects_digest,
1489                    "Unexpected effects digest for transaction {:?}",
1490                    tx_digest
1491                );
1492            }
1493            tx_guard.release();
1494            return ExecutionOutput::Success((effects, None));
1495        }
1496
1497        let execution_start_time = Instant::now();
1498
1499        // Any caller that verifies the signatures on the certificate will have already checked the
1500        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1501        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1502        // epoch, then it's invalid.
1503        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1504        else {
1505            tx_guard.release();
1506            return ExecutionOutput::EpochEnded;
1507        };
1508        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1509        // possible that reconfiguration has happened and they no longer match.
1510        // TODO: We may not need the following check anymore since the scheduler
1511        // should have checked that the certificate is from the same epoch as epoch_store.
1512        if *execution_guard != epoch_store.epoch() {
1513            tx_guard.release();
1514            info!("The epoch of the execution_guard doesn't match the epoch store");
1515            return ExecutionOutput::EpochEnded;
1516        }
1517
1518        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1519
1520        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1521            &tx_guard,
1522            &execution_guard,
1523            certificate,
1524            execution_env,
1525            epoch_store,
1526        ) {
1527            ExecutionOutput::Success(result) => result,
1528            output => return output.unwrap_err(),
1529        };
1530        let transaction_outputs = Arc::new(transaction_outputs);
1531
1532        fail_point!("crash");
1533
1534        let effects = transaction_outputs.effects.clone();
1535        debug!(
1536            ?tx_digest,
1537            fx_digest=?effects.digest(),
1538            "process_certificate succeeded in {:.3}ms",
1539            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1540        );
1541
1542        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1543        if let Err(err) = commit_result {
1544            error!(?tx_digest, "Error committing transaction: {err}");
1545            tx_guard.release();
1546            return ExecutionOutput::Fatal(err);
1547        }
1548
1549        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1550            certificate.data().transaction_data().kind()
1551        {
1552            if let Some(err) = &execution_error_opt {
1553                debug_fatal!("Authenticator state update failed: {:?}", err);
1554            }
1555            epoch_store.update_authenticator_state(auth_state);
1556
1557            // double check that the signature verifier always matches the authenticator state
1558            if cfg!(debug_assertions) {
1559                let authenticator_state = get_authenticator_state(self.get_object_store())
1560                    .expect("Read cannot fail")
1561                    .expect("Authenticator state must exist");
1562
1563                let mut sys_jwks: Vec<_> = authenticator_state
1564                    .active_jwks
1565                    .into_iter()
1566                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1567                    .collect();
1568                let mut active_jwks: Vec<_> = epoch_store
1569                    .signature_verifier
1570                    .get_jwks()
1571                    .into_iter()
1572                    .collect();
1573                sys_jwks.sort();
1574                active_jwks.sort();
1575
1576                assert_eq!(sys_jwks, active_jwks);
1577            }
1578        }
1579
1580        // TODO: We should also settle address funds during execution,
1581        // instead of from checkpoint builder. It will improve performance
1582        // since we will be settling as soon as we can.
1583        if certificate
1584            .transaction_data()
1585            .kind()
1586            .is_accumulator_barrier_settle_tx()
1587        {
1588            let object_funds_checker = self.object_funds_checker.load();
1589            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1590                // unwrap safe because we assign accumulator version for every transaction
1591                // when accumulator is enabled.
1592                let next_accumulator_version = accumulator_version.unwrap().next();
1593                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1594            }
1595        }
1596
1597        tx_guard.commit_tx();
1598
1599        let elapsed = execution_start_time.elapsed();
1600        epoch_store.record_local_execution_time(
1601            certificate.data().transaction_data(),
1602            &effects,
1603            timings,
1604            elapsed,
1605        );
1606
1607        let elapsed_us = elapsed.as_micros() as f64;
1608        if elapsed_us > 0.0 {
1609            self.metrics
1610                .execution_gas_latency_ratio
1611                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1612        };
1613        ExecutionOutput::Success((effects, execution_error_opt))
1614    }
1615
1616    pub fn read_objects_for_execution(
1617        &self,
1618        tx_lock: &CertLockGuard,
1619        certificate: &VerifiedExecutableTransaction,
1620        assigned_shared_object_versions: &AssignedVersions,
1621        epoch_store: &Arc<AuthorityPerEpochStore>,
1622    ) -> SuiResult<InputObjects> {
1623        let _scope = monitored_scope("Execution::load_input_objects");
1624        let _metrics_guard = self
1625            .metrics
1626            .execution_load_input_objects_latency
1627            .start_timer();
1628        let input_objects = &certificate.data().transaction_data().input_objects()?;
1629        self.input_loader.read_objects_for_execution(
1630            &certificate.key(),
1631            tx_lock,
1632            input_objects,
1633            assigned_shared_object_versions,
1634            epoch_store.epoch(),
1635        )
1636    }
1637
1638    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1639    /// transactions. Assumes execution will always succeed.
1640    pub async fn try_execute_for_test(
1641        &self,
1642        certificate: &VerifiedCertificate,
1643        execution_env: ExecutionEnv,
1644    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1645        let epoch_store = self.epoch_store_for_testing();
1646        let (effects, execution_error_opt) = self
1647            .try_execute_immediately(
1648                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1649                execution_env,
1650                &epoch_store,
1651            )
1652            .await
1653            .unwrap();
1654        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1655        (signed_effects, execution_error_opt)
1656    }
1657
1658    pub async fn try_execute_executable_for_test(
1659        &self,
1660        executable: &VerifiedExecutableTransaction,
1661        execution_env: ExecutionEnv,
1662    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1663        let epoch_store = self.epoch_store_for_testing();
1664        let (effects, execution_error_opt) = self
1665            .try_execute_immediately(executable, execution_env, &epoch_store)
1666            .await
1667            .unwrap();
1668        self.flush_post_processing(executable.digest()).await;
1669        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1670        (signed_effects, execution_error_opt)
1671    }
1672
1673    /// Wait until the effects of the given transaction are available and return them.
1674    /// Panics if the effects are not found.
1675    ///
1676    /// Only use this in tests where effects are expected to exist.
1677    pub async fn notify_read_effects_for_testing(
1678        &self,
1679        task_name: &'static str,
1680        digest: TransactionDigest,
1681    ) -> TransactionEffects {
1682        self.get_transaction_cache_reader()
1683            .notify_read_executed_effects(task_name, &[digest])
1684            .await
1685            .pop()
1686            .expect("must return correct number of effects")
1687    }
1688
1689    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1690        self.get_object_cache_reader()
1691            .check_owned_objects_are_live(owned_object_refs)
1692    }
1693
1694    /// This function captures the required state to debug a forked transaction.
1695    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1696    /// NOTE: Since this info escapes the validator context,
1697    /// make sure not to leak any private info here
1698    pub(crate) fn debug_dump_transaction_state(
1699        &self,
1700        tx_digest: &TransactionDigest,
1701        effects: &TransactionEffects,
1702        expected_effects_digest: TransactionEffectsDigest,
1703        inner_temporary_store: &InnerTemporaryStore,
1704        certificate: &VerifiedExecutableTransaction,
1705        debug_dump_config: &StateDebugDumpConfig,
1706    ) -> SuiResult<PathBuf> {
1707        let dump_dir = debug_dump_config
1708            .dump_file_directory
1709            .as_ref()
1710            .cloned()
1711            .unwrap_or(std::env::temp_dir());
1712        let epoch_store = self.load_epoch_store_one_call_per_task();
1713
1714        NodeStateDump::new(
1715            tx_digest,
1716            effects,
1717            expected_effects_digest,
1718            self.get_object_store().as_ref(),
1719            &epoch_store,
1720            inner_temporary_store,
1721            certificate,
1722        )?
1723        .write_to_file(&dump_dir)
1724        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1725    }
1726
1727    #[instrument(level = "trace", skip_all)]
1728    pub(crate) fn process_certificate(
1729        &self,
1730        tx_guard: &CertTxGuard,
1731        execution_guard: &ExecutionLockReadGuard<'_>,
1732        certificate: &VerifiedExecutableTransaction,
1733        execution_env: ExecutionEnv,
1734        epoch_store: &Arc<AuthorityPerEpochStore>,
1735    ) -> ExecutionOutput<(
1736        TransactionOutputs,
1737        Vec<ExecutionTiming>,
1738        Option<ExecutionError>,
1739    )> {
1740        let _scope = monitored_scope("Execution::process_certificate");
1741        let tx_digest = *certificate.digest();
1742
1743        let input_objects = match self.read_objects_for_execution(
1744            tx_guard.as_lock_guard(),
1745            certificate,
1746            &execution_env.assigned_versions,
1747            epoch_store,
1748        ) {
1749            Ok(objects) => objects,
1750            Err(e) => return ExecutionOutput::Fatal(e),
1751        };
1752
1753        let expected_effects_digest = match execution_env.expected_effects_digest {
1754            Some(expected_effects_digest) => Some(expected_effects_digest),
1755            None => {
1756                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1757                // restarting with a new binary. In this situation, if we have published an effects signature,
1758                // we must be sure not to equivocate.
1759                // TODO: read from cache instead of DB
1760                match epoch_store.get_signed_effects_digest(&tx_digest) {
1761                    Ok(digest) => digest,
1762                    Err(e) => return ExecutionOutput::Fatal(e),
1763                }
1764            }
1765        };
1766
1767        fail_point_if!("correlated-crash-process-certificate", || {
1768            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1769                sui_simulator::task::kill_current_node(None);
1770            }
1771        });
1772
1773        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1774        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1775        // this function occur before we have written anything to the db, so we commit the tx
1776        // guard and rely on the client to retry the tx (if it was transient).
1777        self.execute_certificate(
1778            execution_guard,
1779            certificate,
1780            input_objects,
1781            expected_effects_digest,
1782            execution_env,
1783            epoch_store,
1784        )
1785    }
1786
1787    pub async fn reconfigure_traffic_control(
1788        &self,
1789        params: TrafficControlReconfigParams,
1790    ) -> Result<TrafficControlReconfigParams, SuiError> {
1791        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1792            traffic_controller.admin_reconfigure(params).await
1793        } else {
1794            Err(SuiErrorKind::InvalidAdminRequest(
1795                "Traffic controller is not configured on this node".to_string(),
1796            )
1797            .into())
1798        }
1799    }
1800
1801    #[instrument(level = "trace", skip_all)]
1802    fn commit_certificate(
1803        &self,
1804        certificate: &VerifiedExecutableTransaction,
1805        transaction_outputs: Arc<TransactionOutputs>,
1806        epoch_store: &Arc<AuthorityPerEpochStore>,
1807    ) -> SuiResult {
1808        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1809            monitored_scope("Execution::commit_certificate");
1810        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1811
1812        let tx_digest = certificate.digest();
1813
1814        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1815        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1816        epoch_store.insert_executed_in_epoch(tx_digest);
1817        let key = certificate.key();
1818        if !matches!(key, TransactionKey::Digest(_)) {
1819            epoch_store.insert_tx_key(key, *tx_digest)?;
1820        }
1821
1822        // Allow testing what happens if we crash here.
1823        fail_point!("crash");
1824
1825        self.get_cache_writer()
1826            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1827
1828        // Fire an in-memory-only notification so the scheduler can detect that this
1829        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1830        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1831        // this to the DB to avoid stale entries surviving a crash while effects may not.
1832        if let Some(settlement_key) = certificate
1833            .transaction_data()
1834            .kind()
1835            .accumulator_barrier_settlement_key()
1836        {
1837            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1838        }
1839
1840        if certificate.transaction_data().is_end_of_epoch_tx() {
1841            // At the end of epoch, since system packages may have been upgraded, force
1842            // reload them in the cache.
1843            self.get_object_cache_reader()
1844                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1845        }
1846
1847        Ok(())
1848    }
1849
1850    fn update_metrics(
1851        &self,
1852        certificate: &VerifiedExecutableTransaction,
1853        inner_temporary_store: &InnerTemporaryStore,
1854        effects: &TransactionEffects,
1855    ) {
1856        // count signature by scheme, for zklogin and multisig
1857        if certificate.has_zklogin_sig() {
1858            self.metrics.zklogin_sig_count.inc();
1859        } else if certificate.has_upgraded_multisig() {
1860            self.metrics.multisig_sig_count.inc();
1861        }
1862
1863        self.metrics.total_effects.inc();
1864        self.metrics.total_certs.inc();
1865
1866        let consensus_object_count = effects.input_consensus_objects().len();
1867        if consensus_object_count > 0 {
1868            self.metrics.shared_obj_tx.inc();
1869        }
1870
1871        if certificate.is_sponsored_tx() {
1872            self.metrics.sponsored_tx.inc();
1873        }
1874
1875        let input_object_count = inner_temporary_store.input_objects.len();
1876        self.metrics
1877            .num_input_objs
1878            .observe(input_object_count as f64);
1879        self.metrics
1880            .num_shared_objects
1881            .observe(consensus_object_count as f64);
1882        self.metrics.batch_size.observe(
1883            certificate
1884                .data()
1885                .intent_message()
1886                .value
1887                .kind()
1888                .num_commands() as f64,
1889        );
1890    }
1891
1892    /// Helper function that handles transaction rewriting for coin reservations and executes
1893    /// the transaction to effects. Returns the execution results along with the command offset
1894    /// used for rewriting failure command indices.
1895    fn execute_transaction_to_effects(
1896        &self,
1897        executor: &dyn Executor,
1898        store: &dyn BackingStore,
1899        protocol_config: &ProtocolConfig,
1900        enable_expensive_checks: bool,
1901        execution_params: ExecutionOrEarlyError,
1902        epoch_id: &EpochId,
1903        epoch_timestamp_ms: u64,
1904        input_objects: CheckedInputObjects,
1905        gas_data: GasData,
1906        gas_status: SuiGasStatus,
1907        sender: SuiAddress,
1908        mut kind: TransactionKind,
1909        signer: SuiAddress,
1910        tx_digest: TransactionDigest,
1911        accumulator_version: Option<SequenceNumber>,
1912    ) -> (
1913        InnerTemporaryStore,
1914        SuiGasStatus,
1915        TransactionEffects,
1916        Vec<ExecutionTiming>,
1917        Result<(), ExecutionError>,
1918    ) {
1919        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
1920            self.chain_identifier,
1921            &*self.coin_reservation_resolver,
1922            sender,
1923            &mut kind,
1924            accumulator_version,
1925        );
1926
1927        let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1928            // TODO only run this function on FullNodes, use `execute_transaction_to_effects` on validators.
1929            .execute_transaction_to_effects_and_execution_error(
1930                store,
1931                protocol_config,
1932                self.metrics.execution_metrics.clone(),
1933                enable_expensive_checks,
1934                execution_params,
1935                epoch_id,
1936                epoch_timestamp_ms,
1937                input_objects,
1938                gas_data,
1939                gas_status,
1940                kind,
1941                rewritten_inputs,
1942                signer,
1943                tx_digest,
1944                &mut None,
1945            );
1946
1947        (
1948            inner_temp_store,
1949            gas_status,
1950            effects,
1951            timings,
1952            execution_error,
1953        )
1954    }
1955
1956    /// execute_certificate validates the transaction input, and executes the certificate,
1957    /// returning transaction outputs.
1958    ///
1959    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1960    ///
1961    /// Executes a certificate and returns an ExecutionOutput.
1962    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1963    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1964    #[instrument(level = "trace", skip_all)]
1965    fn execute_certificate(
1966        &self,
1967        _execution_guard: &ExecutionLockReadGuard<'_>,
1968        certificate: &VerifiedExecutableTransaction,
1969        input_objects: InputObjects,
1970        expected_effects_digest: Option<TransactionEffectsDigest>,
1971        execution_env: ExecutionEnv,
1972        epoch_store: &Arc<AuthorityPerEpochStore>,
1973    ) -> ExecutionOutput<(
1974        TransactionOutputs,
1975        Vec<ExecutionTiming>,
1976        Option<ExecutionError>,
1977    )> {
1978        let _scope = monitored_scope("Execution::prepare_certificate");
1979        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1980        let prepare_certificate_start_time = tokio::time::Instant::now();
1981
1982        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1983        let tx_data = certificate.data().transaction_data();
1984
1985        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1986            return ExecutionOutput::Fatal(e);
1987        }
1988
1989        // The cost of partially re-auditing a transaction before execution is tolerated.
1990        // This step is required for correctness because, for example, ConsensusAddressOwner
1991        // object owner may have changed between signing and execution.
1992        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1993            certificate,
1994            input_objects,
1995            epoch_store.protocol_config(),
1996            epoch_store.reference_gas_price(),
1997        ) {
1998            Ok(result) => result,
1999            Err(e) => return ExecutionOutput::Fatal(e),
2000        };
2001
2002        let owned_object_refs = input_objects.inner().filter_owned_objects();
2003        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2004            return ExecutionOutput::Fatal(e);
2005        }
2006        let tx_digest = *certificate.digest();
2007        let protocol_config = epoch_store.protocol_config();
2008        let transaction_data = &certificate.data().intent_message().value;
2009        let sender = transaction_data.sender();
2010        let (kind, signer, gas_data) = transaction_data.execution_parts();
2011        let early_execution_error = get_early_execution_error(
2012            &tx_digest,
2013            &input_objects,
2014            self.config.certificate_deny_config.certificate_deny_set(),
2015            &execution_env.funds_withdraw_status,
2016        );
2017        let execution_params = match early_execution_error {
2018            Some(error) => ExecutionOrEarlyError::Err(error),
2019            None => ExecutionOrEarlyError::Ok(()),
2020        };
2021
2022        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2023
2024        #[allow(unused_mut)]
2025        let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2026            .execute_transaction_to_effects(
2027                &**epoch_store.executor(),
2028                &tracking_store,
2029                protocol_config,
2030                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2031                // cyclic dependency w/ sui-adapter
2032                self.config
2033                    .expensive_safety_check_config
2034                    .enable_deep_per_tx_sui_conservation_check(),
2035                execution_params,
2036                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2037                epoch_store
2038                    .epoch_start_config()
2039                    .epoch_data()
2040                    .epoch_start_timestamp(),
2041                input_objects,
2042                gas_data,
2043                gas_status,
2044                sender,
2045                kind,
2046                signer,
2047                tx_digest,
2048                execution_env.assigned_versions.accumulator_version,
2049            );
2050
2051        let object_funds_checker = self.object_funds_checker.load();
2052        if let Some(object_funds_checker) = object_funds_checker.as_ref()
2053            && !object_funds_checker.should_commit_object_funds_withdraws(
2054                certificate,
2055                effects.status(),
2056                &inner_temp_store.accumulator_running_max_withdraws,
2057                &execution_env,
2058                self.get_account_funds_read(),
2059                &self.execution_scheduler,
2060                epoch_store,
2061            )
2062        {
2063            assert_reachable!("retry object withdraw later");
2064            return ExecutionOutput::RetryLater;
2065        }
2066
2067        if let Some(expected_effects_digest) = expected_effects_digest
2068            && effects.digest() != expected_effects_digest
2069        {
2070            // We dont want to mask the original error, so we log it and continue.
2071            match self.debug_dump_transaction_state(
2072                &tx_digest,
2073                &effects,
2074                expected_effects_digest,
2075                &inner_temp_store,
2076                certificate,
2077                &self.config.state_debug_dump_config,
2078            ) {
2079                Ok(out_path) => {
2080                    info!(
2081                        "Dumped node state for transaction {} to {}",
2082                        tx_digest,
2083                        out_path.as_path().display().to_string()
2084                    );
2085                }
2086                Err(e) => {
2087                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2088                }
2089            }
2090            let expected_effects = self
2091                .get_transaction_cache_reader()
2092                .get_effects(&expected_effects_digest);
2093            error!(
2094                ?tx_digest,
2095                ?expected_effects_digest,
2096                actual_effects = ?effects,
2097                expected_effects = ?expected_effects,
2098                "fork detected!"
2099            );
2100            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2101                tx_digest,
2102                expected_effects_digest,
2103                effects.digest(),
2104            ) {
2105                error!("Failed to record transaction fork: {e}");
2106            }
2107
2108            fail_point_if!("kill_transaction_fork_node", || {
2109                #[cfg(msim)]
2110                {
2111                    tracing::error!(
2112                        fatal = true,
2113                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2114                        tx_digest
2115                    );
2116                    sui_simulator::task::shutdown_current_node();
2117                }
2118            });
2119
2120            fatal!(
2121                "Transaction {} is expected to have effects digest {}, but got {}!",
2122                tx_digest,
2123                expected_effects_digest,
2124                effects.digest()
2125            );
2126        }
2127
2128        fail_point_arg!("simulate_fork_during_execution", |(
2129            forked_validators,
2130            full_halt,
2131            effects_overrides,
2132            fork_probability,
2133        ): (
2134            std::sync::Arc<
2135                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2136            >,
2137            bool,
2138            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2139            f32,
2140        )| {
2141            #[cfg(msim)]
2142            self.simulate_fork_during_execution(
2143                certificate,
2144                epoch_store,
2145                &mut effects,
2146                forked_validators,
2147                full_halt,
2148                effects_overrides,
2149                fork_probability,
2150            );
2151        });
2152
2153        let unchanged_loaded_runtime_objects =
2154            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2155                certificate.transaction_data(),
2156                &effects,
2157                &tracking_store.into_read_objects(),
2158            );
2159
2160        // index certificate
2161        let _ = self
2162            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2163            .tap_err(|e| {
2164                self.metrics.post_processing_total_failures.inc();
2165                error!(?tx_digest, "tx post processing failed: {e}");
2166            });
2167
2168        self.update_metrics(certificate, &inner_temp_store, &effects);
2169
2170        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2171            certificate.clone().into_unsigned(),
2172            effects,
2173            inner_temp_store,
2174            unchanged_loaded_runtime_objects,
2175        );
2176
2177        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2178        if elapsed > 0.0 {
2179            self.metrics.prepare_cert_gas_latency_ratio.observe(
2180                transaction_outputs
2181                    .effects
2182                    .gas_cost_summary()
2183                    .computation_cost as f64
2184                    / elapsed,
2185            );
2186        }
2187
2188        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2189    }
2190
2191    pub fn prepare_certificate_for_benchmark(
2192        &self,
2193        certificate: &VerifiedExecutableTransaction,
2194        input_objects: InputObjects,
2195        epoch_store: &Arc<AuthorityPerEpochStore>,
2196    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2197        let lock = RwLock::new(epoch_store.epoch());
2198        let execution_guard = lock.try_read().unwrap();
2199
2200        let (transaction_outputs, _timings, execution_error_opt) = self
2201            .execute_certificate(
2202                &execution_guard,
2203                certificate,
2204                input_objects,
2205                None,
2206                ExecutionEnv::default(),
2207                epoch_store,
2208            )
2209            .unwrap();
2210        Ok((transaction_outputs, execution_error_opt))
2211    }
2212
2213    #[instrument(skip_all)]
2214    #[allow(clippy::type_complexity)]
2215    pub async fn dry_exec_transaction(
2216        &self,
2217        transaction: TransactionData,
2218        transaction_digest: TransactionDigest,
2219    ) -> SuiResult<(
2220        DryRunTransactionBlockResponse,
2221        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2222        TransactionEffects,
2223        Option<ObjectID>,
2224    )> {
2225        let epoch_store = self.load_epoch_store_one_call_per_task();
2226        if !self.is_fullnode(&epoch_store) {
2227            return Err(SuiErrorKind::UnsupportedFeatureError {
2228                error: "dry-exec is only supported on fullnodes".to_string(),
2229            }
2230            .into());
2231        }
2232
2233        if transaction.kind().is_system_tx() {
2234            return Err(SuiErrorKind::UnsupportedFeatureError {
2235                error: "dry-exec does not support system transactions".to_string(),
2236            }
2237            .into());
2238        }
2239
2240        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2241    }
2242
2243    #[allow(clippy::type_complexity)]
2244    pub fn dry_exec_transaction_for_benchmark(
2245        &self,
2246        transaction: TransactionData,
2247        transaction_digest: TransactionDigest,
2248    ) -> SuiResult<(
2249        DryRunTransactionBlockResponse,
2250        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2251        TransactionEffects,
2252        Option<ObjectID>,
2253    )> {
2254        let epoch_store = self.load_epoch_store_one_call_per_task();
2255        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2256    }
2257
2258    #[allow(clippy::type_complexity)]
2259    fn dry_exec_transaction_impl(
2260        &self,
2261        epoch_store: &AuthorityPerEpochStore,
2262        transaction: TransactionData,
2263        transaction_digest: TransactionDigest,
2264    ) -> SuiResult<(
2265        DryRunTransactionBlockResponse,
2266        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2267        TransactionEffects,
2268        Option<ObjectID>,
2269    )> {
2270        // Cheap validity checks for a transaction, including input size limits.
2271        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2272
2273        let input_object_kinds = transaction.input_objects()?;
2274        let receiving_object_refs = transaction.receiving_objects();
2275
2276        sui_transaction_checks::deny::check_transaction_for_signing(
2277            &transaction,
2278            &[],
2279            &input_object_kinds,
2280            &receiving_object_refs,
2281            &self.config.transaction_deny_config,
2282            self.get_backing_package_store().as_ref(),
2283        )?;
2284
2285        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2286            // We don't want to cache this transaction since it's a dry run.
2287            None,
2288            &input_object_kinds,
2289            &receiving_object_refs,
2290            epoch_store.epoch(),
2291        )?;
2292
2293        // make a gas object if one was not provided
2294        let mut gas_data = transaction.gas_data().clone();
2295        let is_gasless =
2296            epoch_store.protocol_config().enable_gasless() && transaction.is_gasless_transaction();
2297        let ((gas_status, checked_input_objects), mock_gas) = if is_gasless {
2298            // Gasless transactions don't use gas coins — skip mock gas object injection.
2299            (
2300                sui_transaction_checks::check_transaction_input(
2301                    epoch_store.protocol_config(),
2302                    epoch_store.reference_gas_price(),
2303                    &transaction,
2304                    input_objects,
2305                    &receiving_objects,
2306                    &self.metrics.bytecode_verifier_metrics,
2307                    &self.config.verifier_signing_config,
2308                )?,
2309                None,
2310            )
2311        } else if transaction.gas().is_empty() {
2312            let sender = transaction.sender();
2313            // use a 1B sui coin
2314            const MIST_TO_SUI: u64 = 1_000_000_000;
2315            const DRY_RUN_SUI: u64 = 1_000_000_000;
2316            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2317            let gas_object_id = ObjectID::random();
2318            let gas_object = Object::new_move(
2319                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2320                Owner::AddressOwner(sender),
2321                TransactionDigest::genesis_marker(),
2322            );
2323            let gas_object_ref = gas_object.compute_object_reference();
2324            gas_data.payment = vec![gas_object_ref];
2325            (
2326                sui_transaction_checks::check_transaction_input_with_given_gas(
2327                    epoch_store.protocol_config(),
2328                    epoch_store.reference_gas_price(),
2329                    &transaction,
2330                    input_objects,
2331                    receiving_objects,
2332                    gas_object,
2333                    &self.metrics.bytecode_verifier_metrics,
2334                    &self.config.verifier_signing_config,
2335                )?,
2336                Some(gas_object_id),
2337            )
2338        } else {
2339            (
2340                sui_transaction_checks::check_transaction_input(
2341                    epoch_store.protocol_config(),
2342                    epoch_store.reference_gas_price(),
2343                    &transaction,
2344                    input_objects,
2345                    &receiving_objects,
2346                    &self.metrics.bytecode_verifier_metrics,
2347                    &self.config.verifier_signing_config,
2348                )?,
2349                None,
2350            )
2351        };
2352
2353        let protocol_config = epoch_store.protocol_config();
2354        let (kind, signer, _) = transaction.execution_parts();
2355
2356        let silent = true;
2357        let executor = sui_execution::executor(protocol_config, silent)
2358            .expect("Creating an executor should not fail here");
2359
2360        let expensive_checks = false;
2361        let early_execution_error = get_early_execution_error(
2362            &transaction_digest,
2363            &checked_input_objects,
2364            self.config.certificate_deny_config.certificate_deny_set(),
2365            // TODO(address-balances): This does not currently support balance withdraws properly.
2366            // For address balance withdraws, this cannot detect insufficient balance. We need to
2367            // first check if the balance is sufficient, similar to how we schedule withdraws.
2368            // For object balance withdraws, we need to handle the case where object balance is
2369            // insufficient in the post-execution.
2370            &FundsWithdrawStatus::MaybeSufficient,
2371        );
2372        let execution_params = match early_execution_error {
2373            Some(error) => ExecutionOrEarlyError::Err(error),
2374            None => ExecutionOrEarlyError::Ok(()),
2375        };
2376
2377        let (inner_temp_store, _, effects, _timings, execution_error) = self
2378            .execute_transaction_to_effects(
2379                executor.as_ref(),
2380                self.get_backing_store().as_ref(),
2381                protocol_config,
2382                expensive_checks,
2383                execution_params,
2384                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2385                epoch_store
2386                    .epoch_start_config()
2387                    .epoch_data()
2388                    .epoch_start_timestamp(),
2389                checked_input_objects,
2390                gas_data,
2391                gas_status,
2392                transaction.sender(),
2393                kind,
2394                signer,
2395                transaction_digest,
2396                // Use latest accumulator version for dry run/dev inspect
2397                None,
2398            );
2399
2400        let tx_digest = *effects.transaction_digest();
2401
2402        let module_cache =
2403            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2404
2405        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2406            epoch_store.protocol_config(),
2407            Box::new(PackageStoreWithFallback::new(
2408                &inner_temp_store,
2409                self.get_backing_package_store(),
2410            )),
2411        );
2412        // Returning empty vector here because we recalculate changes in the rpc layer.
2413        let object_changes = Vec::new();
2414
2415        // Returning empty vector here because we recalculate changes in the rpc layer.
2416        let balance_changes = Vec::new();
2417
2418        let written_with_kind = effects
2419            .created()
2420            .into_iter()
2421            .map(|(oref, _)| (oref, WriteKind::Create))
2422            .chain(
2423                effects
2424                    .unwrapped()
2425                    .into_iter()
2426                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2427            )
2428            .chain(
2429                effects
2430                    .mutated()
2431                    .into_iter()
2432                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2433            )
2434            .map(|(oref, kind)| {
2435                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2436                // TODO: Avoid clones.
2437                (oref.0, (oref, obj.clone(), kind))
2438            })
2439            .collect();
2440
2441        let execution_error_source = execution_error
2442            .as_ref()
2443            .err()
2444            .and_then(|e| e.source_ref().as_ref().map(|e| e.to_string()));
2445
2446        Ok((
2447            DryRunTransactionBlockResponse {
2448                suggested_gas_price: self
2449                    .congestion_tracker
2450                    .get_suggested_gas_prices(&transaction),
2451                input: SuiTransactionBlockData::try_from_with_module_cache(
2452                    transaction,
2453                    &module_cache,
2454                )
2455                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2456                    error: format!(
2457                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2458                        e
2459                    ),
2460                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2461                effects: effects.clone().try_into()?,
2462                events: SuiTransactionBlockEvents::try_from(
2463                    inner_temp_store.events.clone(),
2464                    tx_digest,
2465                    None,
2466                    layout_resolver.as_mut(),
2467                )?,
2468                object_changes,
2469                balance_changes,
2470                execution_error_source,
2471            },
2472            written_with_kind,
2473            effects,
2474            mock_gas,
2475        ))
2476    }
2477
2478    pub fn simulate_transaction(
2479        &self,
2480        mut transaction: TransactionData,
2481        checks: TransactionChecks,
2482        allow_mock_gas_coin: bool,
2483    ) -> SuiResult<SimulateTransactionResult> {
2484        if transaction.kind().is_system_tx() {
2485            return Err(SuiErrorKind::UnsupportedFeatureError {
2486                error: "simulate does not support system transactions".to_string(),
2487            }
2488            .into());
2489        }
2490
2491        let epoch_store = self.load_epoch_store_one_call_per_task();
2492        if !self.is_fullnode(&epoch_store) {
2493            return Err(SuiErrorKind::UnsupportedFeatureError {
2494                error: "simulate is only supported on fullnodes".to_string(),
2495            }
2496            .into());
2497        }
2498
2499        let dev_inspect = checks.disabled();
2500        if dev_inspect && self.config.dev_inspect_disabled {
2501            return Err(SuiErrorKind::UnsupportedFeatureError {
2502                error: "simulate with checks disabled is not allowed on this node".to_string(),
2503            }
2504            .into());
2505        }
2506
2507        // Reject coin reservations in gas payment when the execution engine
2508        // doesn't support them.
2509        let protocol_config = epoch_store.protocol_config();
2510        if !protocol_config.enable_coin_reservation_obj_refs()
2511            && transaction.gas().iter().any(|obj_ref| {
2512                sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2513            })
2514        {
2515            return Err(SuiErrorKind::UnsupportedFeatureError {
2516                error:
2517                    "coin reservations in gas payment are not supported at this protocol version"
2518                        .to_string(),
2519            }
2520            .into());
2521        }
2522
2523        // Cheap validity checks for a transaction, including input size limits.
2524        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2525
2526        let input_object_kinds = transaction.input_objects()?;
2527        let receiving_object_refs = transaction.receiving_objects();
2528
2529        // Create and inject mock gas coin before pre_object_load_checks so that
2530        // funds withdrawal processing sees non-empty payment and doesn't incorrectly
2531        // create an address balance withdrawal for gas.
2532        // Skip mock gas for gasless transactions — they don't use gas coins.
2533        let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2534        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2535        {
2536            let obj = Object::new_move(
2537                MoveObject::new_gas_coin(
2538                    OBJECT_START_VERSION,
2539                    ObjectID::MAX,
2540                    DEV_INSPECT_GAS_COIN_VALUE,
2541                ),
2542                Owner::AddressOwner(transaction.gas_data().owner),
2543                TransactionDigest::genesis_marker(),
2544            );
2545            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2546            Some(obj)
2547        } else {
2548            None
2549        };
2550
2551        let declared_withdrawals = self.pre_object_load_checks(
2552            &transaction,
2553            &[],
2554            &input_object_kinds,
2555            &receiving_object_refs,
2556            epoch_store.protocol_config(),
2557        )?;
2558        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2559
2560        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2561            // We don't want to cache this transaction since it's a simulation.
2562            None,
2563            &input_object_kinds,
2564            &receiving_object_refs,
2565            epoch_store.epoch(),
2566        )?;
2567
2568        // Add mock gas to input objects after loading (it doesn't exist in the store).
2569        let mock_gas_id = mock_gas_object.map(|obj| {
2570            let id = obj.id();
2571            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2572            id
2573        });
2574
2575        let protocol_config = epoch_store.protocol_config();
2576
2577        let (gas_status, checked_input_objects) = if dev_inspect {
2578            sui_transaction_checks::check_dev_inspect_input(
2579                protocol_config,
2580                &transaction,
2581                input_objects,
2582                receiving_objects,
2583                epoch_store.reference_gas_price(),
2584            )?
2585        } else {
2586            sui_transaction_checks::check_transaction_input(
2587                epoch_store.protocol_config(),
2588                epoch_store.reference_gas_price(),
2589                &transaction,
2590                input_objects,
2591                &receiving_objects,
2592                &self.metrics.bytecode_verifier_metrics,
2593                &self.config.verifier_signing_config,
2594            )?
2595        };
2596
2597        // TODO see if we can spin up a VM once and reuse it
2598        let executor = sui_execution::executor(
2599            protocol_config,
2600            true, // silent
2601        )
2602        .expect("Creating an executor should not fail here");
2603
2604        let (mut kind, signer, gas_data) = transaction.execution_parts();
2605        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2606            self.chain_identifier,
2607            &*self.coin_reservation_resolver,
2608            signer,
2609            &mut kind,
2610            None,
2611        );
2612        let early_execution_error = get_early_execution_error(
2613            &transaction.digest(),
2614            &checked_input_objects,
2615            self.config.certificate_deny_config.certificate_deny_set(),
2616            &FundsWithdrawStatus::MaybeSufficient,
2617        );
2618        let execution_params = match early_execution_error {
2619            Some(error) => ExecutionOrEarlyError::Err(error),
2620            None => ExecutionOrEarlyError::Ok(()),
2621        };
2622
2623        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2624
2625        // Clone inputs for potential retry if object funds check fails post-execution.
2626        let cloned_input_objects = checked_input_objects.clone();
2627        let cloned_gas = gas_data.clone();
2628        let cloned_kind = kind.clone();
2629        let tx_digest = transaction.digest();
2630        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2631        let epoch_timestamp_ms = epoch_store
2632            .epoch_start_config()
2633            .epoch_data()
2634            .epoch_start_timestamp();
2635        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2636            &tracking_store,
2637            protocol_config,
2638            self.metrics.execution_metrics.clone(),
2639            false, // expensive_checks
2640            execution_params,
2641            &epoch_id,
2642            epoch_timestamp_ms,
2643            checked_input_objects,
2644            gas_data,
2645            gas_status,
2646            kind,
2647            rewritten_inputs.clone(),
2648            signer,
2649            tx_digest,
2650            dev_inspect,
2651        );
2652
2653        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2654        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2655            let has_insufficient_object_funds = inner_temp_store
2656                .accumulator_running_max_withdraws
2657                .iter()
2658                .filter(|(id, _)| !address_funds.contains(id))
2659                .any(|(id, max_withdraw)| {
2660                    let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2661                    balance < *max_withdraw
2662                });
2663
2664            if has_insufficient_object_funds {
2665                let retry_gas_status = SuiGasStatus::new(
2666                    cloned_gas.budget,
2667                    cloned_gas.price,
2668                    epoch_store.reference_gas_price(),
2669                    protocol_config,
2670                )?;
2671                let (store, _, effects, result) = executor.dev_inspect_transaction(
2672                    &tracking_store,
2673                    protocol_config,
2674                    self.metrics.execution_metrics.clone(),
2675                    false,
2676                    ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2677                    &epoch_id,
2678                    epoch_timestamp_ms,
2679                    cloned_input_objects,
2680                    cloned_gas,
2681                    retry_gas_status,
2682                    cloned_kind,
2683                    rewritten_inputs,
2684                    signer,
2685                    tx_digest,
2686                    dev_inspect,
2687                );
2688                (store, effects, result)
2689            } else {
2690                (inner_temp_store, effects, execution_result)
2691            }
2692        } else {
2693            (inner_temp_store, effects, execution_result)
2694        };
2695
2696        let loaded_runtime_objects = tracking_store.into_read_objects();
2697        let unchanged_loaded_runtime_objects =
2698            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2699                &transaction,
2700                &effects,
2701                &loaded_runtime_objects,
2702            );
2703
2704        let object_set = {
2705            let objects = {
2706                let mut objects = loaded_runtime_objects;
2707
2708                for o in inner_temp_store
2709                    .input_objects
2710                    .into_values()
2711                    .chain(inner_temp_store.written.into_values())
2712                {
2713                    objects.insert(o);
2714                }
2715
2716                objects
2717            };
2718
2719            let object_keys = sui_types::storage::get_transaction_object_set(
2720                &transaction,
2721                &effects,
2722                &unchanged_loaded_runtime_objects,
2723            );
2724
2725            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2726            for k in object_keys {
2727                if let Some(o) = objects.get(&k) {
2728                    set.insert(o.clone());
2729                }
2730            }
2731
2732            set
2733        };
2734
2735        Ok(SimulateTransactionResult {
2736            objects: object_set,
2737            events: effects.events_digest().map(|_| inner_temp_store.events),
2738            effects,
2739            execution_result,
2740            mock_gas_id,
2741            unchanged_loaded_runtime_objects,
2742            suggested_gas_price: self
2743                .congestion_tracker
2744                .get_suggested_gas_prices(&transaction),
2745        })
2746    }
2747
2748    /// The object ID for gas can be any object ID, even for an uncreated object
2749    #[allow(clippy::collapsible_else_if)]
2750    #[instrument(skip_all)]
2751    pub async fn dev_inspect_transaction_block(
2752        &self,
2753        sender: SuiAddress,
2754        transaction_kind: TransactionKind,
2755        gas_price: Option<u64>,
2756        gas_budget: Option<u64>,
2757        gas_sponsor: Option<SuiAddress>,
2758        gas_objects: Option<Vec<ObjectRef>>,
2759        show_raw_txn_data_and_effects: Option<bool>,
2760        skip_checks: Option<bool>,
2761    ) -> SuiResult<DevInspectResults> {
2762        let epoch_store = self.load_epoch_store_one_call_per_task();
2763
2764        if !self.is_fullnode(&epoch_store) {
2765            return Err(SuiErrorKind::UnsupportedFeatureError {
2766                error: "dev-inspect is only supported on fullnodes".to_string(),
2767            }
2768            .into());
2769        }
2770
2771        if transaction_kind.is_system_tx() {
2772            return Err(SuiErrorKind::UnsupportedFeatureError {
2773                error: "system transactions are not supported".to_string(),
2774            }
2775            .into());
2776        }
2777
2778        let skip_checks = skip_checks.unwrap_or(true);
2779        if skip_checks && self.config.dev_inspect_disabled {
2780            return Err(SuiErrorKind::UnsupportedFeatureError {
2781                error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2782            }
2783            .into());
2784        }
2785
2786        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2787        let reference_gas_price = epoch_store.reference_gas_price();
2788        let protocol_config = epoch_store.protocol_config();
2789        let max_tx_gas = protocol_config.max_tx_gas();
2790
2791        let price = gas_price.unwrap_or(reference_gas_price);
2792        let budget = gas_budget.unwrap_or(max_tx_gas);
2793        let owner = gas_sponsor.unwrap_or(sender);
2794        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2795        let payment = gas_objects.unwrap_or_default();
2796        let mut transaction = TransactionData::V1(TransactionDataV1 {
2797            kind: transaction_kind.clone(),
2798            sender,
2799            gas_data: GasData {
2800                payment,
2801                owner,
2802                price,
2803                budget,
2804            },
2805            expiration: TransactionExpiration::None,
2806        });
2807
2808        let raw_txn_data = if show_raw_txn_data_and_effects {
2809            bcs::to_bytes(&transaction).map_err(|_| {
2810                SuiErrorKind::TransactionSerializationError {
2811                    error: "Failed to serialize transaction during dev inspect".to_string(),
2812                }
2813            })?
2814        } else {
2815            vec![]
2816        };
2817
2818        transaction.validity_check_no_gas_check(protocol_config)?;
2819
2820        let input_object_kinds = transaction.input_objects()?;
2821        let receiving_object_refs = transaction.receiving_objects();
2822
2823        sui_transaction_checks::deny::check_transaction_for_signing(
2824            &transaction,
2825            &[],
2826            &input_object_kinds,
2827            &receiving_object_refs,
2828            &self.config.transaction_deny_config,
2829            self.get_backing_package_store().as_ref(),
2830        )?;
2831
2832        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2833            // We don't want to cache this transaction since it's a dev inspect.
2834            None,
2835            &input_object_kinds,
2836            &receiving_object_refs,
2837            epoch_store.epoch(),
2838        )?;
2839
2840        let (gas_status, checked_input_objects) = if skip_checks {
2841            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2842            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2843            // use the dummy gas object so we need to add it to the input objects vector.
2844            if transaction.gas().is_empty() {
2845                // Create and use a dummy gas object if there is no gas object provided.
2846                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2847                    DEV_INSPECT_GAS_COIN_VALUE,
2848                    transaction.gas_owner(),
2849                );
2850                let gas_object_ref = dummy_gas_object.compute_object_reference();
2851                transaction.gas_data_mut().payment = vec![gas_object_ref];
2852                input_objects.push(ObjectReadResult::new(
2853                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2854                    dummy_gas_object.into(),
2855                ));
2856            }
2857            sui_transaction_checks::check_dev_inspect_input(
2858                protocol_config,
2859                &transaction,
2860                input_objects,
2861                receiving_objects,
2862                reference_gas_price,
2863            )?
2864        } else {
2865            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2866            // variant which will perform full fledged checks just like a real transaction execution.
2867            if transaction.gas().is_empty() {
2868                // Create and use a dummy gas object if there is no gas object provided.
2869                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2870                    DEV_INSPECT_GAS_COIN_VALUE,
2871                    transaction.gas_owner(),
2872                );
2873                let gas_object_ref = dummy_gas_object.compute_object_reference();
2874                transaction.gas_data_mut().payment = vec![gas_object_ref];
2875                sui_transaction_checks::check_transaction_input_with_given_gas(
2876                    epoch_store.protocol_config(),
2877                    epoch_store.reference_gas_price(),
2878                    &transaction,
2879                    input_objects,
2880                    receiving_objects,
2881                    dummy_gas_object,
2882                    &self.metrics.bytecode_verifier_metrics,
2883                    &self.config.verifier_signing_config,
2884                )?
2885            } else {
2886                sui_transaction_checks::check_transaction_input(
2887                    epoch_store.protocol_config(),
2888                    epoch_store.reference_gas_price(),
2889                    &transaction,
2890                    input_objects,
2891                    &receiving_objects,
2892                    &self.metrics.bytecode_verifier_metrics,
2893                    &self.config.verifier_signing_config,
2894                )?
2895            }
2896        };
2897
2898        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2899            .expect("Creating an executor should not fail here");
2900        let gas_data = transaction.gas_data().clone();
2901        let intent_msg = IntentMessage::new(
2902            Intent {
2903                version: IntentVersion::V0,
2904                scope: IntentScope::TransactionData,
2905                app_id: AppId::Sui,
2906            },
2907            transaction,
2908        );
2909        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2910        let early_execution_error = get_early_execution_error(
2911            &transaction_digest,
2912            &checked_input_objects,
2913            self.config.certificate_deny_config.certificate_deny_set(),
2914            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2915            &FundsWithdrawStatus::MaybeSufficient,
2916        );
2917        let execution_params = match early_execution_error {
2918            Some(error) => ExecutionOrEarlyError::Err(error),
2919            None => ExecutionOrEarlyError::Ok(()),
2920        };
2921
2922        let mut transaction_kind = transaction_kind;
2923        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2924            self.chain_identifier,
2925            &*self.coin_reservation_resolver,
2926            sender,
2927            &mut transaction_kind,
2928            None,
2929        );
2930        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2931            self.get_backing_store().as_ref(),
2932            protocol_config,
2933            self.metrics.execution_metrics.clone(),
2934            /* expensive checks */ false,
2935            execution_params,
2936            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2937            epoch_store
2938                .epoch_start_config()
2939                .epoch_data()
2940                .epoch_start_timestamp(),
2941            checked_input_objects,
2942            gas_data,
2943            gas_status,
2944            transaction_kind,
2945            rewritten_inputs,
2946            sender,
2947            transaction_digest,
2948            skip_checks,
2949        );
2950
2951        let raw_effects = if show_raw_txn_data_and_effects {
2952            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2953                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2954            })?
2955        } else {
2956            vec![]
2957        };
2958
2959        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2960            epoch_store.protocol_config(),
2961            Box::new(PackageStoreWithFallback::new(
2962                &inner_temp_store,
2963                self.get_backing_package_store(),
2964            )),
2965        );
2966
2967        DevInspectResults::new(
2968            effects,
2969            inner_temp_store.events.clone(),
2970            execution_result,
2971            raw_txn_data,
2972            raw_effects,
2973            layout_resolver.as_mut(),
2974        )
2975    }
2976
2977    // Only used for testing because of how epoch store is loaded.
2978    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2979        let epoch_store = self.epoch_store_for_testing();
2980        Ok(epoch_store.reference_gas_price())
2981    }
2982
2983    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2984        self.get_transaction_cache_reader()
2985            .is_tx_already_executed(digest)
2986    }
2987
2988    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2989    fn index_tx(
2990        sequence: u64,
2991        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2992        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2993        indexes: &IndexStore,
2994        digest: &TransactionDigest,
2995        // TODO: index_tx really just need the transaction data here.
2996        cert: &VerifiedExecutableTransaction,
2997        effects: &TransactionEffects,
2998        events: &TransactionEvents,
2999        timestamp_ms: u64,
3000        tx_coins: Option<TxCoins>,
3001        written: &WrittenObjects,
3002        inner_temporary_store: &InnerTemporaryStore,
3003        epoch_store: &Arc<AuthorityPerEpochStore>,
3004        acquire_locks: bool,
3005    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3006        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
3007            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
3008
3009        indexes.index_tx(
3010            sequence,
3011            cert.data().intent_message().value.sender(),
3012            cert.data()
3013                .intent_message()
3014                .value
3015                .input_objects()?
3016                .iter()
3017                .map(|o| o.object_id()),
3018            effects
3019                .all_changed_objects()
3020                .into_iter()
3021                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
3022            cert.data()
3023                .intent_message()
3024                .value
3025                .move_calls()
3026                .into_iter()
3027                .map(|(_cmd_idx, package, module, function)| {
3028                    (*package, module.to_owned(), function.to_owned())
3029                }),
3030            events,
3031            changes,
3032            digest,
3033            timestamp_ms,
3034            tx_coins,
3035            effects.accumulator_events(),
3036            acquire_locks,
3037        )
3038    }
3039
3040    #[cfg(msim)]
3041    fn simulate_fork_during_execution(
3042        &self,
3043        certificate: &VerifiedExecutableTransaction,
3044        epoch_store: &Arc<AuthorityPerEpochStore>,
3045        effects: &mut TransactionEffects,
3046        forked_validators: std::sync::Arc<
3047            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3048        >,
3049        full_halt: bool,
3050        effects_overrides: std::sync::Arc<
3051            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3052        >,
3053        fork_probability: f32,
3054    ) {
3055        use std::cell::RefCell;
3056        thread_local! {
3057            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3058        }
3059        if !certificate.data().intent_message().value.is_system_tx() {
3060            let committee = epoch_store.committee();
3061            let cur_stake = (**committee).weight(&self.name);
3062            if cur_stake > 0 {
3063                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3064                    let already_forked = forked_validators
3065                        .lock()
3066                        .ok()
3067                        .map(|set| set.contains(&self.name))
3068                        .unwrap_or(false);
3069
3070                    if !already_forked {
3071                        let should_fork = if full_halt {
3072                            // For full halt, fork enough nodes to reach validity threshold
3073                            *total_stake <= committee.validity_threshold()
3074                        } else {
3075                            // For partial fork, stay strictly below validity threshold
3076                            *total_stake + cur_stake < committee.validity_threshold()
3077                        };
3078
3079                        if should_fork {
3080                            *total_stake += cur_stake;
3081
3082                            if let Ok(mut external_set) = forked_validators.lock() {
3083                                external_set.insert(self.name);
3084                                info!("forked_validators: {:?}", external_set);
3085                            }
3086                        }
3087                    }
3088
3089                    if let Ok(external_set) = forked_validators.lock() {
3090                        if external_set.contains(&self.name) {
3091                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
3092                            // Fork this transaction and record the digest and the original effects to original_effects.
3093                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
3094                            // the transaction was forked on a different validator), fork this txn as well.
3095
3096                            let tx_digest = certificate.digest().to_string();
3097                            if let Ok(mut overrides) = effects_overrides.lock() {
3098                                if overrides.contains_key(&tx_digest)
3099                                    || overrides.is_empty()
3100                                        && sui_simulator::random::deterministic_probability(
3101                                            &tx_digest,
3102                                            fork_probability,
3103                                        )
3104                                {
3105                                    let original_effects_digest = effects.digest().to_string();
3106                                    overrides
3107                                        .insert(tx_digest.clone(), original_effects_digest.clone());
3108                                    info!(
3109                                        ?tx_digest,
3110                                        ?original_effects_digest,
3111                                        "Captured forked effects digest for transaction"
3112                                    );
3113                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
3114                                        1;
3115                                }
3116                            }
3117                        }
3118                    }
3119                });
3120            }
3121        }
3122    }
3123
3124    fn process_object_index(
3125        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3126        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3127        effects: &TransactionEffects,
3128        written: &WrittenObjects,
3129        inner_temporary_store: &InnerTemporaryStore,
3130        epoch_store: &Arc<AuthorityPerEpochStore>,
3131    ) -> SuiResult<ObjectIndexChanges> {
3132        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3133            epoch_store.protocol_config(),
3134            Box::new(PackageStoreWithFallback::new(
3135                inner_temporary_store,
3136                backing_package_store,
3137            )),
3138        );
3139
3140        let modified_at_version = effects
3141            .modified_at_versions()
3142            .into_iter()
3143            .collect::<HashMap<_, _>>();
3144
3145        let tx_digest = effects.transaction_digest();
3146        let mut deleted_owners = vec![];
3147        let mut deleted_dynamic_fields = vec![];
3148        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3149            let old_version = modified_at_version.get(&id).unwrap();
3150            // When we process the index, the latest object hasn't been written yet so
3151            // the old object must be present.
3152            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3153                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3154            ) {
3155                Owner::AddressOwner(addr)
3156                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3157                Owner::ObjectOwner(object_id) => {
3158                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3159                }
3160                _ => {}
3161            }
3162        }
3163
3164        let mut new_owners = vec![];
3165        let mut new_dynamic_fields = vec![];
3166
3167        for (oref, owner, kind) in effects.all_changed_objects() {
3168            let id = &oref.0;
3169            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3170            if let WriteKind::Mutate = kind {
3171                let Some(old_version) = modified_at_version.get(id) else {
3172                    panic!(
3173                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3174                        tx_digest
3175                    );
3176                };
3177                // When we process the index, the latest object hasn't been written yet so
3178                // the old object must be present.
3179                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3180                    panic!(
3181                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3182                        tx_digest, id, old_version
3183                    );
3184                };
3185                if old_object.owner != owner {
3186                    match old_object.owner {
3187                        Owner::AddressOwner(addr)
3188                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3189                            deleted_owners.push((addr, *id));
3190                        }
3191                        Owner::ObjectOwner(object_id) => {
3192                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3193                        }
3194                        _ => {}
3195                    }
3196                }
3197            }
3198
3199            match owner {
3200                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3201                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3202                    let new_object = written.get(id).unwrap_or_else(
3203                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3204                    );
3205                    assert_eq!(
3206                        new_object.version(),
3207                        oref.1,
3208                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3209                        tx_digest,
3210                        id,
3211                        new_object.version(),
3212                        oref.1
3213                    );
3214
3215                    let type_ = new_object
3216                        .type_()
3217                        .map(|type_| ObjectType::Struct(type_.clone()))
3218                        .unwrap_or(ObjectType::Package);
3219
3220                    new_owners.push((
3221                        (addr, *id),
3222                        ObjectInfo {
3223                            object_id: *id,
3224                            version: oref.1,
3225                            digest: oref.2,
3226                            type_,
3227                            owner,
3228                            previous_transaction: *effects.transaction_digest(),
3229                        },
3230                    ));
3231                }
3232                Owner::ObjectOwner(owner) => {
3233                    let new_object = written.get(id).unwrap_or_else(
3234                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3235                    );
3236                    assert_eq!(
3237                        new_object.version(),
3238                        oref.1,
3239                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3240                        tx_digest,
3241                        id,
3242                        new_object.version(),
3243                        oref.1
3244                    );
3245
3246                    let Some(df_info) = Self::try_create_dynamic_field_info(
3247                        object_store,
3248                        new_object,
3249                        written,
3250                        layout_resolver.as_mut(),
3251                    )
3252                    .unwrap_or_else(|e| {
3253                        error!(
3254                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3255                            e, new_object
3256                        );
3257                        None
3258                    }) else {
3259                        // Skip indexing for non dynamic field objects.
3260                        continue;
3261                    };
3262                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3263                }
3264                _ => {}
3265            }
3266        }
3267
3268        Ok(ObjectIndexChanges {
3269            deleted_owners,
3270            deleted_dynamic_fields,
3271            new_owners,
3272            new_dynamic_fields,
3273        })
3274    }
3275
3276    fn try_create_dynamic_field_info(
3277        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3278        o: &Object,
3279        written: &WrittenObjects,
3280        resolver: &mut dyn LayoutResolver,
3281    ) -> SuiResult<Option<DynamicFieldInfo>> {
3282        // Skip if not a move object
3283        let Some(move_object) = o.data.try_as_move().cloned() else {
3284            return Ok(None);
3285        };
3286
3287        // We only index dynamic field objects
3288        if !move_object.type_().is_dynamic_field() {
3289            return Ok(None);
3290        }
3291
3292        let layout = resolver
3293            .get_annotated_layout(&move_object.type_().clone().into())?
3294            .into_layout();
3295
3296        let field =
3297            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3298                SuiErrorKind::ObjectDeserializationError {
3299                    error: e.to_string(),
3300                }
3301            })?;
3302
3303        let type_ = field.kind;
3304        let name_type: TypeTag = field.name_layout.into();
3305        let bcs_name = field.name_bytes.to_owned();
3306
3307        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3308            .map_err(|e| {
3309                warn!("{e}");
3310                SuiErrorKind::ObjectDeserializationError {
3311                    error: e.to_string(),
3312                }
3313            })?;
3314
3315        let name = DynamicFieldName {
3316            type_: name_type,
3317            value: SuiMoveValue::from(name_value).to_json_value(),
3318        };
3319
3320        let value_metadata = field.value_metadata().map_err(|e| {
3321            warn!("{e}");
3322            SuiErrorKind::ObjectDeserializationError {
3323                error: e.to_string(),
3324            }
3325        })?;
3326
3327        Ok(Some(match value_metadata {
3328            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3329                name,
3330                bcs_name,
3331                type_,
3332                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3333                object_id: o.id(),
3334                version: o.version(),
3335                digest: o.digest(),
3336            },
3337
3338            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3339                // Find the actual object from storage using the object id obtained from the wrapper.
3340
3341                // Try to find the object in the written objects first.
3342                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3343                    let version = object.version();
3344                    let digest = object.digest();
3345                    let object_type = object.data.type_().unwrap().clone();
3346                    (version, digest, object_type)
3347                } else {
3348                    // If not found, try to find it in the database.
3349                    let object = object_store
3350                        .get_object_by_key(&object_id, o.version())
3351                        .ok_or_else(|| UserInputError::ObjectNotFound {
3352                            object_id,
3353                            version: Some(o.version()),
3354                        })?;
3355                    let version = object.version();
3356                    let digest = object.digest();
3357                    let object_type = object.data.type_().unwrap().clone();
3358                    (version, digest, object_type)
3359                };
3360
3361                DynamicFieldInfo {
3362                    name,
3363                    bcs_name,
3364                    type_,
3365                    object_type: object_type.to_string(),
3366                    object_id,
3367                    version,
3368                    digest,
3369                }
3370            }
3371        }))
3372    }
3373
3374    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3375    fn post_process_one_tx(
3376        &self,
3377        certificate: &VerifiedExecutableTransaction,
3378        effects: &TransactionEffects,
3379        inner_temporary_store: &InnerTemporaryStore,
3380        epoch_store: &Arc<AuthorityPerEpochStore>,
3381    ) -> SuiResult {
3382        let Some(indexes) = &self.indexes else {
3383            return Ok(());
3384        };
3385
3386        let tx_digest = *certificate.digest();
3387
3388        // Allocate sequence number on the calling thread to preserve execution order.
3389        let sequence = indexes.allocate_sequence_number();
3390
3391        if self.config.sync_post_process_one_tx {
3392            // Synchronous mode: run post-processing inline on the calling thread
3393            // and commit the index batch immediately with locks held.
3394            // Used as a rollback mechanism and for testing correctness against async mode.
3395            // TODO: delete this branch once async mode has shipped
3396            let result = Self::post_process_one_tx_impl(
3397                sequence,
3398                indexes,
3399                &self.subscription_handler,
3400                &self.metrics,
3401                self.name,
3402                self.get_backing_package_store(),
3403                self.get_object_store(),
3404                certificate,
3405                effects,
3406                inner_temporary_store,
3407                epoch_store,
3408                true, // acquire_locks
3409            );
3410
3411            match result {
3412                Ok((raw_batch, cache_updates_with_locks)) => {
3413                    let mut db_batch = indexes.new_db_batch();
3414                    db_batch
3415                        .concat(vec![raw_batch])
3416                        .expect("failed to absorb raw index batch");
3417                    // Destructure to keep _locks alive through commit_index_batch.
3418                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3419                        cache_updates_with_locks;
3420                    indexes
3421                        .commit_index_batch(db_batch, vec![inner])
3422                        .expect("failed to commit index batch");
3423                }
3424                Err(e) => {
3425                    self.metrics.post_processing_total_failures.inc();
3426                    error!(?tx_digest, "tx post processing failed: {e}");
3427                    return Err(e);
3428                }
3429            }
3430
3431            return Ok(());
3432        }
3433
3434        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3435        self.pending_post_processing.insert(tx_digest, done_rx);
3436
3437        let indexes = indexes.clone();
3438        let subscription_handler = self.subscription_handler.clone();
3439        let metrics = self.metrics.clone();
3440        let name = self.name;
3441        let backing_package_store = self.get_backing_package_store().clone();
3442        let object_store = self.get_object_store().clone();
3443        let semaphore = self.post_processing_semaphore.clone();
3444
3445        let certificate = certificate.clone();
3446        let effects = effects.clone();
3447        let inner_temporary_store = inner_temporary_store.clone();
3448        let epoch_store = epoch_store.clone();
3449
3450        // spawn post processing on a blocking thread
3451        tokio::spawn(async move {
3452            let permit = {
3453                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3454                semaphore
3455                    .acquire_owned()
3456                    .await
3457                    .expect("post-processing semaphore should not be closed")
3458            };
3459
3460            let _ = tokio::task::spawn_blocking(move || {
3461                let _permit = permit;
3462
3463                let result = Self::post_process_one_tx_impl(
3464                    sequence,
3465                    &indexes,
3466                    &subscription_handler,
3467                    &metrics,
3468                    name,
3469                    &backing_package_store,
3470                    &object_store,
3471                    &certificate,
3472                    &effects,
3473                    &inner_temporary_store,
3474                    &epoch_store,
3475                    false, // acquire_locks
3476                );
3477
3478                match result {
3479                    Ok((raw_batch, cache_updates_with_locks)) => {
3480                        fail_point!("crash-after-post-process-one-tx");
3481                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3482                        let _ = done_tx.send(output);
3483                    }
3484                    Err(e) => {
3485                        metrics.post_processing_total_failures.inc();
3486                        error!(?tx_digest, "tx post processing failed: {e}");
3487                    }
3488                }
3489            })
3490            .await;
3491        });
3492
3493        Ok(())
3494    }
3495
3496    fn post_process_one_tx_impl(
3497        sequence: u64,
3498        indexes: &Arc<IndexStore>,
3499        subscription_handler: &Arc<SubscriptionHandler>,
3500        metrics: &Arc<AuthorityMetrics>,
3501        name: AuthorityName,
3502        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3503        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3504        certificate: &VerifiedExecutableTransaction,
3505        effects: &TransactionEffects,
3506        inner_temporary_store: &InnerTemporaryStore,
3507        epoch_store: &Arc<AuthorityPerEpochStore>,
3508        acquire_locks: bool,
3509    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3510        let _scope = monitored_scope("Execution::post_process_one_tx");
3511
3512        let tx_digest = certificate.digest();
3513        let timestamp_ms = Self::unixtime_now_ms();
3514        let events = &inner_temporary_store.events;
3515        let written = &inner_temporary_store.written;
3516        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3517            name,
3518            object_store,
3519            effects,
3520            inner_temporary_store,
3521            epoch_store,
3522        );
3523
3524        let (raw_batch, cache_updates) = Self::index_tx(
3525            sequence,
3526            backing_package_store,
3527            object_store,
3528            indexes,
3529            tx_digest,
3530            certificate,
3531            effects,
3532            events,
3533            timestamp_ms,
3534            tx_coins,
3535            written,
3536            inner_temporary_store,
3537            epoch_store,
3538            acquire_locks,
3539        )
3540        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3541        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3542        .expect("Indexing tx should not fail");
3543
3544        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3545        let events = Self::make_transaction_block_events(
3546            backing_package_store,
3547            events.clone(),
3548            *tx_digest,
3549            timestamp_ms,
3550            epoch_store,
3551            inner_temporary_store,
3552        )?;
3553        // Emit events
3554        subscription_handler
3555            .process_tx(certificate.data().transaction_data(), &effects, &events)
3556            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3557            .tap_err(|e| {
3558                warn!(
3559                    ?tx_digest,
3560                    "Post processing - Couldn't process events for tx: {}", e
3561                )
3562            })?;
3563
3564        metrics
3565            .post_processing_total_events_emitted
3566            .inc_by(events.data.len() as u64);
3567
3568        Ok((raw_batch, cache_updates))
3569    }
3570
3571    fn make_transaction_block_events(
3572        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3573        transaction_events: TransactionEvents,
3574        digest: TransactionDigest,
3575        timestamp_ms: u64,
3576        epoch_store: &Arc<AuthorityPerEpochStore>,
3577        inner_temporary_store: &InnerTemporaryStore,
3578    ) -> SuiResult<SuiTransactionBlockEvents> {
3579        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3580            epoch_store.protocol_config(),
3581            Box::new(PackageStoreWithFallback::new(
3582                inner_temporary_store,
3583                backing_package_store,
3584            )),
3585        );
3586        SuiTransactionBlockEvents::try_from(
3587            transaction_events,
3588            digest,
3589            Some(timestamp_ms),
3590            layout_resolver.as_mut(),
3591        )
3592    }
3593
3594    pub fn unixtime_now_ms() -> u64 {
3595        let now = SystemTime::now()
3596            .duration_since(UNIX_EPOCH)
3597            .expect("Time went backwards")
3598            .as_millis();
3599        u64::try_from(now).expect("Travelling in time machine")
3600    }
3601
3602    // TODO(fastpath): update this handler for Mysticeti fastpath.
3603    // There will no longer be validator quorum signed transactions or effects.
3604    // The proof of finality needs to come from checkpoints.
3605    #[instrument(level = "trace", skip_all)]
3606    pub async fn handle_transaction_info_request(
3607        &self,
3608        request: TransactionInfoRequest,
3609    ) -> SuiResult<TransactionInfoResponse> {
3610        let epoch_store = self.load_epoch_store_one_call_per_task();
3611        let (transaction, status) = self
3612            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3613            .ok_or(SuiErrorKind::TransactionNotFound {
3614                digest: request.transaction_digest,
3615            })?;
3616        Ok(TransactionInfoResponse {
3617            transaction,
3618            status,
3619        })
3620    }
3621
3622    #[instrument(level = "trace", skip_all)]
3623    pub async fn handle_object_info_request(
3624        &self,
3625        request: ObjectInfoRequest,
3626    ) -> SuiResult<ObjectInfoResponse> {
3627        let epoch_store = self.load_epoch_store_one_call_per_task();
3628
3629        let requested_object_seq = match request.request_kind {
3630            ObjectInfoRequestKind::LatestObjectInfo => {
3631                let (_, seq, _) = self
3632                    .get_object_or_tombstone(request.object_id)
3633                    .await
3634                    .ok_or_else(|| {
3635                        SuiError::from(UserInputError::ObjectNotFound {
3636                            object_id: request.object_id,
3637                            version: None,
3638                        })
3639                    })?;
3640                seq
3641            }
3642            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3643        };
3644
3645        let object = self
3646            .get_object_store()
3647            .get_object_by_key(&request.object_id, requested_object_seq)
3648            .ok_or_else(|| {
3649                SuiError::from(UserInputError::ObjectNotFound {
3650                    object_id: request.object_id,
3651                    version: Some(requested_object_seq),
3652                })
3653            })?;
3654
3655        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3656            (request.generate_layout, object.data.try_as_move())
3657        {
3658            let epoch_store = self.load_epoch_store_one_call_per_task();
3659            Some(into_struct_layout(
3660                epoch_store
3661                    .executor()
3662                    .type_layout_resolver(
3663                        epoch_store.protocol_config(),
3664                        Box::new(self.get_backing_package_store().as_ref()),
3665                    )
3666                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3667            )?)
3668        } else {
3669            None
3670        };
3671
3672        let lock = if !object.is_address_owned() {
3673            // Only address owned objects have locks.
3674            None
3675        } else {
3676            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3677                .await?
3678                .map(|s| s.into_inner())
3679        };
3680
3681        Ok(ObjectInfoResponse {
3682            object,
3683            layout,
3684            lock_for_debugging: lock,
3685        })
3686    }
3687
3688    #[instrument(level = "trace", skip_all)]
3689    pub fn handle_checkpoint_request(
3690        &self,
3691        request: &CheckpointRequest,
3692    ) -> SuiResult<CheckpointResponse> {
3693        let summary = match request.sequence_number {
3694            Some(seq) => self
3695                .checkpoint_store
3696                .get_checkpoint_by_sequence_number(seq)?,
3697            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3698        }
3699        .map(|v| v.into_inner());
3700        let contents = match &summary {
3701            Some(s) => self
3702                .checkpoint_store
3703                .get_checkpoint_contents(&s.content_digest)?,
3704            None => None,
3705        };
3706        Ok(CheckpointResponse {
3707            checkpoint: summary,
3708            contents,
3709        })
3710    }
3711
3712    #[instrument(level = "trace", skip_all)]
3713    pub fn handle_checkpoint_request_v2(
3714        &self,
3715        request: &CheckpointRequestV2,
3716    ) -> SuiResult<CheckpointResponseV2> {
3717        let summary = if request.certified {
3718            let summary = match request.sequence_number {
3719                Some(seq) => self
3720                    .checkpoint_store
3721                    .get_checkpoint_by_sequence_number(seq)?,
3722                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3723            }
3724            .map(|v| v.into_inner());
3725            summary.map(CheckpointSummaryResponse::Certified)
3726        } else {
3727            let summary = match request.sequence_number {
3728                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3729                None => self
3730                    .checkpoint_store
3731                    .get_latest_locally_computed_checkpoint()?,
3732            };
3733            summary.map(CheckpointSummaryResponse::Pending)
3734        };
3735        let contents = match &summary {
3736            Some(s) => self
3737                .checkpoint_store
3738                .get_checkpoint_contents(&s.content_digest())?,
3739            None => None,
3740        };
3741        Ok(CheckpointResponseV2 {
3742            checkpoint: summary,
3743            contents,
3744        })
3745    }
3746
3747    fn check_protocol_version(
3748        supported_protocol_versions: SupportedProtocolVersions,
3749        current_version: ProtocolVersion,
3750    ) {
3751        info!("current protocol version is now {:?}", current_version);
3752        info!("supported versions are: {:?}", supported_protocol_versions);
3753        if !supported_protocol_versions.is_version_supported(current_version) {
3754            let msg = format!(
3755                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3756                current_version, supported_protocol_versions,
3757            );
3758
3759            error!("{}", msg);
3760            eprintln!("{}", msg);
3761
3762            #[cfg(not(msim))]
3763            std::process::exit(1);
3764
3765            #[cfg(msim)]
3766            sui_simulator::task::shutdown_current_node();
3767        }
3768    }
3769
3770    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3771    #[allow(clippy::too_many_arguments)]
3772    pub async fn new(
3773        name: AuthorityName,
3774        secret: StableSyncAuthoritySigner,
3775        supported_protocol_versions: SupportedProtocolVersions,
3776        store: Arc<AuthorityStore>,
3777        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3778        epoch_store: Arc<AuthorityPerEpochStore>,
3779        committee_store: Arc<CommitteeStore>,
3780        indexes: Option<Arc<IndexStore>>,
3781        rpc_index: Option<Arc<RpcIndexStore>>,
3782        checkpoint_store: Arc<CheckpointStore>,
3783        prometheus_registry: &Registry,
3784        genesis_objects: &[Object],
3785        db_checkpoint_config: &DBCheckpointConfig,
3786        config: NodeConfig,
3787        chain_identifier: ChainIdentifier,
3788        policy_config: Option<PolicyConfig>,
3789        firewall_config: Option<RemoteFirewallConfig>,
3790        pruner_watermarks: Arc<PrunerWatermarks>,
3791    ) -> Arc<Self> {
3792        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3793
3794        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3795        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3796        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3797            execution_cache_trait_pointers.object_cache_reader.clone(),
3798            execution_cache_trait_pointers.account_funds_read.clone(),
3799            execution_cache_trait_pointers
3800                .transaction_cache_reader
3801                .clone(),
3802            tx_ready_certificates,
3803            &epoch_store,
3804            config.funds_withdraw_scheduler_type,
3805            metrics.clone(),
3806            prometheus_registry,
3807        ));
3808        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3809
3810        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3811            epoch_store.get_parent_path(),
3812            &config.authority_store_pruning_config,
3813        );
3814        let _pruner = AuthorityStorePruner::new(
3815            store.perpetual_tables.clone(),
3816            checkpoint_store.clone(),
3817            rpc_index.clone(),
3818            indexes.clone(),
3819            config.authority_store_pruning_config.clone(),
3820            epoch_store.committee().authority_exists(&name),
3821            epoch_store.epoch_start_state().epoch_duration_ms(),
3822            prometheus_registry,
3823            pruner_watermarks,
3824        );
3825        let input_loader =
3826            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3827        let epoch = epoch_store.epoch();
3828        let traffic_controller_metrics =
3829            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3830        let traffic_controller = if let Some(policy_config) = policy_config {
3831            Some(Arc::new(
3832                TrafficController::init(
3833                    policy_config,
3834                    traffic_controller_metrics,
3835                    firewall_config.clone(),
3836                )
3837                .await,
3838            ))
3839        } else {
3840            None
3841        };
3842
3843        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3844            ForkRecoveryState::new(Some(fork_config))
3845                .expect("Failed to initialize fork recovery state")
3846        });
3847
3848        let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3849            execution_cache_trait_pointers.child_object_resolver.clone(),
3850        ));
3851
3852        let object_funds_checker_metrics =
3853            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3854        let state = Arc::new(AuthorityState {
3855            name,
3856            secret,
3857            execution_lock: RwLock::new(epoch),
3858            epoch_store: ArcSwap::new(epoch_store.clone()),
3859            input_loader,
3860            execution_cache_trait_pointers,
3861            coin_reservation_resolver,
3862            indexes,
3863            rpc_index,
3864            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3865            checkpoint_store,
3866            committee_store,
3867            execution_scheduler,
3868            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3869            metrics,
3870            _pruner,
3871            _authority_per_epoch_pruner,
3872            db_checkpoint_config: db_checkpoint_config.clone(),
3873            config,
3874            overload_info: AuthorityOverloadInfo::default(),
3875            chain_identifier,
3876            congestion_tracker: Arc::new(CongestionTracker::new()),
3877            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3878            traffic_controller,
3879            fork_recovery_state,
3880            notify_epoch: tokio::sync::watch::channel(epoch).0,
3881            object_funds_checker: ArcSwapOption::empty(),
3882            object_funds_checker_metrics,
3883            pending_post_processing: Arc::new(DashMap::new()),
3884            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3885        });
3886        state.init_object_funds_checker().await;
3887
3888        let state_clone = Arc::downgrade(&state);
3889        spawn_monitored_task!(fix_indexes(state_clone));
3890        // Start a task to execute ready certificates.
3891        let authority_state = Arc::downgrade(&state);
3892        spawn_monitored_task!(execution_process(
3893            authority_state,
3894            rx_ready_certificates,
3895            rx_execution_shutdown,
3896        ));
3897        // TODO: This doesn't belong to the constructor of AuthorityState.
3898        state
3899            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3900            .expect("Error indexing genesis objects.");
3901
3902        if epoch_store
3903            .protocol_config()
3904            .enable_multi_epoch_transaction_expiration()
3905            && epoch_store.epoch() > 0
3906        {
3907            use typed_store::Map;
3908            let previous_epoch = epoch_store.epoch() - 1;
3909            let start_key = (previous_epoch, TransactionDigest::ZERO);
3910            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3911            let has_previous_epoch_data = store
3912                .perpetual_tables
3913                .executed_transaction_digests
3914                .safe_range_iter(start_key..end_key)
3915                .next()
3916                .is_some();
3917
3918            if !has_previous_epoch_data {
3919                panic!(
3920                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3921                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3922                    Please restore from a snapshot using the latest version of sui-tool.",
3923                    previous_epoch
3924                );
3925            }
3926        }
3927
3928        state
3929    }
3930
3931    async fn init_object_funds_checker(&self) {
3932        let epoch_store = self.epoch_store.load();
3933        if self.is_validator(&epoch_store)
3934            && epoch_store.protocol_config().enable_object_funds_withdraw()
3935        {
3936            if self.object_funds_checker.load().is_none() {
3937                let inner = self
3938                    .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3939                    .await
3940                    .map(|o| {
3941                        Arc::new(ObjectFundsChecker::new(
3942                            o.version(),
3943                            self.object_funds_checker_metrics.clone(),
3944                        ))
3945                    });
3946                self.object_funds_checker.store(inner);
3947            }
3948        } else {
3949            self.object_funds_checker.store(None);
3950        }
3951    }
3952
3953    // TODO: Consolidate our traits to reduce the number of methods here.
3954    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3955        &self.execution_cache_trait_pointers.object_cache_reader
3956    }
3957
3958    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3959        &self.execution_cache_trait_pointers.transaction_cache_reader
3960    }
3961
3962    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3963        &self.execution_cache_trait_pointers.cache_writer
3964    }
3965
3966    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3967        &self.execution_cache_trait_pointers.backing_store
3968    }
3969
3970    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3971        &self.execution_cache_trait_pointers.child_object_resolver
3972    }
3973
3974    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3975        &self.execution_cache_trait_pointers.account_funds_read
3976    }
3977
3978    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3979        &self.execution_cache_trait_pointers.backing_package_store
3980    }
3981
3982    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3983        &self.execution_cache_trait_pointers.object_store
3984    }
3985
3986    pub fn pending_post_processing(
3987        &self,
3988    ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3989        &self.pending_post_processing
3990    }
3991
3992    pub async fn await_post_processing(
3993        &self,
3994        tx_digest: &TransactionDigest,
3995    ) -> Option<PostProcessingOutput> {
3996        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3997            // Tx was executed and post-processing is in flight.
3998            rx.await.ok()
3999        } else {
4000            // Tx was already persisted or post-processing already completed.
4001            None
4002        }
4003    }
4004
4005    /// Await post-processing for a transaction and commit the index batch immediately.
4006    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
4007    /// index batches at checkpoint boundaries.
4008    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
4009        if let Some(indexes) = &self.indexes
4010            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
4011        {
4012            let mut db_batch = indexes.new_db_batch();
4013            db_batch
4014                .concat(vec![raw_batch])
4015                .expect("failed to build index batch");
4016            indexes
4017                .commit_index_batch(db_batch, vec![cache_updates])
4018                .expect("failed to commit index batch");
4019        }
4020    }
4021
4022    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
4023        &self.execution_cache_trait_pointers.reconfig_api
4024    }
4025
4026    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
4027        &self.execution_cache_trait_pointers.global_state_hash_store
4028    }
4029
4030    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
4031        &self.execution_cache_trait_pointers.checkpoint_cache
4032    }
4033
4034    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
4035        &self.execution_cache_trait_pointers.state_sync_store
4036    }
4037
4038    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
4039        &self.execution_cache_trait_pointers.cache_commit
4040    }
4041
4042    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
4043        self.execution_cache_trait_pointers
4044            .testing_api
4045            .database_for_testing()
4046    }
4047
4048    pub fn cache_for_testing(&self) -> &WritebackCache {
4049        self.execution_cache_trait_pointers
4050            .testing_api
4051            .cache_for_testing()
4052    }
4053
4054    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4055        &self,
4056        config: NodeConfig,
4057        metrics: Arc<AuthorityStorePruningMetrics>,
4058    ) -> anyhow::Result<()> {
4059        use crate::authority::authority_store_pruner::PrunerWatermarks;
4060        let watermarks = Arc::new(PrunerWatermarks::default());
4061        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4062            &self.database_for_testing().perpetual_tables,
4063            &self.checkpoint_store,
4064            self.rpc_index.as_deref(),
4065            config.authority_store_pruning_config,
4066            metrics,
4067            EPOCH_DURATION_MS_FOR_TESTING,
4068            &watermarks,
4069        )
4070        .await
4071    }
4072
4073    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4074        &self.execution_scheduler
4075    }
4076
4077    fn create_owner_index_if_empty(
4078        &self,
4079        genesis_objects: &[Object],
4080        epoch_store: &Arc<AuthorityPerEpochStore>,
4081    ) -> SuiResult {
4082        let Some(index_store) = &self.indexes else {
4083            return Ok(());
4084        };
4085        if !index_store.is_empty() {
4086            return Ok(());
4087        }
4088
4089        let mut new_owners = vec![];
4090        let mut new_dynamic_fields = vec![];
4091        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
4092            epoch_store.protocol_config(),
4093            Box::new(self.get_backing_package_store().as_ref()),
4094        );
4095        for o in genesis_objects.iter() {
4096            match o.owner {
4097                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4098                    new_owners.push((
4099                        (addr, o.id()),
4100                        ObjectInfo::new(&o.compute_object_reference(), o),
4101                    ))
4102                }
4103                Owner::ObjectOwner(object_id) => {
4104                    let id = o.id();
4105                    let Some(info) = Self::try_create_dynamic_field_info(
4106                        self.get_object_store(),
4107                        o,
4108                        &BTreeMap::new(),
4109                        layout_resolver.as_mut(),
4110                    )?
4111                    else {
4112                        continue;
4113                    };
4114                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4115                }
4116                _ => {}
4117            }
4118        }
4119
4120        index_store.insert_genesis_objects(ObjectIndexChanges {
4121            deleted_owners: vec![],
4122            deleted_dynamic_fields: vec![],
4123            new_owners,
4124            new_dynamic_fields,
4125        })
4126    }
4127
4128    /// Attempts to acquire execution lock for an executable transaction.
4129    /// Returns Some(lock) if the transaction is matching current executed epoch.
4130    /// Returns None if validator is halted at epoch end or epoch mismatch.
4131    pub fn execution_lock_for_executable_transaction(
4132        &self,
4133        transaction: &VerifiedExecutableTransaction,
4134    ) -> Option<ExecutionLockReadGuard<'_>> {
4135        let lock = self.execution_lock.try_read().ok()?;
4136        if *lock == transaction.auth_sig().epoch() {
4137            Some(lock)
4138        } else {
4139            // TODO: Can this still happen?
4140            None
4141        }
4142    }
4143
4144    /// Acquires the execution lock for the duration of transaction validation.
4145    /// This prevents reconfiguration from starting until we are finished validating the transaction.
4146    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4147        self.execution_lock
4148            .try_read()
4149            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4150    }
4151
4152    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4153        self.execution_lock.write().await
4154    }
4155
4156    #[instrument(level = "error", skip_all)]
4157    pub async fn reconfigure(
4158        &self,
4159        cur_epoch_store: &AuthorityPerEpochStore,
4160        supported_protocol_versions: SupportedProtocolVersions,
4161        new_committee: Committee,
4162        epoch_start_configuration: EpochStartConfiguration,
4163        state_hasher: Arc<GlobalStateHasher>,
4164        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4165        epoch_last_checkpoint: CheckpointSequenceNumber,
4166    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4167        Self::check_protocol_version(
4168            supported_protocol_versions,
4169            epoch_start_configuration
4170                .epoch_start_state()
4171                .protocol_version(),
4172        );
4173
4174        self.committee_store.insert_new_committee(&new_committee)?;
4175
4176        // Wait until no transactions are being executed.
4177        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4178
4179        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
4180        cur_epoch_store.epoch_terminated().await;
4181
4182        // Safe to being reconfiguration now. No transactions are being executed,
4183        // and no epoch-specific tasks are running.
4184
4185        {
4186            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4187            if state.should_accept_user_certs() {
4188                // Need to change this so that consensus adapter do not accept certificates from user.
4189                // This can happen if our local validator did not initiate epoch change locally,
4190                // but 2f+1 nodes already concluded the epoch.
4191                //
4192                // This lock is essentially a barrier for
4193                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
4194                cur_epoch_store.close_user_certs(state);
4195            }
4196            // lock is dropped here
4197        }
4198
4199        self.get_reconfig_api()
4200            .clear_state_end_of_epoch(&execution_lock);
4201        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4202        self.maybe_reaccumulate_state_hash(
4203            cur_epoch_store,
4204            epoch_start_configuration
4205                .epoch_start_state()
4206                .protocol_version(),
4207        );
4208        self.get_reconfig_api()
4209            .set_epoch_start_configuration(&epoch_start_configuration);
4210        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4211            && self
4212                .db_checkpoint_config
4213                .perform_db_checkpoints_at_epoch_end
4214        {
4215            let checkpoint_indexes = self
4216                .db_checkpoint_config
4217                .perform_index_db_checkpoints_at_epoch_end
4218                .unwrap_or(false);
4219            let current_epoch = cur_epoch_store.epoch();
4220            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4221            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4222        }
4223
4224        self.get_reconfig_api()
4225            .reconfigure_cache(&epoch_start_configuration)
4226            .await;
4227
4228        let new_epoch = new_committee.epoch;
4229        let new_epoch_store = self
4230            .reopen_epoch_db(
4231                cur_epoch_store,
4232                new_committee,
4233                epoch_start_configuration,
4234                expensive_safety_check_config,
4235                epoch_last_checkpoint,
4236            )
4237            .await?;
4238        assert_eq!(new_epoch_store.epoch(), new_epoch);
4239        self.execution_scheduler
4240            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4241        self.init_object_funds_checker().await;
4242        *execution_lock = new_epoch;
4243
4244        self.notify_epoch(new_epoch);
4245        // drop execution_lock after epoch store was updated
4246        // see also assert in AuthorityState::process_certificate
4247        // on the epoch store and execution lock epoch match
4248        Ok(new_epoch_store)
4249    }
4250
4251    fn notify_epoch(&self, new_epoch: EpochId) {
4252        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4253    }
4254
4255    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4256        let mut rx = self.notify_epoch.subscribe();
4257        loop {
4258            let epoch = *rx.borrow();
4259            if epoch >= target_epoch {
4260                return Ok(epoch);
4261            }
4262            rx.changed().await?;
4263        }
4264    }
4265
4266    /// Executes accumulator settlement for testing purposes.
4267    /// Returns a list of (transaction, execution_env) pairs that can be replayed on another
4268    /// AuthorityState (e.g., a fullnode) using `replay_settlement_for_testing`.
4269    pub async fn settle_accumulator_for_testing(
4270        &self,
4271        effects: &[TransactionEffects],
4272        checkpoint_seq: Option<u64>,
4273    ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4274        let accumulator_version = self
4275            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4276            .await
4277            .unwrap()
4278            .version();
4279        // Use provided checkpoint sequence, or fall back to accumulator version.
4280        let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4281        let builder = AccumulatorSettlementTxBuilder::new(
4282            Some(self.get_transaction_cache_reader().as_ref()),
4283            effects,
4284            ckpt_seq,
4285            0,
4286        );
4287        let balance_changes = builder.collect_funds_changes();
4288        let epoch_store = self.epoch_store_for_testing();
4289        let epoch = epoch_store.epoch();
4290        let accumulator_root_obj_initial_shared_version = epoch_store
4291            .epoch_start_config()
4292            .accumulator_root_obj_initial_shared_version()
4293            .unwrap();
4294        let settlements = builder.build_tx(
4295            epoch_store.protocol_config(),
4296            epoch,
4297            accumulator_root_obj_initial_shared_version,
4298            ckpt_seq,
4299            ckpt_seq,
4300        );
4301
4302        let settlements: Vec<_> = settlements
4303            .into_iter()
4304            .map(|tx| {
4305                VerifiedExecutableTransaction::new_system(
4306                    VerifiedTransaction::new_system_transaction(tx),
4307                    epoch,
4308                )
4309            })
4310            .collect();
4311
4312        let assigned_versions = epoch_store
4313            .assign_shared_object_versions_for_tests(
4314                self.get_object_cache_reader().as_ref(),
4315                &settlements,
4316            )
4317            .unwrap();
4318        let version_map = assigned_versions.into_map();
4319
4320        let mut replay_txns = Vec::new();
4321        let mut settlement_effects = Vec::with_capacity(settlements.len());
4322        for tx in settlements {
4323            let assigned = version_map.get(&tx.key()).unwrap().clone();
4324            let env = ExecutionEnv::new().with_assigned_versions(assigned);
4325            let (effects, _) = self
4326                .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4327                .await
4328                .unwrap();
4329            assert!(effects.status().is_ok());
4330            replay_txns.push((tx, env));
4331            settlement_effects.push(effects);
4332        }
4333
4334        let barrier = accumulators::build_accumulator_barrier_tx(
4335            epoch,
4336            accumulator_root_obj_initial_shared_version,
4337            ckpt_seq,
4338            &settlement_effects,
4339        );
4340        let barrier = VerifiedExecutableTransaction::new_system(
4341            VerifiedTransaction::new_system_transaction(barrier),
4342            epoch,
4343        );
4344
4345        let assigned_versions = epoch_store
4346            .assign_shared_object_versions_for_tests(
4347                self.get_object_cache_reader().as_ref(),
4348                std::slice::from_ref(&barrier),
4349            )
4350            .unwrap();
4351        let version_map = assigned_versions.into_map();
4352
4353        let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4354        let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4355        let (effects, _) = self
4356            .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4357            .await
4358            .unwrap();
4359        assert!(effects.status().is_ok());
4360        replay_txns.push((barrier, env));
4361
4362        let next_accumulator_version = accumulator_version.next();
4363        self.execution_scheduler
4364            .settle_address_funds(FundsSettlement {
4365                funds_changes: balance_changes,
4366                next_accumulator_version,
4367            });
4368        // object funds are settled while executing the barrier transaction
4369
4370        replay_txns
4371    }
4372
4373    /// Replays settlement transactions on this AuthorityState.
4374    /// Used to sync a fullnode with settlement transactions executed on a validator.
4375    pub async fn replay_settlement_for_testing(
4376        &self,
4377        txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4378    ) {
4379        let epoch_store = self.epoch_store_for_testing();
4380        for (tx, env) in txns {
4381            let (effects, _) = self
4382                .try_execute_immediately(tx, env.clone(), &epoch_store)
4383                .await
4384                .unwrap();
4385            assert!(effects.status().is_ok());
4386        }
4387    }
4388
4389    /// Advance the epoch store to the next epoch for testing only.
4390    /// This only manually sets all the places where we have the epoch number.
4391    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4392    pub async fn reconfigure_for_testing(&self) {
4393        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4394        let epoch_store = self.epoch_store_for_testing().clone();
4395        let protocol_config = epoch_store.protocol_config().clone();
4396        // The current protocol config used in the epoch store may have been overridden and diverged from
4397        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4398        // We reapply the override before creating the new epoch store, to make sure that
4399        // the new epoch store has the same protocol config as the current one.
4400        // Since this is for testing only, we mostly like to keep the protocol config the same
4401        // across epochs.
4402        let _guard =
4403            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4404        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4405            self.get_backing_package_store().clone(),
4406            self.get_object_store().clone(),
4407            &self.config.expensive_safety_check_config,
4408            self.checkpoint_store
4409                .get_epoch_last_checkpoint(epoch_store.epoch())
4410                .unwrap()
4411                .map(|c| *c.sequence_number())
4412                .unwrap_or_default(),
4413        );
4414        self.execution_scheduler
4415            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4416        let new_epoch = new_epoch_store.epoch();
4417        self.epoch_store.store(new_epoch_store);
4418        epoch_store.epoch_terminated().await;
4419
4420        *execution_lock = new_epoch;
4421    }
4422
4423    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4424    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4425    #[instrument(level = "error", skip_all)]
4426    fn maybe_reaccumulate_state_hash(
4427        &self,
4428        cur_epoch_store: &AuthorityPerEpochStore,
4429        new_protocol_version: ProtocolVersion,
4430    ) {
4431        self.get_reconfig_api()
4432            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4433    }
4434
4435    #[instrument(level = "error", skip_all)]
4436    fn check_system_consistency(
4437        &self,
4438        cur_epoch_store: &AuthorityPerEpochStore,
4439        state_hasher: Arc<GlobalStateHasher>,
4440        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4441    ) {
4442        info!(
4443            "Performing sui conservation consistency check for epoch {}",
4444            cur_epoch_store.epoch()
4445        );
4446
4447        if let Err(err) = self
4448            .get_reconfig_api()
4449            .expensive_check_sui_conservation(cur_epoch_store)
4450        {
4451            if cfg!(debug_assertions) {
4452                panic!("{}", err);
4453            } else {
4454                // We cannot panic in production yet because it is known that there are some
4455                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4456                warn!("Sui conservation consistency check failed: {}", err);
4457            }
4458        } else {
4459            info!("Sui conservation consistency check passed");
4460        }
4461
4462        // check for root state hash consistency with live object set
4463        if expensive_safety_check_config.enable_state_consistency_check() {
4464            info!(
4465                "Performing state consistency check for epoch {}",
4466                cur_epoch_store.epoch()
4467            );
4468            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4469        }
4470
4471        if expensive_safety_check_config.enable_secondary_index_checks()
4472            && let Some(indexes) = self.indexes.clone()
4473        {
4474            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4475                .expect("secondary indexes are inconsistent");
4476        }
4477
4478        // Verify all checkpointed transactions are present in transactions_seq.
4479        // This catches any post-processing gaps that could occur if async
4480        // post-processing failed to complete before persistence.
4481        if expensive_safety_check_config.enable_secondary_index_checks()
4482            && let Some(indexes) = self.indexes.clone()
4483        {
4484            let epoch = cur_epoch_store.epoch();
4485            // Only verify the current epoch's checkpoints. Previous epoch contents
4486            // may have been pruned, and we only need to verify that this epoch's
4487            // async post-processing completed correctly.
4488            let first_checkpoint = if epoch == 0 {
4489                0
4490            } else {
4491                self.checkpoint_store
4492                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4493                    .expect("Failed to get previous epoch's last checkpoint")
4494                    .expect("Previous epoch's last checkpoint missing")
4495                    + 1
4496            };
4497            let highest_executed = self
4498                .checkpoint_store
4499                .get_highest_executed_checkpoint_seq_number()
4500                .expect("Failed to get highest executed checkpoint")
4501                .expect("No executed checkpoints");
4502
4503            info!(
4504                "Verifying checkpointed transactions are in transactions_seq \
4505                 (checkpoints {first_checkpoint}..={highest_executed})"
4506            );
4507            for seq in first_checkpoint..=highest_executed {
4508                let checkpoint = self
4509                    .checkpoint_store
4510                    .get_checkpoint_by_sequence_number(seq)
4511                    .expect("Failed to get checkpoint")
4512                    .expect("Checkpoint missing");
4513                let contents = self
4514                    .checkpoint_store
4515                    .get_checkpoint_contents(&checkpoint.content_digest)
4516                    .expect("Failed to get checkpoint contents")
4517                    .expect("Checkpoint contents missing");
4518                for digests in contents.iter() {
4519                    let tx_digest = digests.transaction;
4520                    assert!(
4521                        indexes
4522                            .get_transaction_seq(&tx_digest)
4523                            .expect("Failed to read transactions_seq")
4524                            .is_some(),
4525                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4526                    );
4527                }
4528            }
4529            info!("All checkpointed transactions verified in transactions_seq");
4530        }
4531    }
4532
4533    fn expensive_check_is_consistent_state(
4534        &self,
4535        state_hasher: Arc<GlobalStateHasher>,
4536        cur_epoch_store: &AuthorityPerEpochStore,
4537    ) {
4538        let live_object_set_hash = state_hasher.digest_live_object_set(
4539            !cur_epoch_store
4540                .protocol_config()
4541                .simplified_unwrap_then_delete(),
4542        );
4543
4544        let root_state_hash: ECMHLiveObjectSetDigest = self
4545            .get_global_state_hash_store()
4546            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4547            .expect("Retrieving root state hash cannot fail")
4548            .expect("Root state hash for epoch must exist")
4549            .1
4550            .digest()
4551            .into();
4552
4553        let is_inconsistent = root_state_hash != live_object_set_hash;
4554        if is_inconsistent {
4555            debug_fatal!(
4556                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4557                root_state_hash,
4558                live_object_set_hash
4559            );
4560        } else {
4561            info!("State consistency check passed");
4562        }
4563
4564        state_hasher.set_inconsistent_state(is_inconsistent);
4565    }
4566
4567    pub fn current_epoch_for_testing(&self) -> EpochId {
4568        self.epoch_store_for_testing().epoch()
4569    }
4570
4571    #[instrument(level = "error", skip_all)]
4572    pub fn checkpoint_all_dbs(
4573        &self,
4574        checkpoint_path: &Path,
4575        cur_epoch_store: &AuthorityPerEpochStore,
4576        checkpoint_indexes: bool,
4577    ) -> SuiResult {
4578        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4579        let current_epoch = cur_epoch_store.epoch();
4580
4581        if checkpoint_path.exists() {
4582            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4583            return Ok(());
4584        }
4585
4586        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4587        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4588
4589        if checkpoint_path_tmp.exists() {
4590            fs::remove_dir_all(&checkpoint_path_tmp)
4591                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4592        }
4593
4594        fs::create_dir_all(&checkpoint_path_tmp)
4595            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4596        fs::create_dir(&store_checkpoint_path_tmp)
4597            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4598
4599        // NOTE: Do not change the order of invoking these checkpoint calls
4600        // We want to snapshot checkpoint db first to not race with state sync
4601        self.checkpoint_store
4602            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4603
4604        self.get_reconfig_api()
4605            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4606
4607        self.committee_store
4608            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4609
4610        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4611            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4612        }
4613
4614        fs::rename(checkpoint_path_tmp, checkpoint_path)
4615            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4616        Ok(())
4617    }
4618
4619    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4620    /// we never end up accessing different epoch stores in a single task, we need to make sure
4621    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4622    /// the case. This also means we should minimize the number of call-sites. Only call it when
4623    /// there is no way to obtain it from somewhere else.
4624    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4625        self.epoch_store.load()
4626    }
4627
4628    // Load the epoch store, should be used in tests only.
4629    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4630        self.load_epoch_store_one_call_per_task()
4631    }
4632
4633    pub fn clone_committee_for_testing(&self) -> Committee {
4634        Committee::clone(self.epoch_store_for_testing().committee())
4635    }
4636
4637    #[instrument(level = "trace", skip_all)]
4638    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4639        self.get_object_store().get_object(object_id)
4640    }
4641
4642    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4643        Ok(self
4644            .get_object(&SUI_SYSTEM_ADDRESS.into())
4645            .await
4646            .expect("framework object should always exist")
4647            .compute_object_reference())
4648    }
4649
4650    // This function is only used for testing.
4651    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4652        self.get_object_cache_reader()
4653            .get_sui_system_state_object_unsafe()
4654    }
4655
4656    #[instrument(level = "trace", skip_all)]
4657    fn get_transaction_checkpoint_sequence(
4658        &self,
4659        digest: &TransactionDigest,
4660        epoch_store: &AuthorityPerEpochStore,
4661    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4662        epoch_store.get_transaction_checkpoint(digest)
4663    }
4664
4665    #[instrument(level = "trace", skip_all)]
4666    pub fn get_checkpoint_by_sequence_number(
4667        &self,
4668        sequence_number: CheckpointSequenceNumber,
4669    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4670        Ok(self
4671            .checkpoint_store
4672            .get_checkpoint_by_sequence_number(sequence_number)?)
4673    }
4674
4675    #[instrument(level = "trace", skip_all)]
4676    pub fn get_transaction_checkpoint_for_tests(
4677        &self,
4678        digest: &TransactionDigest,
4679        epoch_store: &AuthorityPerEpochStore,
4680    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4681        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4682        let Some(checkpoint) = checkpoint else {
4683            return Ok(None);
4684        };
4685        let checkpoint = self
4686            .checkpoint_store
4687            .get_checkpoint_by_sequence_number(checkpoint)?;
4688        Ok(checkpoint)
4689    }
4690
4691    #[instrument(level = "trace", skip_all)]
4692    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4693        Ok(
4694            match self
4695                .get_object_cache_reader()
4696                .get_latest_object_or_tombstone(*object_id)
4697            {
4698                Some((_, ObjectOrTombstone::Object(object))) => {
4699                    let layout = self.get_object_layout(&object)?;
4700                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4701                }
4702                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4703                None => ObjectRead::NotExists(*object_id),
4704            },
4705        )
4706    }
4707
4708    /// Chain Identifier is the digest of the genesis checkpoint.
4709    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4710        self.chain_identifier
4711    }
4712
4713    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4714        self.fork_recovery_state.as_ref()
4715    }
4716
4717    #[instrument(level = "trace", skip_all)]
4718    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4719    where
4720        T: DeserializeOwned,
4721    {
4722        let o = self.get_object_read(object_id)?.into_object()?;
4723        if let Some(move_object) = o.data.try_as_move() {
4724            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4725                SuiErrorKind::ObjectDeserializationError {
4726                    error: format!("{e}"),
4727                }
4728            })?)
4729        } else {
4730            Err(SuiErrorKind::ObjectDeserializationError {
4731                error: format!("Provided object : [{object_id}] is not a Move object."),
4732            }
4733            .into())
4734        }
4735    }
4736
4737    /// This function aims to serve rpc reads on past objects and
4738    /// we don't expect it to be called for other purposes.
4739    /// Depending on the object pruning policies that will be enforced in the
4740    /// future there is no software-level guarantee/SLA to retrieve an object
4741    /// with an old version even if it exists/existed.
4742    #[instrument(level = "trace", skip_all)]
4743    pub fn get_past_object_read(
4744        &self,
4745        object_id: &ObjectID,
4746        version: SequenceNumber,
4747    ) -> SuiResult<PastObjectRead> {
4748        // Firstly we see if the object ever existed by getting its latest data
4749        let Some(obj_ref) = self
4750            .get_object_cache_reader()
4751            .get_latest_object_ref_or_tombstone(*object_id)
4752        else {
4753            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4754        };
4755
4756        if version > obj_ref.1 {
4757            return Ok(PastObjectRead::VersionTooHigh {
4758                object_id: *object_id,
4759                asked_version: version,
4760                latest_version: obj_ref.1,
4761            });
4762        }
4763
4764        if version < obj_ref.1 {
4765            // Read past objects
4766            return Ok(match self.read_object_at_version(object_id, version)? {
4767                Some((object, layout)) => {
4768                    let obj_ref = object.compute_object_reference();
4769                    PastObjectRead::VersionFound(obj_ref, object, layout)
4770                }
4771
4772                None => PastObjectRead::VersionNotFound(*object_id, version),
4773            });
4774        }
4775
4776        if !obj_ref.2.is_alive() {
4777            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4778        }
4779
4780        match self.read_object_at_version(object_id, obj_ref.1)? {
4781            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4782            None => {
4783                debug_fatal!(
4784                    "Object with in parent_entry is missing from object store, datastore is \
4785                     inconsistent"
4786                );
4787                Err(UserInputError::ObjectNotFound {
4788                    object_id: *object_id,
4789                    version: Some(obj_ref.1),
4790                }
4791                .into())
4792            }
4793        }
4794    }
4795
4796    #[instrument(level = "trace", skip_all)]
4797    fn read_object_at_version(
4798        &self,
4799        object_id: &ObjectID,
4800        version: SequenceNumber,
4801    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4802        let Some(object) = self
4803            .get_object_cache_reader()
4804            .get_object_by_key(object_id, version)
4805        else {
4806            return Ok(None);
4807        };
4808
4809        let layout = self.get_object_layout(&object)?;
4810        Ok(Some((object, layout)))
4811    }
4812
4813    pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4814        let layout = object
4815            .data
4816            .try_as_move()
4817            .map(|object| {
4818                let epoch_store = self.load_epoch_store_one_call_per_task();
4819                into_struct_layout(
4820                    epoch_store
4821                        .executor()
4822                        // TODO(cache) - must read through cache
4823                        .type_layout_resolver(
4824                            epoch_store.protocol_config(),
4825                            Box::new(self.get_backing_package_store().as_ref()),
4826                        )
4827                        .get_annotated_layout(&object.type_().clone().into())?,
4828                )
4829            })
4830            .transpose()?;
4831        Ok(layout)
4832    }
4833
4834    /// Returns a fake ObjectRef representing an address balance, along with the balance value
4835    /// and the previous transaction digest. The ObjectRef can be returned to JSON-RPC clients
4836    /// that don't understand address balances.
4837    #[instrument(level = "trace", skip_all)]
4838    pub fn get_address_balance_coin_info(
4839        &self,
4840        owner: SuiAddress,
4841        balance_type: TypeTag,
4842    ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4843        let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4844        let accumulator_obj = AccumulatorValue::load_object_by_id(
4845            self.get_child_object_resolver().as_ref(),
4846            None,
4847            *accumulator_id.inner(),
4848        )?;
4849
4850        let Some(accumulator_obj) = accumulator_obj else {
4851            return Ok(None);
4852        };
4853
4854        // Extract the currency type from balance_type (e.g., SUI from Balance<SUI>).
4855        // get_balance expects the currency type, not the balance type.
4856        let currency_type =
4857            Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4858
4859        let balance = crate::accumulators::balances::get_balance(
4860            owner,
4861            self.get_child_object_resolver().as_ref(),
4862            currency_type,
4863        )?;
4864
4865        if balance == 0 {
4866            return Ok(None);
4867        };
4868
4869        let object_ref = coin_reservation::encode_object_ref(
4870            accumulator_obj.id(),
4871            accumulator_obj.version(),
4872            self.load_epoch_store_one_call_per_task().epoch(),
4873            balance,
4874            self.get_chain_identifier(),
4875        );
4876
4877        Ok(Some((
4878            object_ref,
4879            balance,
4880            accumulator_obj.previous_transaction,
4881        )))
4882    }
4883
4884    /// Returns fake ObjectRefs for all address balances of an owner, keyed by coin type string.
4885    /// Used by get_all_coins to include fake coins for each coin type.
4886    #[instrument(level = "trace", skip_all)]
4887    pub fn get_all_address_balance_coin_infos(
4888        &self,
4889        owner: SuiAddress,
4890    ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4891        let indexes = self
4892            .indexes
4893            .as_ref()
4894            .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4895
4896        let mut result = std::collections::HashMap::new();
4897        for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4898            let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4899            if let Some((obj_ref, balance, prev_tx)) =
4900                self.get_address_balance_coin_info(owner, balance_type)?
4901            {
4902                // Use currency_type.to_string() to match the format in CoinIndexKey2
4903                // (e.g., "0x2::sui::SUI", not "0x2::coin::Coin<0x2::sui::SUI>")
4904                result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4905            }
4906        }
4907        Ok(result)
4908    }
4909
4910    fn get_owner_at_version(
4911        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4912        object_id: &ObjectID,
4913        version: SequenceNumber,
4914    ) -> SuiResult<Owner> {
4915        object_store
4916            .get_object_by_key(object_id, version)
4917            .ok_or_else(|| {
4918                SuiError::from(UserInputError::ObjectNotFound {
4919                    object_id: *object_id,
4920                    version: Some(version),
4921                })
4922            })
4923            .map(|o| o.owner.clone())
4924    }
4925
4926    #[instrument(level = "trace", skip_all)]
4927    pub fn get_owner_objects(
4928        &self,
4929        owner: SuiAddress,
4930        // If `Some`, the query will start from the next item after the specified cursor
4931        cursor: Option<ObjectID>,
4932        limit: usize,
4933        filter: Option<SuiObjectDataFilter>,
4934    ) -> SuiResult<Vec<ObjectInfo>> {
4935        if let Some(indexes) = &self.indexes {
4936            indexes.get_owner_objects(owner, cursor, limit, filter)
4937        } else {
4938            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4939        }
4940    }
4941
4942    #[instrument(level = "trace", skip_all)]
4943    pub fn get_owned_coins_iterator_with_cursor(
4944        &self,
4945        owner: SuiAddress,
4946        // If `Some`, the query will start from the next item after the specified cursor
4947        cursor: (String, u64, ObjectID),
4948        limit: usize,
4949        one_coin_type_only: bool,
4950    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4951        if let Some(indexes) = &self.indexes {
4952            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4953        } else {
4954            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4955        }
4956    }
4957
4958    #[instrument(level = "trace", skip_all)]
4959    pub fn get_owner_objects_iterator(
4960        &self,
4961        owner: SuiAddress,
4962        // If `Some`, the query will start from the next item after the specified cursor
4963        cursor: Option<ObjectID>,
4964        filter: Option<SuiObjectDataFilter>,
4965    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4966        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4967        if let Some(indexes) = &self.indexes {
4968            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4969        } else {
4970            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4971        }
4972    }
4973
4974    #[instrument(level = "trace", skip_all)]
4975    pub async fn get_move_objects<T>(
4976        &self,
4977        owner: SuiAddress,
4978        type_: MoveObjectType,
4979    ) -> SuiResult<Vec<T>>
4980    where
4981        T: DeserializeOwned,
4982    {
4983        let object_ids = self
4984            .get_owner_objects_iterator(owner, None, None)?
4985            .filter(|o| match &o.type_ {
4986                ObjectType::Struct(s) => &type_ == s,
4987                ObjectType::Package => false,
4988            })
4989            .map(|info| ObjectKey(info.object_id, info.version))
4990            .collect::<Vec<_>>();
4991        let mut move_objects = vec![];
4992
4993        let objects = self
4994            .get_object_store()
4995            .multi_get_objects_by_key(&object_ids);
4996
4997        for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4998            let object = o.ok_or_else(|| {
4999                SuiError::from(UserInputError::ObjectNotFound {
5000                    object_id: id.0,
5001                    version: Some(id.1),
5002                })
5003            })?;
5004            let move_object = object.data.try_as_move().ok_or_else(|| {
5005                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
5006            })?;
5007            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
5008                SuiErrorKind::ObjectDeserializationError {
5009                    error: format!("{e}"),
5010                }
5011            })?);
5012        }
5013        Ok(move_objects)
5014    }
5015
5016    #[instrument(level = "trace", skip_all)]
5017    pub fn get_dynamic_fields(
5018        &self,
5019        owner: ObjectID,
5020        // If `Some`, the query will start from the next item after the specified cursor
5021        cursor: Option<ObjectID>,
5022        limit: usize,
5023    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
5024        Ok(self
5025            .get_dynamic_fields_iterator(owner, cursor)?
5026            .take(limit)
5027            .collect::<Result<Vec<_>, _>>()?)
5028    }
5029
5030    fn get_dynamic_fields_iterator(
5031        &self,
5032        owner: ObjectID,
5033        // If `Some`, the query will start from the next item after the specified cursor
5034        cursor: Option<ObjectID>,
5035    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
5036    {
5037        if let Some(indexes) = &self.indexes {
5038            indexes.get_dynamic_fields_iterator(owner, cursor)
5039        } else {
5040            Err(SuiErrorKind::IndexStoreNotAvailable.into())
5041        }
5042    }
5043
5044    #[instrument(level = "trace", skip_all)]
5045    pub fn get_dynamic_field_object_id(
5046        &self,
5047        owner: ObjectID,
5048        name_type: TypeTag,
5049        name_bcs_bytes: &[u8],
5050    ) -> SuiResult<Option<ObjectID>> {
5051        if let Some(indexes) = &self.indexes {
5052            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5053        } else {
5054            Err(SuiErrorKind::IndexStoreNotAvailable.into())
5055        }
5056    }
5057
5058    #[instrument(level = "trace", skip_all)]
5059    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5060        Ok(self.get_indexes()?.next_sequence_number())
5061    }
5062
5063    #[instrument(level = "trace", skip_all)]
5064    pub async fn get_executed_transaction_and_effects(
5065        &self,
5066        digest: TransactionDigest,
5067        kv_store: Arc<TransactionKeyValueStore>,
5068    ) -> SuiResult<(Transaction, TransactionEffects)> {
5069        let transaction = kv_store.get_tx(digest).await?;
5070        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5071        Ok((transaction, effects))
5072    }
5073
5074    #[instrument(level = "trace", skip_all)]
5075    pub fn multi_get_checkpoint_by_sequence_number(
5076        &self,
5077        sequence_numbers: &[CheckpointSequenceNumber],
5078    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5079        Ok(self
5080            .checkpoint_store
5081            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5082    }
5083
5084    #[instrument(level = "trace", skip_all)]
5085    pub fn get_transaction_events(
5086        &self,
5087        digest: &TransactionDigest,
5088    ) -> SuiResult<TransactionEvents> {
5089        self.get_transaction_cache_reader()
5090            .get_events(digest)
5091            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5092    }
5093
5094    pub fn get_transaction_input_objects(
5095        &self,
5096        effects: &TransactionEffects,
5097    ) -> SuiResult<Vec<Object>> {
5098        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5099            .map_err(Into::into)
5100    }
5101
5102    pub fn get_transaction_output_objects(
5103        &self,
5104        effects: &TransactionEffects,
5105    ) -> SuiResult<Vec<Object>> {
5106        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5107            .map_err(Into::into)
5108    }
5109
5110    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5111        match &self.indexes {
5112            Some(i) => Ok(i.clone()),
5113            None => Err(SuiErrorKind::UnsupportedFeatureError {
5114                error: "extended object indexing is not enabled on this server".into(),
5115            }
5116            .into()),
5117        }
5118    }
5119
5120    pub async fn get_transactions_for_tests(
5121        self: &Arc<Self>,
5122        filter: Option<TransactionFilter>,
5123        cursor: Option<TransactionDigest>,
5124        limit: Option<usize>,
5125        reverse: bool,
5126    ) -> SuiResult<Vec<TransactionDigest>> {
5127        let metrics = KeyValueStoreMetrics::new_for_tests();
5128        let kv_store = Arc::new(TransactionKeyValueStore::new(
5129            "rocksdb",
5130            metrics,
5131            self.clone(),
5132        ));
5133        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5134            .await
5135    }
5136
5137    #[instrument(level = "trace", skip_all)]
5138    pub async fn get_transactions(
5139        &self,
5140        kv_store: &Arc<TransactionKeyValueStore>,
5141        filter: Option<TransactionFilter>,
5142        // If `Some`, the query will start from the next item after the specified cursor
5143        cursor: Option<TransactionDigest>,
5144        limit: Option<usize>,
5145        reverse: bool,
5146    ) -> SuiResult<Vec<TransactionDigest>> {
5147        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5148            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5149            let iter = checkpoint_contents.iter().map(|c| c.transaction);
5150            if reverse {
5151                let iter = iter
5152                    .rev()
5153                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5154                    .skip(usize::from(cursor.is_some()));
5155                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5156            } else {
5157                let iter = iter
5158                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5159                    .skip(usize::from(cursor.is_some()));
5160                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5161            }
5162        }
5163        self.get_indexes()?
5164            .get_transactions(filter, cursor, limit, reverse)
5165    }
5166
5167    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5168        &self.checkpoint_store
5169    }
5170
5171    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5172        self.get_checkpoint_store()
5173            .get_highest_executed_checkpoint_seq_number()?
5174            .ok_or(
5175                SuiErrorKind::UserInputError {
5176                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5177                }
5178                .into(),
5179            )
5180    }
5181
5182    #[cfg(msim)]
5183    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5184        self.database_for_testing()
5185            .perpetual_tables
5186            .get_highest_pruned_checkpoint()
5187            .map(|c| c.unwrap_or(0))
5188            .map_err(Into::into)
5189    }
5190
5191    #[instrument(level = "trace", skip_all)]
5192    pub fn get_checkpoint_summary_by_sequence_number(
5193        &self,
5194        sequence_number: CheckpointSequenceNumber,
5195    ) -> SuiResult<CheckpointSummary> {
5196        let verified_checkpoint = self
5197            .get_checkpoint_store()
5198            .get_checkpoint_by_sequence_number(sequence_number)?;
5199        match verified_checkpoint {
5200            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5201            None => Err(SuiErrorKind::UserInputError {
5202                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5203            }
5204            .into()),
5205        }
5206    }
5207
5208    #[instrument(level = "trace", skip_all)]
5209    pub fn get_checkpoint_summary_by_digest(
5210        &self,
5211        digest: CheckpointDigest,
5212    ) -> SuiResult<CheckpointSummary> {
5213        let verified_checkpoint = self
5214            .get_checkpoint_store()
5215            .get_checkpoint_by_digest(&digest)?;
5216        match verified_checkpoint {
5217            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5218            None => Err(SuiErrorKind::UserInputError {
5219                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5220            }
5221            .into()),
5222        }
5223    }
5224
5225    #[instrument(level = "trace", skip_all)]
5226    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5227        if is_system_package(package_id) {
5228            return self.find_genesis_txn_digest();
5229        }
5230        Ok(self
5231            .get_object_read(&package_id)?
5232            .into_object()?
5233            .previous_transaction)
5234    }
5235
5236    #[instrument(level = "trace", skip_all)]
5237    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5238        let summary = self
5239            .get_verified_checkpoint_by_sequence_number(0)?
5240            .into_message();
5241        let content = self.get_checkpoint_contents(summary.content_digest)?;
5242        let genesis_transaction = content.enumerate_transactions(&summary).next();
5243        Ok(genesis_transaction
5244            .ok_or(SuiErrorKind::UserInputError {
5245                error: UserInputError::GenesisTransactionNotFound,
5246            })?
5247            .1
5248            .transaction)
5249    }
5250
5251    #[instrument(level = "trace", skip_all)]
5252    pub fn get_verified_checkpoint_by_sequence_number(
5253        &self,
5254        sequence_number: CheckpointSequenceNumber,
5255    ) -> SuiResult<VerifiedCheckpoint> {
5256        let verified_checkpoint = self
5257            .get_checkpoint_store()
5258            .get_checkpoint_by_sequence_number(sequence_number)?;
5259        match verified_checkpoint {
5260            Some(verified_checkpoint) => Ok(verified_checkpoint),
5261            None => Err(SuiErrorKind::UserInputError {
5262                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5263            }
5264            .into()),
5265        }
5266    }
5267
5268    #[instrument(level = "trace", skip_all)]
5269    pub fn get_verified_checkpoint_summary_by_digest(
5270        &self,
5271        digest: CheckpointDigest,
5272    ) -> SuiResult<VerifiedCheckpoint> {
5273        let verified_checkpoint = self
5274            .get_checkpoint_store()
5275            .get_checkpoint_by_digest(&digest)?;
5276        match verified_checkpoint {
5277            Some(verified_checkpoint) => Ok(verified_checkpoint),
5278            None => Err(SuiErrorKind::UserInputError {
5279                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5280            }
5281            .into()),
5282        }
5283    }
5284
5285    #[instrument(level = "trace", skip_all)]
5286    pub fn get_checkpoint_contents(
5287        &self,
5288        digest: CheckpointContentsDigest,
5289    ) -> SuiResult<CheckpointContents> {
5290        self.get_checkpoint_store()
5291            .get_checkpoint_contents(&digest)?
5292            .ok_or(
5293                SuiErrorKind::UserInputError {
5294                    error: UserInputError::CheckpointContentsNotFound(digest),
5295                }
5296                .into(),
5297            )
5298    }
5299
5300    #[instrument(level = "trace", skip_all)]
5301    pub fn get_checkpoint_contents_by_sequence_number(
5302        &self,
5303        sequence_number: CheckpointSequenceNumber,
5304    ) -> SuiResult<CheckpointContents> {
5305        let verified_checkpoint = self
5306            .get_checkpoint_store()
5307            .get_checkpoint_by_sequence_number(sequence_number)?;
5308        match verified_checkpoint {
5309            Some(verified_checkpoint) => {
5310                let content_digest = verified_checkpoint.into_inner().content_digest;
5311                self.get_checkpoint_contents(content_digest)
5312            }
5313            None => Err(SuiErrorKind::UserInputError {
5314                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5315            }
5316            .into()),
5317        }
5318    }
5319
5320    #[instrument(level = "trace", skip_all)]
5321    pub async fn query_events(
5322        &self,
5323        kv_store: &Arc<TransactionKeyValueStore>,
5324        query: EventFilter,
5325        // If `Some`, the query will start from the next item after the specified cursor
5326        cursor: Option<EventID>,
5327        limit: usize,
5328        descending: bool,
5329    ) -> SuiResult<Vec<SuiEvent>> {
5330        let index_store = self.get_indexes()?;
5331
5332        //Get the tx_num from tx_digest
5333        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5334            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5335                SuiErrorKind::TransactionNotFound {
5336                    digest: cursor.tx_digest,
5337                },
5338            )?;
5339            (tx_seq, cursor.event_seq as usize)
5340        } else if descending {
5341            (u64::MAX, usize::MAX)
5342        } else {
5343            (0, 0)
5344        };
5345
5346        let limit = limit + 1;
5347        let mut event_keys = match query {
5348            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5349            EventFilter::Transaction(digest) => {
5350                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5351            }
5352            EventFilter::MoveModule { package, module } => {
5353                let module_id = ModuleId::new(package.into(), module);
5354                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5355            }
5356            EventFilter::MoveEventType(struct_name) => index_store
5357                .events_by_move_event_struct_name(
5358                    &struct_name,
5359                    tx_num,
5360                    event_num,
5361                    limit,
5362                    descending,
5363                )?,
5364            EventFilter::Sender(sender) => {
5365                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5366            }
5367            EventFilter::TimeRange {
5368                start_time,
5369                end_time,
5370            } => index_store
5371                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5372            EventFilter::MoveEventModule { package, module } => index_store
5373                .events_by_move_event_module(
5374                    &ModuleId::new(package.into(), module),
5375                    tx_num,
5376                    event_num,
5377                    limit,
5378                    descending,
5379                )?,
5380            // not using "_ =>" because we want to make sure we remember to add new variants here
5381            EventFilter::Any(_) => {
5382                return Err(SuiErrorKind::UserInputError {
5383                    error: UserInputError::Unsupported(
5384                        "'Any' queries are not supported by the fullnode.".to_string(),
5385                    ),
5386                }
5387                .into());
5388            }
5389        };
5390
5391        // skip one event if exclusive cursor is provided,
5392        // otherwise truncate to the original limit.
5393        if cursor.is_some() {
5394            if !event_keys.is_empty() {
5395                event_keys.remove(0);
5396            }
5397        } else {
5398            event_keys.truncate(limit - 1);
5399        }
5400
5401        // get the unique set of digests from the event_keys
5402        let transaction_digests = event_keys
5403            .iter()
5404            .map(|(_, digest, _, _)| *digest)
5405            .collect::<HashSet<_>>()
5406            .into_iter()
5407            .collect::<Vec<_>>();
5408
5409        let events = kv_store
5410            .multi_get_events_by_tx_digests(&transaction_digests)
5411            .await?;
5412
5413        let events_map: HashMap<_, _> = transaction_digests
5414            .iter()
5415            .zip_debug_eq(events.into_iter())
5416            .collect();
5417
5418        let stored_events = event_keys
5419            .into_iter()
5420            .map(|k| {
5421                (
5422                    k,
5423                    events_map
5424                        .get(&k.1)
5425                        .expect("fetched digest is missing")
5426                        .clone()
5427                        .and_then(|e| e.data.get(k.2).cloned()),
5428                )
5429            })
5430            .map(
5431                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5432                    event
5433                        .map(|e| (e, tx_digest, event_seq, timestamp))
5434                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5435                },
5436            )
5437            .collect::<Result<Vec<_>, _>>()?;
5438
5439        let epoch_store = self.load_epoch_store_one_call_per_task();
5440        let backing_store = self.get_backing_package_store().as_ref();
5441        let mut layout_resolver = epoch_store
5442            .executor()
5443            .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5444        let mut events = vec![];
5445        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5446            events.push(SuiEvent::try_from(
5447                e.clone(),
5448                tx_digest,
5449                event_seq as u64,
5450                Some(timestamp),
5451                layout_resolver.get_annotated_layout(&e.type_)?,
5452            )?)
5453        }
5454        Ok(events)
5455    }
5456
5457    pub async fn insert_genesis_object(&self, object: Object) {
5458        self.get_reconfig_api().insert_genesis_object(object);
5459    }
5460
5461    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5462        futures::future::join_all(
5463            objects
5464                .iter()
5465                .map(|o| self.insert_genesis_object(o.clone())),
5466        )
5467        .await;
5468    }
5469
5470    /// Make a status response for a transaction
5471    #[instrument(level = "trace", skip_all)]
5472    pub fn get_transaction_status(
5473        &self,
5474        transaction_digest: &TransactionDigest,
5475        epoch_store: &Arc<AuthorityPerEpochStore>,
5476    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5477        // TODO: In the case of read path, we should not have to re-sign the effects.
5478        if let Some(effects) =
5479            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5480        {
5481            if let Some(transaction) = self
5482                .get_transaction_cache_reader()
5483                .get_transaction_block(transaction_digest)
5484            {
5485                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5486                let events = if effects.events_digest().is_some() {
5487                    self.get_transaction_events(effects.transaction_digest())?
5488                } else {
5489                    TransactionEvents::default()
5490                };
5491                return Ok(Some((
5492                    (*transaction).clone().into_message(),
5493                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5494                )));
5495            } else {
5496                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5497                // the transaction (during epoch change) in between the above two reads, and we end up
5498                // having effects but not transaction. In this case, we just fall through.
5499                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5500            }
5501        }
5502        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5503            self.metrics.tx_already_processed.inc();
5504            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5505            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5506        } else {
5507            Ok(None)
5508        }
5509    }
5510
5511    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5512    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5513    /// epoch.
5514    #[instrument(level = "trace", skip_all)]
5515    pub fn get_signed_effects_and_maybe_resign(
5516        &self,
5517        transaction_digest: &TransactionDigest,
5518        epoch_store: &Arc<AuthorityPerEpochStore>,
5519    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5520        let effects = self
5521            .get_transaction_cache_reader()
5522            .get_executed_effects(transaction_digest);
5523        match effects {
5524            Some(effects) => {
5525                // If the transaction was executed in previous epochs, the validator will
5526                // re-sign the effects with new current epoch so that a client is always able to
5527                // obtain an effects certificate at the current epoch.
5528                //
5529                // Why is this necessary? Consider the following case:
5530                // - assume there are 4 validators
5531                // - Quorum driver gets 2 signed effects before reconfig halt
5532                // - The tx makes it into final checkpoint.
5533                // - 2 validators go away and are replaced in the new epoch.
5534                // - The new epoch begins.
5535                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5536                //   because it may not be able to reach either of the 2 former validators.
5537                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5538                //   epoch, the QD can make a new effects cert and return it to the client.
5539                //
5540                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5541                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5542                // the case above, the Quorum Driver would return a proof of inclusion in the final
5543                // checkpoint, and this code would no longer be necessary.
5544                if effects.executed_epoch() != epoch_store.epoch() {
5545                    debug!(
5546                        tx_digest=?transaction_digest,
5547                        effects_epoch=?effects.executed_epoch(),
5548                        epoch=?epoch_store.epoch(),
5549                        "Re-signing the effects with the current epoch"
5550                    );
5551                }
5552                Ok(Some(self.sign_effects(effects, epoch_store)?))
5553            }
5554            None => Ok(None),
5555        }
5556    }
5557
5558    #[instrument(level = "trace", skip_all)]
5559    pub(crate) fn sign_effects(
5560        &self,
5561        effects: TransactionEffects,
5562        epoch_store: &Arc<AuthorityPerEpochStore>,
5563    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5564        let tx_digest = *effects.transaction_digest();
5565        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5566            Some(sig) => {
5567                debug_assert!(sig.epoch == epoch_store.epoch());
5568                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5569            }
5570            _ => {
5571                let sig = AuthoritySignInfo::new(
5572                    epoch_store.epoch(),
5573                    &effects,
5574                    Intent::sui_app(IntentScope::TransactionEffects),
5575                    self.name,
5576                    &*self.secret,
5577                );
5578
5579                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5580
5581                epoch_store.insert_effects_digest_and_signature(
5582                    &tx_digest,
5583                    effects.digest(),
5584                    &sig,
5585                )?;
5586
5587                effects
5588            }
5589        };
5590
5591        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5592            signed_effects,
5593        ))
5594    }
5595
5596    // Returns coin objects for indexing for fullnode if indexing is enabled.
5597    #[instrument(level = "trace", skip_all)]
5598    fn fullnode_only_get_tx_coins_for_indexing(
5599        name: AuthorityName,
5600        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5601        effects: &TransactionEffects,
5602        inner_temporary_store: &InnerTemporaryStore,
5603        epoch_store: &Arc<AuthorityPerEpochStore>,
5604    ) -> Option<TxCoins> {
5605        if epoch_store.committee().authority_exists(&name) {
5606            return None;
5607        }
5608        let written_coin_objects = inner_temporary_store
5609            .written
5610            .iter()
5611            .filter_map(|(k, v)| {
5612                if v.is_coin() {
5613                    Some((*k, v.clone()))
5614                } else {
5615                    None
5616                }
5617            })
5618            .collect();
5619        let mut input_coin_objects = inner_temporary_store
5620            .input_objects
5621            .iter()
5622            .filter_map(|(k, v)| {
5623                if v.is_coin() {
5624                    Some((*k, v.clone()))
5625                } else {
5626                    None
5627                }
5628            })
5629            .collect::<ObjectMap>();
5630
5631        // Check for receiving objects that were actually used and modified during execution. Their
5632        // updated version will already showup in "written_coins" but their input isn't included in
5633        // the set of input objects in a inner_temporary_store.
5634        for (object_id, version) in effects.modified_at_versions() {
5635            if inner_temporary_store
5636                .loaded_runtime_objects
5637                .contains_key(&object_id)
5638                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5639                && object.is_coin()
5640            {
5641                input_coin_objects.insert(object_id, object);
5642            }
5643        }
5644
5645        Some((input_coin_objects, written_coin_objects))
5646    }
5647
5648    /// Get the TransactionEnvelope that currently locks the given object, if any.
5649    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5650    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5651    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5652    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5653    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5654    ///     or cannot find the transaction in transaction table, because of data race etc.
5655    #[instrument(level = "trace", skip_all)]
5656    pub async fn get_transaction_lock(
5657        &self,
5658        object_ref: &ObjectRef,
5659        epoch_store: &AuthorityPerEpochStore,
5660    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5661        let lock_info = self
5662            .get_object_cache_reader()
5663            .get_lock(*object_ref, epoch_store)?;
5664        let lock_info = match lock_info {
5665            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5666                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5667                    provided_obj_ref: *object_ref,
5668                    current_version: locked_ref.1,
5669                }
5670                .into());
5671            }
5672            ObjectLockStatus::Initialized => {
5673                return Ok(None);
5674            }
5675            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5676        };
5677
5678        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5679    }
5680
5681    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5682        self.get_object_cache_reader().get_objects(objects)
5683    }
5684
5685    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5686        self.get_object_cache_reader()
5687            .get_latest_object_ref_or_tombstone(object_id)
5688    }
5689
5690    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5691    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5692    ///
5693    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5694    /// will go through with only 2f+1 votes.
5695    ///
5696    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5697    /// value), or you risk halting the chain.
5698    pub fn set_override_protocol_upgrade_buffer_stake(
5699        &self,
5700        expected_epoch: EpochId,
5701        buffer_stake_bps: u64,
5702    ) -> SuiResult {
5703        let epoch_store = self.load_epoch_store_one_call_per_task();
5704        let actual_epoch = epoch_store.epoch();
5705        if actual_epoch != expected_epoch {
5706            return Err(SuiErrorKind::WrongEpoch {
5707                expected_epoch,
5708                actual_epoch,
5709            }
5710            .into());
5711        }
5712
5713        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5714    }
5715
5716    pub fn clear_override_protocol_upgrade_buffer_stake(
5717        &self,
5718        expected_epoch: EpochId,
5719    ) -> SuiResult {
5720        let epoch_store = self.load_epoch_store_one_call_per_task();
5721        let actual_epoch = epoch_store.epoch();
5722        if actual_epoch != expected_epoch {
5723            return Err(SuiErrorKind::WrongEpoch {
5724                expected_epoch,
5725                actual_epoch,
5726            }
5727            .into());
5728        }
5729
5730        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5731    }
5732
5733    /// Get the set of system packages that are compiled in to this build, if those packages are
5734    /// compatible with the current versions of those packages on-chain.
5735    pub async fn get_available_system_packages(
5736        &self,
5737        binary_config: &BinaryConfig,
5738    ) -> Vec<ObjectRef> {
5739        let mut results = vec![];
5740
5741        let system_packages = BuiltInFramework::iter_system_packages();
5742
5743        // Add extra framework packages during simtest
5744        #[cfg(msim)]
5745        let extra_packages = framework_injection::get_extra_packages(self.name);
5746        #[cfg(msim)]
5747        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5748
5749        for system_package in system_packages {
5750            let modules = system_package.modules().to_vec();
5751            // In simtests, we could override the current built-in framework packages.
5752            #[cfg(msim)]
5753            let modules =
5754                match framework_injection::get_override_modules(&system_package.id, self.name) {
5755                    Some(overrides) if overrides.is_empty() => continue,
5756                    Some(overrides) => overrides,
5757                    None => modules,
5758                };
5759
5760            let Some(obj_ref) = sui_framework::compare_system_package(
5761                &self.get_object_store(),
5762                &system_package.id,
5763                &modules,
5764                system_package.dependencies.to_vec(),
5765                binary_config,
5766            )
5767            .await
5768            else {
5769                return vec![];
5770            };
5771            results.push(obj_ref);
5772        }
5773
5774        results
5775    }
5776
5777    /// Return the new versions, module bytes, and dependencies for the packages that have been
5778    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5779    /// the binary, and performs the following checks:
5780    ///
5781    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5782    ///   required, and its contents are omitted from the output.
5783    /// - Whether the contents in the binary can form a package whose digest matches the input,
5784    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5785    ///   which case the contents are included in the output.
5786    ///
5787    /// If the current version of the framework can't be loaded, the binary does not contain the
5788    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5789    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5790    async fn get_system_package_bytes(
5791        &self,
5792        system_packages: Vec<ObjectRef>,
5793        binary_config: &BinaryConfig,
5794    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5795        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5796        let objects = self.get_objects(&ids).await;
5797
5798        let mut res = Vec::with_capacity(system_packages.len());
5799        for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5800        {
5801            let prev_transaction = match object {
5802                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5803                    // Skip this one because it doesn't need to be upgraded.
5804                    info!("Framework {} does not need updating", system_package_ref.0);
5805                    continue;
5806                }
5807
5808                Some(cur_object) => cur_object.previous_transaction,
5809                None => TransactionDigest::genesis_marker(),
5810            };
5811
5812            #[cfg(msim)]
5813            let SystemPackage {
5814                id: _,
5815                bytes,
5816                dependencies,
5817            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5818                .unwrap_or_else(|| {
5819                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5820                });
5821
5822            #[cfg(not(msim))]
5823            let SystemPackage {
5824                id: _,
5825                bytes,
5826                dependencies,
5827            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5828
5829            let modules: Vec<_> = bytes
5830                .iter()
5831                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5832                .collect();
5833
5834            let new_object = Object::new_system_package(
5835                &modules,
5836                system_package_ref.1,
5837                dependencies.clone(),
5838                prev_transaction,
5839            );
5840
5841            let new_ref = new_object.compute_object_reference();
5842            if new_ref != system_package_ref {
5843                if cfg!(msim) {
5844                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5845                    debug_fatal!(
5846                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5847                    );
5848                } else {
5849                    error!(
5850                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5851                    );
5852                }
5853                return None;
5854            }
5855
5856            res.push((system_package_ref.1, bytes, dependencies));
5857        }
5858
5859        Some(res)
5860    }
5861
5862    fn is_protocol_version_supported_v2(
5863        current_protocol_version: ProtocolVersion,
5864        proposed_protocol_version: ProtocolVersion,
5865        protocol_config: &ProtocolConfig,
5866        committee: &Committee,
5867        capabilities: Vec<AuthorityCapabilitiesV2>,
5868        mut buffer_stake_bps: u64,
5869    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5870        if proposed_protocol_version > current_protocol_version + 1
5871            && !protocol_config.advance_to_highest_supported_protocol_version()
5872        {
5873            return None;
5874        }
5875
5876        if buffer_stake_bps > 10000 {
5877            warn!("clamping buffer_stake_bps to 10000");
5878            buffer_stake_bps = 10000;
5879        }
5880
5881        // For each validator, gather the protocol version and system packages that it would like
5882        // to upgrade to in the next epoch.
5883        let mut desired_upgrades: Vec<_> = capabilities
5884            .into_iter()
5885            .filter_map(|mut cap| {
5886                // A validator that lists no packages is voting against any change at all.
5887                if cap.available_system_packages.is_empty() {
5888                    return None;
5889                }
5890
5891                cap.available_system_packages.sort();
5892
5893                info!(
5894                    "validator {:?} supports {:?} with system packages: {:?}",
5895                    cap.authority.concise(),
5896                    cap.supported_protocol_versions,
5897                    cap.available_system_packages,
5898                );
5899
5900                // A validator that only supports the current protocol version is also voting
5901                // against any change, because framework upgrades always require a protocol version
5902                // bump.
5903                cap.supported_protocol_versions
5904                    .get_version_digest(proposed_protocol_version)
5905                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5906            })
5907            .collect();
5908
5909        // There can only be one set of votes that have a majority, find one if it exists.
5910        desired_upgrades.sort();
5911        desired_upgrades
5912            .into_iter()
5913            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5914            .into_iter()
5915            .find_map(|((digest, packages), group)| {
5916                // should have been filtered out earlier.
5917                assert!(!packages.is_empty());
5918
5919                let mut stake_aggregator: StakeAggregator<(), true> =
5920                    StakeAggregator::new(Arc::new(committee.clone()));
5921
5922                for (_, _, authority) in group {
5923                    stake_aggregator.insert_generic(authority, ());
5924                }
5925
5926                let total_votes = stake_aggregator.total_votes();
5927                let quorum_threshold = committee.quorum_threshold();
5928                let f = committee.total_votes() - committee.quorum_threshold();
5929
5930                // multiple by buffer_stake_bps / 10000, rounded up.
5931                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5932                let effective_threshold = quorum_threshold + buffer_stake;
5933
5934                info!(
5935                    protocol_config_digest = ?digest,
5936                    ?total_votes,
5937                    ?quorum_threshold,
5938                    ?buffer_stake_bps,
5939                    ?effective_threshold,
5940                    ?proposed_protocol_version,
5941                    ?packages,
5942                    "support for upgrade"
5943                );
5944
5945                let has_support = total_votes >= effective_threshold;
5946                has_support.then_some((proposed_protocol_version, packages))
5947            })
5948    }
5949
5950    fn choose_protocol_version_and_system_packages_v2(
5951        current_protocol_version: ProtocolVersion,
5952        protocol_config: &ProtocolConfig,
5953        committee: &Committee,
5954        capabilities: Vec<AuthorityCapabilitiesV2>,
5955        buffer_stake_bps: u64,
5956    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5957        assert!(protocol_config.authority_capabilities_v2());
5958        let mut next_protocol_version = current_protocol_version;
5959        let mut system_packages = vec![];
5960
5961        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5962            current_protocol_version,
5963            next_protocol_version + 1,
5964            protocol_config,
5965            committee,
5966            capabilities.clone(),
5967            buffer_stake_bps,
5968        ) {
5969            next_protocol_version = version;
5970            system_packages = packages;
5971        }
5972
5973        (next_protocol_version, system_packages)
5974    }
5975
5976    #[instrument(level = "debug", skip_all)]
5977    fn create_authenticator_state_tx(
5978        &self,
5979        epoch_store: &Arc<AuthorityPerEpochStore>,
5980    ) -> Option<EndOfEpochTransactionKind> {
5981        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5982            info!("authenticator state transactions not enabled");
5983            return None;
5984        }
5985
5986        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5987        let tx = if authenticator_state_exists {
5988            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5989            let min_epoch =
5990                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5991            let authenticator_obj_initial_shared_version = epoch_store
5992                .epoch_start_config()
5993                .authenticator_obj_initial_shared_version()
5994                .expect("initial version must exist");
5995
5996            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5997                min_epoch,
5998                authenticator_obj_initial_shared_version,
5999            );
6000
6001            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
6002
6003            tx
6004        } else {
6005            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
6006            info!("Creating AuthenticatorStateCreate tx");
6007            tx
6008        };
6009        Some(tx)
6010    }
6011
6012    #[instrument(level = "debug", skip_all)]
6013    fn create_randomness_state_tx(
6014        &self,
6015        epoch_store: &Arc<AuthorityPerEpochStore>,
6016    ) -> Option<EndOfEpochTransactionKind> {
6017        if !epoch_store.protocol_config().random_beacon() {
6018            info!("randomness state transactions not enabled");
6019            return None;
6020        }
6021
6022        if epoch_store.randomness_state_exists() {
6023            return None;
6024        }
6025
6026        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
6027        info!("Creating RandomnessStateCreate tx");
6028        Some(tx)
6029    }
6030
6031    #[instrument(level = "debug", skip_all)]
6032    fn create_accumulator_root_tx(
6033        &self,
6034        epoch_store: &Arc<AuthorityPerEpochStore>,
6035    ) -> Option<EndOfEpochTransactionKind> {
6036        if !epoch_store
6037            .protocol_config()
6038            .create_root_accumulator_object()
6039        {
6040            info!("accumulator root creation not enabled");
6041            return None;
6042        }
6043
6044        if epoch_store.accumulator_root_exists() {
6045            return None;
6046        }
6047
6048        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
6049        info!("Creating AccumulatorRootCreate tx");
6050        Some(tx)
6051    }
6052
6053    #[instrument(level = "debug", skip_all)]
6054    fn create_write_accumulator_storage_cost_tx(
6055        &self,
6056        epoch_store: &Arc<AuthorityPerEpochStore>,
6057    ) -> Option<EndOfEpochTransactionKind> {
6058        if !epoch_store.accumulator_root_exists() {
6059            info!("accumulator root does not exist yet");
6060            return None;
6061        }
6062        if !epoch_store.protocol_config().enable_accumulators() {
6063            info!("accumulators not enabled");
6064            return None;
6065        }
6066
6067        let object_store = self.get_object_store();
6068        let object_count =
6069            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6070                Ok(Some(count)) => count,
6071                Ok(None) => return None,
6072                Err(e) => {
6073                    fatal!("failed to read accumulator object count: {e}");
6074                }
6075            };
6076
6077        let storage_cost = object_count.saturating_mul(
6078            epoch_store
6079                .protocol_config()
6080                .accumulator_object_storage_cost(),
6081        );
6082
6083        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6084        info!(
6085            object_count,
6086            storage_cost, "Creating WriteAccumulatorStorageCost tx"
6087        );
6088        Some(tx)
6089    }
6090
6091    #[instrument(level = "debug", skip_all)]
6092    fn create_coin_registry_tx(
6093        &self,
6094        epoch_store: &Arc<AuthorityPerEpochStore>,
6095    ) -> Option<EndOfEpochTransactionKind> {
6096        if !epoch_store.protocol_config().enable_coin_registry() {
6097            info!("coin registry not enabled");
6098            return None;
6099        }
6100
6101        if epoch_store.coin_registry_exists() {
6102            return None;
6103        }
6104
6105        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6106        info!("Creating CoinRegistryCreate tx");
6107        Some(tx)
6108    }
6109
6110    #[instrument(level = "debug", skip_all)]
6111    fn create_display_registry_tx(
6112        &self,
6113        epoch_store: &Arc<AuthorityPerEpochStore>,
6114    ) -> Option<EndOfEpochTransactionKind> {
6115        if !epoch_store.protocol_config().enable_display_registry() {
6116            info!("display registry not enabled");
6117            return None;
6118        }
6119
6120        if epoch_store.display_registry_exists() {
6121            return None;
6122        }
6123
6124        let tx = EndOfEpochTransactionKind::new_display_registry_create();
6125        info!("Creating DisplayRegistryCreate tx");
6126        Some(tx)
6127    }
6128
6129    #[instrument(level = "debug", skip_all)]
6130    fn create_bridge_tx(
6131        &self,
6132        epoch_store: &Arc<AuthorityPerEpochStore>,
6133    ) -> Option<EndOfEpochTransactionKind> {
6134        if !epoch_store.protocol_config().enable_bridge() {
6135            info!("bridge not enabled");
6136            return None;
6137        }
6138        if epoch_store.bridge_exists() {
6139            return None;
6140        }
6141        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6142        info!("Creating Bridge Create tx");
6143        Some(tx)
6144    }
6145
6146    #[instrument(level = "debug", skip_all)]
6147    fn init_bridge_committee_tx(
6148        &self,
6149        epoch_store: &Arc<AuthorityPerEpochStore>,
6150    ) -> Option<EndOfEpochTransactionKind> {
6151        if !epoch_store.protocol_config().enable_bridge() {
6152            info!("bridge not enabled");
6153            return None;
6154        }
6155        if !epoch_store
6156            .protocol_config()
6157            .should_try_to_finalize_bridge_committee()
6158        {
6159            info!("should not try to finalize bridge committee yet");
6160            return None;
6161        }
6162        // Only create this transaction if bridge exists
6163        if !epoch_store.bridge_exists() {
6164            return None;
6165        }
6166
6167        if epoch_store.bridge_committee_initiated() {
6168            return None;
6169        }
6170
6171        let bridge_initial_shared_version = epoch_store
6172            .epoch_start_config()
6173            .bridge_obj_initial_shared_version()
6174            .expect("initial version must exist");
6175        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6176        info!("Init Bridge committee tx");
6177        Some(tx)
6178    }
6179
6180    #[instrument(level = "debug", skip_all)]
6181    fn create_deny_list_state_tx(
6182        &self,
6183        epoch_store: &Arc<AuthorityPerEpochStore>,
6184    ) -> Option<EndOfEpochTransactionKind> {
6185        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6186            return None;
6187        }
6188
6189        if epoch_store.coin_deny_list_state_exists() {
6190            return None;
6191        }
6192
6193        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6194        info!("Creating DenyListStateCreate tx");
6195        Some(tx)
6196    }
6197
6198    #[instrument(level = "debug", skip_all)]
6199    fn create_address_alias_state_tx(
6200        &self,
6201        epoch_store: &Arc<AuthorityPerEpochStore>,
6202    ) -> Option<EndOfEpochTransactionKind> {
6203        if !epoch_store.protocol_config().address_aliases() {
6204            info!("address aliases not enabled");
6205            return None;
6206        }
6207
6208        if epoch_store.address_alias_state_exists() {
6209            return None;
6210        }
6211
6212        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6213        info!("Creating AddressAliasStateCreate tx");
6214        Some(tx)
6215    }
6216
6217    #[instrument(level = "debug", skip_all)]
6218    fn create_execution_time_observations_tx(
6219        &self,
6220        epoch_store: &Arc<AuthorityPerEpochStore>,
6221        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6222        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6223    ) -> Option<EndOfEpochTransactionKind> {
6224        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6225            .protocol_config()
6226            .per_object_congestion_control_mode()
6227        else {
6228            return None;
6229        };
6230
6231        // Load tx in the last N checkpoints before end-of-epoch, and save only the
6232        // execution time observations for commands in these checkpoints.
6233        let start_checkpoint = std::cmp::max(
6234            last_checkpoint_before_end_of_epoch
6235                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6236            // If we have <N checkpoints in the epoch, use all of them.
6237            epoch_store
6238                .epoch()
6239                .checked_sub(1)
6240                .map(|prev_epoch| {
6241                    self.checkpoint_store
6242                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
6243                        .expect("typed store must not fail")
6244                        .expect(
6245                            "sequence number of last checkpoint of preceding epoch must be saved",
6246                        )
6247                        + 1
6248                })
6249                .unwrap_or(1),
6250        );
6251        info!(
6252            "reading checkpoint range {:?}..={:?}",
6253            start_checkpoint, last_checkpoint_before_end_of_epoch
6254        );
6255        let sequence_numbers =
6256            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6257        let contents_digests: Vec<_> = self
6258            .checkpoint_store
6259            .multi_get_locally_computed_checkpoints(&sequence_numbers)
6260            .expect("typed store must not fail")
6261            .into_iter()
6262            .zip_debug_eq(sequence_numbers)
6263            .map(|(maybe_checkpoint, sequence_number)| {
6264                if let Some(checkpoint) = maybe_checkpoint {
6265                    checkpoint.content_digest
6266                } else {
6267                    // If locally computed checkpoint summary was already pruned, load the
6268                    // certified checkpoint.
6269                    self.checkpoint_store
6270                        .get_checkpoint_by_sequence_number(sequence_number)
6271                        .expect("typed store must not fail")
6272                        .unwrap_or_else(|| {
6273                            fatal!("preceding checkpoints must exist by end of epoch")
6274                        })
6275                        .data()
6276                        .content_digest
6277                }
6278            })
6279            .collect();
6280        let tx_digests: Vec<_> = self
6281            .checkpoint_store
6282            .multi_get_checkpoint_content(&contents_digests)
6283            .expect("typed store must not fail")
6284            .into_iter()
6285            .flat_map(|maybe_contents| {
6286                maybe_contents
6287                    .expect("preceding checkpoint contents must exist by end of epoch")
6288                    .into_inner()
6289                    .into_iter()
6290                    .map(|digests| digests.transaction)
6291            })
6292            .collect();
6293        let included_execution_time_observations: HashSet<_> = self
6294            .get_transaction_cache_reader()
6295            .multi_get_transaction_blocks(&tx_digests)
6296            .into_iter()
6297            .flat_map(|maybe_tx| {
6298                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6299                    .expect("preceding transaction must exist by end of epoch")
6300                    .transaction_data()
6301                    .kind()
6302                {
6303                    #[allow(clippy::unnecessary_to_owned)]
6304                    itertools::Either::Left(
6305                        ptb.commands
6306                            .to_owned()
6307                            .into_iter()
6308                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6309                    )
6310                } else {
6311                    itertools::Either::Right(std::iter::empty())
6312                }
6313            })
6314            .chain(end_of_epoch_observation_keys.into_iter())
6315            .collect();
6316
6317        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6318            epoch_store
6319                .get_end_of_epoch_execution_time_observations()
6320                .filter_and_sort_v1(
6321                    |(key, _)| included_execution_time_observations.contains(key),
6322                    params.stored_observations_limit.try_into().unwrap(),
6323                ),
6324        );
6325        info!("Creating StoreExecutionTimeObservations tx");
6326        Some(tx)
6327    }
6328
6329    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6330    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6331    /// formed and executed by CheckpointExecutor.
6332    ///
6333    /// When a framework upgraded has been decided on, but the validator does not have the new
6334    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6335    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6336    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6337    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6338    /// by CheckpointExecutor.
6339    #[instrument(level = "error", skip_all)]
6340    pub async fn create_and_execute_advance_epoch_tx(
6341        &self,
6342        epoch_store: &Arc<AuthorityPerEpochStore>,
6343        gas_cost_summary: &GasCostSummary,
6344        checkpoint: CheckpointSequenceNumber,
6345        epoch_start_timestamp_ms: CheckpointTimestamp,
6346        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6347        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6348        // >1 checkpoint.
6349        last_checkpoint: CheckpointSequenceNumber,
6350    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6351        let mut txns = Vec::new();
6352
6353        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6354            txns.push(tx);
6355        }
6356        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6357            txns.push(tx);
6358        }
6359        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6360            txns.push(tx);
6361        }
6362        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6363            txns.push(tx);
6364        }
6365        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6366            txns.push(tx);
6367        }
6368        if let Some(tx) = self.create_execution_time_observations_tx(
6369            epoch_store,
6370            end_of_epoch_observation_keys,
6371            last_checkpoint,
6372        ) {
6373            txns.push(tx);
6374        }
6375        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6376            txns.push(tx);
6377        }
6378
6379        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6380            txns.push(tx);
6381        }
6382        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6383            txns.push(tx);
6384        }
6385        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6386            txns.push(tx);
6387        }
6388        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6389            txns.push(tx);
6390        }
6391
6392        let next_epoch = epoch_store.epoch() + 1;
6393
6394        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6395
6396        let (next_epoch_protocol_version, next_epoch_system_packages) =
6397            Self::choose_protocol_version_and_system_packages_v2(
6398                epoch_store.protocol_version(),
6399                epoch_store.protocol_config(),
6400                epoch_store.committee(),
6401                epoch_store
6402                    .get_capabilities_v2()
6403                    .expect("read capabilities from db cannot fail"),
6404                buffer_stake_bps,
6405            );
6406
6407        // since system packages are created during the current epoch, they should abide by the
6408        // rules of the current epoch, including the current epoch's max Move binary format version
6409        let config = epoch_store.protocol_config();
6410        let binary_config = config.binary_config(None);
6411        let Some(next_epoch_system_package_bytes) = self
6412            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6413            .await
6414        else {
6415            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6416                // This case should only be hit if the validator supports the new protocol version,
6417                // but carries a different framework. The validator should still be able to
6418                // reconfigure, as the correct framework will be installed by the change epoch txn.
6419                debug_fatal!(
6420                    "upgraded system packages {:?} are not locally available, cannot create \
6421                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6422                    next_epoch_system_packages
6423                );
6424            } else {
6425                error!(
6426                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6427                    next_epoch_protocol_version
6428                );
6429            }
6430            // the checkpoint builder will keep retrying forever when it hits this error.
6431            // Eventually, one of two things will happen:
6432            // - The operator will upgrade this binary to one that has the new packages locally,
6433            //   and this function will succeed.
6434            // - The final checkpoint will be certified by other validators, we will receive it via
6435            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6436            //   and most likely shut down in the new epoch (this validator likely doesn't support
6437            //   the new protocol version, or else it should have had the packages.)
6438            return Err(CheckpointBuilderError::SystemPackagesMissing);
6439        };
6440
6441        let tx = if epoch_store
6442            .protocol_config()
6443            .end_of_epoch_transaction_supported()
6444        {
6445            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6446                next_epoch,
6447                next_epoch_protocol_version,
6448                gas_cost_summary.storage_cost,
6449                gas_cost_summary.computation_cost,
6450                gas_cost_summary.storage_rebate,
6451                gas_cost_summary.non_refundable_storage_fee,
6452                epoch_start_timestamp_ms,
6453                next_epoch_system_package_bytes,
6454            ));
6455
6456            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6457        } else {
6458            VerifiedTransaction::new_change_epoch(
6459                next_epoch,
6460                next_epoch_protocol_version,
6461                gas_cost_summary.storage_cost,
6462                gas_cost_summary.computation_cost,
6463                gas_cost_summary.storage_rebate,
6464                gas_cost_summary.non_refundable_storage_fee,
6465                epoch_start_timestamp_ms,
6466                next_epoch_system_package_bytes,
6467            )
6468        };
6469
6470        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6471            tx.clone(),
6472            epoch_store.epoch(),
6473            checkpoint,
6474        );
6475
6476        let tx_digest = executable_tx.digest();
6477
6478        info!(
6479            ?next_epoch,
6480            ?next_epoch_protocol_version,
6481            ?next_epoch_system_packages,
6482            computation_cost=?gas_cost_summary.computation_cost,
6483            storage_cost=?gas_cost_summary.storage_cost,
6484            storage_rebate=?gas_cost_summary.storage_rebate,
6485            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6486            ?tx_digest,
6487            "Creating advance epoch transaction"
6488        );
6489
6490        fail_point_async!("change_epoch_tx_delay");
6491        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6492
6493        // The tx could have been executed by state sync already - if so simply return an error.
6494        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6495        if self
6496            .get_transaction_cache_reader()
6497            .is_tx_already_executed(tx_digest)
6498        {
6499            warn!("change epoch tx has already been executed via state sync");
6500            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6501        }
6502
6503        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6504        else {
6505            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6506        };
6507
6508        // We must manually assign the shared object versions to the transaction before executing it.
6509        // This is because we do not sequence end-of-epoch transactions through consensus.
6510        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6511            self.get_object_cache_reader().as_ref(),
6512            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6513        )?;
6514
6515        assert_eq!(assigned_versions.0.len(), 1);
6516        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6517
6518        let input_objects = self.read_objects_for_execution(
6519            &tx_lock,
6520            &executable_tx,
6521            &assigned_versions,
6522            epoch_store,
6523        )?;
6524
6525        let (transaction_outputs, _timings, _execution_error_opt) = self
6526            .execute_certificate(
6527                &execution_guard,
6528                &executable_tx,
6529                input_objects,
6530                None,
6531                ExecutionEnv::default(),
6532                epoch_store,
6533            )
6534            .unwrap();
6535        let system_obj = get_sui_system_state(&transaction_outputs.written)
6536            .expect("change epoch tx must write to system object");
6537
6538        let effects = transaction_outputs.effects;
6539        // We must write tx and effects to the state sync tables so that state sync is able to
6540        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6541        // checkpoint.
6542        self.get_state_sync_store()
6543            .insert_transaction_and_effects(&tx, &effects);
6544
6545        info!(
6546            "Effects summary of the change epoch transaction: {:?}",
6547            effects.summary_for_debug()
6548        );
6549        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6550        // The change epoch transaction cannot fail to execute.
6551        assert!(effects.status().is_ok());
6552        Ok((system_obj, effects))
6553    }
6554
6555    #[instrument(level = "error", skip_all)]
6556    async fn reopen_epoch_db(
6557        &self,
6558        cur_epoch_store: &AuthorityPerEpochStore,
6559        new_committee: Committee,
6560        epoch_start_configuration: EpochStartConfiguration,
6561        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6562        epoch_last_checkpoint: CheckpointSequenceNumber,
6563    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6564        let new_epoch = new_committee.epoch;
6565        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6566        assert_eq!(
6567            epoch_start_configuration.epoch_start_state().epoch(),
6568            new_committee.epoch
6569        );
6570        fail_point!("before-open-new-epoch-store");
6571        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6572            self.name,
6573            new_committee,
6574            epoch_start_configuration,
6575            self.get_backing_package_store().clone(),
6576            self.get_object_store().clone(),
6577            expensive_safety_check_config,
6578            epoch_last_checkpoint,
6579        )?;
6580        self.epoch_store.store(new_epoch_store.clone());
6581        Ok(new_epoch_store)
6582    }
6583
6584    #[cfg(test)]
6585    pub(crate) fn iter_live_object_set_for_testing(
6586        &self,
6587    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6588        let include_wrapped_object = !self
6589            .epoch_store_for_testing()
6590            .protocol_config()
6591            .simplified_unwrap_then_delete();
6592        self.get_global_state_hash_store()
6593            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6594    }
6595
6596    #[cfg(test)]
6597    pub(crate) fn shutdown_execution_for_test(&self) {
6598        self.tx_execution_shutdown
6599            .lock()
6600            .take()
6601            .unwrap()
6602            .send(())
6603            .unwrap();
6604    }
6605
6606    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6607    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6608        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6609        self.get_object_cache_reader()
6610            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6611        self.get_reconfig_api()
6612            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6613        Ok(())
6614    }
6615}
6616
6617pub struct RandomnessRoundReceiver {
6618    authority_state: Arc<AuthorityState>,
6619    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6620}
6621
6622impl RandomnessRoundReceiver {
6623    pub fn spawn(
6624        authority_state: Arc<AuthorityState>,
6625        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6626    ) -> JoinHandle<()> {
6627        let rrr = RandomnessRoundReceiver {
6628            authority_state,
6629            randomness_rx,
6630        };
6631        spawn_monitored_task!(rrr.run())
6632    }
6633
6634    async fn run(mut self) {
6635        info!("RandomnessRoundReceiver event loop started");
6636
6637        loop {
6638            tokio::select! {
6639                maybe_recv = self.randomness_rx.recv() => {
6640                    if let Some((epoch, round, bytes)) = maybe_recv {
6641                        self.handle_new_randomness(epoch, round, bytes).await;
6642                    } else {
6643                        break;
6644                    }
6645                },
6646            }
6647        }
6648
6649        info!("RandomnessRoundReceiver event loop ended");
6650    }
6651
6652    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6653    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6654        fail_point_async!("randomness-delay");
6655
6656        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6657        if epoch_store.epoch() != epoch {
6658            warn!(
6659                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6660                epoch_store.epoch()
6661            );
6662            return;
6663        }
6664        let key = TransactionKey::RandomnessRound(epoch, round);
6665        let transaction = VerifiedTransaction::new_randomness_state_update(
6666            epoch,
6667            round,
6668            bytes,
6669            epoch_store
6670                .epoch_start_config()
6671                .randomness_obj_initial_shared_version()
6672                .expect("randomness state obj must exist"),
6673        );
6674        debug!(
6675            "created randomness state update transaction with digest: {:?}",
6676            transaction.digest()
6677        );
6678        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6679        let digest = *transaction.digest();
6680
6681        // Randomness state updates contain the full bls signature for the random round,
6682        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6683        // persist this transaction. If we crash before its outputs are committed, this
6684        // ensures we will be able to re-execute it.
6685        self.authority_state
6686            .get_cache_commit()
6687            .persist_transaction(&transaction);
6688
6689        // Notify the scheduler that the transaction key now has a known digest
6690        if epoch_store.insert_tx_key(key, digest).is_err() {
6691            warn!("epoch ended while handling new randomness");
6692        }
6693
6694        let authority_state = self.authority_state.clone();
6695        spawn_monitored_task!(async move {
6696            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6697            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6698            // output of the RandomnessStateUpdate from the previous round.)
6699            //
6700            // We set a very long timeout so that in case this gets stuck for some reason, the
6701            // validator will eventually crash rather than continuing in a zombie mode.
6702            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6703            let result = tokio::time::timeout(
6704                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6705                authority_state
6706                    .get_transaction_cache_reader()
6707                    .notify_read_executed_effects(
6708                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6709                        &[digest],
6710                    ),
6711            )
6712            .await;
6713            let mut effects = match result {
6714                Ok(result) => result,
6715                Err(_) => {
6716                    // Crash on randomness update execution timeout in debug builds.
6717                    debug_fatal_no_invariant!(
6718                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6719                    );
6720                    // Continue waiting as long as necessary in non-debug builds.
6721                    authority_state
6722                        .get_transaction_cache_reader()
6723                        .notify_read_executed_effects(
6724                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6725                            &[digest],
6726                        )
6727                        .await
6728                }
6729            };
6730
6731            let effects = effects.pop().expect("should return effects");
6732            if *effects.status() != ExecutionStatus::Success {
6733                fatal!(
6734                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6735                );
6736            }
6737            debug!(
6738                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6739            );
6740        });
6741    }
6742}
6743
6744#[async_trait]
6745impl TransactionKeyValueStoreTrait for AuthorityState {
6746    #[instrument(skip(self))]
6747    async fn multi_get(
6748        &self,
6749        transactions: &[TransactionDigest],
6750        effects: &[TransactionDigest],
6751    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6752        let txns = if !transactions.is_empty() {
6753            self.get_transaction_cache_reader()
6754                .multi_get_transaction_blocks(transactions)
6755                .into_iter()
6756                .map(|t| t.map(|t| (*t).clone().into_inner()))
6757                .collect()
6758        } else {
6759            vec![]
6760        };
6761
6762        let fx = if !effects.is_empty() {
6763            self.get_transaction_cache_reader()
6764                .multi_get_executed_effects(effects)
6765        } else {
6766            vec![]
6767        };
6768
6769        Ok((txns, fx))
6770    }
6771
6772    #[instrument(skip(self))]
6773    async fn multi_get_checkpoints(
6774        &self,
6775        checkpoint_summaries: &[CheckpointSequenceNumber],
6776        checkpoint_contents: &[CheckpointSequenceNumber],
6777        checkpoint_summaries_by_digest: &[CheckpointDigest],
6778    ) -> SuiResult<(
6779        Vec<Option<CertifiedCheckpointSummary>>,
6780        Vec<Option<CheckpointContents>>,
6781        Vec<Option<CertifiedCheckpointSummary>>,
6782    )> {
6783        // TODO: use multi-get methods if it ever becomes important (unlikely)
6784        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6785        let store = self.get_checkpoint_store();
6786        for seq in checkpoint_summaries {
6787            let checkpoint = store
6788                .get_checkpoint_by_sequence_number(*seq)?
6789                .map(|c| c.into_inner());
6790
6791            summaries.push(checkpoint);
6792        }
6793
6794        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6795        for seq in checkpoint_contents {
6796            let checkpoint = store
6797                .get_checkpoint_by_sequence_number(*seq)?
6798                .and_then(|summary| {
6799                    store
6800                        .get_checkpoint_contents(&summary.content_digest)
6801                        .expect("db read cannot fail")
6802                });
6803            contents.push(checkpoint);
6804        }
6805
6806        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6807        for digest in checkpoint_summaries_by_digest {
6808            let checkpoint = store
6809                .get_checkpoint_by_digest(digest)?
6810                .map(|c| c.into_inner());
6811            summaries_by_digest.push(checkpoint);
6812        }
6813        Ok((summaries, contents, summaries_by_digest))
6814    }
6815
6816    #[instrument(skip(self))]
6817    async fn deprecated_get_transaction_checkpoint(
6818        &self,
6819        digest: TransactionDigest,
6820    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6821        Ok(self
6822            .get_checkpoint_cache()
6823            .deprecated_get_transaction_checkpoint(&digest)
6824            .map(|(_epoch, checkpoint)| checkpoint))
6825    }
6826
6827    #[instrument(skip(self))]
6828    async fn get_object(
6829        &self,
6830        object_id: ObjectID,
6831        version: VersionNumber,
6832    ) -> SuiResult<Option<Object>> {
6833        Ok(self
6834            .get_object_cache_reader()
6835            .get_object_by_key(&object_id, version))
6836    }
6837
6838    #[instrument(skip_all)]
6839    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6840        Ok(self
6841            .get_object_cache_reader()
6842            .multi_get_objects_by_key(object_keys))
6843    }
6844
6845    #[instrument(skip(self))]
6846    async fn multi_get_transaction_checkpoint(
6847        &self,
6848        digests: &[TransactionDigest],
6849    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6850        let res = self
6851            .get_checkpoint_cache()
6852            .deprecated_multi_get_transaction_checkpoint(digests);
6853
6854        Ok(res
6855            .into_iter()
6856            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6857            .collect())
6858    }
6859
6860    #[instrument(skip(self))]
6861    async fn multi_get_events_by_tx_digests(
6862        &self,
6863        digests: &[TransactionDigest],
6864    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6865        if digests.is_empty() {
6866            return Ok(vec![]);
6867        }
6868
6869        Ok(self
6870            .get_transaction_cache_reader()
6871            .multi_get_events(digests))
6872    }
6873}
6874
6875#[cfg(msim)]
6876pub mod framework_injection {
6877    use move_binary_format::CompiledModule;
6878    use std::collections::BTreeMap;
6879    use std::{cell::RefCell, collections::BTreeSet};
6880    use sui_framework::{BuiltInFramework, SystemPackage};
6881    use sui_types::base_types::{AuthorityName, ObjectID};
6882    use sui_types::is_system_package;
6883
6884    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6885
6886    // Thread local cache because all simtests run in a single unique thread.
6887    thread_local! {
6888        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6889    }
6890
6891    type Framework = Vec<CompiledModule>;
6892
6893    pub type PackageUpgradeCallback =
6894        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6895
6896    enum PackageOverrideConfig {
6897        Global(Framework),
6898        PerValidator(PackageUpgradeCallback),
6899    }
6900
6901    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6902        modules
6903            .iter()
6904            .map(|m| {
6905                let mut buf = Vec::new();
6906                m.serialize_with_version(m.version, &mut buf).unwrap();
6907                buf
6908            })
6909            .collect()
6910    }
6911
6912    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6913        OVERRIDE.with(|bs| {
6914            bs.borrow_mut()
6915                .insert(package_id, PackageOverrideConfig::Global(modules))
6916        });
6917    }
6918
6919    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6920        OVERRIDE.with(|bs| {
6921            bs.borrow_mut()
6922                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6923        });
6924    }
6925
6926    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6927        OVERRIDE.with(|bs| {
6928            let mut new_packages_not_to_include: BTreeSet<_> =
6929                BuiltInFramework::all_package_ids().into_iter().collect();
6930            for pkg in &packages {
6931                new_packages_not_to_include.remove(&pkg.id);
6932            }
6933            for pkg in packages {
6934                bs.borrow_mut()
6935                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6936            }
6937            for empty_pkg in new_packages_not_to_include {
6938                bs.borrow_mut()
6939                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6940            }
6941        });
6942    }
6943
6944    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6945        OVERRIDE.with(|cfg| {
6946            cfg.borrow().get(package_id).and_then(|entry| match entry {
6947                PackageOverrideConfig::Global(framework) => {
6948                    Some(compiled_modules_to_bytes(framework))
6949                }
6950                PackageOverrideConfig::PerValidator(func) => {
6951                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6952                }
6953            })
6954        })
6955    }
6956
6957    pub fn get_override_modules(
6958        package_id: &ObjectID,
6959        name: AuthorityName,
6960    ) -> Option<Vec<CompiledModule>> {
6961        OVERRIDE.with(|cfg| {
6962            cfg.borrow().get(package_id).and_then(|entry| match entry {
6963                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6964                PackageOverrideConfig::PerValidator(func) => func(name),
6965            })
6966        })
6967    }
6968
6969    pub fn get_override_system_package(
6970        package_id: &ObjectID,
6971        name: AuthorityName,
6972    ) -> Option<SystemPackage> {
6973        let bytes = get_override_bytes(package_id, name)?;
6974        let dependencies = if is_system_package(*package_id) {
6975            BuiltInFramework::get_package_by_id(package_id)
6976                .dependencies
6977                .to_vec()
6978        } else {
6979            // Assume that entirely new injected packages depend on all existing system packages.
6980            BuiltInFramework::all_package_ids()
6981        };
6982        Some(SystemPackage {
6983            id: *package_id,
6984            bytes,
6985            dependencies,
6986        })
6987    }
6988
6989    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6990        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6991        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6992            cfg.borrow()
6993                .keys()
6994                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6995                .collect()
6996        });
6997
6998        extra
6999            .into_iter()
7000            .map(|package| SystemPackage {
7001                id: package,
7002                bytes: get_override_bytes(&package, name).unwrap(),
7003                dependencies: BuiltInFramework::all_package_ids(),
7004            })
7005            .collect()
7006    }
7007}
7008
7009#[derive(Debug, Serialize, Deserialize, Clone)]
7010pub struct ObjDumpFormat {
7011    pub id: ObjectID,
7012    pub version: VersionNumber,
7013    pub digest: ObjectDigest,
7014    pub object: Object,
7015}
7016
7017impl ObjDumpFormat {
7018    fn new(object: Object) -> Self {
7019        let oref = object.compute_object_reference();
7020        Self {
7021            id: oref.0,
7022            version: oref.1,
7023            digest: oref.2,
7024            object,
7025        }
7026    }
7027}
7028
7029#[derive(Debug, Serialize, Deserialize, Clone)]
7030pub struct NodeStateDump {
7031    pub tx_digest: TransactionDigest,
7032    pub sender_signed_data: SenderSignedData,
7033    pub executed_epoch: u64,
7034    pub reference_gas_price: u64,
7035    pub protocol_version: u64,
7036    pub epoch_start_timestamp_ms: u64,
7037    pub computed_effects: TransactionEffects,
7038    pub expected_effects_digest: TransactionEffectsDigest,
7039    pub relevant_system_packages: Vec<ObjDumpFormat>,
7040    pub shared_objects: Vec<ObjDumpFormat>,
7041    pub loaded_child_objects: Vec<ObjDumpFormat>,
7042    pub modified_at_versions: Vec<ObjDumpFormat>,
7043    pub runtime_reads: Vec<ObjDumpFormat>,
7044    pub input_objects: Vec<ObjDumpFormat>,
7045}
7046
7047impl NodeStateDump {
7048    pub fn new(
7049        tx_digest: &TransactionDigest,
7050        effects: &TransactionEffects,
7051        expected_effects_digest: TransactionEffectsDigest,
7052        object_store: &dyn ObjectStore,
7053        epoch_store: &Arc<AuthorityPerEpochStore>,
7054        inner_temporary_store: &InnerTemporaryStore,
7055        certificate: &VerifiedExecutableTransaction,
7056    ) -> SuiResult<Self> {
7057        // Epoch info
7058        let executed_epoch = epoch_store.epoch();
7059        let reference_gas_price = epoch_store.reference_gas_price();
7060        let epoch_start_config = epoch_store.epoch_start_config();
7061        let protocol_version = epoch_store.protocol_version().as_u64();
7062        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7063
7064        // Record all system packages at this version
7065        let mut relevant_system_packages = Vec::new();
7066        for sys_package_id in BuiltInFramework::all_package_ids() {
7067            if let Some(w) = object_store.get_object(&sys_package_id) {
7068                relevant_system_packages.push(ObjDumpFormat::new(w))
7069            }
7070        }
7071
7072        // Record all the shared objects
7073        let mut shared_objects = Vec::new();
7074        for kind in effects.input_consensus_objects() {
7075            match kind {
7076                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7077                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7078                        shared_objects.push(ObjDumpFormat::new(w))
7079                    }
7080                }
7081                InputConsensusObject::ReadConsensusStreamEnded(..)
7082                | InputConsensusObject::MutateConsensusStreamEnded(..)
7083                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
7084            }
7085        }
7086
7087        // Record all loaded child objects
7088        // Child objects which are read but not mutated are not tracked anywhere else
7089        let mut loaded_child_objects = Vec::new();
7090        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7091            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7092                loaded_child_objects.push(ObjDumpFormat::new(w))
7093            }
7094        }
7095
7096        // Record all modified objects
7097        let mut modified_at_versions = Vec::new();
7098        for (id, ver) in effects.modified_at_versions() {
7099            if let Some(w) = object_store.get_object_by_key(&id, ver) {
7100                modified_at_versions.push(ObjDumpFormat::new(w))
7101            }
7102        }
7103
7104        // Packages read at runtime, which were not previously loaded into the temoorary store
7105        // Some packages may be fetched at runtime and wont show up in input objects
7106        let mut runtime_reads = Vec::new();
7107        for obj in inner_temporary_store
7108            .runtime_packages_loaded_from_db
7109            .values()
7110        {
7111            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7112        }
7113
7114        // All other input objects should already be in `inner_temporary_store.objects`
7115
7116        Ok(Self {
7117            tx_digest: *tx_digest,
7118            executed_epoch,
7119            reference_gas_price,
7120            epoch_start_timestamp_ms,
7121            protocol_version,
7122            relevant_system_packages,
7123            shared_objects,
7124            loaded_child_objects,
7125            modified_at_versions,
7126            runtime_reads,
7127            sender_signed_data: certificate.clone().into_message(),
7128            input_objects: inner_temporary_store
7129                .input_objects
7130                .values()
7131                .map(|o| ObjDumpFormat::new(o.clone()))
7132                .collect(),
7133            computed_effects: effects.clone(),
7134            expected_effects_digest,
7135        })
7136    }
7137
7138    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7139        let mut objects = Vec::new();
7140        objects.extend(self.relevant_system_packages.clone());
7141        objects.extend(self.shared_objects.clone());
7142        objects.extend(self.loaded_child_objects.clone());
7143        objects.extend(self.modified_at_versions.clone());
7144        objects.extend(self.runtime_reads.clone());
7145        objects.extend(self.input_objects.clone());
7146        objects
7147    }
7148
7149    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7150        let file_name = format!(
7151            "{}_{}_NODE_DUMP.json",
7152            self.tx_digest,
7153            AuthorityState::unixtime_now_ms()
7154        );
7155        let mut path = path.to_path_buf();
7156        path.push(&file_name);
7157        let mut file = File::create(path.clone())?;
7158        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7159        Ok(path)
7160    }
7161
7162    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7163        let file = File::open(path)?;
7164        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7165    }
7166}