sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::consensus_adapter::ConsensusOverloadChecker;
15use crate::execution_cache::ExecutionCacheTraitPointers;
16use crate::execution_cache::TransactionCacheRead;
17use crate::execution_cache::writeback_cache::WritebackCache;
18use crate::execution_scheduler::ExecutionScheduler;
19use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::{assert_reachable, fatal};
39use parking_lot::Mutex;
40use prometheus::{
41    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
42    register_histogram_vec_with_registry, register_histogram_with_registry,
43    register_int_counter_vec_with_registry, register_int_counter_with_registry,
44    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
45};
46use serde::de::DeserializeOwned;
47use serde::{Deserialize, Serialize};
48use shared_object_version_manager::AssignedVersions;
49use shared_object_version_manager::Schedulable;
50use std::collections::BTreeMap;
51use std::collections::BTreeSet;
52use std::fs::File;
53use std::io::Write;
54use std::path::{Path, PathBuf};
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59use std::time::UNIX_EPOCH;
60use std::{
61    collections::{HashMap, HashSet},
62    fs,
63    pin::Pin,
64    str::FromStr,
65    sync::Arc,
66    vec,
67};
68use sui_config::NodeConfig;
69use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
70use sui_execution::Executor;
71use sui_protocol_config::PerObjectCongestionControlMode;
72use sui_types::accumulator_root::AccumulatorObjId;
73use sui_types::crypto::RandomnessRound;
74use sui_types::dynamic_field::visitor as DFV;
75use sui_types::execution::ExecutionOutput;
76use sui_types::execution::ExecutionTimeObservationKey;
77use sui_types::execution::ExecutionTiming;
78use sui_types::execution_params::ExecutionOrEarlyError;
79use sui_types::execution_params::FundsWithdrawStatus;
80use sui_types::execution_params::get_early_execution_error;
81use sui_types::execution_status::ExecutionStatus;
82use sui_types::inner_temporary_store::PackageStoreWithFallback;
83use sui_types::layout_resolver::LayoutResolver;
84use sui_types::layout_resolver::into_struct_layout;
85use sui_types::messages_consensus::AuthorityCapabilitiesV2;
86use sui_types::object::bounded_visitor::BoundedVisitor;
87use sui_types::storage::ChildObjectResolver;
88use sui_types::storage::InputKey;
89use sui_types::storage::TrackingBackingStore;
90use sui_types::traffic_control::{
91    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
92};
93use sui_types::transaction_executor::SimulateTransactionResult;
94use sui_types::transaction_executor::TransactionChecks;
95use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
96use tap::TapFallible;
97use tokio::sync::RwLock;
98use tokio::sync::mpsc::unbounded_channel;
99use tokio::sync::watch::error::RecvError;
100use tokio::sync::{mpsc, oneshot};
101use tokio::task::JoinHandle;
102use tokio::time::timeout;
103use tracing::{debug, error, info, instrument, warn};
104
105use self::authority_store::ExecutionLockWriteGuard;
106use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
107pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
108use mysten_metrics::{monitored_scope, spawn_monitored_task};
109
110use crate::jsonrpc_index::IndexStore;
111use crate::jsonrpc_index::{
112    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
113};
114use mysten_common::{debug_fatal, debug_fatal_no_invariant};
115use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
116use sui_config::genesis::Genesis;
117use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
118use sui_framework::{BuiltInFramework, SystemPackage};
119use sui_json_rpc_types::{
120    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
121    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
122    SuiTransactionBlockEvents, TransactionFilter,
123};
124use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
125use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
126use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
127use sui_types::accumulator_root::AccumulatorValue;
128use sui_types::authenticator_state::get_authenticator_state;
129use sui_types::balance::Balance;
130use sui_types::coin_reservation;
131use sui_types::committee::{EpochId, ProtocolVersion};
132use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
133use sui_types::deny_list_v1::check_coin_deny_list_v1;
134use sui_types::digests::ChainIdentifier;
135use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
136use sui_types::effects::{
137    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
138    TransactionEvents, VerifiedSignedTransactionEffects,
139};
140use sui_types::error::{ExecutionError, ExecutionErrorTrait, SuiErrorKind, UserInputError};
141use sui_types::event::{Event, EventID};
142use sui_types::executable_transaction::VerifiedExecutableTransaction;
143use sui_types::execution_status::ExecutionErrorKind;
144use sui_types::gas::{GasCostSummary, SuiGasStatus};
145use sui_types::inner_temporary_store::{
146    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
147};
148use sui_types::message_envelope::Message;
149use sui_types::messages_checkpoint::{
150    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
151    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
152    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
153    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
154};
155use sui_types::messages_grpc::{
156    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
157    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
158};
159use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
160use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
161use sui_types::signature::GenericSignature;
162use sui_types::storage::{
163    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
164};
165use sui_types::sui_system_state::SuiSystemStateTrait;
166use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
167use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
168use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
169use sui_types::{
170    SUI_SYSTEM_ADDRESS,
171    base_types::*,
172    committee::Committee,
173    crypto::AuthoritySignature,
174    error::{SuiError, SuiResult},
175    object::{Object, ObjectRead},
176    transaction::*,
177};
178use sui_types::{TypeTag, is_system_package};
179use typed_store::TypedStoreError;
180use typed_store::rocks::StagedBatch;
181
182use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
183use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
184use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
185use crate::authority::authority_store_pruner::{
186    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
187};
188use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
189use crate::authority::epoch_start_configuration::EpochStartConfiguration;
190use crate::checkpoints::CheckpointStore;
191use crate::epoch::committee_store::CommitteeStore;
192use crate::execution_cache::{
193    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
194    ObjectCacheRead, StateSyncAPI,
195};
196use crate::execution_driver::execution_process;
197use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
198use crate::metrics::LatencyObserver;
199use crate::metrics::RateTracker;
200use crate::module_cache_metrics::ResolverMetrics;
201use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
202use crate::stake_aggregator::StakeAggregator;
203use crate::subscription_handler::SubscriptionHandler;
204use crate::transaction_input_loader::TransactionInputLoader;
205
206#[cfg(msim)]
207pub use crate::checkpoints::checkpoint_executor::utils::{
208    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
209};
210
211#[cfg(msim)]
212use sui_types::committee::CommitteeTrait;
213use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
214
215#[cfg(test)]
216#[path = "unit_tests/authority_tests.rs"]
217pub mod authority_tests;
218
219#[cfg(test)]
220#[path = "unit_tests/transaction_tests.rs"]
221pub mod transaction_tests;
222
223#[cfg(test)]
224#[path = "unit_tests/batch_transaction_tests.rs"]
225mod batch_transaction_tests;
226
227#[cfg(test)]
228#[path = "unit_tests/move_integration_tests.rs"]
229pub mod move_integration_tests;
230
231#[cfg(test)]
232#[path = "unit_tests/gas_tests.rs"]
233mod gas_tests;
234
235#[cfg(test)]
236#[path = "unit_tests/batch_verification_tests.rs"]
237mod batch_verification_tests;
238
239#[cfg(test)]
240#[path = "unit_tests/coin_deny_list_tests.rs"]
241mod coin_deny_list_tests;
242
243#[cfg(test)]
244#[path = "unit_tests/auth_unit_test_utils.rs"]
245pub mod auth_unit_test_utils;
246
247pub mod authority_test_utils;
248
249pub mod authority_per_epoch_store;
250pub mod authority_per_epoch_store_pruner;
251
252pub mod authority_store_pruner;
253pub mod authority_store_tables;
254pub mod authority_store_types;
255pub mod congestion_log;
256pub mod consensus_tx_status_cache;
257pub(crate) mod epoch_marker_key;
258pub mod epoch_start_configuration;
259pub mod execution_time_estimator;
260pub mod shared_object_congestion_tracker;
261pub mod shared_object_version_manager;
262pub mod submitted_transaction_cache;
263pub mod test_authority_builder;
264pub mod transaction_deferral;
265pub mod transaction_reject_reason_cache;
266mod weighted_moving_average;
267
268pub(crate) mod authority_store;
269pub mod backpressure;
270
271/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
272pub struct AuthorityMetrics {
273    tx_orders: IntCounter,
274    total_certs: IntCounter,
275    total_effects: IntCounter,
276    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
277    pub shared_obj_tx: IntCounter,
278    sponsored_tx: IntCounter,
279    tx_already_processed: IntCounter,
280    num_input_objs: Histogram,
281    // TODO: this tracks consensus object count, not just shared. Consider renaming.
282    num_shared_objects: Histogram,
283    batch_size: Histogram,
284
285    authority_state_handle_vote_transaction_latency: Histogram,
286
287    internal_execution_latency: Histogram,
288    execution_load_input_objects_latency: Histogram,
289    prepare_certificate_latency: Histogram,
290    commit_certificate_latency: Histogram,
291    db_checkpoint_latency: Histogram,
292
293    // TODO: Rename these metrics.
294    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
295    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
296    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
297    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
298
299    pub(crate) execution_driver_executed_transactions: IntCounter,
300    pub(crate) execution_driver_paused_transactions: IntCounter,
301    pub(crate) execution_driver_dispatch_queue: IntGauge,
302    pub(crate) execution_queueing_delay_s: Histogram,
303    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
304    pub(crate) execution_gas_latency_ratio: Histogram,
305
306    pub(crate) skipped_consensus_txns: IntCounter,
307    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
308    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
309
310    pub(crate) authority_overload_status: IntGauge,
311    pub(crate) authority_load_shedding_percentage: IntGauge,
312
313    pub(crate) transaction_overload_sources: IntCounterVec,
314
315    /// Post processing metrics
316    post_processing_total_events_emitted: IntCounter,
317    post_processing_total_tx_indexed: IntCounter,
318    post_processing_total_tx_had_event_processed: IntCounter,
319    post_processing_total_failures: IntCounter,
320
321    /// Consensus commit and transaction handler metrics
322    pub consensus_handler_processed: IntCounterVec,
323    pub consensus_handler_transaction_sizes: HistogramVec,
324    pub consensus_handler_num_low_scoring_authorities: IntGauge,
325    pub consensus_handler_scores: IntGaugeVec,
326    pub consensus_handler_deferred_transactions: IntCounter,
327    pub consensus_handler_congested_transactions: IntCounter,
328    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
329    pub consensus_handler_cancelled_transactions: IntCounter,
330    pub consensus_handler_max_object_costs: IntGaugeVec,
331    pub consensus_committed_subdags: IntCounterVec,
332    pub consensus_committed_messages: IntGaugeVec,
333    pub consensus_committed_user_transactions: IntGaugeVec,
334    pub consensus_finalized_user_transactions: IntGaugeVec,
335    pub consensus_rejected_user_transactions: IntGaugeVec,
336    pub consensus_calculated_throughput: IntGauge,
337    pub consensus_calculated_throughput_profile: IntGauge,
338    pub consensus_block_handler_block_processed: IntCounter,
339    pub consensus_block_handler_txn_processed: IntCounterVec,
340    pub consensus_block_handler_fastpath_executions: IntCounter,
341    pub consensus_timestamp_bias: Histogram,
342
343    pub limits_metrics: Arc<LimitsMetrics>,
344
345    /// bytecode verifier metrics for tracking timeouts
346    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
347
348    /// Count of zklogin signatures
349    pub zklogin_sig_count: IntCounter,
350    /// Count of multisig signatures
351    pub multisig_sig_count: IntCounter,
352
353    // Tracks recent average txn queueing delay between when it is ready for execution
354    // until it starts executing.
355    pub execution_queueing_latency: LatencyObserver,
356
357    // Tracks the rate of transactions become ready for execution in transaction manager.
358    // The need for the Mutex is that the tracker is updated in transaction manager and read
359    // in the overload_monitor. There should be low mutex contention because
360    // transaction manager is single threaded and the read rate in overload_monitor is
361    // low. In the case where transaction manager becomes multi-threaded, we can
362    // create one rate tracker per thread.
363    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
364
365    // Tracks the rate of transactions starts execution in execution driver.
366    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
367    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
368}
369
370// Override default Prom buckets for positive numbers in 0-10M range
371const POSITIVE_INT_BUCKETS: &[f64] = &[
372    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
373    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
374    7000000., 10000000.,
375];
376
377const LATENCY_SEC_BUCKETS: &[f64] = &[
378    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
379    10., 20., 30., 60., 90.,
380];
381
382// Buckets for low latency samples. Starts from 10us.
383const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
384    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
385    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
386];
387
388// Buckets for consensus timestamp bias in seconds.
389// We expect 200-300ms, so we cluster the buckets more tightly in that range.
390const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
391    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
392    2.0, 10.0, 30.0,
393];
394
395const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
396    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
397    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
398];
399
400pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
401
402// Transaction author should have observed the input objects as finalized output,
403// so usually the wait does not need to be long.
404// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
405pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
406
407impl AuthorityMetrics {
408    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
409        Self {
410            tx_orders: register_int_counter_with_registry!(
411                "total_transaction_orders",
412                "Total number of transaction orders",
413                registry,
414            )
415            .unwrap(),
416            total_certs: register_int_counter_with_registry!(
417                "total_transaction_certificates",
418                "Total number of transaction certificates handled",
419                registry,
420            )
421            .unwrap(),
422            // total_effects == total transactions finished
423            total_effects: register_int_counter_with_registry!(
424                "total_transaction_effects",
425                "Total number of transaction effects produced",
426                registry,
427            )
428            .unwrap(),
429
430            shared_obj_tx: register_int_counter_with_registry!(
431                "num_shared_obj_tx",
432                "Number of transactions involving shared objects",
433                registry,
434            )
435            .unwrap(),
436
437            sponsored_tx: register_int_counter_with_registry!(
438                "num_sponsored_tx",
439                "Number of sponsored transactions",
440                registry,
441            )
442            .unwrap(),
443
444            tx_already_processed: register_int_counter_with_registry!(
445                "num_tx_already_processed",
446                "Number of transaction orders already processed previously",
447                registry,
448            )
449            .unwrap(),
450            num_input_objs: register_histogram_with_registry!(
451                "num_input_objects",
452                "Distribution of number of input TX objects per TX",
453                POSITIVE_INT_BUCKETS.to_vec(),
454                registry,
455            )
456            .unwrap(),
457            num_shared_objects: register_histogram_with_registry!(
458                "num_shared_objects",
459                "Number of shared input objects per TX",
460                POSITIVE_INT_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            batch_size: register_histogram_with_registry!(
465                "batch_size",
466                "Distribution of size of transaction batch",
467                POSITIVE_INT_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
472                "authority_state_handle_vote_transaction_latency",
473                "Latency of voting on transactions without signing",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            internal_execution_latency: register_histogram_with_registry!(
479                "authority_state_internal_execution_latency",
480                "Latency of actual certificate executions",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            execution_load_input_objects_latency: register_histogram_with_registry!(
486                "authority_state_execution_load_input_objects_latency",
487                "Latency of loading input objects for execution",
488                LOW_LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            )
491            .unwrap(),
492            prepare_certificate_latency: register_histogram_with_registry!(
493                "authority_state_prepare_certificate_latency",
494                "Latency of executing certificates, before committing the results",
495                LATENCY_SEC_BUCKETS.to_vec(),
496                registry,
497            )
498            .unwrap(),
499            commit_certificate_latency: register_histogram_with_registry!(
500                "authority_state_commit_certificate_latency",
501                "Latency of committing certificate execution results",
502                LATENCY_SEC_BUCKETS.to_vec(),
503                registry,
504            )
505            .unwrap(),
506            db_checkpoint_latency: register_histogram_with_registry!(
507                "db_checkpoint_latency",
508                "Latency of checkpointing dbs",
509                LATENCY_SEC_BUCKETS.to_vec(),
510                registry,
511            ).unwrap(),
512            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
513                "transaction_manager_num_enqueued_certificates",
514                "Current number of certificates enqueued to ExecutionScheduler",
515                &["result"],
516                registry,
517            )
518            .unwrap(),
519            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
520                "transaction_manager_num_pending_certificates",
521                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
522                registry,
523            )
524            .unwrap(),
525            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
526                "transaction_manager_num_executing_certificates",
527                "Number of executing certificates, including queued and actually running certificates",
528                registry,
529            )
530            .unwrap(),
531            authority_overload_status: register_int_gauge_with_registry!(
532                "authority_overload_status",
533                "Whether authority is current experiencing overload and enters load shedding mode.",
534                registry)
535            .unwrap(),
536            authority_load_shedding_percentage: register_int_gauge_with_registry!(
537                "authority_load_shedding_percentage",
538                "The percentage of transactions is shed when the authority is in load shedding mode.",
539                registry)
540            .unwrap(),
541            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
542                "transaction_manager_transaction_queue_age_s",
543                "Time spent in waiting for transaction in the queue",
544                LATENCY_SEC_BUCKETS.to_vec(),
545                registry,
546            )
547            .unwrap(),
548            transaction_overload_sources: register_int_counter_vec_with_registry!(
549                "transaction_overload_sources",
550                "Number of times each source indicates transaction overload.",
551                &["source"],
552                registry)
553            .unwrap(),
554            execution_driver_executed_transactions: register_int_counter_with_registry!(
555                "execution_driver_executed_transactions",
556                "Cumulative number of transaction executed by execution driver",
557                registry,
558            )
559            .unwrap(),
560            execution_driver_paused_transactions: register_int_counter_with_registry!(
561                "execution_driver_paused_transactions",
562                "Cumulative number of transactions paused by execution driver",
563                registry,
564            )
565            .unwrap(),
566            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
567                "execution_driver_dispatch_queue",
568                "Number of transaction pending in execution driver dispatch queue",
569                registry,
570            )
571            .unwrap(),
572            execution_queueing_delay_s: register_histogram_with_registry!(
573                "execution_queueing_delay_s",
574                "Queueing delay between a transaction is ready for execution until it starts executing.",
575                LATENCY_SEC_BUCKETS.to_vec(),
576                registry
577            )
578            .unwrap(),
579            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
580                "prepare_cert_gas_latency_ratio",
581                "The ratio of computation gas divided by VM execution latency.",
582                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
583                registry
584            )
585            .unwrap(),
586            execution_gas_latency_ratio: register_histogram_with_registry!(
587                "execution_gas_latency_ratio",
588                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
589                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
590                registry
591            )
592            .unwrap(),
593            skipped_consensus_txns: register_int_counter_with_registry!(
594                "skipped_consensus_txns",
595                "Total number of consensus transactions skipped",
596                registry,
597            )
598            .unwrap(),
599            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
600                "skipped_consensus_txns_cache_hit",
601                "Total number of consensus transactions skipped because of local cache hit",
602                registry,
603            )
604            .unwrap(),
605            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
606                "consensus_handler_duplicate_tx_count",
607                "Number of times each transaction appears in its first consensus commit",
608                POSITIVE_INT_BUCKETS.to_vec(),
609                registry,
610            )
611            .unwrap(),
612            post_processing_total_events_emitted: register_int_counter_with_registry!(
613                "post_processing_total_events_emitted",
614                "Total number of events emitted in post processing",
615                registry,
616            )
617            .unwrap(),
618            post_processing_total_tx_indexed: register_int_counter_with_registry!(
619                "post_processing_total_tx_indexed",
620                "Total number of txes indexed in post processing",
621                registry,
622            )
623            .unwrap(),
624            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
625                "post_processing_total_tx_had_event_processed",
626                "Total number of txes finished event processing in post processing",
627                registry,
628            )
629            .unwrap(),
630            post_processing_total_failures: register_int_counter_with_registry!(
631                "post_processing_total_failures",
632                "Total number of failure in post processing",
633                registry,
634            )
635            .unwrap(),
636            consensus_handler_processed: register_int_counter_vec_with_registry!(
637                "consensus_handler_processed",
638                "Number of transactions processed by consensus handler",
639                &["class"],
640                registry
641            ).unwrap(),
642            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
643                "consensus_handler_transaction_sizes",
644                "Sizes of each type of transactions processed by consensus handler",
645                &["class"],
646                POSITIVE_INT_BUCKETS.to_vec(),
647                registry
648            ).unwrap(),
649            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
650                "consensus_handler_num_low_scoring_authorities",
651                "Number of low scoring authorities based on reputation scores from consensus",
652                registry
653            ).unwrap(),
654            consensus_handler_scores: register_int_gauge_vec_with_registry!(
655                "consensus_handler_scores",
656                "scores from consensus for each authority",
657                &["authority"],
658                registry,
659            ).unwrap(),
660            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
661                "consensus_handler_deferred_transactions",
662                "Number of transactions deferred by consensus handler",
663                registry,
664            ).unwrap(),
665            consensus_handler_congested_transactions: register_int_counter_with_registry!(
666                "consensus_handler_congested_transactions",
667                "Number of transactions deferred by consensus handler due to congestion",
668                registry,
669            ).unwrap(),
670            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
671                "consensus_handler_unpaid_amplification_deferrals",
672                "Number of transactions deferred due to unpaid consensus amplification",
673                registry,
674            ).unwrap(),
675            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
676                "consensus_handler_cancelled_transactions",
677                "Number of transactions cancelled by consensus handler",
678                registry,
679            ).unwrap(),
680            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
681                "consensus_handler_max_congestion_control_object_costs",
682                "Max object costs for congestion control in the current consensus commit",
683                &["commit_type"],
684                registry,
685            ).unwrap(),
686            consensus_committed_subdags: register_int_counter_vec_with_registry!(
687                "consensus_committed_subdags",
688                "Number of committed subdags, sliced by author",
689                &["authority"],
690                registry,
691            ).unwrap(),
692            consensus_committed_messages: register_int_gauge_vec_with_registry!(
693                "consensus_committed_messages",
694                "Total number of committed consensus messages, sliced by author",
695                &["authority"],
696                registry,
697            ).unwrap(),
698            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
699                "consensus_committed_user_transactions",
700                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
705                "consensus_finalized_user_transactions",
706                "Number of user transactions finalized, sliced by submitter",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
711                "consensus_rejected_user_transactions",
712                "Number of user transactions rejected, sliced by submitter",
713                &["authority"],
714                registry,
715            ).unwrap(),
716            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
717            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
718            zklogin_sig_count: register_int_counter_with_registry!(
719                "zklogin_sig_count",
720                "Count of zkLogin signatures",
721                registry,
722            )
723            .unwrap(),
724            multisig_sig_count: register_int_counter_with_registry!(
725                "multisig_sig_count",
726                "Count of zkLogin signatures",
727                registry,
728            )
729            .unwrap(),
730            consensus_calculated_throughput: register_int_gauge_with_registry!(
731                "consensus_calculated_throughput",
732                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
733                registry,
734            ).unwrap(),
735            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
736                "consensus_calculated_throughput_profile",
737                "The current active calculated throughput profile",
738                registry
739            ).unwrap(),
740            consensus_block_handler_block_processed: register_int_counter_with_registry!(
741                "consensus_block_handler_block_processed",
742                "Number of blocks processed by consensus block handler.",
743                registry
744            ).unwrap(),
745            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
746                "consensus_block_handler_txn_processed",
747                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
748                &["outcome"],
749                registry
750            ).unwrap(),
751            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
752                "consensus_block_handler_fastpath_executions",
753                "Number of fastpath transactions sent for execution by consensus transaction handler",
754                registry,
755            ).unwrap(),
756            consensus_timestamp_bias: register_histogram_with_registry!(
757                "consensus_timestamp_bias",
758                "Bias/delay from consensus timestamp to system time",
759                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
760                registry
761            ).unwrap(),
762            execution_queueing_latency: LatencyObserver::new(),
763            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
764            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
765        }
766    }
767}
768
769/// a Trait object for `Signer` that is:
770/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
771/// - Sync, i.e. can be safely shared between threads.
772///
773/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
774///
775pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
776
777/// Execution env contains the "environment" for the transaction to be executed in, that is,
778/// all the information necessary for execution that is not specified by the transaction itself.
779#[derive(Debug, Clone)]
780pub struct ExecutionEnv {
781    /// The assigned version of each shared object for the transaction.
782    pub assigned_versions: AssignedVersions,
783    /// The expected digest of the effects of the transaction, if executing from checkpoint or
784    /// other sources where the effects are known in advance.
785    pub expected_effects_digest: Option<TransactionEffectsDigest>,
786    /// Status of the address funds withdraw scheduling of the transaction,
787    /// including both address and object funds withdraws.
788    pub funds_withdraw_status: FundsWithdrawStatus,
789    /// Transactions that must finish before this transaction can be executed.
790    /// Used to schedule barrier transactions after non-exclusive writes.
791    pub barrier_dependencies: Vec<TransactionDigest>,
792}
793
794impl Default for ExecutionEnv {
795    fn default() -> Self {
796        Self {
797            assigned_versions: Default::default(),
798            expected_effects_digest: None,
799            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
800            barrier_dependencies: Default::default(),
801        }
802    }
803}
804
805impl ExecutionEnv {
806    pub fn new() -> Self {
807        Default::default()
808    }
809
810    pub fn with_expected_effects_digest(
811        mut self,
812        expected_effects_digest: TransactionEffectsDigest,
813    ) -> Self {
814        self.expected_effects_digest = Some(expected_effects_digest);
815        self
816    }
817
818    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
819        self.assigned_versions = assigned_versions;
820        self
821    }
822
823    pub fn with_insufficient_funds(mut self) -> Self {
824        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
825        self
826    }
827
828    pub fn with_barrier_dependencies(
829        mut self,
830        barrier_dependencies: BTreeSet<TransactionDigest>,
831    ) -> Self {
832        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
833        self
834    }
835}
836
837#[derive(Debug)]
838pub struct ForkRecoveryState {
839    /// Transaction digest to effects digest overrides
840    transaction_overrides:
841        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
842}
843
844impl Default for ForkRecoveryState {
845    fn default() -> Self {
846        Self {
847            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
848        }
849    }
850}
851
852impl ForkRecoveryState {
853    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
854        let Some(config) = config else {
855            return Ok(Self::default());
856        };
857
858        let mut transaction_overrides = HashMap::new();
859        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
860            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
861                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
862            })?;
863            let effects_digest =
864                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
865                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
866                })?;
867            transaction_overrides.insert(tx_digest, effects_digest);
868        }
869
870        Ok(Self {
871            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
872        })
873    }
874
875    pub fn get_transaction_override(
876        &self,
877        tx_digest: &TransactionDigest,
878    ) -> Option<TransactionEffectsDigest> {
879        self.transaction_overrides.read().get(tx_digest).copied()
880    }
881}
882
883pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
884
885pub struct AuthorityState {
886    // Fixed size, static, identity of the authority
887    /// The name of this authority.
888    pub name: AuthorityName,
889    /// The signature key of the authority.
890    pub secret: StableSyncAuthoritySigner,
891
892    /// The database
893    input_loader: TransactionInputLoader,
894    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
895    coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
896
897    epoch_store: ArcSwap<AuthorityPerEpochStore>,
898
899    /// This lock denotes current 'execution epoch'.
900    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
901    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
902    /// from previous epoch that are executed but did not make into checkpoint.
903    execution_lock: RwLock<EpochId>,
904
905    pub indexes: Option<Arc<IndexStore>>,
906    pub rpc_index: Option<Arc<RpcIndexStore>>,
907
908    pub subscription_handler: Arc<SubscriptionHandler>,
909    pub checkpoint_store: Arc<CheckpointStore>,
910
911    committee_store: Arc<CommitteeStore>,
912
913    /// Schedules transaction execution.
914    execution_scheduler: Arc<ExecutionScheduler>,
915
916    /// Shuts down the execution task. Used only in testing.
917    #[allow(unused)]
918    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
919
920    pub metrics: Arc<AuthorityMetrics>,
921    _pruner: AuthorityStorePruner,
922    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
923
924    /// Take db checkpoints of different dbs
925    db_checkpoint_config: DBCheckpointConfig,
926
927    pub config: NodeConfig,
928
929    /// Current overload status in this authority. Updated periodically.
930    pub overload_info: AuthorityOverloadInfo,
931
932    /// The chain identifier is derived from the digest of the genesis checkpoint.
933    chain_identifier: ChainIdentifier,
934
935    pub(crate) congestion_tracker: Arc<CongestionTracker>,
936
937    /// Traffic controller for Sui core servers (json-rpc, validator service)
938    pub traffic_controller: Option<Arc<TrafficController>>,
939
940    /// Fork recovery state for handling equivocation after forks
941    fork_recovery_state: Option<ForkRecoveryState>,
942
943    /// Notification channel for reconfiguration
944    notify_epoch: tokio::sync::watch::Sender<EpochId>,
945
946    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
947    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
948
949    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
950    /// CheckpointExecutor removes entries and collects the index batches before committing
951    /// them atomically at checkpoint boundaries.
952    pending_post_processing:
953        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
954
955    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
956    /// the blocking thread pool. Defaults to the number of available CPUs.
957    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
958}
959
960/// The authority state encapsulates all state, drives execution, and ensures safety.
961///
962/// Note the authority operations can be accessed through a read ref (&) and do not
963/// require &mut. Internally a database is synchronized through a mutex lock.
964///
965/// Repeating valid commands should produce no changes and return no error.
966impl AuthorityState {
967    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
968        epoch_store.committee().authority_exists(&self.name)
969    }
970
971    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
972        !self.is_validator(epoch_store)
973    }
974
975    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
976        &self.committee_store
977    }
978
979    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
980        self.committee_store.clone()
981    }
982
983    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
984        &self.config.authority_overload_config
985    }
986
987    pub fn get_epoch_state_commitments(
988        &self,
989        epoch: EpochId,
990    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
991        self.checkpoint_store.get_epoch_state_commitments(epoch)
992    }
993
994    /// Runs deny list checks and processes funds withdrawals. Called before loading input
995    /// objects, since these checks don't depend on object state.
996    fn pre_object_load_checks(
997        &self,
998        tx_data: &TransactionData,
999        tx_signatures: &[GenericSignature],
1000        input_object_kinds: &[InputObjectKind],
1001        receiving_objects_refs: &[ObjectRef],
1002    ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1003        // Note: the deny checks may do redundant package loads but:
1004        // - they only load packages when there is an active package deny map
1005        // - the loads are cached anyway
1006        sui_transaction_checks::deny::check_transaction_for_signing(
1007            tx_data,
1008            tx_signatures,
1009            input_object_kinds,
1010            receiving_objects_refs,
1011            &self.config.transaction_deny_config,
1012            self.get_backing_package_store().as_ref(),
1013        )?;
1014
1015        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1016            self.chain_identifier,
1017            self.coin_reservation_resolver.as_ref(),
1018        )?;
1019
1020        self.execution_cache_trait_pointers
1021            .account_funds_read
1022            .check_amounts_available(&declared_withdrawals)?;
1023
1024        Ok(declared_withdrawals)
1025    }
1026
1027    fn handle_transaction_deny_checks(
1028        &self,
1029        transaction: &VerifiedTransaction,
1030        epoch_store: &Arc<AuthorityPerEpochStore>,
1031    ) -> SuiResult<CheckedInputObjects> {
1032        let tx_digest = transaction.digest();
1033        let tx_data = transaction.data().transaction_data();
1034
1035        let input_object_kinds = tx_data.input_objects()?;
1036        let receiving_objects_refs = tx_data.receiving_objects();
1037
1038        self.pre_object_load_checks(
1039            tx_data,
1040            transaction.tx_signatures(),
1041            &input_object_kinds,
1042            &receiving_objects_refs,
1043        )?;
1044
1045        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1046            Some(tx_digest),
1047            &input_object_kinds,
1048            &receiving_objects_refs,
1049            epoch_store.epoch(),
1050        )?;
1051
1052        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1053            epoch_store.protocol_config(),
1054            epoch_store.reference_gas_price(),
1055            tx_data,
1056            input_objects,
1057            &receiving_objects,
1058            &self.metrics.bytecode_verifier_metrics,
1059            &self.config.verifier_signing_config,
1060        )?;
1061
1062        self.handle_coin_deny_list_checks(
1063            tx_data,
1064            &checked_input_objects,
1065            &receiving_objects,
1066            epoch_store,
1067        )?;
1068
1069        Ok(checked_input_objects)
1070    }
1071
1072    fn handle_coin_deny_list_checks(
1073        &self,
1074        tx_data: &TransactionData,
1075        checked_input_objects: &CheckedInputObjects,
1076        receiving_objects: &ReceivingObjects,
1077        epoch_store: &Arc<AuthorityPerEpochStore>,
1078    ) -> SuiResult<()> {
1079        use sui_types::balance::Balance;
1080
1081        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1082            self.chain_identifier,
1083            self.coin_reservation_resolver.as_ref(),
1084        )?;
1085
1086        let funds_withdraw_types = declared_withdrawals
1087            .values()
1088            .filter_map(|(_, type_tag)| {
1089                Balance::maybe_get_balance_type_param(type_tag)
1090                    .map(|ty| ty.to_canonical_string(false))
1091            })
1092            .collect::<BTreeSet<_>>();
1093
1094        if epoch_store.coin_deny_list_v1_enabled() {
1095            check_coin_deny_list_v1(
1096                tx_data.sender(),
1097                checked_input_objects,
1098                receiving_objects,
1099                funds_withdraw_types.clone(),
1100                &self.get_object_store(),
1101            )?;
1102        }
1103
1104        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1105            check_coin_deny_list_v2_during_signing(
1106                tx_data.sender(),
1107                checked_input_objects,
1108                receiving_objects,
1109                funds_withdraw_types.clone(),
1110                &self.get_object_store(),
1111            )?;
1112        }
1113
1114        Ok(())
1115    }
1116
1117    /// Vote for a transaction, either when validator receives a submit_transaction request,
1118    /// or sees a transaction from consensus. Performs the same types of checks as
1119    /// transaction signing, but does not explicitly sign the transaction.
1120    /// Note that if the transaction has been executed, we still go through the
1121    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1122    /// but not yet finalized, the signing will still work since the objects are still available.
1123    /// But if the transaction has been finalized, the signing will fail.
1124    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1125    /// has already been finalized executed.
1126    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1127    pub fn handle_vote_transaction(
1128        &self,
1129        epoch_store: &Arc<AuthorityPerEpochStore>,
1130        transaction: VerifiedTransaction,
1131    ) -> SuiResult<()> {
1132        debug!("handle_vote_transaction");
1133
1134        let _metrics_guard = self
1135            .metrics
1136            .authority_state_handle_vote_transaction_latency
1137            .start_timer();
1138        self.metrics.tx_orders.inc();
1139
1140        // The should_accept_user_certs check here is best effort, because
1141        // between a validator signs a tx and a cert is formed, the validator
1142        // could close the window.
1143        if !epoch_store
1144            .get_reconfig_state_read_lock_guard()
1145            .should_accept_user_certs()
1146        {
1147            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1148        }
1149
1150        // Accept finalized transactions, instead of voting to reject them.
1151        // Checking executed transactions is limited to the current epoch.
1152        // Otherwise there can be a race where the transaction is accepted because it has executed,
1153        // but the executed effects are pruned post consensus, leading to failures.
1154        let tx_digest = *transaction.digest();
1155        if epoch_store.is_recently_finalized(&tx_digest)
1156            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1157        {
1158            assert_reachable!("transaction recently executed");
1159            return Ok(());
1160        }
1161
1162        if self
1163            .get_transaction_cache_reader()
1164            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1165        {
1166            assert_reachable!("transaction executed in last epoch");
1167            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1168                digest: (*transaction.digest()),
1169            }
1170            .into());
1171        }
1172
1173        // Ensure that validator cannot reconfigure while we are validating the transaction.
1174        let _execution_lock = self.execution_lock_for_validation()?;
1175
1176        let checked_input_objects =
1177            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1178
1179        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1180
1181        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1182        // in the consensus handler. Validation still runs to prevent spam transactions with
1183        // invalid object versions, and is necessary to handle recently created objects.
1184        self.get_cache_writer()
1185            .validate_owned_object_versions(&owned_objects)
1186    }
1187
1188    /// Used for early client validation check for transactions before submission to server.
1189    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1190    /// This allows for fast failure feedback to clients for non-retriable errors.
1191    ///
1192    /// The key addition is checking that owned object versions match live object versions.
1193    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1194    /// requested version (which may exist in storage as historical versions), whereas
1195    /// validators check against the live/current version during locking (verify_live_object).
1196    pub fn check_transaction_validity(
1197        &self,
1198        epoch_store: &Arc<AuthorityPerEpochStore>,
1199        transaction: &VerifiedTransaction,
1200        enforce_live_input_objects: bool,
1201    ) -> SuiResult<()> {
1202        if !epoch_store
1203            .get_reconfig_state_read_lock_guard()
1204            .should_accept_user_certs()
1205        {
1206            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1207        }
1208
1209        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1210
1211        let checked_input_objects =
1212            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1213
1214        // Check that owned object versions match live objects
1215        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1216        let cache_reader = self.get_object_cache_reader();
1217
1218        for obj_ref in &owned_objects {
1219            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1220                // Only reject if transaction references an old version. Allow newer
1221                // versions that may exist on validators but not yet synced to fullnode.
1222                if obj_ref.1 < live_object.version() {
1223                    return Err(SuiErrorKind::UserInputError {
1224                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1225                            provided_obj_ref: *obj_ref,
1226                            current_version: live_object.version(),
1227                        },
1228                    }
1229                    .into());
1230                }
1231
1232                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1233                    // Returns a non-retriable error when the input object version is required to be live.
1234                    return Err(SuiErrorKind::UserInputError {
1235                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1236                            provided_obj_ref: *obj_ref,
1237                            current_version: live_object.version(),
1238                        },
1239                    }
1240                    .into());
1241                }
1242
1243                // If version matches, verify digest also matches
1244                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1245                    return Err(SuiErrorKind::UserInputError {
1246                        error: UserInputError::InvalidObjectDigest {
1247                            object_id: obj_ref.0,
1248                            expected_digest: live_object.digest(),
1249                        },
1250                    }
1251                    .into());
1252                }
1253            }
1254        }
1255
1256        Ok(())
1257    }
1258
1259    pub fn check_system_overload_at_signing(&self) -> bool {
1260        self.config
1261            .authority_overload_config
1262            .check_system_overload_at_signing
1263    }
1264
1265    pub fn check_system_overload_at_execution(&self) -> bool {
1266        self.config
1267            .authority_overload_config
1268            .check_system_overload_at_execution
1269    }
1270
1271    pub(crate) fn check_system_overload(
1272        &self,
1273        consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1274        tx_data: &SenderSignedData,
1275        do_authority_overload_check: bool,
1276    ) -> SuiResult {
1277        if do_authority_overload_check {
1278            self.check_authority_overload(tx_data).tap_err(|_| {
1279                self.update_overload_metrics("execution_queue");
1280            })?;
1281        }
1282        self.execution_scheduler
1283            .check_execution_overload(self.overload_config(), tx_data)
1284            .tap_err(|_| {
1285                self.update_overload_metrics("execution_pending");
1286            })?;
1287        consensus_overload_checker
1288            .check_consensus_overload()
1289            .tap_err(|_| {
1290                self.update_overload_metrics("consensus");
1291            })?;
1292
1293        let pending_tx_count = self
1294            .get_cache_commit()
1295            .approximate_pending_transaction_count();
1296        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1297            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1298                retry_after_secs: 10,
1299            }
1300            .into());
1301        }
1302
1303        Ok(())
1304    }
1305
1306    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1307        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1308            return Ok(());
1309        }
1310
1311        let load_shedding_percentage = self
1312            .overload_info
1313            .load_shedding_percentage
1314            .load(Ordering::Relaxed);
1315        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1316    }
1317
1318    fn update_overload_metrics(&self, source: &str) {
1319        self.metrics
1320            .transaction_overload_sources
1321            .with_label_values(&[source])
1322            .inc();
1323    }
1324
1325    /// Waits for fastpath (owned, package) dependency objects to become available.
1326    /// Returns true if a chosen set of fastpath dependency objects are available,
1327    /// returns false otherwise after an internal timeout.
1328    pub(crate) async fn wait_for_fastpath_dependency_objects(
1329        &self,
1330        transaction: &VerifiedTransaction,
1331        epoch: EpochId,
1332    ) -> SuiResult<bool> {
1333        let txn_data = transaction.data().transaction_data();
1334        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1335
1336        // Gather and filter input objects to wait for.
1337        let fastpath_dependency_objects: Vec<_> = move_objects
1338            .into_iter()
1339            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1340            .chain(
1341                packages
1342                    .into_iter()
1343                    .map(|package_id| InputKey::Package { id: package_id }),
1344            )
1345            .collect();
1346        let receiving_keys: HashSet<_> = receiving_objects
1347            .into_iter()
1348            .filter_map(|receiving_obj_ref| {
1349                self.should_wait_for_dependency_object(receiving_obj_ref)
1350            })
1351            .collect();
1352        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1353            return Ok(true);
1354        }
1355
1356        // Use shorter wait timeout in simtests to exercise server-side error paths and
1357        // client-side retry logic.
1358        let max_wait = if cfg!(msim) {
1359            Duration::from_millis(200)
1360        } else {
1361            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1362        };
1363
1364        match timeout(
1365            max_wait,
1366            self.get_object_cache_reader().notify_read_input_objects(
1367                &fastpath_dependency_objects,
1368                &receiving_keys,
1369                epoch,
1370            ),
1371        )
1372        .await
1373        {
1374            Ok(()) => Ok(true),
1375            // Maybe return an error for unavailable input objects,
1376            // and allow the caller to skip the rest of input checks?
1377            Err(_) => Ok(false),
1378        }
1379    }
1380
1381    /// Returns Some(inputKey) if the object reference should be waited on until it is
1382    /// finalized, before proceeding to input checks.
1383    ///
1384    /// Incorrect decisions here should only affect user experience, not safety:
1385    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1386    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1387    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1388        let (obj_id, cur_version, _digest) = obj_ref;
1389        let Some(latest_obj_ref) = self
1390            .get_object_cache_reader()
1391            .get_latest_object_ref_or_tombstone(obj_id)
1392        else {
1393            // Object might not have been created.
1394            return Some(InputKey::VersionedObject {
1395                id: FullObjectID::new(obj_id, None),
1396                version: cur_version,
1397            });
1398        };
1399        let latest_digest = latest_obj_ref.2;
1400        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1401            // Do not wait for deleted object and rely on input check instead.
1402            return None;
1403        }
1404        let latest_version = latest_obj_ref.1;
1405        if cur_version <= latest_version {
1406            // Do not wait for version that already exists or has been consumed.
1407            // Let the input check to handle them and return the proper error.
1408            return None;
1409        }
1410        // Wait for the object version to become available.
1411        Some(InputKey::VersionedObject {
1412            id: FullObjectID::new(obj_id, None),
1413            version: cur_version,
1414        })
1415    }
1416
1417    /// Wait for a transaction to be executed.
1418    /// For consensus transactions, it needs to be sequenced by the consensus.
1419    /// For owned object transactions, this function will enqueue the transaction for execution.
1420    ///
1421    /// Only use this in tests.
1422    #[instrument(level = "trace", skip_all)]
1423    pub async fn wait_for_transaction_execution_for_testing(
1424        &self,
1425        transaction: &VerifiedExecutableTransaction,
1426        epoch_store: &Arc<AuthorityPerEpochStore>,
1427    ) -> TransactionEffects {
1428        if !transaction.is_consensus_tx()
1429            && !epoch_store.protocol_config().disable_preconsensus_locking()
1430        {
1431            // Shared object transactions need to be sequenced by the consensus before enqueueing
1432            // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
1433            //
1434            // For owned object transactions, they can be enqueued for execution immediately
1435            // ONLY when disable_preconsensus_locking is false (QD/original fastpath mode).
1436            // When disable_preconsensus_locking is true (MFP mode), all transactions including
1437            // owned object transactions must go through consensus before enqueuing for execution.
1438            self.execution_scheduler.enqueue(
1439                vec![(
1440                    Schedulable::Transaction(transaction.clone()),
1441                    ExecutionEnv::new(),
1442                )],
1443                epoch_store,
1444            );
1445        }
1446
1447        self.notify_read_effects_for_testing(
1448            "AuthorityState::wait_for_transaction_execution_for_testing",
1449            *transaction.digest(),
1450        )
1451        .await
1452    }
1453
1454    /// Internal logic to execute a certificate.
1455    ///
1456    /// Guarantees that
1457    /// - If input objects are available, return no permanent failure.
1458    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1459    /// on successful execution; crashed execution has no observable effect and can be retried.
1460    ///
1461    /// It is caller's responsibility to ensure input objects are available and locks are set.
1462    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1463    ///
1464    /// Should only be called within sui-core.
1465    #[instrument(level = "trace", skip_all)]
1466    pub async fn try_execute_immediately(
1467        &self,
1468        certificate: &VerifiedExecutableTransaction,
1469        mut execution_env: ExecutionEnv,
1470        epoch_store: &Arc<AuthorityPerEpochStore>,
1471    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1472        let _scope = monitored_scope("Execution::try_execute_immediately");
1473        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1474
1475        let tx_digest = certificate.digest();
1476
1477        if let Some(fork_recovery) = &self.fork_recovery_state
1478            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1479        {
1480            warn!(
1481                ?tx_digest,
1482                original_digest = ?execution_env.expected_effects_digest,
1483                override_digest = ?override_digest,
1484                "Applying fork recovery override for transaction effects digest"
1485            );
1486            execution_env.expected_effects_digest = Some(override_digest);
1487        }
1488
1489        // prevent concurrent executions of the same tx.
1490        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1491
1492        let tx_cache_reader = self.get_transaction_cache_reader();
1493        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1494            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1495                assert_eq!(
1496                    effects.digest(),
1497                    expected_effects_digest,
1498                    "Unexpected effects digest for transaction {:?}",
1499                    tx_digest
1500                );
1501            }
1502            tx_guard.release();
1503            return ExecutionOutput::Success((effects, None));
1504        }
1505
1506        let execution_start_time = Instant::now();
1507
1508        // Any caller that verifies the signatures on the certificate will have already checked the
1509        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1510        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1511        // epoch, then it's invalid.
1512        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1513        else {
1514            tx_guard.release();
1515            return ExecutionOutput::EpochEnded;
1516        };
1517        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1518        // possible that reconfiguration has happened and they no longer match.
1519        // TODO: We may not need the following check anymore since the scheduler
1520        // should have checked that the certificate is from the same epoch as epoch_store.
1521        if *execution_guard != epoch_store.epoch() {
1522            tx_guard.release();
1523            info!("The epoch of the execution_guard doesn't match the epoch store");
1524            return ExecutionOutput::EpochEnded;
1525        }
1526
1527        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1528
1529        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1530            &tx_guard,
1531            &execution_guard,
1532            certificate,
1533            execution_env,
1534            epoch_store,
1535        ) {
1536            ExecutionOutput::Success(result) => result,
1537            output => return output.unwrap_err(),
1538        };
1539        let transaction_outputs = Arc::new(transaction_outputs);
1540
1541        fail_point!("crash");
1542
1543        let effects = transaction_outputs.effects.clone();
1544        debug!(
1545            ?tx_digest,
1546            fx_digest=?effects.digest(),
1547            "process_certificate succeeded in {:.3}ms",
1548            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1549        );
1550
1551        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1552        if let Err(err) = commit_result {
1553            error!(?tx_digest, "Error committing transaction: {err}");
1554            tx_guard.release();
1555            return ExecutionOutput::Fatal(err);
1556        }
1557
1558        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1559            certificate.data().transaction_data().kind()
1560        {
1561            if let Some(err) = &execution_error_opt {
1562                debug_fatal!("Authenticator state update failed: {:?}", err);
1563            }
1564            epoch_store.update_authenticator_state(auth_state);
1565
1566            // double check that the signature verifier always matches the authenticator state
1567            if cfg!(debug_assertions) {
1568                let authenticator_state = get_authenticator_state(self.get_object_store())
1569                    .expect("Read cannot fail")
1570                    .expect("Authenticator state must exist");
1571
1572                let mut sys_jwks: Vec<_> = authenticator_state
1573                    .active_jwks
1574                    .into_iter()
1575                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1576                    .collect();
1577                let mut active_jwks: Vec<_> = epoch_store
1578                    .signature_verifier
1579                    .get_jwks()
1580                    .into_iter()
1581                    .collect();
1582                sys_jwks.sort();
1583                active_jwks.sort();
1584
1585                assert_eq!(sys_jwks, active_jwks);
1586            }
1587        }
1588
1589        // TODO: We should also settle address funds during execution,
1590        // instead of from checkpoint builder. It will improve performance
1591        // since we will be settling as soon as we can.
1592        if certificate
1593            .transaction_data()
1594            .kind()
1595            .is_accumulator_barrier_settle_tx()
1596        {
1597            let object_funds_checker = self.object_funds_checker.load();
1598            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1599                // unwrap safe because we assign accumulator version for every transaction
1600                // when accumulator is enabled.
1601                let next_accumulator_version = accumulator_version.unwrap().next();
1602                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1603            }
1604        }
1605
1606        tx_guard.commit_tx();
1607
1608        let elapsed = execution_start_time.elapsed();
1609        epoch_store.record_local_execution_time(
1610            certificate.data().transaction_data(),
1611            &effects,
1612            timings,
1613            elapsed,
1614        );
1615
1616        let elapsed_us = elapsed.as_micros() as f64;
1617        if elapsed_us > 0.0 {
1618            self.metrics
1619                .execution_gas_latency_ratio
1620                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1621        };
1622        ExecutionOutput::Success((effects, execution_error_opt))
1623    }
1624
1625    pub fn read_objects_for_execution(
1626        &self,
1627        tx_lock: &CertLockGuard,
1628        certificate: &VerifiedExecutableTransaction,
1629        assigned_shared_object_versions: &AssignedVersions,
1630        epoch_store: &Arc<AuthorityPerEpochStore>,
1631    ) -> SuiResult<InputObjects> {
1632        let _scope = monitored_scope("Execution::load_input_objects");
1633        let _metrics_guard = self
1634            .metrics
1635            .execution_load_input_objects_latency
1636            .start_timer();
1637        let input_objects = &certificate.data().transaction_data().input_objects()?;
1638        self.input_loader.read_objects_for_execution(
1639            &certificate.key(),
1640            tx_lock,
1641            input_objects,
1642            assigned_shared_object_versions,
1643            epoch_store.epoch(),
1644        )
1645    }
1646
1647    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1648    /// transactions. Assumes execution will always succeed.
1649    pub async fn try_execute_for_test(
1650        &self,
1651        certificate: &VerifiedCertificate,
1652        execution_env: ExecutionEnv,
1653    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1654        let epoch_store = self.epoch_store_for_testing();
1655        let (effects, execution_error_opt) = self
1656            .try_execute_immediately(
1657                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1658                execution_env,
1659                &epoch_store,
1660            )
1661            .await
1662            .unwrap();
1663        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1664        (signed_effects, execution_error_opt)
1665    }
1666
1667    pub async fn try_execute_executable_for_test(
1668        &self,
1669        executable: &VerifiedExecutableTransaction,
1670        execution_env: ExecutionEnv,
1671    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1672        let epoch_store = self.epoch_store_for_testing();
1673        let (effects, execution_error_opt) = self
1674            .try_execute_immediately(executable, execution_env, &epoch_store)
1675            .await
1676            .unwrap();
1677        self.flush_post_processing(executable.digest()).await;
1678        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1679        (signed_effects, execution_error_opt)
1680    }
1681
1682    /// Wait until the effects of the given transaction are available and return them.
1683    /// Panics if the effects are not found.
1684    ///
1685    /// Only use this in tests where effects are expected to exist.
1686    pub async fn notify_read_effects_for_testing(
1687        &self,
1688        task_name: &'static str,
1689        digest: TransactionDigest,
1690    ) -> TransactionEffects {
1691        self.get_transaction_cache_reader()
1692            .notify_read_executed_effects(task_name, &[digest])
1693            .await
1694            .pop()
1695            .expect("must return correct number of effects")
1696    }
1697
1698    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1699        self.get_object_cache_reader()
1700            .check_owned_objects_are_live(owned_object_refs)
1701    }
1702
1703    /// This function captures the required state to debug a forked transaction.
1704    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1705    /// NOTE: Since this info escapes the validator context,
1706    /// make sure not to leak any private info here
1707    pub(crate) fn debug_dump_transaction_state(
1708        &self,
1709        tx_digest: &TransactionDigest,
1710        effects: &TransactionEffects,
1711        expected_effects_digest: TransactionEffectsDigest,
1712        inner_temporary_store: &InnerTemporaryStore,
1713        certificate: &VerifiedExecutableTransaction,
1714        debug_dump_config: &StateDebugDumpConfig,
1715    ) -> SuiResult<PathBuf> {
1716        let dump_dir = debug_dump_config
1717            .dump_file_directory
1718            .as_ref()
1719            .cloned()
1720            .unwrap_or(std::env::temp_dir());
1721        let epoch_store = self.load_epoch_store_one_call_per_task();
1722
1723        NodeStateDump::new(
1724            tx_digest,
1725            effects,
1726            expected_effects_digest,
1727            self.get_object_store().as_ref(),
1728            &epoch_store,
1729            inner_temporary_store,
1730            certificate,
1731        )?
1732        .write_to_file(&dump_dir)
1733        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1734    }
1735
1736    #[instrument(level = "trace", skip_all)]
1737    pub(crate) fn process_certificate(
1738        &self,
1739        tx_guard: &CertTxGuard,
1740        execution_guard: &ExecutionLockReadGuard<'_>,
1741        certificate: &VerifiedExecutableTransaction,
1742        execution_env: ExecutionEnv,
1743        epoch_store: &Arc<AuthorityPerEpochStore>,
1744    ) -> ExecutionOutput<(
1745        TransactionOutputs,
1746        Vec<ExecutionTiming>,
1747        Option<ExecutionError>,
1748    )> {
1749        let _scope = monitored_scope("Execution::process_certificate");
1750        let tx_digest = *certificate.digest();
1751
1752        let input_objects = match self.read_objects_for_execution(
1753            tx_guard.as_lock_guard(),
1754            certificate,
1755            &execution_env.assigned_versions,
1756            epoch_store,
1757        ) {
1758            Ok(objects) => objects,
1759            Err(e) => return ExecutionOutput::Fatal(e),
1760        };
1761
1762        let expected_effects_digest = match execution_env.expected_effects_digest {
1763            Some(expected_effects_digest) => Some(expected_effects_digest),
1764            None => {
1765                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1766                // restarting with a new binary. In this situation, if we have published an effects signature,
1767                // we must be sure not to equivocate.
1768                // TODO: read from cache instead of DB
1769                match epoch_store.get_signed_effects_digest(&tx_digest) {
1770                    Ok(digest) => digest,
1771                    Err(e) => return ExecutionOutput::Fatal(e),
1772                }
1773            }
1774        };
1775
1776        fail_point_if!("correlated-crash-process-certificate", || {
1777            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1778                sui_simulator::task::kill_current_node(None);
1779            }
1780        });
1781
1782        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1783        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1784        // this function occur before we have written anything to the db, so we commit the tx
1785        // guard and rely on the client to retry the tx (if it was transient).
1786        self.execute_certificate(
1787            execution_guard,
1788            certificate,
1789            input_objects,
1790            expected_effects_digest,
1791            execution_env,
1792            epoch_store,
1793        )
1794    }
1795
1796    pub async fn reconfigure_traffic_control(
1797        &self,
1798        params: TrafficControlReconfigParams,
1799    ) -> Result<TrafficControlReconfigParams, SuiError> {
1800        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1801            traffic_controller.admin_reconfigure(params).await
1802        } else {
1803            Err(SuiErrorKind::InvalidAdminRequest(
1804                "Traffic controller is not configured on this node".to_string(),
1805            )
1806            .into())
1807        }
1808    }
1809
1810    #[instrument(level = "trace", skip_all)]
1811    fn commit_certificate(
1812        &self,
1813        certificate: &VerifiedExecutableTransaction,
1814        transaction_outputs: Arc<TransactionOutputs>,
1815        epoch_store: &Arc<AuthorityPerEpochStore>,
1816    ) -> SuiResult {
1817        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1818            monitored_scope("Execution::commit_certificate");
1819        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1820
1821        let tx_digest = certificate.digest();
1822
1823        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1824        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1825        epoch_store.insert_executed_in_epoch(tx_digest);
1826        let key = certificate.key();
1827        if !matches!(key, TransactionKey::Digest(_)) {
1828            epoch_store.insert_tx_key(key, *tx_digest)?;
1829        }
1830
1831        // Allow testing what happens if we crash here.
1832        fail_point!("crash");
1833
1834        self.get_cache_writer()
1835            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1836
1837        // Fire an in-memory-only notification so the scheduler can detect that this
1838        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1839        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1840        // this to the DB to avoid stale entries surviving a crash while effects may not.
1841        if let Some(settlement_key) = certificate
1842            .transaction_data()
1843            .kind()
1844            .accumulator_barrier_settlement_key()
1845        {
1846            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1847        }
1848
1849        if certificate.transaction_data().is_end_of_epoch_tx() {
1850            // At the end of epoch, since system packages may have been upgraded, force
1851            // reload them in the cache.
1852            self.get_object_cache_reader()
1853                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1854        }
1855
1856        Ok(())
1857    }
1858
1859    fn update_metrics(
1860        &self,
1861        certificate: &VerifiedExecutableTransaction,
1862        inner_temporary_store: &InnerTemporaryStore,
1863        effects: &TransactionEffects,
1864    ) {
1865        // count signature by scheme, for zklogin and multisig
1866        if certificate.has_zklogin_sig() {
1867            self.metrics.zklogin_sig_count.inc();
1868        } else if certificate.has_upgraded_multisig() {
1869            self.metrics.multisig_sig_count.inc();
1870        }
1871
1872        self.metrics.total_effects.inc();
1873        self.metrics.total_certs.inc();
1874
1875        let consensus_object_count = effects.input_consensus_objects().len();
1876        if consensus_object_count > 0 {
1877            self.metrics.shared_obj_tx.inc();
1878        }
1879
1880        if certificate.is_sponsored_tx() {
1881            self.metrics.sponsored_tx.inc();
1882        }
1883
1884        let input_object_count = inner_temporary_store.input_objects.len();
1885        self.metrics
1886            .num_input_objs
1887            .observe(input_object_count as f64);
1888        self.metrics
1889            .num_shared_objects
1890            .observe(consensus_object_count as f64);
1891        self.metrics.batch_size.observe(
1892            certificate
1893                .data()
1894                .intent_message()
1895                .value
1896                .kind()
1897                .num_commands() as f64,
1898        );
1899    }
1900
1901    /// Helper function that handles transaction rewriting for coin reservations and executes
1902    /// the transaction to effects. Returns the execution results along with the command offset
1903    /// used for rewriting failure command indices.
1904    fn execute_transaction_to_effects(
1905        &self,
1906        executor: &dyn Executor,
1907        store: &dyn BackingStore,
1908        protocol_config: &ProtocolConfig,
1909        enable_expensive_checks: bool,
1910        execution_params: ExecutionOrEarlyError,
1911        epoch_id: &EpochId,
1912        epoch_timestamp_ms: u64,
1913        input_objects: CheckedInputObjects,
1914        gas_data: GasData,
1915        gas_status: SuiGasStatus,
1916        sender: SuiAddress,
1917        mut kind: TransactionKind,
1918        signer: SuiAddress,
1919        tx_digest: TransactionDigest,
1920        accumulator_version: Option<SequenceNumber>,
1921    ) -> (
1922        InnerTemporaryStore,
1923        SuiGasStatus,
1924        TransactionEffects,
1925        Vec<ExecutionTiming>,
1926        Result<(), ExecutionError>,
1927    ) {
1928        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
1929            self.chain_identifier,
1930            &*self.coin_reservation_resolver,
1931            sender,
1932            &mut kind,
1933            accumulator_version,
1934        );
1935
1936        let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1937            // TODO only run this function on FullNodes, use `execute_transaction_to_effects` on validators.
1938            .execute_transaction_to_effects_and_execution_error(
1939                store,
1940                protocol_config,
1941                self.metrics.limits_metrics.clone(),
1942                enable_expensive_checks,
1943                execution_params,
1944                epoch_id,
1945                epoch_timestamp_ms,
1946                input_objects,
1947                gas_data,
1948                gas_status,
1949                kind,
1950                rewritten_inputs,
1951                signer,
1952                tx_digest,
1953                &mut None,
1954            );
1955
1956        (
1957            inner_temp_store,
1958            gas_status,
1959            effects,
1960            timings,
1961            execution_error,
1962        )
1963    }
1964
1965    /// execute_certificate validates the transaction input, and executes the certificate,
1966    /// returning transaction outputs.
1967    ///
1968    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1969    ///
1970    /// Executes a certificate and returns an ExecutionOutput.
1971    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1972    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1973    #[instrument(level = "trace", skip_all)]
1974    fn execute_certificate(
1975        &self,
1976        _execution_guard: &ExecutionLockReadGuard<'_>,
1977        certificate: &VerifiedExecutableTransaction,
1978        input_objects: InputObjects,
1979        expected_effects_digest: Option<TransactionEffectsDigest>,
1980        execution_env: ExecutionEnv,
1981        epoch_store: &Arc<AuthorityPerEpochStore>,
1982    ) -> ExecutionOutput<(
1983        TransactionOutputs,
1984        Vec<ExecutionTiming>,
1985        Option<ExecutionError>,
1986    )> {
1987        let _scope = monitored_scope("Execution::prepare_certificate");
1988        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1989        let prepare_certificate_start_time = tokio::time::Instant::now();
1990
1991        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1992        let tx_data = certificate.data().transaction_data();
1993
1994        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1995            return ExecutionOutput::Fatal(e);
1996        }
1997
1998        // The cost of partially re-auditing a transaction before execution is tolerated.
1999        // This step is required for correctness because, for example, ConsensusAddressOwner
2000        // object owner may have changed between signing and execution.
2001        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2002            certificate,
2003            input_objects,
2004            epoch_store.protocol_config(),
2005            epoch_store.reference_gas_price(),
2006        ) {
2007            Ok(result) => result,
2008            Err(e) => return ExecutionOutput::Fatal(e),
2009        };
2010
2011        let owned_object_refs = input_objects.inner().filter_owned_objects();
2012        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2013            return ExecutionOutput::Fatal(e);
2014        }
2015        let tx_digest = *certificate.digest();
2016        let protocol_config = epoch_store.protocol_config();
2017        let transaction_data = &certificate.data().intent_message().value;
2018        let sender = transaction_data.sender();
2019        let (kind, signer, gas_data) = transaction_data.execution_parts();
2020        let early_execution_error = get_early_execution_error(
2021            &tx_digest,
2022            &input_objects,
2023            self.config.certificate_deny_config.certificate_deny_set(),
2024            &execution_env.funds_withdraw_status,
2025        );
2026        let execution_params = match early_execution_error {
2027            Some(error) => ExecutionOrEarlyError::Err(error),
2028            None => ExecutionOrEarlyError::Ok(()),
2029        };
2030
2031        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2032
2033        #[allow(unused_mut)]
2034        let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2035            .execute_transaction_to_effects(
2036                &**epoch_store.executor(),
2037                &tracking_store,
2038                protocol_config,
2039                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2040                // cyclic dependency w/ sui-adapter
2041                self.config
2042                    .expensive_safety_check_config
2043                    .enable_deep_per_tx_sui_conservation_check(),
2044                execution_params,
2045                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2046                epoch_store
2047                    .epoch_start_config()
2048                    .epoch_data()
2049                    .epoch_start_timestamp(),
2050                input_objects,
2051                gas_data,
2052                gas_status,
2053                sender,
2054                kind,
2055                signer,
2056                tx_digest,
2057                execution_env.assigned_versions.accumulator_version,
2058            );
2059
2060        let object_funds_checker = self.object_funds_checker.load();
2061        if let Some(object_funds_checker) = object_funds_checker.as_ref()
2062            && !object_funds_checker.should_commit_object_funds_withdraws(
2063                certificate,
2064                effects.status(),
2065                &inner_temp_store.accumulator_running_max_withdraws,
2066                &execution_env,
2067                self.get_account_funds_read(),
2068                &self.execution_scheduler,
2069                epoch_store,
2070            )
2071        {
2072            assert_reachable!("retry object withdraw later");
2073            return ExecutionOutput::RetryLater;
2074        }
2075
2076        if let Some(expected_effects_digest) = expected_effects_digest
2077            && effects.digest() != expected_effects_digest
2078        {
2079            // We dont want to mask the original error, so we log it and continue.
2080            match self.debug_dump_transaction_state(
2081                &tx_digest,
2082                &effects,
2083                expected_effects_digest,
2084                &inner_temp_store,
2085                certificate,
2086                &self.config.state_debug_dump_config,
2087            ) {
2088                Ok(out_path) => {
2089                    info!(
2090                        "Dumped node state for transaction {} to {}",
2091                        tx_digest,
2092                        out_path.as_path().display().to_string()
2093                    );
2094                }
2095                Err(e) => {
2096                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2097                }
2098            }
2099            let expected_effects = self
2100                .get_transaction_cache_reader()
2101                .get_effects(&expected_effects_digest);
2102            error!(
2103                ?tx_digest,
2104                ?expected_effects_digest,
2105                actual_effects = ?effects,
2106                expected_effects = ?expected_effects,
2107                "fork detected!"
2108            );
2109            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2110                tx_digest,
2111                expected_effects_digest,
2112                effects.digest(),
2113            ) {
2114                error!("Failed to record transaction fork: {e}");
2115            }
2116
2117            fail_point_if!("kill_transaction_fork_node", || {
2118                #[cfg(msim)]
2119                {
2120                    tracing::error!(
2121                        fatal = true,
2122                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2123                        tx_digest
2124                    );
2125                    sui_simulator::task::shutdown_current_node();
2126                }
2127            });
2128
2129            fatal!(
2130                "Transaction {} is expected to have effects digest {}, but got {}!",
2131                tx_digest,
2132                expected_effects_digest,
2133                effects.digest()
2134            );
2135        }
2136
2137        fail_point_arg!("simulate_fork_during_execution", |(
2138            forked_validators,
2139            full_halt,
2140            effects_overrides,
2141            fork_probability,
2142        ): (
2143            std::sync::Arc<
2144                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2145            >,
2146            bool,
2147            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2148            f32,
2149        )| {
2150            #[cfg(msim)]
2151            self.simulate_fork_during_execution(
2152                certificate,
2153                epoch_store,
2154                &mut effects,
2155                forked_validators,
2156                full_halt,
2157                effects_overrides,
2158                fork_probability,
2159            );
2160        });
2161
2162        let unchanged_loaded_runtime_objects =
2163            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2164                certificate.transaction_data(),
2165                &effects,
2166                &tracking_store.into_read_objects(),
2167            );
2168
2169        // index certificate
2170        let _ = self
2171            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2172            .tap_err(|e| {
2173                self.metrics.post_processing_total_failures.inc();
2174                error!(?tx_digest, "tx post processing failed: {e}");
2175            });
2176
2177        self.update_metrics(certificate, &inner_temp_store, &effects);
2178
2179        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2180            certificate.clone().into_unsigned(),
2181            effects,
2182            inner_temp_store,
2183            unchanged_loaded_runtime_objects,
2184        );
2185
2186        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2187        if elapsed > 0.0 {
2188            self.metrics.prepare_cert_gas_latency_ratio.observe(
2189                transaction_outputs
2190                    .effects
2191                    .gas_cost_summary()
2192                    .computation_cost as f64
2193                    / elapsed,
2194            );
2195        }
2196
2197        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2198    }
2199
2200    pub fn prepare_certificate_for_benchmark(
2201        &self,
2202        certificate: &VerifiedExecutableTransaction,
2203        input_objects: InputObjects,
2204        epoch_store: &Arc<AuthorityPerEpochStore>,
2205    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2206        let lock = RwLock::new(epoch_store.epoch());
2207        let execution_guard = lock.try_read().unwrap();
2208
2209        let (transaction_outputs, _timings, execution_error_opt) = self
2210            .execute_certificate(
2211                &execution_guard,
2212                certificate,
2213                input_objects,
2214                None,
2215                ExecutionEnv::default(),
2216                epoch_store,
2217            )
2218            .unwrap();
2219        Ok((transaction_outputs, execution_error_opt))
2220    }
2221
2222    #[instrument(skip_all)]
2223    #[allow(clippy::type_complexity)]
2224    pub async fn dry_exec_transaction(
2225        &self,
2226        transaction: TransactionData,
2227        transaction_digest: TransactionDigest,
2228    ) -> SuiResult<(
2229        DryRunTransactionBlockResponse,
2230        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2231        TransactionEffects,
2232        Option<ObjectID>,
2233    )> {
2234        let epoch_store = self.load_epoch_store_one_call_per_task();
2235        if !self.is_fullnode(&epoch_store) {
2236            return Err(SuiErrorKind::UnsupportedFeatureError {
2237                error: "dry-exec is only supported on fullnodes".to_string(),
2238            }
2239            .into());
2240        }
2241
2242        if transaction.kind().is_system_tx() {
2243            return Err(SuiErrorKind::UnsupportedFeatureError {
2244                error: "dry-exec does not support system transactions".to_string(),
2245            }
2246            .into());
2247        }
2248
2249        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2250    }
2251
2252    #[allow(clippy::type_complexity)]
2253    pub fn dry_exec_transaction_for_benchmark(
2254        &self,
2255        transaction: TransactionData,
2256        transaction_digest: TransactionDigest,
2257    ) -> SuiResult<(
2258        DryRunTransactionBlockResponse,
2259        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2260        TransactionEffects,
2261        Option<ObjectID>,
2262    )> {
2263        let epoch_store = self.load_epoch_store_one_call_per_task();
2264        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2265    }
2266
2267    #[allow(clippy::type_complexity)]
2268    fn dry_exec_transaction_impl(
2269        &self,
2270        epoch_store: &AuthorityPerEpochStore,
2271        transaction: TransactionData,
2272        transaction_digest: TransactionDigest,
2273    ) -> SuiResult<(
2274        DryRunTransactionBlockResponse,
2275        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2276        TransactionEffects,
2277        Option<ObjectID>,
2278    )> {
2279        // Cheap validity checks for a transaction, including input size limits.
2280        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2281
2282        let input_object_kinds = transaction.input_objects()?;
2283        let receiving_object_refs = transaction.receiving_objects();
2284
2285        sui_transaction_checks::deny::check_transaction_for_signing(
2286            &transaction,
2287            &[],
2288            &input_object_kinds,
2289            &receiving_object_refs,
2290            &self.config.transaction_deny_config,
2291            self.get_backing_package_store().as_ref(),
2292        )?;
2293
2294        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2295            // We don't want to cache this transaction since it's a dry run.
2296            None,
2297            &input_object_kinds,
2298            &receiving_object_refs,
2299            epoch_store.epoch(),
2300        )?;
2301
2302        // make a gas object if one was not provided
2303        let mut gas_data = transaction.gas_data().clone();
2304        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2305            let sender = transaction.sender();
2306            // use a 1B sui coin
2307            const MIST_TO_SUI: u64 = 1_000_000_000;
2308            const DRY_RUN_SUI: u64 = 1_000_000_000;
2309            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2310            let gas_object_id = ObjectID::random();
2311            let gas_object = Object::new_move(
2312                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2313                Owner::AddressOwner(sender),
2314                TransactionDigest::genesis_marker(),
2315            );
2316            let gas_object_ref = gas_object.compute_object_reference();
2317            gas_data.payment = vec![gas_object_ref];
2318            (
2319                sui_transaction_checks::check_transaction_input_with_given_gas(
2320                    epoch_store.protocol_config(),
2321                    epoch_store.reference_gas_price(),
2322                    &transaction,
2323                    input_objects,
2324                    receiving_objects,
2325                    gas_object,
2326                    &self.metrics.bytecode_verifier_metrics,
2327                    &self.config.verifier_signing_config,
2328                )?,
2329                Some(gas_object_id),
2330            )
2331        } else {
2332            (
2333                sui_transaction_checks::check_transaction_input(
2334                    epoch_store.protocol_config(),
2335                    epoch_store.reference_gas_price(),
2336                    &transaction,
2337                    input_objects,
2338                    &receiving_objects,
2339                    &self.metrics.bytecode_verifier_metrics,
2340                    &self.config.verifier_signing_config,
2341                )?,
2342                None,
2343            )
2344        };
2345
2346        let protocol_config = epoch_store.protocol_config();
2347        let (kind, signer, _) = transaction.execution_parts();
2348
2349        let silent = true;
2350        let executor = sui_execution::executor(protocol_config, silent)
2351            .expect("Creating an executor should not fail here");
2352
2353        let expensive_checks = false;
2354        let early_execution_error = get_early_execution_error(
2355            &transaction_digest,
2356            &checked_input_objects,
2357            self.config.certificate_deny_config.certificate_deny_set(),
2358            // TODO(address-balances): This does not currently support balance withdraws properly.
2359            // For address balance withdraws, this cannot detect insufficient balance. We need to
2360            // first check if the balance is sufficient, similar to how we schedule withdraws.
2361            // For object balance withdraws, we need to handle the case where object balance is
2362            // insufficient in the post-execution.
2363            &FundsWithdrawStatus::MaybeSufficient,
2364        );
2365        let execution_params = match early_execution_error {
2366            Some(error) => ExecutionOrEarlyError::Err(error),
2367            None => ExecutionOrEarlyError::Ok(()),
2368        };
2369
2370        let (inner_temp_store, _, effects, _timings, execution_error) = self
2371            .execute_transaction_to_effects(
2372                executor.as_ref(),
2373                self.get_backing_store().as_ref(),
2374                protocol_config,
2375                expensive_checks,
2376                execution_params,
2377                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2378                epoch_store
2379                    .epoch_start_config()
2380                    .epoch_data()
2381                    .epoch_start_timestamp(),
2382                checked_input_objects,
2383                gas_data,
2384                gas_status,
2385                transaction.sender(),
2386                kind,
2387                signer,
2388                transaction_digest,
2389                // Use latest accumulator version for dry run/dev inspect
2390                None,
2391            );
2392
2393        let tx_digest = *effects.transaction_digest();
2394
2395        let module_cache =
2396            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2397
2398        let mut layout_resolver =
2399            epoch_store
2400                .executor()
2401                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2402                    &inner_temp_store,
2403                    self.get_backing_package_store(),
2404                )));
2405        // Returning empty vector here because we recalculate changes in the rpc layer.
2406        let object_changes = Vec::new();
2407
2408        // Returning empty vector here because we recalculate changes in the rpc layer.
2409        let balance_changes = Vec::new();
2410
2411        let written_with_kind = effects
2412            .created()
2413            .into_iter()
2414            .map(|(oref, _)| (oref, WriteKind::Create))
2415            .chain(
2416                effects
2417                    .unwrapped()
2418                    .into_iter()
2419                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2420            )
2421            .chain(
2422                effects
2423                    .mutated()
2424                    .into_iter()
2425                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2426            )
2427            .map(|(oref, kind)| {
2428                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2429                // TODO: Avoid clones.
2430                (oref.0, (oref, obj.clone(), kind))
2431            })
2432            .collect();
2433
2434        let execution_error_source = execution_error
2435            .as_ref()
2436            .err()
2437            .and_then(|e| e.source_ref().as_ref().map(|e| e.to_string()));
2438
2439        Ok((
2440            DryRunTransactionBlockResponse {
2441                suggested_gas_price: self
2442                    .congestion_tracker
2443                    .get_suggested_gas_prices(&transaction),
2444                input: SuiTransactionBlockData::try_from_with_module_cache(
2445                    transaction,
2446                    &module_cache,
2447                )
2448                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2449                    error: format!(
2450                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2451                        e
2452                    ),
2453                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2454                effects: effects.clone().try_into()?,
2455                events: SuiTransactionBlockEvents::try_from(
2456                    inner_temp_store.events.clone(),
2457                    tx_digest,
2458                    None,
2459                    layout_resolver.as_mut(),
2460                )?,
2461                object_changes,
2462                balance_changes,
2463                execution_error_source,
2464            },
2465            written_with_kind,
2466            effects,
2467            mock_gas,
2468        ))
2469    }
2470
2471    pub fn simulate_transaction(
2472        &self,
2473        mut transaction: TransactionData,
2474        checks: TransactionChecks,
2475        allow_mock_gas_coin: bool,
2476    ) -> SuiResult<SimulateTransactionResult> {
2477        if transaction.kind().is_system_tx() {
2478            return Err(SuiErrorKind::UnsupportedFeatureError {
2479                error: "simulate does not support system transactions".to_string(),
2480            }
2481            .into());
2482        }
2483
2484        let epoch_store = self.load_epoch_store_one_call_per_task();
2485        if !self.is_fullnode(&epoch_store) {
2486            return Err(SuiErrorKind::UnsupportedFeatureError {
2487                error: "simulate is only supported on fullnodes".to_string(),
2488            }
2489            .into());
2490        }
2491
2492        let dev_inspect = checks.disabled();
2493        if dev_inspect && self.config.dev_inspect_disabled {
2494            return Err(SuiErrorKind::UnsupportedFeatureError {
2495                error: "simulate with checks disabled is not allowed on this node".to_string(),
2496            }
2497            .into());
2498        }
2499
2500        // Cheap validity checks for a transaction, including input size limits.
2501        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2502
2503        let input_object_kinds = transaction.input_objects()?;
2504        let receiving_object_refs = transaction.receiving_objects();
2505
2506        // Create and inject mock gas coin before pre_object_load_checks so that
2507        // funds withdrawal processing sees non-empty payment and doesn't incorrectly
2508        // create an address balance withdrawal for gas.
2509        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() {
2510            let obj = Object::new_move(
2511                MoveObject::new_gas_coin(
2512                    OBJECT_START_VERSION,
2513                    ObjectID::MAX,
2514                    DEV_INSPECT_GAS_COIN_VALUE,
2515                ),
2516                Owner::AddressOwner(transaction.gas_data().owner),
2517                TransactionDigest::genesis_marker(),
2518            );
2519            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2520            Some(obj)
2521        } else {
2522            None
2523        };
2524
2525        let declared_withdrawals = self.pre_object_load_checks(
2526            &transaction,
2527            &[],
2528            &input_object_kinds,
2529            &receiving_object_refs,
2530        )?;
2531        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2532
2533        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2534            // We don't want to cache this transaction since it's a simulation.
2535            None,
2536            &input_object_kinds,
2537            &receiving_object_refs,
2538            epoch_store.epoch(),
2539        )?;
2540
2541        // Add mock gas to input objects after loading (it doesn't exist in the store).
2542        let mock_gas_id = mock_gas_object.map(|obj| {
2543            let id = obj.id();
2544            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2545            id
2546        });
2547
2548        let protocol_config = epoch_store.protocol_config();
2549
2550        let (gas_status, checked_input_objects) = if dev_inspect {
2551            sui_transaction_checks::check_dev_inspect_input(
2552                protocol_config,
2553                &transaction,
2554                input_objects,
2555                receiving_objects,
2556                epoch_store.reference_gas_price(),
2557            )?
2558        } else {
2559            sui_transaction_checks::check_transaction_input(
2560                epoch_store.protocol_config(),
2561                epoch_store.reference_gas_price(),
2562                &transaction,
2563                input_objects,
2564                &receiving_objects,
2565                &self.metrics.bytecode_verifier_metrics,
2566                &self.config.verifier_signing_config,
2567            )?
2568        };
2569
2570        // TODO see if we can spin up a VM once and reuse it
2571        let executor = sui_execution::executor(
2572            protocol_config,
2573            true, // silent
2574        )
2575        .expect("Creating an executor should not fail here");
2576
2577        let (kind, signer, gas_data) = transaction.execution_parts();
2578        let early_execution_error = get_early_execution_error(
2579            &transaction.digest(),
2580            &checked_input_objects,
2581            self.config.certificate_deny_config.certificate_deny_set(),
2582            &FundsWithdrawStatus::MaybeSufficient,
2583        );
2584        let execution_params = match early_execution_error {
2585            Some(error) => ExecutionOrEarlyError::Err(error),
2586            None => ExecutionOrEarlyError::Ok(()),
2587        };
2588
2589        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2590
2591        // Clone inputs for potential retry if object funds check fails post-execution.
2592        let cloned_input_objects = checked_input_objects.clone();
2593        let cloned_gas = gas_data.clone();
2594        let cloned_kind = kind.clone();
2595        let tx_digest = transaction.digest();
2596        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2597        let epoch_timestamp_ms = epoch_store
2598            .epoch_start_config()
2599            .epoch_data()
2600            .epoch_start_timestamp();
2601        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2602            &tracking_store,
2603            protocol_config,
2604            self.metrics.limits_metrics.clone(),
2605            false, // expensive_checks
2606            execution_params,
2607            &epoch_id,
2608            epoch_timestamp_ms,
2609            checked_input_objects,
2610            gas_data,
2611            gas_status,
2612            kind,
2613            None, // rewritten_inputs - not needed for dev_inspect
2614            signer,
2615            tx_digest,
2616            dev_inspect,
2617        );
2618
2619        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2620        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2621            let has_insufficient_object_funds = inner_temp_store
2622                .accumulator_running_max_withdraws
2623                .iter()
2624                .filter(|(id, _)| !address_funds.contains(id))
2625                .any(|(id, max_withdraw)| {
2626                    let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2627                    balance < *max_withdraw
2628                });
2629
2630            if has_insufficient_object_funds {
2631                let retry_gas_status = SuiGasStatus::new(
2632                    cloned_gas.budget,
2633                    cloned_gas.price,
2634                    epoch_store.reference_gas_price(),
2635                    protocol_config,
2636                )?;
2637                let (store, _, effects, result) = executor.dev_inspect_transaction(
2638                    &tracking_store,
2639                    protocol_config,
2640                    self.metrics.limits_metrics.clone(),
2641                    false,
2642                    ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2643                    &epoch_id,
2644                    epoch_timestamp_ms,
2645                    cloned_input_objects,
2646                    cloned_gas,
2647                    retry_gas_status,
2648                    cloned_kind,
2649                    None, // rewritten_inputs
2650                    signer,
2651                    tx_digest,
2652                    dev_inspect,
2653                );
2654                (store, effects, result)
2655            } else {
2656                (inner_temp_store, effects, execution_result)
2657            }
2658        } else {
2659            (inner_temp_store, effects, execution_result)
2660        };
2661
2662        let loaded_runtime_objects = tracking_store.into_read_objects();
2663        let unchanged_loaded_runtime_objects =
2664            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2665                &transaction,
2666                &effects,
2667                &loaded_runtime_objects,
2668            );
2669
2670        let object_set = {
2671            let objects = {
2672                let mut objects = loaded_runtime_objects;
2673
2674                for o in inner_temp_store
2675                    .input_objects
2676                    .into_values()
2677                    .chain(inner_temp_store.written.into_values())
2678                {
2679                    objects.insert(o);
2680                }
2681
2682                objects
2683            };
2684
2685            let object_keys = sui_types::storage::get_transaction_object_set(
2686                &transaction,
2687                &effects,
2688                &unchanged_loaded_runtime_objects,
2689            );
2690
2691            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2692            for k in object_keys {
2693                if let Some(o) = objects.get(&k) {
2694                    set.insert(o.clone());
2695                }
2696            }
2697
2698            set
2699        };
2700
2701        Ok(SimulateTransactionResult {
2702            objects: object_set,
2703            events: effects.events_digest().map(|_| inner_temp_store.events),
2704            effects,
2705            execution_result,
2706            mock_gas_id,
2707            unchanged_loaded_runtime_objects,
2708            suggested_gas_price: self
2709                .congestion_tracker
2710                .get_suggested_gas_prices(&transaction),
2711        })
2712    }
2713
2714    /// The object ID for gas can be any object ID, even for an uncreated object
2715    #[allow(clippy::collapsible_else_if)]
2716    #[instrument(skip_all)]
2717    pub async fn dev_inspect_transaction_block(
2718        &self,
2719        sender: SuiAddress,
2720        transaction_kind: TransactionKind,
2721        gas_price: Option<u64>,
2722        gas_budget: Option<u64>,
2723        gas_sponsor: Option<SuiAddress>,
2724        gas_objects: Option<Vec<ObjectRef>>,
2725        show_raw_txn_data_and_effects: Option<bool>,
2726        skip_checks: Option<bool>,
2727    ) -> SuiResult<DevInspectResults> {
2728        let epoch_store = self.load_epoch_store_one_call_per_task();
2729
2730        if !self.is_fullnode(&epoch_store) {
2731            return Err(SuiErrorKind::UnsupportedFeatureError {
2732                error: "dev-inspect is only supported on fullnodes".to_string(),
2733            }
2734            .into());
2735        }
2736
2737        if transaction_kind.is_system_tx() {
2738            return Err(SuiErrorKind::UnsupportedFeatureError {
2739                error: "system transactions are not supported".to_string(),
2740            }
2741            .into());
2742        }
2743
2744        let skip_checks = skip_checks.unwrap_or(true);
2745        if skip_checks && self.config.dev_inspect_disabled {
2746            return Err(SuiErrorKind::UnsupportedFeatureError {
2747                error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2748            }
2749            .into());
2750        }
2751
2752        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2753        let reference_gas_price = epoch_store.reference_gas_price();
2754        let protocol_config = epoch_store.protocol_config();
2755        let max_tx_gas = protocol_config.max_tx_gas();
2756
2757        let price = gas_price.unwrap_or(reference_gas_price);
2758        let budget = gas_budget.unwrap_or(max_tx_gas);
2759        let owner = gas_sponsor.unwrap_or(sender);
2760        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2761        let payment = gas_objects.unwrap_or_default();
2762        let mut transaction = TransactionData::V1(TransactionDataV1 {
2763            kind: transaction_kind.clone(),
2764            sender,
2765            gas_data: GasData {
2766                payment,
2767                owner,
2768                price,
2769                budget,
2770            },
2771            expiration: TransactionExpiration::None,
2772        });
2773
2774        let raw_txn_data = if show_raw_txn_data_and_effects {
2775            bcs::to_bytes(&transaction).map_err(|_| {
2776                SuiErrorKind::TransactionSerializationError {
2777                    error: "Failed to serialize transaction during dev inspect".to_string(),
2778                }
2779            })?
2780        } else {
2781            vec![]
2782        };
2783
2784        transaction.validity_check_no_gas_check(protocol_config)?;
2785
2786        let input_object_kinds = transaction.input_objects()?;
2787        let receiving_object_refs = transaction.receiving_objects();
2788
2789        sui_transaction_checks::deny::check_transaction_for_signing(
2790            &transaction,
2791            &[],
2792            &input_object_kinds,
2793            &receiving_object_refs,
2794            &self.config.transaction_deny_config,
2795            self.get_backing_package_store().as_ref(),
2796        )?;
2797
2798        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2799            // We don't want to cache this transaction since it's a dev inspect.
2800            None,
2801            &input_object_kinds,
2802            &receiving_object_refs,
2803            epoch_store.epoch(),
2804        )?;
2805
2806        let (gas_status, checked_input_objects) = if skip_checks {
2807            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2808            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2809            // use the dummy gas object so we need to add it to the input objects vector.
2810            if transaction.gas().is_empty() {
2811                // Create and use a dummy gas object if there is no gas object provided.
2812                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2813                    DEV_INSPECT_GAS_COIN_VALUE,
2814                    transaction.gas_owner(),
2815                );
2816                let gas_object_ref = dummy_gas_object.compute_object_reference();
2817                transaction.gas_data_mut().payment = vec![gas_object_ref];
2818                input_objects.push(ObjectReadResult::new(
2819                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2820                    dummy_gas_object.into(),
2821                ));
2822            }
2823            sui_transaction_checks::check_dev_inspect_input(
2824                protocol_config,
2825                &transaction,
2826                input_objects,
2827                receiving_objects,
2828                reference_gas_price,
2829            )?
2830        } else {
2831            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2832            // variant which will perform full fledged checks just like a real transaction execution.
2833            if transaction.gas().is_empty() {
2834                // Create and use a dummy gas object if there is no gas object provided.
2835                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2836                    DEV_INSPECT_GAS_COIN_VALUE,
2837                    transaction.gas_owner(),
2838                );
2839                let gas_object_ref = dummy_gas_object.compute_object_reference();
2840                transaction.gas_data_mut().payment = vec![gas_object_ref];
2841                sui_transaction_checks::check_transaction_input_with_given_gas(
2842                    epoch_store.protocol_config(),
2843                    epoch_store.reference_gas_price(),
2844                    &transaction,
2845                    input_objects,
2846                    receiving_objects,
2847                    dummy_gas_object,
2848                    &self.metrics.bytecode_verifier_metrics,
2849                    &self.config.verifier_signing_config,
2850                )?
2851            } else {
2852                sui_transaction_checks::check_transaction_input(
2853                    epoch_store.protocol_config(),
2854                    epoch_store.reference_gas_price(),
2855                    &transaction,
2856                    input_objects,
2857                    &receiving_objects,
2858                    &self.metrics.bytecode_verifier_metrics,
2859                    &self.config.verifier_signing_config,
2860                )?
2861            }
2862        };
2863
2864        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2865            .expect("Creating an executor should not fail here");
2866        let gas_data = transaction.gas_data().clone();
2867        let intent_msg = IntentMessage::new(
2868            Intent {
2869                version: IntentVersion::V0,
2870                scope: IntentScope::TransactionData,
2871                app_id: AppId::Sui,
2872            },
2873            transaction,
2874        );
2875        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2876        let early_execution_error = get_early_execution_error(
2877            &transaction_digest,
2878            &checked_input_objects,
2879            self.config.certificate_deny_config.certificate_deny_set(),
2880            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2881            &FundsWithdrawStatus::MaybeSufficient,
2882        );
2883        let execution_params = match early_execution_error {
2884            Some(error) => ExecutionOrEarlyError::Err(error),
2885            None => ExecutionOrEarlyError::Ok(()),
2886        };
2887
2888        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2889            self.get_backing_store().as_ref(),
2890            protocol_config,
2891            self.metrics.limits_metrics.clone(),
2892            /* expensive checks */ false,
2893            execution_params,
2894            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2895            epoch_store
2896                .epoch_start_config()
2897                .epoch_data()
2898                .epoch_start_timestamp(),
2899            checked_input_objects,
2900            gas_data,
2901            gas_status,
2902            transaction_kind,
2903            None, // rewritten_inputs - not needed for dev_inspect
2904            sender,
2905            transaction_digest,
2906            skip_checks,
2907        );
2908
2909        let raw_effects = if show_raw_txn_data_and_effects {
2910            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2911                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2912            })?
2913        } else {
2914            vec![]
2915        };
2916
2917        let mut layout_resolver =
2918            epoch_store
2919                .executor()
2920                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2921                    &inner_temp_store,
2922                    self.get_backing_package_store(),
2923                )));
2924
2925        DevInspectResults::new(
2926            effects,
2927            inner_temp_store.events.clone(),
2928            execution_result,
2929            raw_txn_data,
2930            raw_effects,
2931            layout_resolver.as_mut(),
2932        )
2933    }
2934
2935    // Only used for testing because of how epoch store is loaded.
2936    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2937        let epoch_store = self.epoch_store_for_testing();
2938        Ok(epoch_store.reference_gas_price())
2939    }
2940
2941    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2942        self.get_transaction_cache_reader()
2943            .is_tx_already_executed(digest)
2944    }
2945
2946    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2947    fn index_tx(
2948        sequence: u64,
2949        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2950        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2951        indexes: &IndexStore,
2952        digest: &TransactionDigest,
2953        // TODO: index_tx really just need the transaction data here.
2954        cert: &VerifiedExecutableTransaction,
2955        effects: &TransactionEffects,
2956        events: &TransactionEvents,
2957        timestamp_ms: u64,
2958        tx_coins: Option<TxCoins>,
2959        written: &WrittenObjects,
2960        inner_temporary_store: &InnerTemporaryStore,
2961        epoch_store: &Arc<AuthorityPerEpochStore>,
2962        acquire_locks: bool,
2963    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2964        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2965            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2966
2967        indexes.index_tx(
2968            sequence,
2969            cert.data().intent_message().value.sender(),
2970            cert.data()
2971                .intent_message()
2972                .value
2973                .input_objects()?
2974                .iter()
2975                .map(|o| o.object_id()),
2976            effects
2977                .all_changed_objects()
2978                .into_iter()
2979                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2980            cert.data()
2981                .intent_message()
2982                .value
2983                .move_calls()
2984                .into_iter()
2985                .map(|(_cmd_idx, package, module, function)| {
2986                    (*package, module.to_owned(), function.to_owned())
2987                }),
2988            events,
2989            changes,
2990            digest,
2991            timestamp_ms,
2992            tx_coins,
2993            effects.accumulator_events(),
2994            acquire_locks,
2995        )
2996    }
2997
2998    #[cfg(msim)]
2999    fn simulate_fork_during_execution(
3000        &self,
3001        certificate: &VerifiedExecutableTransaction,
3002        epoch_store: &Arc<AuthorityPerEpochStore>,
3003        effects: &mut TransactionEffects,
3004        forked_validators: std::sync::Arc<
3005            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3006        >,
3007        full_halt: bool,
3008        effects_overrides: std::sync::Arc<
3009            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3010        >,
3011        fork_probability: f32,
3012    ) {
3013        use std::cell::RefCell;
3014        thread_local! {
3015            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3016        }
3017        if !certificate.data().intent_message().value.is_system_tx() {
3018            let committee = epoch_store.committee();
3019            let cur_stake = (**committee).weight(&self.name);
3020            if cur_stake > 0 {
3021                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3022                    let already_forked = forked_validators
3023                        .lock()
3024                        .ok()
3025                        .map(|set| set.contains(&self.name))
3026                        .unwrap_or(false);
3027
3028                    if !already_forked {
3029                        let should_fork = if full_halt {
3030                            // For full halt, fork enough nodes to reach validity threshold
3031                            *total_stake <= committee.validity_threshold()
3032                        } else {
3033                            // For partial fork, stay strictly below validity threshold
3034                            *total_stake + cur_stake < committee.validity_threshold()
3035                        };
3036
3037                        if should_fork {
3038                            *total_stake += cur_stake;
3039
3040                            if let Ok(mut external_set) = forked_validators.lock() {
3041                                external_set.insert(self.name);
3042                                info!("forked_validators: {:?}", external_set);
3043                            }
3044                        }
3045                    }
3046
3047                    if let Ok(external_set) = forked_validators.lock() {
3048                        if external_set.contains(&self.name) {
3049                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
3050                            // Fork this transaction and record the digest and the original effects to original_effects.
3051                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
3052                            // the transaction was forked on a different validator), fork this txn as well.
3053
3054                            let tx_digest = certificate.digest().to_string();
3055                            if let Ok(mut overrides) = effects_overrides.lock() {
3056                                if overrides.contains_key(&tx_digest)
3057                                    || overrides.is_empty()
3058                                        && sui_simulator::random::deterministic_probability(
3059                                            &tx_digest,
3060                                            fork_probability,
3061                                        )
3062                                {
3063                                    let original_effects_digest = effects.digest().to_string();
3064                                    overrides
3065                                        .insert(tx_digest.clone(), original_effects_digest.clone());
3066                                    info!(
3067                                        ?tx_digest,
3068                                        ?original_effects_digest,
3069                                        "Captured forked effects digest for transaction"
3070                                    );
3071                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
3072                                        1;
3073                                }
3074                            }
3075                        }
3076                    }
3077                });
3078            }
3079        }
3080    }
3081
3082    fn process_object_index(
3083        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3084        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3085        effects: &TransactionEffects,
3086        written: &WrittenObjects,
3087        inner_temporary_store: &InnerTemporaryStore,
3088        epoch_store: &Arc<AuthorityPerEpochStore>,
3089    ) -> SuiResult<ObjectIndexChanges> {
3090        let mut layout_resolver =
3091            epoch_store
3092                .executor()
3093                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3094                    inner_temporary_store,
3095                    backing_package_store,
3096                )));
3097
3098        let modified_at_version = effects
3099            .modified_at_versions()
3100            .into_iter()
3101            .collect::<HashMap<_, _>>();
3102
3103        let tx_digest = effects.transaction_digest();
3104        let mut deleted_owners = vec![];
3105        let mut deleted_dynamic_fields = vec![];
3106        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3107            let old_version = modified_at_version.get(&id).unwrap();
3108            // When we process the index, the latest object hasn't been written yet so
3109            // the old object must be present.
3110            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3111                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3112            ) {
3113                Owner::AddressOwner(addr)
3114                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3115                Owner::ObjectOwner(object_id) => {
3116                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3117                }
3118                _ => {}
3119            }
3120        }
3121
3122        let mut new_owners = vec![];
3123        let mut new_dynamic_fields = vec![];
3124
3125        for (oref, owner, kind) in effects.all_changed_objects() {
3126            let id = &oref.0;
3127            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3128            if let WriteKind::Mutate = kind {
3129                let Some(old_version) = modified_at_version.get(id) else {
3130                    panic!(
3131                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3132                        tx_digest
3133                    );
3134                };
3135                // When we process the index, the latest object hasn't been written yet so
3136                // the old object must be present.
3137                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3138                    panic!(
3139                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3140                        tx_digest, id, old_version
3141                    );
3142                };
3143                if old_object.owner != owner {
3144                    match old_object.owner {
3145                        Owner::AddressOwner(addr)
3146                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3147                            deleted_owners.push((addr, *id));
3148                        }
3149                        Owner::ObjectOwner(object_id) => {
3150                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3151                        }
3152                        _ => {}
3153                    }
3154                }
3155            }
3156
3157            match owner {
3158                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3159                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3160                    let new_object = written.get(id).unwrap_or_else(
3161                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3162                    );
3163                    assert_eq!(
3164                        new_object.version(),
3165                        oref.1,
3166                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3167                        tx_digest,
3168                        id,
3169                        new_object.version(),
3170                        oref.1
3171                    );
3172
3173                    let type_ = new_object
3174                        .type_()
3175                        .map(|type_| ObjectType::Struct(type_.clone()))
3176                        .unwrap_or(ObjectType::Package);
3177
3178                    new_owners.push((
3179                        (addr, *id),
3180                        ObjectInfo {
3181                            object_id: *id,
3182                            version: oref.1,
3183                            digest: oref.2,
3184                            type_,
3185                            owner,
3186                            previous_transaction: *effects.transaction_digest(),
3187                        },
3188                    ));
3189                }
3190                Owner::ObjectOwner(owner) => {
3191                    let new_object = written.get(id).unwrap_or_else(
3192                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3193                    );
3194                    assert_eq!(
3195                        new_object.version(),
3196                        oref.1,
3197                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3198                        tx_digest,
3199                        id,
3200                        new_object.version(),
3201                        oref.1
3202                    );
3203
3204                    let Some(df_info) = Self::try_create_dynamic_field_info(
3205                        object_store,
3206                        new_object,
3207                        written,
3208                        layout_resolver.as_mut(),
3209                    )
3210                    .unwrap_or_else(|e| {
3211                        error!(
3212                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3213                            e, new_object
3214                        );
3215                        None
3216                    }) else {
3217                        // Skip indexing for non dynamic field objects.
3218                        continue;
3219                    };
3220                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3221                }
3222                _ => {}
3223            }
3224        }
3225
3226        Ok(ObjectIndexChanges {
3227            deleted_owners,
3228            deleted_dynamic_fields,
3229            new_owners,
3230            new_dynamic_fields,
3231        })
3232    }
3233
3234    fn try_create_dynamic_field_info(
3235        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3236        o: &Object,
3237        written: &WrittenObjects,
3238        resolver: &mut dyn LayoutResolver,
3239    ) -> SuiResult<Option<DynamicFieldInfo>> {
3240        // Skip if not a move object
3241        let Some(move_object) = o.data.try_as_move().cloned() else {
3242            return Ok(None);
3243        };
3244
3245        // We only index dynamic field objects
3246        if !move_object.type_().is_dynamic_field() {
3247            return Ok(None);
3248        }
3249
3250        let layout = resolver
3251            .get_annotated_layout(&move_object.type_().clone().into())?
3252            .into_layout();
3253
3254        let field =
3255            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3256                SuiErrorKind::ObjectDeserializationError {
3257                    error: e.to_string(),
3258                }
3259            })?;
3260
3261        let type_ = field.kind;
3262        let name_type: TypeTag = field.name_layout.into();
3263        let bcs_name = field.name_bytes.to_owned();
3264
3265        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3266            .map_err(|e| {
3267                warn!("{e}");
3268                SuiErrorKind::ObjectDeserializationError {
3269                    error: e.to_string(),
3270                }
3271            })?;
3272
3273        let name = DynamicFieldName {
3274            type_: name_type,
3275            value: SuiMoveValue::from(name_value).to_json_value(),
3276        };
3277
3278        let value_metadata = field.value_metadata().map_err(|e| {
3279            warn!("{e}");
3280            SuiErrorKind::ObjectDeserializationError {
3281                error: e.to_string(),
3282            }
3283        })?;
3284
3285        Ok(Some(match value_metadata {
3286            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3287                name,
3288                bcs_name,
3289                type_,
3290                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3291                object_id: o.id(),
3292                version: o.version(),
3293                digest: o.digest(),
3294            },
3295
3296            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3297                // Find the actual object from storage using the object id obtained from the wrapper.
3298
3299                // Try to find the object in the written objects first.
3300                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3301                    let version = object.version();
3302                    let digest = object.digest();
3303                    let object_type = object.data.type_().unwrap().clone();
3304                    (version, digest, object_type)
3305                } else {
3306                    // If not found, try to find it in the database.
3307                    let object = object_store
3308                        .get_object_by_key(&object_id, o.version())
3309                        .ok_or_else(|| UserInputError::ObjectNotFound {
3310                            object_id,
3311                            version: Some(o.version()),
3312                        })?;
3313                    let version = object.version();
3314                    let digest = object.digest();
3315                    let object_type = object.data.type_().unwrap().clone();
3316                    (version, digest, object_type)
3317                };
3318
3319                DynamicFieldInfo {
3320                    name,
3321                    bcs_name,
3322                    type_,
3323                    object_type: object_type.to_string(),
3324                    object_id,
3325                    version,
3326                    digest,
3327                }
3328            }
3329        }))
3330    }
3331
3332    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3333    fn post_process_one_tx(
3334        &self,
3335        certificate: &VerifiedExecutableTransaction,
3336        effects: &TransactionEffects,
3337        inner_temporary_store: &InnerTemporaryStore,
3338        epoch_store: &Arc<AuthorityPerEpochStore>,
3339    ) -> SuiResult {
3340        let Some(indexes) = &self.indexes else {
3341            return Ok(());
3342        };
3343
3344        let tx_digest = *certificate.digest();
3345
3346        // Allocate sequence number on the calling thread to preserve execution order.
3347        let sequence = indexes.allocate_sequence_number();
3348
3349        if self.config.sync_post_process_one_tx {
3350            // Synchronous mode: run post-processing inline on the calling thread
3351            // and commit the index batch immediately with locks held.
3352            // Used as a rollback mechanism and for testing correctness against async mode.
3353            // TODO: delete this branch once async mode has shipped
3354            let result = Self::post_process_one_tx_impl(
3355                sequence,
3356                indexes,
3357                &self.subscription_handler,
3358                &self.metrics,
3359                self.name,
3360                self.get_backing_package_store(),
3361                self.get_object_store(),
3362                certificate,
3363                effects,
3364                inner_temporary_store,
3365                epoch_store,
3366                true, // acquire_locks
3367            );
3368
3369            match result {
3370                Ok((raw_batch, cache_updates_with_locks)) => {
3371                    let mut db_batch = indexes.new_db_batch();
3372                    db_batch
3373                        .concat(vec![raw_batch])
3374                        .expect("failed to absorb raw index batch");
3375                    // Destructure to keep _locks alive through commit_index_batch.
3376                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3377                        cache_updates_with_locks;
3378                    indexes
3379                        .commit_index_batch(db_batch, vec![inner])
3380                        .expect("failed to commit index batch");
3381                }
3382                Err(e) => {
3383                    self.metrics.post_processing_total_failures.inc();
3384                    error!(?tx_digest, "tx post processing failed: {e}");
3385                    return Err(e);
3386                }
3387            }
3388
3389            return Ok(());
3390        }
3391
3392        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3393        self.pending_post_processing.insert(tx_digest, done_rx);
3394
3395        let indexes = indexes.clone();
3396        let subscription_handler = self.subscription_handler.clone();
3397        let metrics = self.metrics.clone();
3398        let name = self.name;
3399        let backing_package_store = self.get_backing_package_store().clone();
3400        let object_store = self.get_object_store().clone();
3401        let semaphore = self.post_processing_semaphore.clone();
3402
3403        let certificate = certificate.clone();
3404        let effects = effects.clone();
3405        let inner_temporary_store = inner_temporary_store.clone();
3406        let epoch_store = epoch_store.clone();
3407
3408        // spawn post processing on a blocking thread
3409        tokio::spawn(async move {
3410            let permit = {
3411                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3412                semaphore
3413                    .acquire_owned()
3414                    .await
3415                    .expect("post-processing semaphore should not be closed")
3416            };
3417
3418            let _ = tokio::task::spawn_blocking(move || {
3419                let _permit = permit;
3420
3421                let result = Self::post_process_one_tx_impl(
3422                    sequence,
3423                    &indexes,
3424                    &subscription_handler,
3425                    &metrics,
3426                    name,
3427                    &backing_package_store,
3428                    &object_store,
3429                    &certificate,
3430                    &effects,
3431                    &inner_temporary_store,
3432                    &epoch_store,
3433                    false, // acquire_locks
3434                );
3435
3436                match result {
3437                    Ok((raw_batch, cache_updates_with_locks)) => {
3438                        fail_point!("crash-after-post-process-one-tx");
3439                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3440                        let _ = done_tx.send(output);
3441                    }
3442                    Err(e) => {
3443                        metrics.post_processing_total_failures.inc();
3444                        error!(?tx_digest, "tx post processing failed: {e}");
3445                    }
3446                }
3447            })
3448            .await;
3449        });
3450
3451        Ok(())
3452    }
3453
3454    fn post_process_one_tx_impl(
3455        sequence: u64,
3456        indexes: &Arc<IndexStore>,
3457        subscription_handler: &Arc<SubscriptionHandler>,
3458        metrics: &Arc<AuthorityMetrics>,
3459        name: AuthorityName,
3460        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3461        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3462        certificate: &VerifiedExecutableTransaction,
3463        effects: &TransactionEffects,
3464        inner_temporary_store: &InnerTemporaryStore,
3465        epoch_store: &Arc<AuthorityPerEpochStore>,
3466        acquire_locks: bool,
3467    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3468        let _scope = monitored_scope("Execution::post_process_one_tx");
3469
3470        let tx_digest = certificate.digest();
3471        let timestamp_ms = Self::unixtime_now_ms();
3472        let events = &inner_temporary_store.events;
3473        let written = &inner_temporary_store.written;
3474        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3475            name,
3476            object_store,
3477            effects,
3478            inner_temporary_store,
3479            epoch_store,
3480        );
3481
3482        let (raw_batch, cache_updates) = Self::index_tx(
3483            sequence,
3484            backing_package_store,
3485            object_store,
3486            indexes,
3487            tx_digest,
3488            certificate,
3489            effects,
3490            events,
3491            timestamp_ms,
3492            tx_coins,
3493            written,
3494            inner_temporary_store,
3495            epoch_store,
3496            acquire_locks,
3497        )
3498        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3499        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3500        .expect("Indexing tx should not fail");
3501
3502        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3503        let events = Self::make_transaction_block_events(
3504            backing_package_store,
3505            events.clone(),
3506            *tx_digest,
3507            timestamp_ms,
3508            epoch_store,
3509            inner_temporary_store,
3510        )?;
3511        // Emit events
3512        subscription_handler
3513            .process_tx(certificate.data().transaction_data(), &effects, &events)
3514            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3515            .tap_err(|e| {
3516                warn!(
3517                    ?tx_digest,
3518                    "Post processing - Couldn't process events for tx: {}", e
3519                )
3520            })?;
3521
3522        metrics
3523            .post_processing_total_events_emitted
3524            .inc_by(events.data.len() as u64);
3525
3526        Ok((raw_batch, cache_updates))
3527    }
3528
3529    fn make_transaction_block_events(
3530        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3531        transaction_events: TransactionEvents,
3532        digest: TransactionDigest,
3533        timestamp_ms: u64,
3534        epoch_store: &Arc<AuthorityPerEpochStore>,
3535        inner_temporary_store: &InnerTemporaryStore,
3536    ) -> SuiResult<SuiTransactionBlockEvents> {
3537        let mut layout_resolver =
3538            epoch_store
3539                .executor()
3540                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3541                    inner_temporary_store,
3542                    backing_package_store,
3543                )));
3544        SuiTransactionBlockEvents::try_from(
3545            transaction_events,
3546            digest,
3547            Some(timestamp_ms),
3548            layout_resolver.as_mut(),
3549        )
3550    }
3551
3552    pub fn unixtime_now_ms() -> u64 {
3553        let now = SystemTime::now()
3554            .duration_since(UNIX_EPOCH)
3555            .expect("Time went backwards")
3556            .as_millis();
3557        u64::try_from(now).expect("Travelling in time machine")
3558    }
3559
3560    // TODO(fastpath): update this handler for Mysticeti fastpath.
3561    // There will no longer be validator quorum signed transactions or effects.
3562    // The proof of finality needs to come from checkpoints.
3563    #[instrument(level = "trace", skip_all)]
3564    pub async fn handle_transaction_info_request(
3565        &self,
3566        request: TransactionInfoRequest,
3567    ) -> SuiResult<TransactionInfoResponse> {
3568        let epoch_store = self.load_epoch_store_one_call_per_task();
3569        let (transaction, status) = self
3570            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3571            .ok_or(SuiErrorKind::TransactionNotFound {
3572                digest: request.transaction_digest,
3573            })?;
3574        Ok(TransactionInfoResponse {
3575            transaction,
3576            status,
3577        })
3578    }
3579
3580    #[instrument(level = "trace", skip_all)]
3581    pub async fn handle_object_info_request(
3582        &self,
3583        request: ObjectInfoRequest,
3584    ) -> SuiResult<ObjectInfoResponse> {
3585        let epoch_store = self.load_epoch_store_one_call_per_task();
3586
3587        let requested_object_seq = match request.request_kind {
3588            ObjectInfoRequestKind::LatestObjectInfo => {
3589                let (_, seq, _) = self
3590                    .get_object_or_tombstone(request.object_id)
3591                    .await
3592                    .ok_or_else(|| {
3593                        SuiError::from(UserInputError::ObjectNotFound {
3594                            object_id: request.object_id,
3595                            version: None,
3596                        })
3597                    })?;
3598                seq
3599            }
3600            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3601        };
3602
3603        let object = self
3604            .get_object_store()
3605            .get_object_by_key(&request.object_id, requested_object_seq)
3606            .ok_or_else(|| {
3607                SuiError::from(UserInputError::ObjectNotFound {
3608                    object_id: request.object_id,
3609                    version: Some(requested_object_seq),
3610                })
3611            })?;
3612
3613        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3614            (request.generate_layout, object.data.try_as_move())
3615        {
3616            Some(into_struct_layout(
3617                self.load_epoch_store_one_call_per_task()
3618                    .executor()
3619                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3620                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3621            )?)
3622        } else {
3623            None
3624        };
3625
3626        let lock = if !object.is_address_owned() {
3627            // Only address owned objects have locks.
3628            None
3629        } else {
3630            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3631                .await?
3632                .map(|s| s.into_inner())
3633        };
3634
3635        Ok(ObjectInfoResponse {
3636            object,
3637            layout,
3638            lock_for_debugging: lock,
3639        })
3640    }
3641
3642    #[instrument(level = "trace", skip_all)]
3643    pub fn handle_checkpoint_request(
3644        &self,
3645        request: &CheckpointRequest,
3646    ) -> SuiResult<CheckpointResponse> {
3647        let summary = match request.sequence_number {
3648            Some(seq) => self
3649                .checkpoint_store
3650                .get_checkpoint_by_sequence_number(seq)?,
3651            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3652        }
3653        .map(|v| v.into_inner());
3654        let contents = match &summary {
3655            Some(s) => self
3656                .checkpoint_store
3657                .get_checkpoint_contents(&s.content_digest)?,
3658            None => None,
3659        };
3660        Ok(CheckpointResponse {
3661            checkpoint: summary,
3662            contents,
3663        })
3664    }
3665
3666    #[instrument(level = "trace", skip_all)]
3667    pub fn handle_checkpoint_request_v2(
3668        &self,
3669        request: &CheckpointRequestV2,
3670    ) -> SuiResult<CheckpointResponseV2> {
3671        let summary = if request.certified {
3672            let summary = match request.sequence_number {
3673                Some(seq) => self
3674                    .checkpoint_store
3675                    .get_checkpoint_by_sequence_number(seq)?,
3676                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3677            }
3678            .map(|v| v.into_inner());
3679            summary.map(CheckpointSummaryResponse::Certified)
3680        } else {
3681            let summary = match request.sequence_number {
3682                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3683                None => self
3684                    .checkpoint_store
3685                    .get_latest_locally_computed_checkpoint()?,
3686            };
3687            summary.map(CheckpointSummaryResponse::Pending)
3688        };
3689        let contents = match &summary {
3690            Some(s) => self
3691                .checkpoint_store
3692                .get_checkpoint_contents(&s.content_digest())?,
3693            None => None,
3694        };
3695        Ok(CheckpointResponseV2 {
3696            checkpoint: summary,
3697            contents,
3698        })
3699    }
3700
3701    fn check_protocol_version(
3702        supported_protocol_versions: SupportedProtocolVersions,
3703        current_version: ProtocolVersion,
3704    ) {
3705        info!("current protocol version is now {:?}", current_version);
3706        info!("supported versions are: {:?}", supported_protocol_versions);
3707        if !supported_protocol_versions.is_version_supported(current_version) {
3708            let msg = format!(
3709                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3710                current_version, supported_protocol_versions,
3711            );
3712
3713            error!("{}", msg);
3714            eprintln!("{}", msg);
3715
3716            #[cfg(not(msim))]
3717            std::process::exit(1);
3718
3719            #[cfg(msim)]
3720            sui_simulator::task::shutdown_current_node();
3721        }
3722    }
3723
3724    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3725    #[allow(clippy::too_many_arguments)]
3726    pub async fn new(
3727        name: AuthorityName,
3728        secret: StableSyncAuthoritySigner,
3729        supported_protocol_versions: SupportedProtocolVersions,
3730        store: Arc<AuthorityStore>,
3731        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3732        epoch_store: Arc<AuthorityPerEpochStore>,
3733        committee_store: Arc<CommitteeStore>,
3734        indexes: Option<Arc<IndexStore>>,
3735        rpc_index: Option<Arc<RpcIndexStore>>,
3736        checkpoint_store: Arc<CheckpointStore>,
3737        prometheus_registry: &Registry,
3738        genesis_objects: &[Object],
3739        db_checkpoint_config: &DBCheckpointConfig,
3740        config: NodeConfig,
3741        chain_identifier: ChainIdentifier,
3742        policy_config: Option<PolicyConfig>,
3743        firewall_config: Option<RemoteFirewallConfig>,
3744        pruner_watermarks: Arc<PrunerWatermarks>,
3745    ) -> Arc<Self> {
3746        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3747
3748        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3749        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3750        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3751            execution_cache_trait_pointers.object_cache_reader.clone(),
3752            execution_cache_trait_pointers.account_funds_read.clone(),
3753            execution_cache_trait_pointers
3754                .transaction_cache_reader
3755                .clone(),
3756            tx_ready_certificates,
3757            &epoch_store,
3758            config.funds_withdraw_scheduler_type,
3759            metrics.clone(),
3760            prometheus_registry,
3761        ));
3762        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3763
3764        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3765            epoch_store.get_parent_path(),
3766            &config.authority_store_pruning_config,
3767        );
3768        let _pruner = AuthorityStorePruner::new(
3769            store.perpetual_tables.clone(),
3770            checkpoint_store.clone(),
3771            rpc_index.clone(),
3772            indexes.clone(),
3773            config.authority_store_pruning_config.clone(),
3774            epoch_store.committee().authority_exists(&name),
3775            epoch_store.epoch_start_state().epoch_duration_ms(),
3776            prometheus_registry,
3777            pruner_watermarks,
3778        );
3779        let input_loader =
3780            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3781        let epoch = epoch_store.epoch();
3782        let traffic_controller_metrics =
3783            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3784        let traffic_controller = if let Some(policy_config) = policy_config {
3785            Some(Arc::new(
3786                TrafficController::init(
3787                    policy_config,
3788                    traffic_controller_metrics,
3789                    firewall_config.clone(),
3790                )
3791                .await,
3792            ))
3793        } else {
3794            None
3795        };
3796
3797        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3798            ForkRecoveryState::new(Some(fork_config))
3799                .expect("Failed to initialize fork recovery state")
3800        });
3801
3802        let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3803            execution_cache_trait_pointers.child_object_resolver.clone(),
3804        ));
3805
3806        let object_funds_checker_metrics =
3807            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3808        let state = Arc::new(AuthorityState {
3809            name,
3810            secret,
3811            execution_lock: RwLock::new(epoch),
3812            epoch_store: ArcSwap::new(epoch_store.clone()),
3813            input_loader,
3814            execution_cache_trait_pointers,
3815            coin_reservation_resolver,
3816            indexes,
3817            rpc_index,
3818            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3819            checkpoint_store,
3820            committee_store,
3821            execution_scheduler,
3822            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3823            metrics,
3824            _pruner,
3825            _authority_per_epoch_pruner,
3826            db_checkpoint_config: db_checkpoint_config.clone(),
3827            config,
3828            overload_info: AuthorityOverloadInfo::default(),
3829            chain_identifier,
3830            congestion_tracker: Arc::new(CongestionTracker::new()),
3831            traffic_controller,
3832            fork_recovery_state,
3833            notify_epoch: tokio::sync::watch::channel(epoch).0,
3834            object_funds_checker: ArcSwapOption::empty(),
3835            object_funds_checker_metrics,
3836            pending_post_processing: Arc::new(DashMap::new()),
3837            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3838        });
3839        state.init_object_funds_checker().await;
3840
3841        let state_clone = Arc::downgrade(&state);
3842        spawn_monitored_task!(fix_indexes(state_clone));
3843        // Start a task to execute ready certificates.
3844        let authority_state = Arc::downgrade(&state);
3845        spawn_monitored_task!(execution_process(
3846            authority_state,
3847            rx_ready_certificates,
3848            rx_execution_shutdown,
3849        ));
3850        // TODO: This doesn't belong to the constructor of AuthorityState.
3851        state
3852            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3853            .expect("Error indexing genesis objects.");
3854
3855        if epoch_store
3856            .protocol_config()
3857            .enable_multi_epoch_transaction_expiration()
3858            && epoch_store.epoch() > 0
3859        {
3860            use typed_store::Map;
3861            let previous_epoch = epoch_store.epoch() - 1;
3862            let start_key = (previous_epoch, TransactionDigest::ZERO);
3863            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3864            let has_previous_epoch_data = store
3865                .perpetual_tables
3866                .executed_transaction_digests
3867                .safe_range_iter(start_key..end_key)
3868                .next()
3869                .is_some();
3870
3871            if !has_previous_epoch_data {
3872                panic!(
3873                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3874                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3875                    Please restore from a snapshot using the latest version of sui-tool.",
3876                    previous_epoch
3877                );
3878            }
3879        }
3880
3881        state
3882    }
3883
3884    async fn init_object_funds_checker(&self) {
3885        let epoch_store = self.epoch_store.load();
3886        if self.is_validator(&epoch_store)
3887            && epoch_store.protocol_config().enable_object_funds_withdraw()
3888        {
3889            if self.object_funds_checker.load().is_none() {
3890                let inner = self
3891                    .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3892                    .await
3893                    .map(|o| {
3894                        Arc::new(ObjectFundsChecker::new(
3895                            o.version(),
3896                            self.object_funds_checker_metrics.clone(),
3897                        ))
3898                    });
3899                self.object_funds_checker.store(inner);
3900            }
3901        } else {
3902            self.object_funds_checker.store(None);
3903        }
3904    }
3905
3906    // TODO: Consolidate our traits to reduce the number of methods here.
3907    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3908        &self.execution_cache_trait_pointers.object_cache_reader
3909    }
3910
3911    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3912        &self.execution_cache_trait_pointers.transaction_cache_reader
3913    }
3914
3915    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3916        &self.execution_cache_trait_pointers.cache_writer
3917    }
3918
3919    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3920        &self.execution_cache_trait_pointers.backing_store
3921    }
3922
3923    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3924        &self.execution_cache_trait_pointers.child_object_resolver
3925    }
3926
3927    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3928        &self.execution_cache_trait_pointers.account_funds_read
3929    }
3930
3931    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3932        &self.execution_cache_trait_pointers.backing_package_store
3933    }
3934
3935    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3936        &self.execution_cache_trait_pointers.object_store
3937    }
3938
3939    pub fn pending_post_processing(
3940        &self,
3941    ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3942        &self.pending_post_processing
3943    }
3944
3945    pub async fn await_post_processing(
3946        &self,
3947        tx_digest: &TransactionDigest,
3948    ) -> Option<PostProcessingOutput> {
3949        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3950            // Tx was executed and post-processing is in flight.
3951            rx.await.ok()
3952        } else {
3953            // Tx was already persisted or post-processing already completed.
3954            None
3955        }
3956    }
3957
3958    /// Await post-processing for a transaction and commit the index batch immediately.
3959    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
3960    /// index batches at checkpoint boundaries.
3961    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3962        if let Some(indexes) = &self.indexes
3963            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3964        {
3965            let mut db_batch = indexes.new_db_batch();
3966            db_batch
3967                .concat(vec![raw_batch])
3968                .expect("failed to build index batch");
3969            indexes
3970                .commit_index_batch(db_batch, vec![cache_updates])
3971                .expect("failed to commit index batch");
3972        }
3973    }
3974
3975    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3976        &self.execution_cache_trait_pointers.reconfig_api
3977    }
3978
3979    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3980        &self.execution_cache_trait_pointers.global_state_hash_store
3981    }
3982
3983    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3984        &self.execution_cache_trait_pointers.checkpoint_cache
3985    }
3986
3987    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3988        &self.execution_cache_trait_pointers.state_sync_store
3989    }
3990
3991    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3992        &self.execution_cache_trait_pointers.cache_commit
3993    }
3994
3995    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3996        self.execution_cache_trait_pointers
3997            .testing_api
3998            .database_for_testing()
3999    }
4000
4001    pub fn cache_for_testing(&self) -> &WritebackCache {
4002        self.execution_cache_trait_pointers
4003            .testing_api
4004            .cache_for_testing()
4005    }
4006
4007    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4008        &self,
4009        config: NodeConfig,
4010        metrics: Arc<AuthorityStorePruningMetrics>,
4011    ) -> anyhow::Result<()> {
4012        use crate::authority::authority_store_pruner::PrunerWatermarks;
4013        let watermarks = Arc::new(PrunerWatermarks::default());
4014        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4015            &self.database_for_testing().perpetual_tables,
4016            &self.checkpoint_store,
4017            self.rpc_index.as_deref(),
4018            config.authority_store_pruning_config,
4019            metrics,
4020            EPOCH_DURATION_MS_FOR_TESTING,
4021            &watermarks,
4022        )
4023        .await
4024    }
4025
4026    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4027        &self.execution_scheduler
4028    }
4029
4030    fn create_owner_index_if_empty(
4031        &self,
4032        genesis_objects: &[Object],
4033        epoch_store: &Arc<AuthorityPerEpochStore>,
4034    ) -> SuiResult {
4035        let Some(index_store) = &self.indexes else {
4036            return Ok(());
4037        };
4038        if !index_store.is_empty() {
4039            return Ok(());
4040        }
4041
4042        let mut new_owners = vec![];
4043        let mut new_dynamic_fields = vec![];
4044        let mut layout_resolver = epoch_store
4045            .executor()
4046            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
4047        for o in genesis_objects.iter() {
4048            match o.owner {
4049                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4050                    new_owners.push((
4051                        (addr, o.id()),
4052                        ObjectInfo::new(&o.compute_object_reference(), o),
4053                    ))
4054                }
4055                Owner::ObjectOwner(object_id) => {
4056                    let id = o.id();
4057                    let Some(info) = Self::try_create_dynamic_field_info(
4058                        self.get_object_store(),
4059                        o,
4060                        &BTreeMap::new(),
4061                        layout_resolver.as_mut(),
4062                    )?
4063                    else {
4064                        continue;
4065                    };
4066                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4067                }
4068                _ => {}
4069            }
4070        }
4071
4072        index_store.insert_genesis_objects(ObjectIndexChanges {
4073            deleted_owners: vec![],
4074            deleted_dynamic_fields: vec![],
4075            new_owners,
4076            new_dynamic_fields,
4077        })
4078    }
4079
4080    /// Attempts to acquire execution lock for an executable transaction.
4081    /// Returns Some(lock) if the transaction is matching current executed epoch.
4082    /// Returns None if validator is halted at epoch end or epoch mismatch.
4083    pub fn execution_lock_for_executable_transaction(
4084        &self,
4085        transaction: &VerifiedExecutableTransaction,
4086    ) -> Option<ExecutionLockReadGuard<'_>> {
4087        let lock = self.execution_lock.try_read().ok()?;
4088        if *lock == transaction.auth_sig().epoch() {
4089            Some(lock)
4090        } else {
4091            // TODO: Can this still happen?
4092            None
4093        }
4094    }
4095
4096    /// Acquires the execution lock for the duration of transaction validation.
4097    /// This prevents reconfiguration from starting until we are finished validating the transaction.
4098    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4099        self.execution_lock
4100            .try_read()
4101            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4102    }
4103
4104    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4105        self.execution_lock.write().await
4106    }
4107
4108    #[instrument(level = "error", skip_all)]
4109    pub async fn reconfigure(
4110        &self,
4111        cur_epoch_store: &AuthorityPerEpochStore,
4112        supported_protocol_versions: SupportedProtocolVersions,
4113        new_committee: Committee,
4114        epoch_start_configuration: EpochStartConfiguration,
4115        state_hasher: Arc<GlobalStateHasher>,
4116        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4117        epoch_last_checkpoint: CheckpointSequenceNumber,
4118    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4119        Self::check_protocol_version(
4120            supported_protocol_versions,
4121            epoch_start_configuration
4122                .epoch_start_state()
4123                .protocol_version(),
4124        );
4125
4126        self.committee_store.insert_new_committee(&new_committee)?;
4127
4128        // Wait until no transactions are being executed.
4129        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4130
4131        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
4132        cur_epoch_store.epoch_terminated().await;
4133
4134        // Safe to being reconfiguration now. No transactions are being executed,
4135        // and no epoch-specific tasks are running.
4136
4137        {
4138            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4139            if state.should_accept_user_certs() {
4140                // Need to change this so that consensus adapter do not accept certificates from user.
4141                // This can happen if our local validator did not initiate epoch change locally,
4142                // but 2f+1 nodes already concluded the epoch.
4143                //
4144                // This lock is essentially a barrier for
4145                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
4146                cur_epoch_store.close_user_certs(state);
4147            }
4148            // lock is dropped here
4149        }
4150
4151        self.get_reconfig_api()
4152            .clear_state_end_of_epoch(&execution_lock);
4153        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4154        self.maybe_reaccumulate_state_hash(
4155            cur_epoch_store,
4156            epoch_start_configuration
4157                .epoch_start_state()
4158                .protocol_version(),
4159        );
4160        self.get_reconfig_api()
4161            .set_epoch_start_configuration(&epoch_start_configuration);
4162        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4163            && self
4164                .db_checkpoint_config
4165                .perform_db_checkpoints_at_epoch_end
4166        {
4167            let checkpoint_indexes = self
4168                .db_checkpoint_config
4169                .perform_index_db_checkpoints_at_epoch_end
4170                .unwrap_or(false);
4171            let current_epoch = cur_epoch_store.epoch();
4172            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4173            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4174        }
4175
4176        self.get_reconfig_api()
4177            .reconfigure_cache(&epoch_start_configuration)
4178            .await;
4179
4180        let new_epoch = new_committee.epoch;
4181        let new_epoch_store = self
4182            .reopen_epoch_db(
4183                cur_epoch_store,
4184                new_committee,
4185                epoch_start_configuration,
4186                expensive_safety_check_config,
4187                epoch_last_checkpoint,
4188            )
4189            .await?;
4190        assert_eq!(new_epoch_store.epoch(), new_epoch);
4191        self.execution_scheduler
4192            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4193        self.init_object_funds_checker().await;
4194        *execution_lock = new_epoch;
4195
4196        self.notify_epoch(new_epoch);
4197        // drop execution_lock after epoch store was updated
4198        // see also assert in AuthorityState::process_certificate
4199        // on the epoch store and execution lock epoch match
4200        Ok(new_epoch_store)
4201    }
4202
4203    fn notify_epoch(&self, new_epoch: EpochId) {
4204        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4205    }
4206
4207    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4208        let mut rx = self.notify_epoch.subscribe();
4209        loop {
4210            let epoch = *rx.borrow();
4211            if epoch >= target_epoch {
4212                return Ok(epoch);
4213            }
4214            rx.changed().await?;
4215        }
4216    }
4217
4218    /// Executes accumulator settlement for testing purposes.
4219    /// Returns a list of (transaction, execution_env) pairs that can be replayed on another
4220    /// AuthorityState (e.g., a fullnode) using `replay_settlement_for_testing`.
4221    pub async fn settle_accumulator_for_testing(
4222        &self,
4223        effects: &[TransactionEffects],
4224        checkpoint_seq: Option<u64>,
4225    ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4226        let accumulator_version = self
4227            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4228            .await
4229            .unwrap()
4230            .version();
4231        // Use provided checkpoint sequence, or fall back to accumulator version.
4232        let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4233        let builder = AccumulatorSettlementTxBuilder::new(
4234            Some(self.get_transaction_cache_reader().as_ref()),
4235            effects,
4236            ckpt_seq,
4237            0,
4238        );
4239        let balance_changes = builder.collect_funds_changes();
4240        let epoch_store = self.epoch_store_for_testing();
4241        let epoch = epoch_store.epoch();
4242        let accumulator_root_obj_initial_shared_version = epoch_store
4243            .epoch_start_config()
4244            .accumulator_root_obj_initial_shared_version()
4245            .unwrap();
4246        let settlements = builder.build_tx(
4247            epoch_store.protocol_config(),
4248            epoch,
4249            accumulator_root_obj_initial_shared_version,
4250            ckpt_seq,
4251            ckpt_seq,
4252        );
4253
4254        let settlements: Vec<_> = settlements
4255            .into_iter()
4256            .map(|tx| {
4257                VerifiedExecutableTransaction::new_system(
4258                    VerifiedTransaction::new_system_transaction(tx),
4259                    epoch,
4260                )
4261            })
4262            .collect();
4263
4264        let assigned_versions = epoch_store
4265            .assign_shared_object_versions_for_tests(
4266                self.get_object_cache_reader().as_ref(),
4267                &settlements,
4268            )
4269            .unwrap();
4270        let version_map = assigned_versions.into_map();
4271
4272        let mut replay_txns = Vec::new();
4273        let mut settlement_effects = Vec::with_capacity(settlements.len());
4274        for tx in settlements {
4275            let assigned = version_map.get(&tx.key()).unwrap().clone();
4276            let env = ExecutionEnv::new().with_assigned_versions(assigned);
4277            let (effects, _) = self
4278                .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4279                .await
4280                .unwrap();
4281            assert!(effects.status().is_ok());
4282            replay_txns.push((tx, env));
4283            settlement_effects.push(effects);
4284        }
4285
4286        let barrier = accumulators::build_accumulator_barrier_tx(
4287            epoch,
4288            accumulator_root_obj_initial_shared_version,
4289            ckpt_seq,
4290            &settlement_effects,
4291        );
4292        let barrier = VerifiedExecutableTransaction::new_system(
4293            VerifiedTransaction::new_system_transaction(barrier),
4294            epoch,
4295        );
4296
4297        let assigned_versions = epoch_store
4298            .assign_shared_object_versions_for_tests(
4299                self.get_object_cache_reader().as_ref(),
4300                std::slice::from_ref(&barrier),
4301            )
4302            .unwrap();
4303        let version_map = assigned_versions.into_map();
4304
4305        let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4306        let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4307        let (effects, _) = self
4308            .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4309            .await
4310            .unwrap();
4311        assert!(effects.status().is_ok());
4312        replay_txns.push((barrier, env));
4313
4314        let next_accumulator_version = accumulator_version.next();
4315        self.execution_scheduler
4316            .settle_address_funds(FundsSettlement {
4317                funds_changes: balance_changes,
4318                next_accumulator_version,
4319            });
4320        // object funds are settled while executing the barrier transaction
4321
4322        replay_txns
4323    }
4324
4325    /// Replays settlement transactions on this AuthorityState.
4326    /// Used to sync a fullnode with settlement transactions executed on a validator.
4327    pub async fn replay_settlement_for_testing(
4328        &self,
4329        txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4330    ) {
4331        let epoch_store = self.epoch_store_for_testing();
4332        for (tx, env) in txns {
4333            let (effects, _) = self
4334                .try_execute_immediately(tx, env.clone(), &epoch_store)
4335                .await
4336                .unwrap();
4337            assert!(effects.status().is_ok());
4338        }
4339    }
4340
4341    /// Advance the epoch store to the next epoch for testing only.
4342    /// This only manually sets all the places where we have the epoch number.
4343    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4344    pub async fn reconfigure_for_testing(&self) {
4345        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4346        let epoch_store = self.epoch_store_for_testing().clone();
4347        let protocol_config = epoch_store.protocol_config().clone();
4348        // The current protocol config used in the epoch store may have been overridden and diverged from
4349        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4350        // We reapply the override before creating the new epoch store, to make sure that
4351        // the new epoch store has the same protocol config as the current one.
4352        // Since this is for testing only, we mostly like to keep the protocol config the same
4353        // across epochs.
4354        let _guard =
4355            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4356        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4357            self.get_backing_package_store().clone(),
4358            self.get_object_store().clone(),
4359            &self.config.expensive_safety_check_config,
4360            self.checkpoint_store
4361                .get_epoch_last_checkpoint(epoch_store.epoch())
4362                .unwrap()
4363                .map(|c| *c.sequence_number())
4364                .unwrap_or_default(),
4365        );
4366        self.execution_scheduler
4367            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4368        let new_epoch = new_epoch_store.epoch();
4369        self.epoch_store.store(new_epoch_store);
4370        epoch_store.epoch_terminated().await;
4371
4372        *execution_lock = new_epoch;
4373    }
4374
4375    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4376    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4377    #[instrument(level = "error", skip_all)]
4378    fn maybe_reaccumulate_state_hash(
4379        &self,
4380        cur_epoch_store: &AuthorityPerEpochStore,
4381        new_protocol_version: ProtocolVersion,
4382    ) {
4383        self.get_reconfig_api()
4384            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4385    }
4386
4387    #[instrument(level = "error", skip_all)]
4388    fn check_system_consistency(
4389        &self,
4390        cur_epoch_store: &AuthorityPerEpochStore,
4391        state_hasher: Arc<GlobalStateHasher>,
4392        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4393    ) {
4394        info!(
4395            "Performing sui conservation consistency check for epoch {}",
4396            cur_epoch_store.epoch()
4397        );
4398
4399        if let Err(err) = self
4400            .get_reconfig_api()
4401            .expensive_check_sui_conservation(cur_epoch_store)
4402        {
4403            if cfg!(debug_assertions) {
4404                panic!("{}", err);
4405            } else {
4406                // We cannot panic in production yet because it is known that there are some
4407                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4408                warn!("Sui conservation consistency check failed: {}", err);
4409            }
4410        } else {
4411            info!("Sui conservation consistency check passed");
4412        }
4413
4414        // check for root state hash consistency with live object set
4415        if expensive_safety_check_config.enable_state_consistency_check() {
4416            info!(
4417                "Performing state consistency check for epoch {}",
4418                cur_epoch_store.epoch()
4419            );
4420            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4421        }
4422
4423        if expensive_safety_check_config.enable_secondary_index_checks()
4424            && let Some(indexes) = self.indexes.clone()
4425        {
4426            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4427                .expect("secondary indexes are inconsistent");
4428        }
4429
4430        // Verify all checkpointed transactions are present in transactions_seq.
4431        // This catches any post-processing gaps that could occur if async
4432        // post-processing failed to complete before persistence.
4433        if expensive_safety_check_config.enable_secondary_index_checks()
4434            && let Some(indexes) = self.indexes.clone()
4435        {
4436            let epoch = cur_epoch_store.epoch();
4437            // Only verify the current epoch's checkpoints. Previous epoch contents
4438            // may have been pruned, and we only need to verify that this epoch's
4439            // async post-processing completed correctly.
4440            let first_checkpoint = if epoch == 0 {
4441                0
4442            } else {
4443                self.checkpoint_store
4444                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4445                    .expect("Failed to get previous epoch's last checkpoint")
4446                    .expect("Previous epoch's last checkpoint missing")
4447                    + 1
4448            };
4449            let highest_executed = self
4450                .checkpoint_store
4451                .get_highest_executed_checkpoint_seq_number()
4452                .expect("Failed to get highest executed checkpoint")
4453                .expect("No executed checkpoints");
4454
4455            info!(
4456                "Verifying checkpointed transactions are in transactions_seq \
4457                 (checkpoints {first_checkpoint}..={highest_executed})"
4458            );
4459            for seq in first_checkpoint..=highest_executed {
4460                let checkpoint = self
4461                    .checkpoint_store
4462                    .get_checkpoint_by_sequence_number(seq)
4463                    .expect("Failed to get checkpoint")
4464                    .expect("Checkpoint missing");
4465                let contents = self
4466                    .checkpoint_store
4467                    .get_checkpoint_contents(&checkpoint.content_digest)
4468                    .expect("Failed to get checkpoint contents")
4469                    .expect("Checkpoint contents missing");
4470                for digests in contents.iter() {
4471                    let tx_digest = digests.transaction;
4472                    assert!(
4473                        indexes
4474                            .get_transaction_seq(&tx_digest)
4475                            .expect("Failed to read transactions_seq")
4476                            .is_some(),
4477                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4478                    );
4479                }
4480            }
4481            info!("All checkpointed transactions verified in transactions_seq");
4482        }
4483    }
4484
4485    fn expensive_check_is_consistent_state(
4486        &self,
4487        state_hasher: Arc<GlobalStateHasher>,
4488        cur_epoch_store: &AuthorityPerEpochStore,
4489    ) {
4490        let live_object_set_hash = state_hasher.digest_live_object_set(
4491            !cur_epoch_store
4492                .protocol_config()
4493                .simplified_unwrap_then_delete(),
4494        );
4495
4496        let root_state_hash: ECMHLiveObjectSetDigest = self
4497            .get_global_state_hash_store()
4498            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4499            .expect("Retrieving root state hash cannot fail")
4500            .expect("Root state hash for epoch must exist")
4501            .1
4502            .digest()
4503            .into();
4504
4505        let is_inconsistent = root_state_hash != live_object_set_hash;
4506        if is_inconsistent {
4507            debug_fatal!(
4508                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4509                root_state_hash,
4510                live_object_set_hash
4511            );
4512        } else {
4513            info!("State consistency check passed");
4514        }
4515
4516        state_hasher.set_inconsistent_state(is_inconsistent);
4517    }
4518
4519    pub fn current_epoch_for_testing(&self) -> EpochId {
4520        self.epoch_store_for_testing().epoch()
4521    }
4522
4523    #[instrument(level = "error", skip_all)]
4524    pub fn checkpoint_all_dbs(
4525        &self,
4526        checkpoint_path: &Path,
4527        cur_epoch_store: &AuthorityPerEpochStore,
4528        checkpoint_indexes: bool,
4529    ) -> SuiResult {
4530        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4531        let current_epoch = cur_epoch_store.epoch();
4532
4533        if checkpoint_path.exists() {
4534            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4535            return Ok(());
4536        }
4537
4538        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4539        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4540
4541        if checkpoint_path_tmp.exists() {
4542            fs::remove_dir_all(&checkpoint_path_tmp)
4543                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4544        }
4545
4546        fs::create_dir_all(&checkpoint_path_tmp)
4547            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4548        fs::create_dir(&store_checkpoint_path_tmp)
4549            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4550
4551        // NOTE: Do not change the order of invoking these checkpoint calls
4552        // We want to snapshot checkpoint db first to not race with state sync
4553        self.checkpoint_store
4554            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4555
4556        self.get_reconfig_api()
4557            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4558
4559        self.committee_store
4560            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4561
4562        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4563            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4564        }
4565
4566        fs::rename(checkpoint_path_tmp, checkpoint_path)
4567            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4568        Ok(())
4569    }
4570
4571    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4572    /// we never end up accessing different epoch stores in a single task, we need to make sure
4573    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4574    /// the case. This also means we should minimize the number of call-sites. Only call it when
4575    /// there is no way to obtain it from somewhere else.
4576    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4577        self.epoch_store.load()
4578    }
4579
4580    // Load the epoch store, should be used in tests only.
4581    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4582        self.load_epoch_store_one_call_per_task()
4583    }
4584
4585    pub fn clone_committee_for_testing(&self) -> Committee {
4586        Committee::clone(self.epoch_store_for_testing().committee())
4587    }
4588
4589    #[instrument(level = "trace", skip_all)]
4590    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4591        self.get_object_store().get_object(object_id)
4592    }
4593
4594    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4595        Ok(self
4596            .get_object(&SUI_SYSTEM_ADDRESS.into())
4597            .await
4598            .expect("framework object should always exist")
4599            .compute_object_reference())
4600    }
4601
4602    // This function is only used for testing.
4603    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4604        self.get_object_cache_reader()
4605            .get_sui_system_state_object_unsafe()
4606    }
4607
4608    #[instrument(level = "trace", skip_all)]
4609    fn get_transaction_checkpoint_sequence(
4610        &self,
4611        digest: &TransactionDigest,
4612        epoch_store: &AuthorityPerEpochStore,
4613    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4614        epoch_store.get_transaction_checkpoint(digest)
4615    }
4616
4617    #[instrument(level = "trace", skip_all)]
4618    pub fn get_checkpoint_by_sequence_number(
4619        &self,
4620        sequence_number: CheckpointSequenceNumber,
4621    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4622        Ok(self
4623            .checkpoint_store
4624            .get_checkpoint_by_sequence_number(sequence_number)?)
4625    }
4626
4627    #[instrument(level = "trace", skip_all)]
4628    pub fn get_transaction_checkpoint_for_tests(
4629        &self,
4630        digest: &TransactionDigest,
4631        epoch_store: &AuthorityPerEpochStore,
4632    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4633        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4634        let Some(checkpoint) = checkpoint else {
4635            return Ok(None);
4636        };
4637        let checkpoint = self
4638            .checkpoint_store
4639            .get_checkpoint_by_sequence_number(checkpoint)?;
4640        Ok(checkpoint)
4641    }
4642
4643    #[instrument(level = "trace", skip_all)]
4644    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4645        Ok(
4646            match self
4647                .get_object_cache_reader()
4648                .get_latest_object_or_tombstone(*object_id)
4649            {
4650                Some((_, ObjectOrTombstone::Object(object))) => {
4651                    let layout = self.get_object_layout(&object)?;
4652                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4653                }
4654                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4655                None => ObjectRead::NotExists(*object_id),
4656            },
4657        )
4658    }
4659
4660    /// Chain Identifier is the digest of the genesis checkpoint.
4661    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4662        self.chain_identifier
4663    }
4664
4665    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4666        self.fork_recovery_state.as_ref()
4667    }
4668
4669    #[instrument(level = "trace", skip_all)]
4670    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4671    where
4672        T: DeserializeOwned,
4673    {
4674        let o = self.get_object_read(object_id)?.into_object()?;
4675        if let Some(move_object) = o.data.try_as_move() {
4676            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4677                SuiErrorKind::ObjectDeserializationError {
4678                    error: format!("{e}"),
4679                }
4680            })?)
4681        } else {
4682            Err(SuiErrorKind::ObjectDeserializationError {
4683                error: format!("Provided object : [{object_id}] is not a Move object."),
4684            }
4685            .into())
4686        }
4687    }
4688
4689    /// This function aims to serve rpc reads on past objects and
4690    /// we don't expect it to be called for other purposes.
4691    /// Depending on the object pruning policies that will be enforced in the
4692    /// future there is no software-level guarantee/SLA to retrieve an object
4693    /// with an old version even if it exists/existed.
4694    #[instrument(level = "trace", skip_all)]
4695    pub fn get_past_object_read(
4696        &self,
4697        object_id: &ObjectID,
4698        version: SequenceNumber,
4699    ) -> SuiResult<PastObjectRead> {
4700        // Firstly we see if the object ever existed by getting its latest data
4701        let Some(obj_ref) = self
4702            .get_object_cache_reader()
4703            .get_latest_object_ref_or_tombstone(*object_id)
4704        else {
4705            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4706        };
4707
4708        if version > obj_ref.1 {
4709            return Ok(PastObjectRead::VersionTooHigh {
4710                object_id: *object_id,
4711                asked_version: version,
4712                latest_version: obj_ref.1,
4713            });
4714        }
4715
4716        if version < obj_ref.1 {
4717            // Read past objects
4718            return Ok(match self.read_object_at_version(object_id, version)? {
4719                Some((object, layout)) => {
4720                    let obj_ref = object.compute_object_reference();
4721                    PastObjectRead::VersionFound(obj_ref, object, layout)
4722                }
4723
4724                None => PastObjectRead::VersionNotFound(*object_id, version),
4725            });
4726        }
4727
4728        if !obj_ref.2.is_alive() {
4729            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4730        }
4731
4732        match self.read_object_at_version(object_id, obj_ref.1)? {
4733            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4734            None => {
4735                debug_fatal!(
4736                    "Object with in parent_entry is missing from object store, datastore is \
4737                     inconsistent"
4738                );
4739                Err(UserInputError::ObjectNotFound {
4740                    object_id: *object_id,
4741                    version: Some(obj_ref.1),
4742                }
4743                .into())
4744            }
4745        }
4746    }
4747
4748    #[instrument(level = "trace", skip_all)]
4749    fn read_object_at_version(
4750        &self,
4751        object_id: &ObjectID,
4752        version: SequenceNumber,
4753    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4754        let Some(object) = self
4755            .get_object_cache_reader()
4756            .get_object_by_key(object_id, version)
4757        else {
4758            return Ok(None);
4759        };
4760
4761        let layout = self.get_object_layout(&object)?;
4762        Ok(Some((object, layout)))
4763    }
4764
4765    pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4766        let layout = object
4767            .data
4768            .try_as_move()
4769            .map(|object| {
4770                into_struct_layout(
4771                    self.load_epoch_store_one_call_per_task()
4772                        .executor()
4773                        // TODO(cache) - must read through cache
4774                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4775                        .get_annotated_layout(&object.type_().clone().into())?,
4776                )
4777            })
4778            .transpose()?;
4779        Ok(layout)
4780    }
4781
4782    /// Returns a fake ObjectRef representing an address balance, along with the balance value
4783    /// and the previous transaction digest. The ObjectRef can be returned to JSON-RPC clients
4784    /// that don't understand address balances.
4785    #[instrument(level = "trace", skip_all)]
4786    pub fn get_address_balance_coin_info(
4787        &self,
4788        owner: SuiAddress,
4789        balance_type: TypeTag,
4790    ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4791        let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4792        let accumulator_obj = AccumulatorValue::load_object_by_id(
4793            self.get_child_object_resolver().as_ref(),
4794            None,
4795            *accumulator_id.inner(),
4796        )?;
4797
4798        let Some(accumulator_obj) = accumulator_obj else {
4799            return Ok(None);
4800        };
4801
4802        // Extract the currency type from balance_type (e.g., SUI from Balance<SUI>).
4803        // get_balance expects the currency type, not the balance type.
4804        let currency_type =
4805            Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4806
4807        let balance = crate::accumulators::balances::get_balance(
4808            owner,
4809            self.get_child_object_resolver().as_ref(),
4810            currency_type,
4811        )?;
4812
4813        if balance == 0 {
4814            return Ok(None);
4815        };
4816
4817        let object_ref = coin_reservation::encode_object_ref(
4818            accumulator_obj.id(),
4819            accumulator_obj.version(),
4820            self.load_epoch_store_one_call_per_task().epoch(),
4821            balance,
4822            self.get_chain_identifier(),
4823        );
4824
4825        Ok(Some((
4826            object_ref,
4827            balance,
4828            accumulator_obj.previous_transaction,
4829        )))
4830    }
4831
4832    /// Returns fake ObjectRefs for all address balances of an owner, keyed by coin type string.
4833    /// Used by get_all_coins to include fake coins for each coin type.
4834    #[instrument(level = "trace", skip_all)]
4835    pub fn get_all_address_balance_coin_infos(
4836        &self,
4837        owner: SuiAddress,
4838    ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4839        let indexes = self
4840            .indexes
4841            .as_ref()
4842            .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4843
4844        let mut result = std::collections::HashMap::new();
4845        for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4846            let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4847            if let Some((obj_ref, balance, prev_tx)) =
4848                self.get_address_balance_coin_info(owner, balance_type)?
4849            {
4850                // Use currency_type.to_string() to match the format in CoinIndexKey2
4851                // (e.g., "0x2::sui::SUI", not "0x2::coin::Coin<0x2::sui::SUI>")
4852                result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4853            }
4854        }
4855        Ok(result)
4856    }
4857
4858    fn get_owner_at_version(
4859        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4860        object_id: &ObjectID,
4861        version: SequenceNumber,
4862    ) -> SuiResult<Owner> {
4863        object_store
4864            .get_object_by_key(object_id, version)
4865            .ok_or_else(|| {
4866                SuiError::from(UserInputError::ObjectNotFound {
4867                    object_id: *object_id,
4868                    version: Some(version),
4869                })
4870            })
4871            .map(|o| o.owner.clone())
4872    }
4873
4874    #[instrument(level = "trace", skip_all)]
4875    pub fn get_owner_objects(
4876        &self,
4877        owner: SuiAddress,
4878        // If `Some`, the query will start from the next item after the specified cursor
4879        cursor: Option<ObjectID>,
4880        limit: usize,
4881        filter: Option<SuiObjectDataFilter>,
4882    ) -> SuiResult<Vec<ObjectInfo>> {
4883        if let Some(indexes) = &self.indexes {
4884            indexes.get_owner_objects(owner, cursor, limit, filter)
4885        } else {
4886            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4887        }
4888    }
4889
4890    #[instrument(level = "trace", skip_all)]
4891    pub fn get_owned_coins_iterator_with_cursor(
4892        &self,
4893        owner: SuiAddress,
4894        // If `Some`, the query will start from the next item after the specified cursor
4895        cursor: (String, u64, ObjectID),
4896        limit: usize,
4897        one_coin_type_only: bool,
4898    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4899        if let Some(indexes) = &self.indexes {
4900            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4901        } else {
4902            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4903        }
4904    }
4905
4906    #[instrument(level = "trace", skip_all)]
4907    pub fn get_owner_objects_iterator(
4908        &self,
4909        owner: SuiAddress,
4910        // If `Some`, the query will start from the next item after the specified cursor
4911        cursor: Option<ObjectID>,
4912        filter: Option<SuiObjectDataFilter>,
4913    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4914        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4915        if let Some(indexes) = &self.indexes {
4916            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4917        } else {
4918            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4919        }
4920    }
4921
4922    #[instrument(level = "trace", skip_all)]
4923    pub async fn get_move_objects<T>(
4924        &self,
4925        owner: SuiAddress,
4926        type_: MoveObjectType,
4927    ) -> SuiResult<Vec<T>>
4928    where
4929        T: DeserializeOwned,
4930    {
4931        let object_ids = self
4932            .get_owner_objects_iterator(owner, None, None)?
4933            .filter(|o| match &o.type_ {
4934                ObjectType::Struct(s) => &type_ == s,
4935                ObjectType::Package => false,
4936            })
4937            .map(|info| ObjectKey(info.object_id, info.version))
4938            .collect::<Vec<_>>();
4939        let mut move_objects = vec![];
4940
4941        let objects = self
4942            .get_object_store()
4943            .multi_get_objects_by_key(&object_ids);
4944
4945        for (o, id) in objects.into_iter().zip(object_ids) {
4946            let object = o.ok_or_else(|| {
4947                SuiError::from(UserInputError::ObjectNotFound {
4948                    object_id: id.0,
4949                    version: Some(id.1),
4950                })
4951            })?;
4952            let move_object = object.data.try_as_move().ok_or_else(|| {
4953                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4954            })?;
4955            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4956                SuiErrorKind::ObjectDeserializationError {
4957                    error: format!("{e}"),
4958                }
4959            })?);
4960        }
4961        Ok(move_objects)
4962    }
4963
4964    #[instrument(level = "trace", skip_all)]
4965    pub fn get_dynamic_fields(
4966        &self,
4967        owner: ObjectID,
4968        // If `Some`, the query will start from the next item after the specified cursor
4969        cursor: Option<ObjectID>,
4970        limit: usize,
4971    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4972        Ok(self
4973            .get_dynamic_fields_iterator(owner, cursor)?
4974            .take(limit)
4975            .collect::<Result<Vec<_>, _>>()?)
4976    }
4977
4978    fn get_dynamic_fields_iterator(
4979        &self,
4980        owner: ObjectID,
4981        // If `Some`, the query will start from the next item after the specified cursor
4982        cursor: Option<ObjectID>,
4983    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4984    {
4985        if let Some(indexes) = &self.indexes {
4986            indexes.get_dynamic_fields_iterator(owner, cursor)
4987        } else {
4988            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4989        }
4990    }
4991
4992    #[instrument(level = "trace", skip_all)]
4993    pub fn get_dynamic_field_object_id(
4994        &self,
4995        owner: ObjectID,
4996        name_type: TypeTag,
4997        name_bcs_bytes: &[u8],
4998    ) -> SuiResult<Option<ObjectID>> {
4999        if let Some(indexes) = &self.indexes {
5000            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5001        } else {
5002            Err(SuiErrorKind::IndexStoreNotAvailable.into())
5003        }
5004    }
5005
5006    #[instrument(level = "trace", skip_all)]
5007    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5008        Ok(self.get_indexes()?.next_sequence_number())
5009    }
5010
5011    #[instrument(level = "trace", skip_all)]
5012    pub async fn get_executed_transaction_and_effects(
5013        &self,
5014        digest: TransactionDigest,
5015        kv_store: Arc<TransactionKeyValueStore>,
5016    ) -> SuiResult<(Transaction, TransactionEffects)> {
5017        let transaction = kv_store.get_tx(digest).await?;
5018        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5019        Ok((transaction, effects))
5020    }
5021
5022    #[instrument(level = "trace", skip_all)]
5023    pub fn multi_get_checkpoint_by_sequence_number(
5024        &self,
5025        sequence_numbers: &[CheckpointSequenceNumber],
5026    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5027        Ok(self
5028            .checkpoint_store
5029            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5030    }
5031
5032    #[instrument(level = "trace", skip_all)]
5033    pub fn get_transaction_events(
5034        &self,
5035        digest: &TransactionDigest,
5036    ) -> SuiResult<TransactionEvents> {
5037        self.get_transaction_cache_reader()
5038            .get_events(digest)
5039            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5040    }
5041
5042    pub fn get_transaction_input_objects(
5043        &self,
5044        effects: &TransactionEffects,
5045    ) -> SuiResult<Vec<Object>> {
5046        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5047            .map_err(Into::into)
5048    }
5049
5050    pub fn get_transaction_output_objects(
5051        &self,
5052        effects: &TransactionEffects,
5053    ) -> SuiResult<Vec<Object>> {
5054        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5055            .map_err(Into::into)
5056    }
5057
5058    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5059        match &self.indexes {
5060            Some(i) => Ok(i.clone()),
5061            None => Err(SuiErrorKind::UnsupportedFeatureError {
5062                error: "extended object indexing is not enabled on this server".into(),
5063            }
5064            .into()),
5065        }
5066    }
5067
5068    pub async fn get_transactions_for_tests(
5069        self: &Arc<Self>,
5070        filter: Option<TransactionFilter>,
5071        cursor: Option<TransactionDigest>,
5072        limit: Option<usize>,
5073        reverse: bool,
5074    ) -> SuiResult<Vec<TransactionDigest>> {
5075        let metrics = KeyValueStoreMetrics::new_for_tests();
5076        let kv_store = Arc::new(TransactionKeyValueStore::new(
5077            "rocksdb",
5078            metrics,
5079            self.clone(),
5080        ));
5081        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5082            .await
5083    }
5084
5085    #[instrument(level = "trace", skip_all)]
5086    pub async fn get_transactions(
5087        &self,
5088        kv_store: &Arc<TransactionKeyValueStore>,
5089        filter: Option<TransactionFilter>,
5090        // If `Some`, the query will start from the next item after the specified cursor
5091        cursor: Option<TransactionDigest>,
5092        limit: Option<usize>,
5093        reverse: bool,
5094    ) -> SuiResult<Vec<TransactionDigest>> {
5095        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5096            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5097            let iter = checkpoint_contents.iter().map(|c| c.transaction);
5098            if reverse {
5099                let iter = iter
5100                    .rev()
5101                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5102                    .skip(usize::from(cursor.is_some()));
5103                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5104            } else {
5105                let iter = iter
5106                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5107                    .skip(usize::from(cursor.is_some()));
5108                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5109            }
5110        }
5111        self.get_indexes()?
5112            .get_transactions(filter, cursor, limit, reverse)
5113    }
5114
5115    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5116        &self.checkpoint_store
5117    }
5118
5119    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5120        self.get_checkpoint_store()
5121            .get_highest_executed_checkpoint_seq_number()?
5122            .ok_or(
5123                SuiErrorKind::UserInputError {
5124                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5125                }
5126                .into(),
5127            )
5128    }
5129
5130    #[cfg(msim)]
5131    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5132        self.database_for_testing()
5133            .perpetual_tables
5134            .get_highest_pruned_checkpoint()
5135            .map(|c| c.unwrap_or(0))
5136            .map_err(Into::into)
5137    }
5138
5139    #[instrument(level = "trace", skip_all)]
5140    pub fn get_checkpoint_summary_by_sequence_number(
5141        &self,
5142        sequence_number: CheckpointSequenceNumber,
5143    ) -> SuiResult<CheckpointSummary> {
5144        let verified_checkpoint = self
5145            .get_checkpoint_store()
5146            .get_checkpoint_by_sequence_number(sequence_number)?;
5147        match verified_checkpoint {
5148            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5149            None => Err(SuiErrorKind::UserInputError {
5150                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5151            }
5152            .into()),
5153        }
5154    }
5155
5156    #[instrument(level = "trace", skip_all)]
5157    pub fn get_checkpoint_summary_by_digest(
5158        &self,
5159        digest: CheckpointDigest,
5160    ) -> SuiResult<CheckpointSummary> {
5161        let verified_checkpoint = self
5162            .get_checkpoint_store()
5163            .get_checkpoint_by_digest(&digest)?;
5164        match verified_checkpoint {
5165            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5166            None => Err(SuiErrorKind::UserInputError {
5167                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5168            }
5169            .into()),
5170        }
5171    }
5172
5173    #[instrument(level = "trace", skip_all)]
5174    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5175        if is_system_package(package_id) {
5176            return self.find_genesis_txn_digest();
5177        }
5178        Ok(self
5179            .get_object_read(&package_id)?
5180            .into_object()?
5181            .previous_transaction)
5182    }
5183
5184    #[instrument(level = "trace", skip_all)]
5185    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5186        let summary = self
5187            .get_verified_checkpoint_by_sequence_number(0)?
5188            .into_message();
5189        let content = self.get_checkpoint_contents(summary.content_digest)?;
5190        let genesis_transaction = content.enumerate_transactions(&summary).next();
5191        Ok(genesis_transaction
5192            .ok_or(SuiErrorKind::UserInputError {
5193                error: UserInputError::GenesisTransactionNotFound,
5194            })?
5195            .1
5196            .transaction)
5197    }
5198
5199    #[instrument(level = "trace", skip_all)]
5200    pub fn get_verified_checkpoint_by_sequence_number(
5201        &self,
5202        sequence_number: CheckpointSequenceNumber,
5203    ) -> SuiResult<VerifiedCheckpoint> {
5204        let verified_checkpoint = self
5205            .get_checkpoint_store()
5206            .get_checkpoint_by_sequence_number(sequence_number)?;
5207        match verified_checkpoint {
5208            Some(verified_checkpoint) => Ok(verified_checkpoint),
5209            None => Err(SuiErrorKind::UserInputError {
5210                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5211            }
5212            .into()),
5213        }
5214    }
5215
5216    #[instrument(level = "trace", skip_all)]
5217    pub fn get_verified_checkpoint_summary_by_digest(
5218        &self,
5219        digest: CheckpointDigest,
5220    ) -> SuiResult<VerifiedCheckpoint> {
5221        let verified_checkpoint = self
5222            .get_checkpoint_store()
5223            .get_checkpoint_by_digest(&digest)?;
5224        match verified_checkpoint {
5225            Some(verified_checkpoint) => Ok(verified_checkpoint),
5226            None => Err(SuiErrorKind::UserInputError {
5227                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5228            }
5229            .into()),
5230        }
5231    }
5232
5233    #[instrument(level = "trace", skip_all)]
5234    pub fn get_checkpoint_contents(
5235        &self,
5236        digest: CheckpointContentsDigest,
5237    ) -> SuiResult<CheckpointContents> {
5238        self.get_checkpoint_store()
5239            .get_checkpoint_contents(&digest)?
5240            .ok_or(
5241                SuiErrorKind::UserInputError {
5242                    error: UserInputError::CheckpointContentsNotFound(digest),
5243                }
5244                .into(),
5245            )
5246    }
5247
5248    #[instrument(level = "trace", skip_all)]
5249    pub fn get_checkpoint_contents_by_sequence_number(
5250        &self,
5251        sequence_number: CheckpointSequenceNumber,
5252    ) -> SuiResult<CheckpointContents> {
5253        let verified_checkpoint = self
5254            .get_checkpoint_store()
5255            .get_checkpoint_by_sequence_number(sequence_number)?;
5256        match verified_checkpoint {
5257            Some(verified_checkpoint) => {
5258                let content_digest = verified_checkpoint.into_inner().content_digest;
5259                self.get_checkpoint_contents(content_digest)
5260            }
5261            None => Err(SuiErrorKind::UserInputError {
5262                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5263            }
5264            .into()),
5265        }
5266    }
5267
5268    #[instrument(level = "trace", skip_all)]
5269    pub async fn query_events(
5270        &self,
5271        kv_store: &Arc<TransactionKeyValueStore>,
5272        query: EventFilter,
5273        // If `Some`, the query will start from the next item after the specified cursor
5274        cursor: Option<EventID>,
5275        limit: usize,
5276        descending: bool,
5277    ) -> SuiResult<Vec<SuiEvent>> {
5278        let index_store = self.get_indexes()?;
5279
5280        //Get the tx_num from tx_digest
5281        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5282            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5283                SuiErrorKind::TransactionNotFound {
5284                    digest: cursor.tx_digest,
5285                },
5286            )?;
5287            (tx_seq, cursor.event_seq as usize)
5288        } else if descending {
5289            (u64::MAX, usize::MAX)
5290        } else {
5291            (0, 0)
5292        };
5293
5294        let limit = limit + 1;
5295        let mut event_keys = match query {
5296            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5297            EventFilter::Transaction(digest) => {
5298                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5299            }
5300            EventFilter::MoveModule { package, module } => {
5301                let module_id = ModuleId::new(package.into(), module);
5302                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5303            }
5304            EventFilter::MoveEventType(struct_name) => index_store
5305                .events_by_move_event_struct_name(
5306                    &struct_name,
5307                    tx_num,
5308                    event_num,
5309                    limit,
5310                    descending,
5311                )?,
5312            EventFilter::Sender(sender) => {
5313                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5314            }
5315            EventFilter::TimeRange {
5316                start_time,
5317                end_time,
5318            } => index_store
5319                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5320            EventFilter::MoveEventModule { package, module } => index_store
5321                .events_by_move_event_module(
5322                    &ModuleId::new(package.into(), module),
5323                    tx_num,
5324                    event_num,
5325                    limit,
5326                    descending,
5327                )?,
5328            // not using "_ =>" because we want to make sure we remember to add new variants here
5329            EventFilter::Any(_) => {
5330                return Err(SuiErrorKind::UserInputError {
5331                    error: UserInputError::Unsupported(
5332                        "'Any' queries are not supported by the fullnode.".to_string(),
5333                    ),
5334                }
5335                .into());
5336            }
5337        };
5338
5339        // skip one event if exclusive cursor is provided,
5340        // otherwise truncate to the original limit.
5341        if cursor.is_some() {
5342            if !event_keys.is_empty() {
5343                event_keys.remove(0);
5344            }
5345        } else {
5346            event_keys.truncate(limit - 1);
5347        }
5348
5349        // get the unique set of digests from the event_keys
5350        let transaction_digests = event_keys
5351            .iter()
5352            .map(|(_, digest, _, _)| *digest)
5353            .collect::<HashSet<_>>()
5354            .into_iter()
5355            .collect::<Vec<_>>();
5356
5357        let events = kv_store
5358            .multi_get_events_by_tx_digests(&transaction_digests)
5359            .await?;
5360
5361        let events_map: HashMap<_, _> =
5362            transaction_digests.iter().zip(events.into_iter()).collect();
5363
5364        let stored_events = event_keys
5365            .into_iter()
5366            .map(|k| {
5367                (
5368                    k,
5369                    events_map
5370                        .get(&k.1)
5371                        .expect("fetched digest is missing")
5372                        .clone()
5373                        .and_then(|e| e.data.get(k.2).cloned()),
5374                )
5375            })
5376            .map(
5377                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5378                    event
5379                        .map(|e| (e, tx_digest, event_seq, timestamp))
5380                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5381                },
5382            )
5383            .collect::<Result<Vec<_>, _>>()?;
5384
5385        let epoch_store = self.load_epoch_store_one_call_per_task();
5386        let backing_store = self.get_backing_package_store().as_ref();
5387        let mut layout_resolver = epoch_store
5388            .executor()
5389            .type_layout_resolver(Box::new(backing_store));
5390        let mut events = vec![];
5391        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5392            events.push(SuiEvent::try_from(
5393                e.clone(),
5394                tx_digest,
5395                event_seq as u64,
5396                Some(timestamp),
5397                layout_resolver.get_annotated_layout(&e.type_)?,
5398            )?)
5399        }
5400        Ok(events)
5401    }
5402
5403    pub async fn insert_genesis_object(&self, object: Object) {
5404        self.get_reconfig_api().insert_genesis_object(object);
5405    }
5406
5407    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5408        futures::future::join_all(
5409            objects
5410                .iter()
5411                .map(|o| self.insert_genesis_object(o.clone())),
5412        )
5413        .await;
5414    }
5415
5416    /// Make a status response for a transaction
5417    #[instrument(level = "trace", skip_all)]
5418    pub fn get_transaction_status(
5419        &self,
5420        transaction_digest: &TransactionDigest,
5421        epoch_store: &Arc<AuthorityPerEpochStore>,
5422    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5423        // TODO: In the case of read path, we should not have to re-sign the effects.
5424        if let Some(effects) =
5425            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5426        {
5427            if let Some(transaction) = self
5428                .get_transaction_cache_reader()
5429                .get_transaction_block(transaction_digest)
5430            {
5431                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5432                let events = if effects.events_digest().is_some() {
5433                    self.get_transaction_events(effects.transaction_digest())?
5434                } else {
5435                    TransactionEvents::default()
5436                };
5437                return Ok(Some((
5438                    (*transaction).clone().into_message(),
5439                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5440                )));
5441            } else {
5442                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5443                // the transaction (during epoch change) in between the above two reads, and we end up
5444                // having effects but not transaction. In this case, we just fall through.
5445                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5446            }
5447        }
5448        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5449            self.metrics.tx_already_processed.inc();
5450            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5451            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5452        } else {
5453            Ok(None)
5454        }
5455    }
5456
5457    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5458    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5459    /// epoch.
5460    #[instrument(level = "trace", skip_all)]
5461    pub fn get_signed_effects_and_maybe_resign(
5462        &self,
5463        transaction_digest: &TransactionDigest,
5464        epoch_store: &Arc<AuthorityPerEpochStore>,
5465    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5466        let effects = self
5467            .get_transaction_cache_reader()
5468            .get_executed_effects(transaction_digest);
5469        match effects {
5470            Some(effects) => {
5471                // If the transaction was executed in previous epochs, the validator will
5472                // re-sign the effects with new current epoch so that a client is always able to
5473                // obtain an effects certificate at the current epoch.
5474                //
5475                // Why is this necessary? Consider the following case:
5476                // - assume there are 4 validators
5477                // - Quorum driver gets 2 signed effects before reconfig halt
5478                // - The tx makes it into final checkpoint.
5479                // - 2 validators go away and are replaced in the new epoch.
5480                // - The new epoch begins.
5481                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5482                //   because it may not be able to reach either of the 2 former validators.
5483                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5484                //   epoch, the QD can make a new effects cert and return it to the client.
5485                //
5486                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5487                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5488                // the case above, the Quorum Driver would return a proof of inclusion in the final
5489                // checkpoint, and this code would no longer be necessary.
5490                if effects.executed_epoch() != epoch_store.epoch() {
5491                    debug!(
5492                        tx_digest=?transaction_digest,
5493                        effects_epoch=?effects.executed_epoch(),
5494                        epoch=?epoch_store.epoch(),
5495                        "Re-signing the effects with the current epoch"
5496                    );
5497                }
5498                Ok(Some(self.sign_effects(effects, epoch_store)?))
5499            }
5500            None => Ok(None),
5501        }
5502    }
5503
5504    #[instrument(level = "trace", skip_all)]
5505    pub(crate) fn sign_effects(
5506        &self,
5507        effects: TransactionEffects,
5508        epoch_store: &Arc<AuthorityPerEpochStore>,
5509    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5510        let tx_digest = *effects.transaction_digest();
5511        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5512            Some(sig) => {
5513                debug_assert!(sig.epoch == epoch_store.epoch());
5514                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5515            }
5516            _ => {
5517                let sig = AuthoritySignInfo::new(
5518                    epoch_store.epoch(),
5519                    &effects,
5520                    Intent::sui_app(IntentScope::TransactionEffects),
5521                    self.name,
5522                    &*self.secret,
5523                );
5524
5525                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5526
5527                epoch_store.insert_effects_digest_and_signature(
5528                    &tx_digest,
5529                    effects.digest(),
5530                    &sig,
5531                )?;
5532
5533                effects
5534            }
5535        };
5536
5537        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5538            signed_effects,
5539        ))
5540    }
5541
5542    // Returns coin objects for indexing for fullnode if indexing is enabled.
5543    #[instrument(level = "trace", skip_all)]
5544    fn fullnode_only_get_tx_coins_for_indexing(
5545        name: AuthorityName,
5546        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5547        effects: &TransactionEffects,
5548        inner_temporary_store: &InnerTemporaryStore,
5549        epoch_store: &Arc<AuthorityPerEpochStore>,
5550    ) -> Option<TxCoins> {
5551        if epoch_store.committee().authority_exists(&name) {
5552            return None;
5553        }
5554        let written_coin_objects = inner_temporary_store
5555            .written
5556            .iter()
5557            .filter_map(|(k, v)| {
5558                if v.is_coin() {
5559                    Some((*k, v.clone()))
5560                } else {
5561                    None
5562                }
5563            })
5564            .collect();
5565        let mut input_coin_objects = inner_temporary_store
5566            .input_objects
5567            .iter()
5568            .filter_map(|(k, v)| {
5569                if v.is_coin() {
5570                    Some((*k, v.clone()))
5571                } else {
5572                    None
5573                }
5574            })
5575            .collect::<ObjectMap>();
5576
5577        // Check for receiving objects that were actually used and modified during execution. Their
5578        // updated version will already showup in "written_coins" but their input isn't included in
5579        // the set of input objects in a inner_temporary_store.
5580        for (object_id, version) in effects.modified_at_versions() {
5581            if inner_temporary_store
5582                .loaded_runtime_objects
5583                .contains_key(&object_id)
5584                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5585                && object.is_coin()
5586            {
5587                input_coin_objects.insert(object_id, object);
5588            }
5589        }
5590
5591        Some((input_coin_objects, written_coin_objects))
5592    }
5593
5594    /// Get the TransactionEnvelope that currently locks the given object, if any.
5595    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5596    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5597    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5598    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5599    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5600    ///     or cannot find the transaction in transaction table, because of data race etc.
5601    #[instrument(level = "trace", skip_all)]
5602    pub async fn get_transaction_lock(
5603        &self,
5604        object_ref: &ObjectRef,
5605        epoch_store: &AuthorityPerEpochStore,
5606    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5607        let lock_info = self
5608            .get_object_cache_reader()
5609            .get_lock(*object_ref, epoch_store)?;
5610        let lock_info = match lock_info {
5611            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5612                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5613                    provided_obj_ref: *object_ref,
5614                    current_version: locked_ref.1,
5615                }
5616                .into());
5617            }
5618            ObjectLockStatus::Initialized => {
5619                return Ok(None);
5620            }
5621            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5622        };
5623
5624        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5625    }
5626
5627    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5628        self.get_object_cache_reader().get_objects(objects)
5629    }
5630
5631    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5632        self.get_object_cache_reader()
5633            .get_latest_object_ref_or_tombstone(object_id)
5634    }
5635
5636    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5637    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5638    ///
5639    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5640    /// will go through with only 2f+1 votes.
5641    ///
5642    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5643    /// value), or you risk halting the chain.
5644    pub fn set_override_protocol_upgrade_buffer_stake(
5645        &self,
5646        expected_epoch: EpochId,
5647        buffer_stake_bps: u64,
5648    ) -> SuiResult {
5649        let epoch_store = self.load_epoch_store_one_call_per_task();
5650        let actual_epoch = epoch_store.epoch();
5651        if actual_epoch != expected_epoch {
5652            return Err(SuiErrorKind::WrongEpoch {
5653                expected_epoch,
5654                actual_epoch,
5655            }
5656            .into());
5657        }
5658
5659        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5660    }
5661
5662    pub fn clear_override_protocol_upgrade_buffer_stake(
5663        &self,
5664        expected_epoch: EpochId,
5665    ) -> SuiResult {
5666        let epoch_store = self.load_epoch_store_one_call_per_task();
5667        let actual_epoch = epoch_store.epoch();
5668        if actual_epoch != expected_epoch {
5669            return Err(SuiErrorKind::WrongEpoch {
5670                expected_epoch,
5671                actual_epoch,
5672            }
5673            .into());
5674        }
5675
5676        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5677    }
5678
5679    /// Get the set of system packages that are compiled in to this build, if those packages are
5680    /// compatible with the current versions of those packages on-chain.
5681    pub async fn get_available_system_packages(
5682        &self,
5683        binary_config: &BinaryConfig,
5684    ) -> Vec<ObjectRef> {
5685        let mut results = vec![];
5686
5687        let system_packages = BuiltInFramework::iter_system_packages();
5688
5689        // Add extra framework packages during simtest
5690        #[cfg(msim)]
5691        let extra_packages = framework_injection::get_extra_packages(self.name);
5692        #[cfg(msim)]
5693        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5694
5695        for system_package in system_packages {
5696            let modules = system_package.modules().to_vec();
5697            // In simtests, we could override the current built-in framework packages.
5698            #[cfg(msim)]
5699            let modules =
5700                match framework_injection::get_override_modules(&system_package.id, self.name) {
5701                    Some(overrides) if overrides.is_empty() => continue,
5702                    Some(overrides) => overrides,
5703                    None => modules,
5704                };
5705
5706            let Some(obj_ref) = sui_framework::compare_system_package(
5707                &self.get_object_store(),
5708                &system_package.id,
5709                &modules,
5710                system_package.dependencies.to_vec(),
5711                binary_config,
5712            )
5713            .await
5714            else {
5715                return vec![];
5716            };
5717            results.push(obj_ref);
5718        }
5719
5720        results
5721    }
5722
5723    /// Return the new versions, module bytes, and dependencies for the packages that have been
5724    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5725    /// the binary, and performs the following checks:
5726    ///
5727    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5728    ///   required, and its contents are omitted from the output.
5729    /// - Whether the contents in the binary can form a package whose digest matches the input,
5730    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5731    ///   which case the contents are included in the output.
5732    ///
5733    /// If the current version of the framework can't be loaded, the binary does not contain the
5734    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5735    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5736    async fn get_system_package_bytes(
5737        &self,
5738        system_packages: Vec<ObjectRef>,
5739        binary_config: &BinaryConfig,
5740    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5741        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5742        let objects = self.get_objects(&ids).await;
5743
5744        let mut res = Vec::with_capacity(system_packages.len());
5745        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5746            let prev_transaction = match object {
5747                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5748                    // Skip this one because it doesn't need to be upgraded.
5749                    info!("Framework {} does not need updating", system_package_ref.0);
5750                    continue;
5751                }
5752
5753                Some(cur_object) => cur_object.previous_transaction,
5754                None => TransactionDigest::genesis_marker(),
5755            };
5756
5757            #[cfg(msim)]
5758            let SystemPackage {
5759                id: _,
5760                bytes,
5761                dependencies,
5762            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5763                .unwrap_or_else(|| {
5764                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5765                });
5766
5767            #[cfg(not(msim))]
5768            let SystemPackage {
5769                id: _,
5770                bytes,
5771                dependencies,
5772            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5773
5774            let modules: Vec<_> = bytes
5775                .iter()
5776                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5777                .collect();
5778
5779            let new_object = Object::new_system_package(
5780                &modules,
5781                system_package_ref.1,
5782                dependencies.clone(),
5783                prev_transaction,
5784            );
5785
5786            let new_ref = new_object.compute_object_reference();
5787            if new_ref != system_package_ref {
5788                if cfg!(msim) {
5789                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5790                    debug_fatal!(
5791                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5792                    );
5793                } else {
5794                    error!(
5795                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5796                    );
5797                }
5798                return None;
5799            }
5800
5801            res.push((system_package_ref.1, bytes, dependencies));
5802        }
5803
5804        Some(res)
5805    }
5806
5807    fn is_protocol_version_supported_v2(
5808        current_protocol_version: ProtocolVersion,
5809        proposed_protocol_version: ProtocolVersion,
5810        protocol_config: &ProtocolConfig,
5811        committee: &Committee,
5812        capabilities: Vec<AuthorityCapabilitiesV2>,
5813        mut buffer_stake_bps: u64,
5814    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5815        if proposed_protocol_version > current_protocol_version + 1
5816            && !protocol_config.advance_to_highest_supported_protocol_version()
5817        {
5818            return None;
5819        }
5820
5821        if buffer_stake_bps > 10000 {
5822            warn!("clamping buffer_stake_bps to 10000");
5823            buffer_stake_bps = 10000;
5824        }
5825
5826        // For each validator, gather the protocol version and system packages that it would like
5827        // to upgrade to in the next epoch.
5828        let mut desired_upgrades: Vec<_> = capabilities
5829            .into_iter()
5830            .filter_map(|mut cap| {
5831                // A validator that lists no packages is voting against any change at all.
5832                if cap.available_system_packages.is_empty() {
5833                    return None;
5834                }
5835
5836                cap.available_system_packages.sort();
5837
5838                info!(
5839                    "validator {:?} supports {:?} with system packages: {:?}",
5840                    cap.authority.concise(),
5841                    cap.supported_protocol_versions,
5842                    cap.available_system_packages,
5843                );
5844
5845                // A validator that only supports the current protocol version is also voting
5846                // against any change, because framework upgrades always require a protocol version
5847                // bump.
5848                cap.supported_protocol_versions
5849                    .get_version_digest(proposed_protocol_version)
5850                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5851            })
5852            .collect();
5853
5854        // There can only be one set of votes that have a majority, find one if it exists.
5855        desired_upgrades.sort();
5856        desired_upgrades
5857            .into_iter()
5858            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5859            .into_iter()
5860            .find_map(|((digest, packages), group)| {
5861                // should have been filtered out earlier.
5862                assert!(!packages.is_empty());
5863
5864                let mut stake_aggregator: StakeAggregator<(), true> =
5865                    StakeAggregator::new(Arc::new(committee.clone()));
5866
5867                for (_, _, authority) in group {
5868                    stake_aggregator.insert_generic(authority, ());
5869                }
5870
5871                let total_votes = stake_aggregator.total_votes();
5872                let quorum_threshold = committee.quorum_threshold();
5873                let f = committee.total_votes() - committee.quorum_threshold();
5874
5875                // multiple by buffer_stake_bps / 10000, rounded up.
5876                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5877                let effective_threshold = quorum_threshold + buffer_stake;
5878
5879                info!(
5880                    protocol_config_digest = ?digest,
5881                    ?total_votes,
5882                    ?quorum_threshold,
5883                    ?buffer_stake_bps,
5884                    ?effective_threshold,
5885                    ?proposed_protocol_version,
5886                    ?packages,
5887                    "support for upgrade"
5888                );
5889
5890                let has_support = total_votes >= effective_threshold;
5891                has_support.then_some((proposed_protocol_version, packages))
5892            })
5893    }
5894
5895    fn choose_protocol_version_and_system_packages_v2(
5896        current_protocol_version: ProtocolVersion,
5897        protocol_config: &ProtocolConfig,
5898        committee: &Committee,
5899        capabilities: Vec<AuthorityCapabilitiesV2>,
5900        buffer_stake_bps: u64,
5901    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5902        assert!(protocol_config.authority_capabilities_v2());
5903        let mut next_protocol_version = current_protocol_version;
5904        let mut system_packages = vec![];
5905
5906        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5907            current_protocol_version,
5908            next_protocol_version + 1,
5909            protocol_config,
5910            committee,
5911            capabilities.clone(),
5912            buffer_stake_bps,
5913        ) {
5914            next_protocol_version = version;
5915            system_packages = packages;
5916        }
5917
5918        (next_protocol_version, system_packages)
5919    }
5920
5921    #[instrument(level = "debug", skip_all)]
5922    fn create_authenticator_state_tx(
5923        &self,
5924        epoch_store: &Arc<AuthorityPerEpochStore>,
5925    ) -> Option<EndOfEpochTransactionKind> {
5926        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5927            info!("authenticator state transactions not enabled");
5928            return None;
5929        }
5930
5931        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5932        let tx = if authenticator_state_exists {
5933            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5934            let min_epoch =
5935                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5936            let authenticator_obj_initial_shared_version = epoch_store
5937                .epoch_start_config()
5938                .authenticator_obj_initial_shared_version()
5939                .expect("initial version must exist");
5940
5941            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5942                min_epoch,
5943                authenticator_obj_initial_shared_version,
5944            );
5945
5946            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5947
5948            tx
5949        } else {
5950            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5951            info!("Creating AuthenticatorStateCreate tx");
5952            tx
5953        };
5954        Some(tx)
5955    }
5956
5957    #[instrument(level = "debug", skip_all)]
5958    fn create_randomness_state_tx(
5959        &self,
5960        epoch_store: &Arc<AuthorityPerEpochStore>,
5961    ) -> Option<EndOfEpochTransactionKind> {
5962        if !epoch_store.protocol_config().random_beacon() {
5963            info!("randomness state transactions not enabled");
5964            return None;
5965        }
5966
5967        if epoch_store.randomness_state_exists() {
5968            return None;
5969        }
5970
5971        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5972        info!("Creating RandomnessStateCreate tx");
5973        Some(tx)
5974    }
5975
5976    #[instrument(level = "debug", skip_all)]
5977    fn create_accumulator_root_tx(
5978        &self,
5979        epoch_store: &Arc<AuthorityPerEpochStore>,
5980    ) -> Option<EndOfEpochTransactionKind> {
5981        if !epoch_store
5982            .protocol_config()
5983            .create_root_accumulator_object()
5984        {
5985            info!("accumulator root creation not enabled");
5986            return None;
5987        }
5988
5989        if epoch_store.accumulator_root_exists() {
5990            return None;
5991        }
5992
5993        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5994        info!("Creating AccumulatorRootCreate tx");
5995        Some(tx)
5996    }
5997
5998    #[instrument(level = "debug", skip_all)]
5999    fn create_write_accumulator_storage_cost_tx(
6000        &self,
6001        epoch_store: &Arc<AuthorityPerEpochStore>,
6002    ) -> Option<EndOfEpochTransactionKind> {
6003        if !epoch_store.accumulator_root_exists() {
6004            info!("accumulator root does not exist yet");
6005            return None;
6006        }
6007        if !epoch_store.protocol_config().enable_accumulators() {
6008            info!("accumulators not enabled");
6009            return None;
6010        }
6011
6012        let object_store = self.get_object_store();
6013        let object_count =
6014            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6015                Ok(Some(count)) => count,
6016                Ok(None) => return None,
6017                Err(e) => {
6018                    fatal!("failed to read accumulator object count: {e}");
6019                }
6020            };
6021
6022        let storage_cost = object_count.saturating_mul(
6023            epoch_store
6024                .protocol_config()
6025                .accumulator_object_storage_cost(),
6026        );
6027
6028        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6029        info!(
6030            object_count,
6031            storage_cost, "Creating WriteAccumulatorStorageCost tx"
6032        );
6033        Some(tx)
6034    }
6035
6036    #[instrument(level = "debug", skip_all)]
6037    fn create_coin_registry_tx(
6038        &self,
6039        epoch_store: &Arc<AuthorityPerEpochStore>,
6040    ) -> Option<EndOfEpochTransactionKind> {
6041        if !epoch_store.protocol_config().enable_coin_registry() {
6042            info!("coin registry not enabled");
6043            return None;
6044        }
6045
6046        if epoch_store.coin_registry_exists() {
6047            return None;
6048        }
6049
6050        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6051        info!("Creating CoinRegistryCreate tx");
6052        Some(tx)
6053    }
6054
6055    #[instrument(level = "debug", skip_all)]
6056    fn create_display_registry_tx(
6057        &self,
6058        epoch_store: &Arc<AuthorityPerEpochStore>,
6059    ) -> Option<EndOfEpochTransactionKind> {
6060        if !epoch_store.protocol_config().enable_display_registry() {
6061            info!("display registry not enabled");
6062            return None;
6063        }
6064
6065        if epoch_store.display_registry_exists() {
6066            return None;
6067        }
6068
6069        let tx = EndOfEpochTransactionKind::new_display_registry_create();
6070        info!("Creating DisplayRegistryCreate tx");
6071        Some(tx)
6072    }
6073
6074    #[instrument(level = "debug", skip_all)]
6075    fn create_bridge_tx(
6076        &self,
6077        epoch_store: &Arc<AuthorityPerEpochStore>,
6078    ) -> Option<EndOfEpochTransactionKind> {
6079        if !epoch_store.protocol_config().enable_bridge() {
6080            info!("bridge not enabled");
6081            return None;
6082        }
6083        if epoch_store.bridge_exists() {
6084            return None;
6085        }
6086        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6087        info!("Creating Bridge Create tx");
6088        Some(tx)
6089    }
6090
6091    #[instrument(level = "debug", skip_all)]
6092    fn init_bridge_committee_tx(
6093        &self,
6094        epoch_store: &Arc<AuthorityPerEpochStore>,
6095    ) -> Option<EndOfEpochTransactionKind> {
6096        if !epoch_store.protocol_config().enable_bridge() {
6097            info!("bridge not enabled");
6098            return None;
6099        }
6100        if !epoch_store
6101            .protocol_config()
6102            .should_try_to_finalize_bridge_committee()
6103        {
6104            info!("should not try to finalize bridge committee yet");
6105            return None;
6106        }
6107        // Only create this transaction if bridge exists
6108        if !epoch_store.bridge_exists() {
6109            return None;
6110        }
6111
6112        if epoch_store.bridge_committee_initiated() {
6113            return None;
6114        }
6115
6116        let bridge_initial_shared_version = epoch_store
6117            .epoch_start_config()
6118            .bridge_obj_initial_shared_version()
6119            .expect("initial version must exist");
6120        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6121        info!("Init Bridge committee tx");
6122        Some(tx)
6123    }
6124
6125    #[instrument(level = "debug", skip_all)]
6126    fn create_deny_list_state_tx(
6127        &self,
6128        epoch_store: &Arc<AuthorityPerEpochStore>,
6129    ) -> Option<EndOfEpochTransactionKind> {
6130        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6131            return None;
6132        }
6133
6134        if epoch_store.coin_deny_list_state_exists() {
6135            return None;
6136        }
6137
6138        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6139        info!("Creating DenyListStateCreate tx");
6140        Some(tx)
6141    }
6142
6143    #[instrument(level = "debug", skip_all)]
6144    fn create_address_alias_state_tx(
6145        &self,
6146        epoch_store: &Arc<AuthorityPerEpochStore>,
6147    ) -> Option<EndOfEpochTransactionKind> {
6148        if !epoch_store.protocol_config().address_aliases() {
6149            info!("address aliases not enabled");
6150            return None;
6151        }
6152
6153        if epoch_store.address_alias_state_exists() {
6154            return None;
6155        }
6156
6157        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6158        info!("Creating AddressAliasStateCreate tx");
6159        Some(tx)
6160    }
6161
6162    #[instrument(level = "debug", skip_all)]
6163    fn create_execution_time_observations_tx(
6164        &self,
6165        epoch_store: &Arc<AuthorityPerEpochStore>,
6166        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6167        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6168    ) -> Option<EndOfEpochTransactionKind> {
6169        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6170            .protocol_config()
6171            .per_object_congestion_control_mode()
6172        else {
6173            return None;
6174        };
6175
6176        // Load tx in the last N checkpoints before end-of-epoch, and save only the
6177        // execution time observations for commands in these checkpoints.
6178        let start_checkpoint = std::cmp::max(
6179            last_checkpoint_before_end_of_epoch
6180                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6181            // If we have <N checkpoints in the epoch, use all of them.
6182            epoch_store
6183                .epoch()
6184                .checked_sub(1)
6185                .map(|prev_epoch| {
6186                    self.checkpoint_store
6187                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
6188                        .expect("typed store must not fail")
6189                        .expect(
6190                            "sequence number of last checkpoint of preceding epoch must be saved",
6191                        )
6192                        + 1
6193                })
6194                .unwrap_or(1),
6195        );
6196        info!(
6197            "reading checkpoint range {:?}..={:?}",
6198            start_checkpoint, last_checkpoint_before_end_of_epoch
6199        );
6200        let sequence_numbers =
6201            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6202        let contents_digests: Vec<_> = self
6203            .checkpoint_store
6204            .multi_get_locally_computed_checkpoints(&sequence_numbers)
6205            .expect("typed store must not fail")
6206            .into_iter()
6207            .zip(sequence_numbers)
6208            .map(|(maybe_checkpoint, sequence_number)| {
6209                if let Some(checkpoint) = maybe_checkpoint {
6210                    checkpoint.content_digest
6211                } else {
6212                    // If locally computed checkpoint summary was already pruned, load the
6213                    // certified checkpoint.
6214                    self.checkpoint_store
6215                        .get_checkpoint_by_sequence_number(sequence_number)
6216                        .expect("typed store must not fail")
6217                        .unwrap_or_else(|| {
6218                            fatal!("preceding checkpoints must exist by end of epoch")
6219                        })
6220                        .data()
6221                        .content_digest
6222                }
6223            })
6224            .collect();
6225        let tx_digests: Vec<_> = self
6226            .checkpoint_store
6227            .multi_get_checkpoint_content(&contents_digests)
6228            .expect("typed store must not fail")
6229            .into_iter()
6230            .flat_map(|maybe_contents| {
6231                maybe_contents
6232                    .expect("preceding checkpoint contents must exist by end of epoch")
6233                    .into_inner()
6234                    .into_iter()
6235                    .map(|digests| digests.transaction)
6236            })
6237            .collect();
6238        let included_execution_time_observations: HashSet<_> = self
6239            .get_transaction_cache_reader()
6240            .multi_get_transaction_blocks(&tx_digests)
6241            .into_iter()
6242            .flat_map(|maybe_tx| {
6243                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6244                    .expect("preceding transaction must exist by end of epoch")
6245                    .transaction_data()
6246                    .kind()
6247                {
6248                    #[allow(clippy::unnecessary_to_owned)]
6249                    itertools::Either::Left(
6250                        ptb.commands
6251                            .to_owned()
6252                            .into_iter()
6253                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6254                    )
6255                } else {
6256                    itertools::Either::Right(std::iter::empty())
6257                }
6258            })
6259            .chain(end_of_epoch_observation_keys.into_iter())
6260            .collect();
6261
6262        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6263            epoch_store
6264                .get_end_of_epoch_execution_time_observations()
6265                .filter_and_sort_v1(
6266                    |(key, _)| included_execution_time_observations.contains(key),
6267                    params.stored_observations_limit.try_into().unwrap(),
6268                ),
6269        );
6270        info!("Creating StoreExecutionTimeObservations tx");
6271        Some(tx)
6272    }
6273
6274    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6275    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6276    /// formed and executed by CheckpointExecutor.
6277    ///
6278    /// When a framework upgraded has been decided on, but the validator does not have the new
6279    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6280    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6281    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6282    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6283    /// by CheckpointExecutor.
6284    #[instrument(level = "error", skip_all)]
6285    pub async fn create_and_execute_advance_epoch_tx(
6286        &self,
6287        epoch_store: &Arc<AuthorityPerEpochStore>,
6288        gas_cost_summary: &GasCostSummary,
6289        checkpoint: CheckpointSequenceNumber,
6290        epoch_start_timestamp_ms: CheckpointTimestamp,
6291        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6292        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6293        // >1 checkpoint.
6294        last_checkpoint: CheckpointSequenceNumber,
6295    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6296        let mut txns = Vec::new();
6297
6298        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6299            txns.push(tx);
6300        }
6301        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6302            txns.push(tx);
6303        }
6304        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6305            txns.push(tx);
6306        }
6307        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6308            txns.push(tx);
6309        }
6310        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6311            txns.push(tx);
6312        }
6313        if let Some(tx) = self.create_execution_time_observations_tx(
6314            epoch_store,
6315            end_of_epoch_observation_keys,
6316            last_checkpoint,
6317        ) {
6318            txns.push(tx);
6319        }
6320        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6321            txns.push(tx);
6322        }
6323
6324        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6325            txns.push(tx);
6326        }
6327        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6328            txns.push(tx);
6329        }
6330        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6331            txns.push(tx);
6332        }
6333        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6334            txns.push(tx);
6335        }
6336
6337        let next_epoch = epoch_store.epoch() + 1;
6338
6339        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6340
6341        let (next_epoch_protocol_version, next_epoch_system_packages) =
6342            Self::choose_protocol_version_and_system_packages_v2(
6343                epoch_store.protocol_version(),
6344                epoch_store.protocol_config(),
6345                epoch_store.committee(),
6346                epoch_store
6347                    .get_capabilities_v2()
6348                    .expect("read capabilities from db cannot fail"),
6349                buffer_stake_bps,
6350            );
6351
6352        // since system packages are created during the current epoch, they should abide by the
6353        // rules of the current epoch, including the current epoch's max Move binary format version
6354        let config = epoch_store.protocol_config();
6355        let binary_config = config.binary_config(None);
6356        let Some(next_epoch_system_package_bytes) = self
6357            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6358            .await
6359        else {
6360            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6361                // This case should only be hit if the validator supports the new protocol version,
6362                // but carries a different framework. The validator should still be able to
6363                // reconfigure, as the correct framework will be installed by the change epoch txn.
6364                debug_fatal!(
6365                    "upgraded system packages {:?} are not locally available, cannot create \
6366                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6367                    next_epoch_system_packages
6368                );
6369            } else {
6370                error!(
6371                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6372                    next_epoch_protocol_version
6373                );
6374            }
6375            // the checkpoint builder will keep retrying forever when it hits this error.
6376            // Eventually, one of two things will happen:
6377            // - The operator will upgrade this binary to one that has the new packages locally,
6378            //   and this function will succeed.
6379            // - The final checkpoint will be certified by other validators, we will receive it via
6380            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6381            //   and most likely shut down in the new epoch (this validator likely doesn't support
6382            //   the new protocol version, or else it should have had the packages.)
6383            return Err(CheckpointBuilderError::SystemPackagesMissing);
6384        };
6385
6386        let tx = if epoch_store
6387            .protocol_config()
6388            .end_of_epoch_transaction_supported()
6389        {
6390            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6391                next_epoch,
6392                next_epoch_protocol_version,
6393                gas_cost_summary.storage_cost,
6394                gas_cost_summary.computation_cost,
6395                gas_cost_summary.storage_rebate,
6396                gas_cost_summary.non_refundable_storage_fee,
6397                epoch_start_timestamp_ms,
6398                next_epoch_system_package_bytes,
6399            ));
6400
6401            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6402        } else {
6403            VerifiedTransaction::new_change_epoch(
6404                next_epoch,
6405                next_epoch_protocol_version,
6406                gas_cost_summary.storage_cost,
6407                gas_cost_summary.computation_cost,
6408                gas_cost_summary.storage_rebate,
6409                gas_cost_summary.non_refundable_storage_fee,
6410                epoch_start_timestamp_ms,
6411                next_epoch_system_package_bytes,
6412            )
6413        };
6414
6415        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6416            tx.clone(),
6417            epoch_store.epoch(),
6418            checkpoint,
6419        );
6420
6421        let tx_digest = executable_tx.digest();
6422
6423        info!(
6424            ?next_epoch,
6425            ?next_epoch_protocol_version,
6426            ?next_epoch_system_packages,
6427            computation_cost=?gas_cost_summary.computation_cost,
6428            storage_cost=?gas_cost_summary.storage_cost,
6429            storage_rebate=?gas_cost_summary.storage_rebate,
6430            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6431            ?tx_digest,
6432            "Creating advance epoch transaction"
6433        );
6434
6435        fail_point_async!("change_epoch_tx_delay");
6436        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6437
6438        // The tx could have been executed by state sync already - if so simply return an error.
6439        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6440        if self
6441            .get_transaction_cache_reader()
6442            .is_tx_already_executed(tx_digest)
6443        {
6444            warn!("change epoch tx has already been executed via state sync");
6445            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6446        }
6447
6448        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6449        else {
6450            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6451        };
6452
6453        // We must manually assign the shared object versions to the transaction before executing it.
6454        // This is because we do not sequence end-of-epoch transactions through consensus.
6455        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6456            self.get_object_cache_reader().as_ref(),
6457            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6458        )?;
6459
6460        assert_eq!(assigned_versions.0.len(), 1);
6461        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6462
6463        let input_objects = self.read_objects_for_execution(
6464            &tx_lock,
6465            &executable_tx,
6466            &assigned_versions,
6467            epoch_store,
6468        )?;
6469
6470        let (transaction_outputs, _timings, _execution_error_opt) = self
6471            .execute_certificate(
6472                &execution_guard,
6473                &executable_tx,
6474                input_objects,
6475                None,
6476                ExecutionEnv::default(),
6477                epoch_store,
6478            )
6479            .unwrap();
6480        let system_obj = get_sui_system_state(&transaction_outputs.written)
6481            .expect("change epoch tx must write to system object");
6482
6483        let effects = transaction_outputs.effects;
6484        // We must write tx and effects to the state sync tables so that state sync is able to
6485        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6486        // checkpoint.
6487        self.get_state_sync_store()
6488            .insert_transaction_and_effects(&tx, &effects);
6489
6490        info!(
6491            "Effects summary of the change epoch transaction: {:?}",
6492            effects.summary_for_debug()
6493        );
6494        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6495        // The change epoch transaction cannot fail to execute.
6496        assert!(effects.status().is_ok());
6497        Ok((system_obj, effects))
6498    }
6499
6500    #[instrument(level = "error", skip_all)]
6501    async fn reopen_epoch_db(
6502        &self,
6503        cur_epoch_store: &AuthorityPerEpochStore,
6504        new_committee: Committee,
6505        epoch_start_configuration: EpochStartConfiguration,
6506        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6507        epoch_last_checkpoint: CheckpointSequenceNumber,
6508    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6509        let new_epoch = new_committee.epoch;
6510        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6511        assert_eq!(
6512            epoch_start_configuration.epoch_start_state().epoch(),
6513            new_committee.epoch
6514        );
6515        fail_point!("before-open-new-epoch-store");
6516        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6517            self.name,
6518            new_committee,
6519            epoch_start_configuration,
6520            self.get_backing_package_store().clone(),
6521            self.get_object_store().clone(),
6522            expensive_safety_check_config,
6523            epoch_last_checkpoint,
6524        )?;
6525        self.epoch_store.store(new_epoch_store.clone());
6526        Ok(new_epoch_store)
6527    }
6528
6529    #[cfg(test)]
6530    pub(crate) fn iter_live_object_set_for_testing(
6531        &self,
6532    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6533        let include_wrapped_object = !self
6534            .epoch_store_for_testing()
6535            .protocol_config()
6536            .simplified_unwrap_then_delete();
6537        self.get_global_state_hash_store()
6538            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6539    }
6540
6541    #[cfg(test)]
6542    pub(crate) fn shutdown_execution_for_test(&self) {
6543        self.tx_execution_shutdown
6544            .lock()
6545            .take()
6546            .unwrap()
6547            .send(())
6548            .unwrap();
6549    }
6550
6551    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6552    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6553        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6554        self.get_object_cache_reader()
6555            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6556        self.get_reconfig_api()
6557            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6558        Ok(())
6559    }
6560}
6561
6562pub struct RandomnessRoundReceiver {
6563    authority_state: Arc<AuthorityState>,
6564    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6565}
6566
6567impl RandomnessRoundReceiver {
6568    pub fn spawn(
6569        authority_state: Arc<AuthorityState>,
6570        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6571    ) -> JoinHandle<()> {
6572        let rrr = RandomnessRoundReceiver {
6573            authority_state,
6574            randomness_rx,
6575        };
6576        spawn_monitored_task!(rrr.run())
6577    }
6578
6579    async fn run(mut self) {
6580        info!("RandomnessRoundReceiver event loop started");
6581
6582        loop {
6583            tokio::select! {
6584                maybe_recv = self.randomness_rx.recv() => {
6585                    if let Some((epoch, round, bytes)) = maybe_recv {
6586                        self.handle_new_randomness(epoch, round, bytes).await;
6587                    } else {
6588                        break;
6589                    }
6590                },
6591            }
6592        }
6593
6594        info!("RandomnessRoundReceiver event loop ended");
6595    }
6596
6597    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6598    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6599        fail_point_async!("randomness-delay");
6600
6601        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6602        if epoch_store.epoch() != epoch {
6603            warn!(
6604                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6605                epoch_store.epoch()
6606            );
6607            return;
6608        }
6609        let key = TransactionKey::RandomnessRound(epoch, round);
6610        let transaction = VerifiedTransaction::new_randomness_state_update(
6611            epoch,
6612            round,
6613            bytes,
6614            epoch_store
6615                .epoch_start_config()
6616                .randomness_obj_initial_shared_version()
6617                .expect("randomness state obj must exist"),
6618        );
6619        debug!(
6620            "created randomness state update transaction with digest: {:?}",
6621            transaction.digest()
6622        );
6623        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6624        let digest = *transaction.digest();
6625
6626        // Randomness state updates contain the full bls signature for the random round,
6627        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6628        // persist this transaction. If we crash before its outputs are committed, this
6629        // ensures we will be able to re-execute it.
6630        self.authority_state
6631            .get_cache_commit()
6632            .persist_transaction(&transaction);
6633
6634        // Notify the scheduler that the transaction key now has a known digest
6635        if epoch_store.insert_tx_key(key, digest).is_err() {
6636            warn!("epoch ended while handling new randomness");
6637        }
6638
6639        let authority_state = self.authority_state.clone();
6640        spawn_monitored_task!(async move {
6641            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6642            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6643            // output of the RandomnessStateUpdate from the previous round.)
6644            //
6645            // We set a very long timeout so that in case this gets stuck for some reason, the
6646            // validator will eventually crash rather than continuing in a zombie mode.
6647            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6648            let result = tokio::time::timeout(
6649                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6650                authority_state
6651                    .get_transaction_cache_reader()
6652                    .notify_read_executed_effects(
6653                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6654                        &[digest],
6655                    ),
6656            )
6657            .await;
6658            let mut effects = match result {
6659                Ok(result) => result,
6660                Err(_) => {
6661                    // Crash on randomness update execution timeout in debug builds.
6662                    debug_fatal_no_invariant!(
6663                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6664                    );
6665                    // Continue waiting as long as necessary in non-debug builds.
6666                    authority_state
6667                        .get_transaction_cache_reader()
6668                        .notify_read_executed_effects(
6669                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6670                            &[digest],
6671                        )
6672                        .await
6673                }
6674            };
6675
6676            let effects = effects.pop().expect("should return effects");
6677            if *effects.status() != ExecutionStatus::Success {
6678                fatal!(
6679                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6680                );
6681            }
6682            debug!(
6683                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6684            );
6685        });
6686    }
6687}
6688
6689#[async_trait]
6690impl TransactionKeyValueStoreTrait for AuthorityState {
6691    #[instrument(skip(self))]
6692    async fn multi_get(
6693        &self,
6694        transactions: &[TransactionDigest],
6695        effects: &[TransactionDigest],
6696    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6697        let txns = if !transactions.is_empty() {
6698            self.get_transaction_cache_reader()
6699                .multi_get_transaction_blocks(transactions)
6700                .into_iter()
6701                .map(|t| t.map(|t| (*t).clone().into_inner()))
6702                .collect()
6703        } else {
6704            vec![]
6705        };
6706
6707        let fx = if !effects.is_empty() {
6708            self.get_transaction_cache_reader()
6709                .multi_get_executed_effects(effects)
6710        } else {
6711            vec![]
6712        };
6713
6714        Ok((txns, fx))
6715    }
6716
6717    #[instrument(skip(self))]
6718    async fn multi_get_checkpoints(
6719        &self,
6720        checkpoint_summaries: &[CheckpointSequenceNumber],
6721        checkpoint_contents: &[CheckpointSequenceNumber],
6722        checkpoint_summaries_by_digest: &[CheckpointDigest],
6723    ) -> SuiResult<(
6724        Vec<Option<CertifiedCheckpointSummary>>,
6725        Vec<Option<CheckpointContents>>,
6726        Vec<Option<CertifiedCheckpointSummary>>,
6727    )> {
6728        // TODO: use multi-get methods if it ever becomes important (unlikely)
6729        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6730        let store = self.get_checkpoint_store();
6731        for seq in checkpoint_summaries {
6732            let checkpoint = store
6733                .get_checkpoint_by_sequence_number(*seq)?
6734                .map(|c| c.into_inner());
6735
6736            summaries.push(checkpoint);
6737        }
6738
6739        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6740        for seq in checkpoint_contents {
6741            let checkpoint = store
6742                .get_checkpoint_by_sequence_number(*seq)?
6743                .and_then(|summary| {
6744                    store
6745                        .get_checkpoint_contents(&summary.content_digest)
6746                        .expect("db read cannot fail")
6747                });
6748            contents.push(checkpoint);
6749        }
6750
6751        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6752        for digest in checkpoint_summaries_by_digest {
6753            let checkpoint = store
6754                .get_checkpoint_by_digest(digest)?
6755                .map(|c| c.into_inner());
6756            summaries_by_digest.push(checkpoint);
6757        }
6758        Ok((summaries, contents, summaries_by_digest))
6759    }
6760
6761    #[instrument(skip(self))]
6762    async fn deprecated_get_transaction_checkpoint(
6763        &self,
6764        digest: TransactionDigest,
6765    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6766        Ok(self
6767            .get_checkpoint_cache()
6768            .deprecated_get_transaction_checkpoint(&digest)
6769            .map(|(_epoch, checkpoint)| checkpoint))
6770    }
6771
6772    #[instrument(skip(self))]
6773    async fn get_object(
6774        &self,
6775        object_id: ObjectID,
6776        version: VersionNumber,
6777    ) -> SuiResult<Option<Object>> {
6778        Ok(self
6779            .get_object_cache_reader()
6780            .get_object_by_key(&object_id, version))
6781    }
6782
6783    #[instrument(skip_all)]
6784    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6785        Ok(self
6786            .get_object_cache_reader()
6787            .multi_get_objects_by_key(object_keys))
6788    }
6789
6790    #[instrument(skip(self))]
6791    async fn multi_get_transaction_checkpoint(
6792        &self,
6793        digests: &[TransactionDigest],
6794    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6795        let res = self
6796            .get_checkpoint_cache()
6797            .deprecated_multi_get_transaction_checkpoint(digests);
6798
6799        Ok(res
6800            .into_iter()
6801            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6802            .collect())
6803    }
6804
6805    #[instrument(skip(self))]
6806    async fn multi_get_events_by_tx_digests(
6807        &self,
6808        digests: &[TransactionDigest],
6809    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6810        if digests.is_empty() {
6811            return Ok(vec![]);
6812        }
6813
6814        Ok(self
6815            .get_transaction_cache_reader()
6816            .multi_get_events(digests))
6817    }
6818}
6819
6820#[cfg(msim)]
6821pub mod framework_injection {
6822    use move_binary_format::CompiledModule;
6823    use std::collections::BTreeMap;
6824    use std::{cell::RefCell, collections::BTreeSet};
6825    use sui_framework::{BuiltInFramework, SystemPackage};
6826    use sui_types::base_types::{AuthorityName, ObjectID};
6827    use sui_types::is_system_package;
6828
6829    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6830
6831    // Thread local cache because all simtests run in a single unique thread.
6832    thread_local! {
6833        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6834    }
6835
6836    type Framework = Vec<CompiledModule>;
6837
6838    pub type PackageUpgradeCallback =
6839        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6840
6841    enum PackageOverrideConfig {
6842        Global(Framework),
6843        PerValidator(PackageUpgradeCallback),
6844    }
6845
6846    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6847        modules
6848            .iter()
6849            .map(|m| {
6850                let mut buf = Vec::new();
6851                m.serialize_with_version(m.version, &mut buf).unwrap();
6852                buf
6853            })
6854            .collect()
6855    }
6856
6857    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6858        OVERRIDE.with(|bs| {
6859            bs.borrow_mut()
6860                .insert(package_id, PackageOverrideConfig::Global(modules))
6861        });
6862    }
6863
6864    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6865        OVERRIDE.with(|bs| {
6866            bs.borrow_mut()
6867                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6868        });
6869    }
6870
6871    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6872        OVERRIDE.with(|bs| {
6873            let mut new_packages_not_to_include: BTreeSet<_> =
6874                BuiltInFramework::all_package_ids().into_iter().collect();
6875            for pkg in &packages {
6876                new_packages_not_to_include.remove(&pkg.id);
6877            }
6878            for pkg in packages {
6879                bs.borrow_mut()
6880                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6881            }
6882            for empty_pkg in new_packages_not_to_include {
6883                bs.borrow_mut()
6884                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6885            }
6886        });
6887    }
6888
6889    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6890        OVERRIDE.with(|cfg| {
6891            cfg.borrow().get(package_id).and_then(|entry| match entry {
6892                PackageOverrideConfig::Global(framework) => {
6893                    Some(compiled_modules_to_bytes(framework))
6894                }
6895                PackageOverrideConfig::PerValidator(func) => {
6896                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6897                }
6898            })
6899        })
6900    }
6901
6902    pub fn get_override_modules(
6903        package_id: &ObjectID,
6904        name: AuthorityName,
6905    ) -> Option<Vec<CompiledModule>> {
6906        OVERRIDE.with(|cfg| {
6907            cfg.borrow().get(package_id).and_then(|entry| match entry {
6908                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6909                PackageOverrideConfig::PerValidator(func) => func(name),
6910            })
6911        })
6912    }
6913
6914    pub fn get_override_system_package(
6915        package_id: &ObjectID,
6916        name: AuthorityName,
6917    ) -> Option<SystemPackage> {
6918        let bytes = get_override_bytes(package_id, name)?;
6919        let dependencies = if is_system_package(*package_id) {
6920            BuiltInFramework::get_package_by_id(package_id)
6921                .dependencies
6922                .to_vec()
6923        } else {
6924            // Assume that entirely new injected packages depend on all existing system packages.
6925            BuiltInFramework::all_package_ids()
6926        };
6927        Some(SystemPackage {
6928            id: *package_id,
6929            bytes,
6930            dependencies,
6931        })
6932    }
6933
6934    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6935        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6936        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6937            cfg.borrow()
6938                .keys()
6939                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6940                .collect()
6941        });
6942
6943        extra
6944            .into_iter()
6945            .map(|package| SystemPackage {
6946                id: package,
6947                bytes: get_override_bytes(&package, name).unwrap(),
6948                dependencies: BuiltInFramework::all_package_ids(),
6949            })
6950            .collect()
6951    }
6952}
6953
6954#[derive(Debug, Serialize, Deserialize, Clone)]
6955pub struct ObjDumpFormat {
6956    pub id: ObjectID,
6957    pub version: VersionNumber,
6958    pub digest: ObjectDigest,
6959    pub object: Object,
6960}
6961
6962impl ObjDumpFormat {
6963    fn new(object: Object) -> Self {
6964        let oref = object.compute_object_reference();
6965        Self {
6966            id: oref.0,
6967            version: oref.1,
6968            digest: oref.2,
6969            object,
6970        }
6971    }
6972}
6973
6974#[derive(Debug, Serialize, Deserialize, Clone)]
6975pub struct NodeStateDump {
6976    pub tx_digest: TransactionDigest,
6977    pub sender_signed_data: SenderSignedData,
6978    pub executed_epoch: u64,
6979    pub reference_gas_price: u64,
6980    pub protocol_version: u64,
6981    pub epoch_start_timestamp_ms: u64,
6982    pub computed_effects: TransactionEffects,
6983    pub expected_effects_digest: TransactionEffectsDigest,
6984    pub relevant_system_packages: Vec<ObjDumpFormat>,
6985    pub shared_objects: Vec<ObjDumpFormat>,
6986    pub loaded_child_objects: Vec<ObjDumpFormat>,
6987    pub modified_at_versions: Vec<ObjDumpFormat>,
6988    pub runtime_reads: Vec<ObjDumpFormat>,
6989    pub input_objects: Vec<ObjDumpFormat>,
6990}
6991
6992impl NodeStateDump {
6993    pub fn new(
6994        tx_digest: &TransactionDigest,
6995        effects: &TransactionEffects,
6996        expected_effects_digest: TransactionEffectsDigest,
6997        object_store: &dyn ObjectStore,
6998        epoch_store: &Arc<AuthorityPerEpochStore>,
6999        inner_temporary_store: &InnerTemporaryStore,
7000        certificate: &VerifiedExecutableTransaction,
7001    ) -> SuiResult<Self> {
7002        // Epoch info
7003        let executed_epoch = epoch_store.epoch();
7004        let reference_gas_price = epoch_store.reference_gas_price();
7005        let epoch_start_config = epoch_store.epoch_start_config();
7006        let protocol_version = epoch_store.protocol_version().as_u64();
7007        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7008
7009        // Record all system packages at this version
7010        let mut relevant_system_packages = Vec::new();
7011        for sys_package_id in BuiltInFramework::all_package_ids() {
7012            if let Some(w) = object_store.get_object(&sys_package_id) {
7013                relevant_system_packages.push(ObjDumpFormat::new(w))
7014            }
7015        }
7016
7017        // Record all the shared objects
7018        let mut shared_objects = Vec::new();
7019        for kind in effects.input_consensus_objects() {
7020            match kind {
7021                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7022                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7023                        shared_objects.push(ObjDumpFormat::new(w))
7024                    }
7025                }
7026                InputConsensusObject::ReadConsensusStreamEnded(..)
7027                | InputConsensusObject::MutateConsensusStreamEnded(..)
7028                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
7029            }
7030        }
7031
7032        // Record all loaded child objects
7033        // Child objects which are read but not mutated are not tracked anywhere else
7034        let mut loaded_child_objects = Vec::new();
7035        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7036            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7037                loaded_child_objects.push(ObjDumpFormat::new(w))
7038            }
7039        }
7040
7041        // Record all modified objects
7042        let mut modified_at_versions = Vec::new();
7043        for (id, ver) in effects.modified_at_versions() {
7044            if let Some(w) = object_store.get_object_by_key(&id, ver) {
7045                modified_at_versions.push(ObjDumpFormat::new(w))
7046            }
7047        }
7048
7049        // Packages read at runtime, which were not previously loaded into the temoorary store
7050        // Some packages may be fetched at runtime and wont show up in input objects
7051        let mut runtime_reads = Vec::new();
7052        for obj in inner_temporary_store
7053            .runtime_packages_loaded_from_db
7054            .values()
7055        {
7056            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7057        }
7058
7059        // All other input objects should already be in `inner_temporary_store.objects`
7060
7061        Ok(Self {
7062            tx_digest: *tx_digest,
7063            executed_epoch,
7064            reference_gas_price,
7065            epoch_start_timestamp_ms,
7066            protocol_version,
7067            relevant_system_packages,
7068            shared_objects,
7069            loaded_child_objects,
7070            modified_at_versions,
7071            runtime_reads,
7072            sender_signed_data: certificate.clone().into_message(),
7073            input_objects: inner_temporary_store
7074                .input_objects
7075                .values()
7076                .map(|o| ObjDumpFormat::new(o.clone()))
7077                .collect(),
7078            computed_effects: effects.clone(),
7079            expected_effects_digest,
7080        })
7081    }
7082
7083    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7084        let mut objects = Vec::new();
7085        objects.extend(self.relevant_system_packages.clone());
7086        objects.extend(self.shared_objects.clone());
7087        objects.extend(self.loaded_child_objects.clone());
7088        objects.extend(self.modified_at_versions.clone());
7089        objects.extend(self.runtime_reads.clone());
7090        objects.extend(self.input_objects.clone());
7091        objects
7092    }
7093
7094    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7095        let file_name = format!(
7096            "{}_{}_NODE_DUMP.json",
7097            self.tx_digest,
7098            AuthorityState::unixtime_now_ms()
7099        );
7100        let mut path = path.to_path_buf();
7101        path.push(&file_name);
7102        let mut file = File::create(path.clone())?;
7103        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7104        Ok(path)
7105    }
7106
7107    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7108        let file = File::open(path)?;
7109        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7110    }
7111}