sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_common::{assert_reachable, fatal};
39use nonempty::NonEmpty;
40use parking_lot::Mutex;
41use prometheus::{
42    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43    register_histogram_vec_with_registry, register_histogram_with_registry,
44    register_int_counter_vec_with_registry, register_int_counter_with_registry,
45    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62    collections::{HashMap, HashSet},
63    fs,
64    pin::Pin,
65    str::FromStr,
66    sync::Arc,
67    vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::dynamic_field::visitor as DFV;
75use sui_types::execution::ExecutionOutput;
76use sui_types::execution::ExecutionTimeObservationKey;
77use sui_types::execution::ExecutionTiming;
78use sui_types::execution_params::ExecutionOrEarlyError;
79use sui_types::execution_params::FundsWithdrawStatus;
80use sui_types::execution_params::get_early_execution_error;
81use sui_types::inner_temporary_store::PackageStoreWithFallback;
82use sui_types::layout_resolver::LayoutResolver;
83use sui_types::layout_resolver::into_struct_layout;
84use sui_types::messages_consensus::AuthorityCapabilitiesV2;
85use sui_types::node_role::NodeRole;
86use sui_types::object::bounded_visitor::BoundedVisitor;
87use sui_types::storage::ChildObjectResolver;
88use sui_types::storage::InputKey;
89use sui_types::storage::OverlayBackingPackageStore;
90use sui_types::storage::TrackingBackingStore;
91use sui_types::traffic_control::{
92    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
93};
94use sui_types::transaction_executor::SimulateTransactionResult;
95use sui_types::transaction_executor::TransactionChecks;
96use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
97use tap::TapFallible;
98use tokio::sync::RwLock;
99use tokio::sync::mpsc::unbounded_channel;
100use tokio::sync::oneshot;
101use tokio::sync::watch::error::RecvError;
102use tokio::time::timeout;
103use tracing::{debug, error, info, instrument, warn};
104
105use self::authority_store::ExecutionLockWriteGuard;
106use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
107pub use authority_store::{AuthorityStore, ResolverWrapper};
108use mysten_metrics::{monitored_scope, spawn_monitored_task};
109
110use crate::jsonrpc_index::IndexStore;
111use crate::jsonrpc_index::{
112    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
113};
114use mysten_common::debug_fatal;
115use shared_crypto::intent::{Intent, IntentScope};
116use sui_config::genesis::Genesis;
117use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
118use sui_framework::{BuiltInFramework, SystemPackage};
119use sui_json_rpc_types::{
120    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
121    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
122    SuiTransactionBlockEvents, TransactionFilter,
123};
124use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
125use sui_rpc_store::Store as RpcStore;
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139    TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{InnerTemporaryStore, ObjectMap, TxCoins, WrittenObjects};
147use sui_types::message_envelope::Message;
148use sui_types::messages_checkpoint::{
149    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
150    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
151    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
152    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
153};
154use sui_types::messages_grpc::{
155    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
156    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
157};
158use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
159use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
160use sui_types::signature::GenericSignature;
161use sui_types::storage::{
162    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
163};
164use sui_types::sui_system_state::SuiSystemStateTrait;
165use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
166use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
167use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
168use sui_types::{
169    SUI_SYSTEM_ADDRESS,
170    base_types::*,
171    committee::Committee,
172    crypto::AuthoritySignature,
173    error::{SuiError, SuiResult},
174    object::{Object, ObjectRead},
175    transaction::*,
176};
177use sui_types::{TypeTag, is_system_package};
178use typed_store::TypedStoreError;
179use typed_store::rocks::StagedBatch;
180
181use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
182use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
183use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
184use crate::authority::authority_store_pruner::{
185    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
186};
187use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
188use crate::authority::epoch_start_configuration::EpochStartConfiguration;
189use crate::checkpoints::CheckpointStore;
190use crate::epoch::committee_store::CommitteeStore;
191use crate::execution_cache::{
192    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
193    ObjectCacheRead, StateSyncAPI,
194};
195use crate::execution_driver::execution_process;
196use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
197use crate::metrics::LatencyObserver;
198use crate::metrics::RateTracker;
199use crate::module_cache_metrics::ResolverMetrics;
200use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
201use crate::stake_aggregator::StakeAggregator;
202use crate::subscription_handler::SubscriptionHandler;
203use crate::transaction_input_loader::TransactionInputLoader;
204
205#[cfg(msim)]
206pub use crate::checkpoints::checkpoint_executor::utils::{
207    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
208};
209
210#[cfg(msim)]
211use sui_types::committee::CommitteeTrait;
212use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
213
214#[cfg(test)]
215#[path = "unit_tests/authority_tests.rs"]
216pub mod authority_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/transaction_tests.rs"]
220pub mod transaction_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_transaction_tests.rs"]
224mod batch_transaction_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/move_integration_tests.rs"]
228pub mod move_integration_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/gas_tests.rs"]
232mod gas_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/gas_data_tests.rs"]
236mod gas_data_tests;
237
238#[cfg(test)]
239#[path = "unit_tests/batch_verification_tests.rs"]
240mod batch_verification_tests;
241
242#[cfg(test)]
243#[path = "unit_tests/coin_deny_list_tests.rs"]
244mod coin_deny_list_tests;
245
246#[cfg(test)]
247#[path = "unit_tests/auth_unit_test_utils.rs"]
248pub mod auth_unit_test_utils;
249
250pub mod authority_test_utils;
251
252pub mod authority_per_epoch_store;
253pub mod authority_per_epoch_store_pruner;
254
255pub mod authority_store_pruner;
256pub mod authority_store_tables;
257pub mod authority_store_types;
258pub mod congestion_log;
259pub mod consensus_tx_status_cache;
260pub(crate) mod epoch_marker_key;
261pub mod epoch_start_configuration;
262pub mod execution_time_estimator;
263pub mod finalized_transactions_cache;
264pub mod shared_object_congestion_tracker;
265pub mod shared_object_version_manager;
266pub mod submitted_transaction_cache;
267pub mod test_authority_builder;
268pub mod transaction_deferral;
269pub mod transaction_reject_reason_cache;
270mod weighted_moving_average;
271
272pub(crate) mod authority_store;
273pub mod backpressure;
274
275/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
276pub struct AuthorityMetrics {
277    tx_orders: IntCounter,
278    total_certs: IntCounter,
279    total_effects: IntCounter,
280    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
281    pub shared_obj_tx: IntCounter,
282    sponsored_tx: IntCounter,
283    tx_already_processed: IntCounter,
284    num_input_objs: Histogram,
285    // TODO: this tracks consensus object count, not just shared. Consider renaming.
286    num_shared_objects: Histogram,
287    batch_size: Histogram,
288
289    authority_state_handle_vote_transaction_latency: Histogram,
290
291    internal_execution_latency: Histogram,
292    execution_load_input_objects_latency: Histogram,
293    prepare_certificate_latency: Histogram,
294    commit_certificate_latency: Histogram,
295    db_checkpoint_latency: Histogram,
296
297    // TODO: Rename these metrics.
298    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
299    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
300    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
301    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
302
303    pub(crate) execution_driver_executed_transactions: IntCounter,
304    pub(crate) execution_driver_paused_transactions: IntCounter,
305    pub(crate) execution_driver_dispatch_queue: IntGauge,
306    pub(crate) execution_queueing_delay_s: Histogram,
307    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
308    pub(crate) execution_gas_latency_ratio: Histogram,
309
310    pub(crate) skipped_consensus_txns: IntCounter,
311    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
312    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
313
314    pub(crate) authority_overload_status: IntGauge,
315    pub(crate) authority_load_shedding_percentage: IntGauge,
316
317    pub(crate) transaction_overload_sources: IntCounterVec,
318
319    /// Post processing metrics
320    post_processing_total_events_emitted: IntCounter,
321    post_processing_total_tx_indexed: IntCounter,
322    post_processing_total_tx_had_event_processed: IntCounter,
323    post_processing_total_failures: IntCounter,
324
325    /// Consensus commit and transaction handler metrics
326    pub consensus_handler_processed: IntCounterVec,
327    pub consensus_handler_processed_user_transactions: IntCounterVec,
328    pub consensus_handler_transaction_sizes: HistogramVec,
329    pub consensus_handler_deferred_transactions: IntCounter,
330    pub consensus_handler_congested_transactions: IntCounter,
331    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
332    pub consensus_handler_cancelled_transactions: IntCounter,
333    pub consensus_handler_dropped_transactions: IntCounter,
334    pub consensus_handler_max_object_costs: IntGaugeVec,
335    pub consensus_committed_subdags: IntCounterVec,
336    pub accumulator_deposits: IntCounter,
337    pub accumulator_withdrawals: IntCounter,
338    pub consensus_committed_messages: IntGaugeVec,
339    pub consensus_committed_user_transactions: IntGaugeVec,
340    pub consensus_finalized_user_transactions: IntGaugeVec,
341    pub consensus_rejected_user_transactions: IntGaugeVec,
342    pub consensus_calculated_throughput: IntGauge,
343    pub consensus_calculated_throughput_profile: IntGauge,
344    pub consensus_block_handler_block_processed: IntCounter,
345    pub consensus_block_handler_txn_processed: IntCounterVec,
346    pub consensus_block_handler_fastpath_executions: IntCounter,
347    pub consensus_timestamp_bias: Histogram,
348
349    pub execution_metrics: Arc<ExecutionMetrics>,
350
351    /// bytecode verifier metrics for tracking timeouts
352    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
353
354    /// Count of zklogin signatures
355    pub zklogin_sig_count: IntCounter,
356    /// Count of multisig signatures
357    pub multisig_sig_count: IntCounter,
358
359    // Tracks recent average txn queueing delay between when it is ready for execution
360    // until it starts executing.
361    pub execution_queueing_latency: LatencyObserver,
362
363    // Tracks the rate of transactions become ready for execution in transaction manager.
364    // The need for the Mutex is that the tracker is updated in transaction manager and read
365    // in the overload_monitor. There should be low mutex contention because
366    // transaction manager is single threaded and the read rate in overload_monitor is
367    // low. In the case where transaction manager becomes multi-threaded, we can
368    // create one rate tracker per thread.
369    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
370
371    // Tracks the rate of transactions starts execution in execution driver.
372    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
373    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
374}
375
376// Override default Prom buckets for positive numbers in 0-10M range
377const POSITIVE_INT_BUCKETS: &[f64] = &[
378    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
379    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
380    7000000., 10000000.,
381];
382
383const LATENCY_SEC_BUCKETS: &[f64] = &[
384    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
385    10., 20., 30., 60., 90.,
386];
387
388// Buckets for low latency samples. Starts from 10us.
389const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
390    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
391    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
392];
393
394// Buckets for consensus timestamp bias in seconds.
395// We expect 200-300ms, so we cluster the buckets more tightly in that range.
396const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
397    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
398    2.0, 10.0, 30.0,
399];
400
401const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
402    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
403    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
404];
405
406pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
407
408// Transaction author should have observed the input objects as finalized output,
409// so usually the wait does not need to be long.
410// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
411pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
412
413impl AuthorityMetrics {
414    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
415        Self {
416            tx_orders: register_int_counter_with_registry!(
417                "total_transaction_orders",
418                "Total number of transaction orders",
419                registry,
420            )
421            .unwrap(),
422            total_certs: register_int_counter_with_registry!(
423                "total_transaction_certificates",
424                "Total number of transaction certificates handled",
425                registry,
426            )
427            .unwrap(),
428            // total_effects == total transactions finished
429            total_effects: register_int_counter_with_registry!(
430                "total_transaction_effects",
431                "Total number of transaction effects produced",
432                registry,
433            )
434            .unwrap(),
435
436            shared_obj_tx: register_int_counter_with_registry!(
437                "num_shared_obj_tx",
438                "Number of transactions involving shared objects",
439                registry,
440            )
441            .unwrap(),
442
443            sponsored_tx: register_int_counter_with_registry!(
444                "num_sponsored_tx",
445                "Number of sponsored transactions",
446                registry,
447            )
448            .unwrap(),
449
450            tx_already_processed: register_int_counter_with_registry!(
451                "num_tx_already_processed",
452                "Number of transaction orders already processed previously",
453                registry,
454            )
455            .unwrap(),
456            num_input_objs: register_histogram_with_registry!(
457                "num_input_objects",
458                "Distribution of number of input TX objects per TX",
459                POSITIVE_INT_BUCKETS.to_vec(),
460                registry,
461            )
462            .unwrap(),
463            num_shared_objects: register_histogram_with_registry!(
464                "num_shared_objects",
465                "Number of shared input objects per TX",
466                POSITIVE_INT_BUCKETS.to_vec(),
467                registry,
468            )
469            .unwrap(),
470            batch_size: register_histogram_with_registry!(
471                "batch_size",
472                "Distribution of size of transaction batch",
473                POSITIVE_INT_BUCKETS.to_vec(),
474                registry,
475            )
476            .unwrap(),
477            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
478                "authority_state_handle_vote_transaction_latency",
479                "Latency of voting on transactions without signing",
480                LATENCY_SEC_BUCKETS.to_vec(),
481                registry,
482            )
483            .unwrap(),
484            internal_execution_latency: register_histogram_with_registry!(
485                "authority_state_internal_execution_latency",
486                "Latency of actual certificate executions",
487                LATENCY_SEC_BUCKETS.to_vec(),
488                registry,
489            )
490            .unwrap(),
491            execution_load_input_objects_latency: register_histogram_with_registry!(
492                "authority_state_execution_load_input_objects_latency",
493                "Latency of loading input objects for execution",
494                LOW_LATENCY_SEC_BUCKETS.to_vec(),
495                registry,
496            )
497            .unwrap(),
498            prepare_certificate_latency: register_histogram_with_registry!(
499                "authority_state_prepare_certificate_latency",
500                "Latency of executing certificates, before committing the results",
501                LATENCY_SEC_BUCKETS.to_vec(),
502                registry,
503            )
504            .unwrap(),
505            commit_certificate_latency: register_histogram_with_registry!(
506                "authority_state_commit_certificate_latency",
507                "Latency of committing certificate execution results",
508                LATENCY_SEC_BUCKETS.to_vec(),
509                registry,
510            )
511            .unwrap(),
512            db_checkpoint_latency: register_histogram_with_registry!(
513                "db_checkpoint_latency",
514                "Latency of checkpointing dbs",
515                LATENCY_SEC_BUCKETS.to_vec(),
516                registry,
517            ).unwrap(),
518            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
519                "transaction_manager_num_enqueued_certificates",
520                "Current number of certificates enqueued to ExecutionScheduler",
521                &["result"],
522                registry,
523            )
524            .unwrap(),
525            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
526                "transaction_manager_num_pending_certificates",
527                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
528                registry,
529            )
530            .unwrap(),
531            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
532                "transaction_manager_num_executing_certificates",
533                "Number of executing certificates, including queued and actually running certificates",
534                registry,
535            )
536            .unwrap(),
537            authority_overload_status: register_int_gauge_with_registry!(
538                "authority_overload_status",
539                "Whether authority is current experiencing overload and enters load shedding mode.",
540                registry)
541            .unwrap(),
542            authority_load_shedding_percentage: register_int_gauge_with_registry!(
543                "authority_load_shedding_percentage",
544                "The percentage of transactions is shed when the authority is in load shedding mode.",
545                registry)
546            .unwrap(),
547            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
548                "transaction_manager_transaction_queue_age_s",
549                "Time spent in waiting for transaction in the queue",
550                LATENCY_SEC_BUCKETS.to_vec(),
551                registry,
552            )
553            .unwrap(),
554            transaction_overload_sources: register_int_counter_vec_with_registry!(
555                "transaction_overload_sources",
556                "Number of times each source indicates transaction overload.",
557                &["source"],
558                registry)
559            .unwrap(),
560            execution_driver_executed_transactions: register_int_counter_with_registry!(
561                "execution_driver_executed_transactions",
562                "Cumulative number of transaction executed by execution driver",
563                registry,
564            )
565            .unwrap(),
566            execution_driver_paused_transactions: register_int_counter_with_registry!(
567                "execution_driver_paused_transactions",
568                "Cumulative number of transactions paused by execution driver",
569                registry,
570            )
571            .unwrap(),
572            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
573                "execution_driver_dispatch_queue",
574                "Number of transaction pending in execution driver dispatch queue",
575                registry,
576            )
577            .unwrap(),
578            execution_queueing_delay_s: register_histogram_with_registry!(
579                "execution_queueing_delay_s",
580                "Queueing delay between a transaction is ready for execution until it starts executing.",
581                LATENCY_SEC_BUCKETS.to_vec(),
582                registry
583            )
584            .unwrap(),
585            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
586                "prepare_cert_gas_latency_ratio",
587                "The ratio of computation gas divided by VM execution latency.",
588                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
589                registry
590            )
591            .unwrap(),
592            execution_gas_latency_ratio: register_histogram_with_registry!(
593                "execution_gas_latency_ratio",
594                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
595                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
596                registry
597            )
598            .unwrap(),
599            skipped_consensus_txns: register_int_counter_with_registry!(
600                "skipped_consensus_txns",
601                "Total number of consensus transactions skipped",
602                registry,
603            )
604            .unwrap(),
605            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
606                "skipped_consensus_txns_cache_hit",
607                "Total number of consensus transactions skipped because of local cache hit",
608                registry,
609            )
610            .unwrap(),
611            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
612                "consensus_handler_duplicate_tx_count",
613                "Number of times each transaction appears in its first consensus commit",
614                POSITIVE_INT_BUCKETS.to_vec(),
615                registry,
616            )
617            .unwrap(),
618            post_processing_total_events_emitted: register_int_counter_with_registry!(
619                "post_processing_total_events_emitted",
620                "Total number of events emitted in post processing",
621                registry,
622            )
623            .unwrap(),
624            post_processing_total_tx_indexed: register_int_counter_with_registry!(
625                "post_processing_total_tx_indexed",
626                "Total number of txes indexed in post processing",
627                registry,
628            )
629            .unwrap(),
630            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
631                "post_processing_total_tx_had_event_processed",
632                "Total number of txes finished event processing in post processing",
633                registry,
634            )
635            .unwrap(),
636            post_processing_total_failures: register_int_counter_with_registry!(
637                "post_processing_total_failures",
638                "Total number of failure in post processing",
639                registry,
640            )
641            .unwrap(),
642            consensus_handler_processed: register_int_counter_vec_with_registry!(
643                "consensus_handler_processed",
644                "Number of transactions processed by consensus handler, sliced by class and commit outcome (accepted/rejected)",
645                &["class", "outcome"],
646                registry
647            ).unwrap(),
648            consensus_handler_processed_user_transactions: register_int_counter_vec_with_registry!(
649                "consensus_handler_processed_user_transactions",
650                "Number of user transactions processed by consensus handler, sliced by commit outcome (accepted/rejected) and block author",
651                &["outcome", "authority"],
652                registry
653            ).unwrap(),
654            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
655                "consensus_handler_transaction_sizes",
656                "Sizes of each type of transactions processed by consensus handler, sliced by class and commit outcome (accepted/rejected)",
657                &["class", "outcome"],
658                POSITIVE_INT_BUCKETS.to_vec(),
659                registry
660            ).unwrap(),
661            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
662                "consensus_handler_deferred_transactions",
663                "Number of transactions deferred by consensus handler",
664                registry,
665            ).unwrap(),
666            consensus_handler_congested_transactions: register_int_counter_with_registry!(
667                "consensus_handler_congested_transactions",
668                "Number of transactions deferred by consensus handler due to congestion",
669                registry,
670            ).unwrap(),
671            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
672                "consensus_handler_unpaid_amplification_deferrals",
673                "Number of transactions deferred due to unpaid consensus amplification",
674                registry,
675            ).unwrap(),
676            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
677                "consensus_handler_cancelled_transactions",
678                "Number of transactions cancelled by consensus handler",
679                registry,
680            ).unwrap(),
681            consensus_handler_dropped_transactions: register_int_counter_with_registry!(
682                "consensus_handler_dropped_transactions",
683                "Number of transactions dropped by consensus handler",
684                registry,
685            ).unwrap(),
686            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
687                "consensus_handler_max_congestion_control_object_costs",
688                "Max object costs for congestion control in the current consensus commit",
689                &["commit_type"],
690                registry,
691            ).unwrap(),
692            consensus_committed_subdags: register_int_counter_vec_with_registry!(
693                "consensus_committed_subdags",
694                "Number of committed subdags, sliced by author",
695                &["authority"],
696                registry,
697            ).unwrap(),
698            accumulator_deposits: register_int_counter_with_registry!(
699                "accumulator_deposits",
700                "Total accumulator deposit (merge) events processed in settlement",
701                registry,
702            ).unwrap(),
703            accumulator_withdrawals: register_int_counter_with_registry!(
704                "accumulator_withdrawals",
705                "Total accumulator withdrawal (split) events processed in settlement",
706                registry,
707            ).unwrap(),
708            consensus_committed_messages: register_int_gauge_vec_with_registry!(
709                "consensus_committed_messages",
710                "Total number of committed consensus messages, sliced by author",
711                &["authority"],
712                registry,
713            ).unwrap(),
714            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
715                "consensus_committed_user_transactions",
716                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
717                &["authority"],
718                registry,
719            ).unwrap(),
720            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
721                "consensus_finalized_user_transactions",
722                "Number of user transactions finalized, sliced by submitter",
723                &["authority"],
724                registry,
725            ).unwrap(),
726            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
727                "consensus_rejected_user_transactions",
728                "Number of user transactions rejected, sliced by submitter",
729                &["authority"],
730                registry,
731            ).unwrap(),
732            execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
733            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
734            zklogin_sig_count: register_int_counter_with_registry!(
735                "zklogin_sig_count",
736                "Count of zkLogin signatures",
737                registry,
738            )
739            .unwrap(),
740            multisig_sig_count: register_int_counter_with_registry!(
741                "multisig_sig_count",
742                "Count of zkLogin signatures",
743                registry,
744            )
745            .unwrap(),
746            consensus_calculated_throughput: register_int_gauge_with_registry!(
747                "consensus_calculated_throughput",
748                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
749                registry,
750            ).unwrap(),
751            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
752                "consensus_calculated_throughput_profile",
753                "The current active calculated throughput profile",
754                registry
755            ).unwrap(),
756            consensus_block_handler_block_processed: register_int_counter_with_registry!(
757                "consensus_block_handler_block_processed",
758                "Number of blocks processed by consensus block handler.",
759                registry
760            ).unwrap(),
761            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
762                "consensus_block_handler_txn_processed",
763                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
764                &["outcome"],
765                registry
766            ).unwrap(),
767            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
768                "consensus_block_handler_fastpath_executions",
769                "Number of fastpath transactions sent for execution by consensus transaction handler",
770                registry,
771            ).unwrap(),
772            consensus_timestamp_bias: register_histogram_with_registry!(
773                "consensus_timestamp_bias",
774                "Bias/delay from consensus timestamp to system time",
775                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
776                registry
777            ).unwrap(),
778            execution_queueing_latency: LatencyObserver::new(),
779            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
781        }
782    }
783}
784
785/// a Trait object for `Signer` that is:
786/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
787/// - Sync, i.e. can be safely shared between threads.
788///
789/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
790///
791pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
792
793/// Execution env contains the "environment" for the transaction to be executed in, that is,
794/// all the information necessary for execution that is not specified by the transaction itself.
795#[derive(Debug, Clone)]
796pub struct ExecutionEnv {
797    /// The assigned version of each shared object for the transaction.
798    pub assigned_versions: AssignedVersions,
799    /// The expected digest of the effects of the transaction, if executing from checkpoint or
800    /// other sources where the effects are known in advance.
801    pub expected_effects_digest: Option<TransactionEffectsDigest>,
802    /// Status of the address funds withdraw scheduling of the transaction,
803    /// including both address and object funds withdraws.
804    pub funds_withdraw_status: FundsWithdrawStatus,
805    /// Transactions that must finish before this transaction can be executed.
806    /// Used to schedule barrier transactions after non-exclusive writes.
807    pub barrier_dependencies: Vec<TransactionDigest>,
808}
809
810impl Default for ExecutionEnv {
811    fn default() -> Self {
812        Self {
813            assigned_versions: Default::default(),
814            expected_effects_digest: None,
815            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
816            barrier_dependencies: Default::default(),
817        }
818    }
819}
820
821impl ExecutionEnv {
822    pub fn new() -> Self {
823        Default::default()
824    }
825
826    pub fn with_expected_effects_digest(
827        mut self,
828        expected_effects_digest: TransactionEffectsDigest,
829    ) -> Self {
830        self.expected_effects_digest = Some(expected_effects_digest);
831        self
832    }
833
834    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
835        self.assigned_versions = assigned_versions;
836        self
837    }
838
839    pub fn with_insufficient_funds(mut self) -> Self {
840        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
841        self
842    }
843
844    pub fn with_barrier_dependencies(
845        mut self,
846        barrier_dependencies: BTreeSet<TransactionDigest>,
847    ) -> Self {
848        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
849        self
850    }
851}
852
853#[derive(Debug)]
854pub struct ForkRecoveryState {
855    /// Transaction digest to effects digest overrides
856    transaction_overrides:
857        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
858}
859
860impl Default for ForkRecoveryState {
861    fn default() -> Self {
862        Self {
863            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
864        }
865    }
866}
867
868impl ForkRecoveryState {
869    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
870        let Some(config) = config else {
871            return Ok(Self::default());
872        };
873
874        let mut transaction_overrides = HashMap::new();
875        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
876            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
877                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
878            })?;
879            let effects_digest =
880                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
881                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
882                })?;
883            transaction_overrides.insert(tx_digest, effects_digest);
884        }
885
886        Ok(Self {
887            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
888        })
889    }
890
891    pub fn get_transaction_override(
892        &self,
893        tx_digest: &TransactionDigest,
894    ) -> Option<TransactionEffectsDigest> {
895        self.transaction_overrides.read().get(tx_digest).copied()
896    }
897}
898
899pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
900
901pub struct AuthorityState {
902    // Fixed size, static, identity of the authority
903    /// The name of this authority.
904    pub name: AuthorityName,
905    /// The signature key of the authority.
906    pub secret: StableSyncAuthoritySigner,
907
908    /// The database
909    input_loader: TransactionInputLoader,
910    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
911    coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
912
913    epoch_store: ArcSwap<AuthorityPerEpochStore>,
914
915    /// This lock denotes current 'execution epoch'.
916    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
917    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
918    /// from previous epoch that are executed but did not make into checkpoint.
919    execution_lock: RwLock<EpochId>,
920
921    pub indexes: Option<Arc<IndexStore>>,
922    pub rpc_index: Option<Arc<RpcIndexStore>>,
923
924    pub subscription_handler: Arc<SubscriptionHandler>,
925    pub checkpoint_store: Arc<CheckpointStore>,
926
927    committee_store: Arc<CommitteeStore>,
928
929    /// Schedules transaction execution.
930    execution_scheduler: Arc<ExecutionScheduler>,
931
932    /// Shuts down the execution task. Used only in testing.
933    #[allow(unused)]
934    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
935
936    pub metrics: Arc<AuthorityMetrics>,
937    _pruner: AuthorityStorePruner,
938    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
939
940    /// Take db checkpoints of different dbs
941    db_checkpoint_config: DBCheckpointConfig,
942
943    pub config: NodeConfig,
944
945    /// Current overload status in this authority. Updated periodically.
946    pub overload_info: AuthorityOverloadInfo,
947
948    /// The chain identifier is derived from the digest of the genesis checkpoint.
949    chain_identifier: ChainIdentifier,
950
951    pub(crate) congestion_tracker: Arc<CongestionTracker>,
952
953    /// Consumed by gasless tx rate limiter.
954    pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
955
956    /// Traffic controller for Sui core servers (json-rpc, validator service)
957    pub traffic_controller: Option<Arc<TrafficController>>,
958
959    /// Fork recovery state for handling equivocation after forks
960    fork_recovery_state: Option<ForkRecoveryState>,
961
962    /// Notification channel for reconfiguration
963    notify_epoch: tokio::sync::watch::Sender<EpochId>,
964
965    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
966    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
967
968    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
969    /// CheckpointExecutor removes entries and collects the index batches before committing
970    /// them atomically at checkpoint boundaries.
971    pending_post_processing:
972        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
973
974    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
975    /// the blocking thread pool. Defaults to the number of available CPUs.
976    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
977}
978
979/// The authority state encapsulates all state, drives execution, and ensures safety.
980///
981/// Note the authority operations can be accessed through a read ref (&) and do not
982/// require &mut. Internally a database is synchronized through a mutex lock.
983///
984/// Repeating valid commands should produce no changes and return no error.
985impl AuthorityState {
986    pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
987        epoch_store.node_role()
988    }
989
990    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
991        epoch_store.node_role().is_validator()
992    }
993
994    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
995        epoch_store.node_role().is_fullnode()
996    }
997
998    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
999        &self.committee_store
1000    }
1001
1002    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1003        self.committee_store.clone()
1004    }
1005
1006    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
1007        &self.config.authority_overload_config
1008    }
1009
1010    pub fn get_epoch_state_commitments(
1011        &self,
1012        epoch: EpochId,
1013    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1014        self.checkpoint_store.get_epoch_state_commitments(epoch)
1015    }
1016
1017    /// Runs deny list checks and processes funds withdrawals. Called before loading input
1018    /// objects, since these checks don't depend on object state.
1019    fn pre_object_load_checks(
1020        &self,
1021        tx_data: &TransactionData,
1022        tx_signatures: &[GenericSignature],
1023        input_object_kinds: &[InputObjectKind],
1024        receiving_objects_refs: &[ObjectRef],
1025        protocol_config: &ProtocolConfig,
1026    ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1027        // Note: the deny checks may do redundant package loads but:
1028        // - they only load packages when there is an active package deny map
1029        // - the loads are cached anyway
1030        sui_transaction_checks::deny::check_transaction_for_signing(
1031            tx_data,
1032            tx_signatures,
1033            input_object_kinds,
1034            receiving_objects_refs,
1035            &self.config.transaction_deny_config,
1036            self.get_backing_package_store().as_ref(),
1037        )?;
1038
1039        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1040            self.chain_identifier,
1041            self.coin_reservation_resolver.as_ref(),
1042        )?;
1043
1044        self.execution_cache_trait_pointers
1045            .account_funds_read
1046            .check_amounts_available(&declared_withdrawals)?;
1047
1048        if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1049            let min_amounts =
1050                sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1051            self.execution_cache_trait_pointers
1052                .account_funds_read
1053                .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1054        }
1055
1056        Ok(declared_withdrawals)
1057    }
1058
1059    fn handle_transaction_deny_checks(
1060        &self,
1061        transaction: &VerifiedTransaction,
1062        epoch_store: &Arc<AuthorityPerEpochStore>,
1063    ) -> SuiResult<CheckedInputObjects> {
1064        let tx_digest = transaction.digest();
1065        let tx_data = transaction.data().transaction_data();
1066
1067        let input_object_kinds = tx_data.input_objects()?;
1068        let receiving_objects_refs = tx_data.receiving_objects();
1069
1070        self.pre_object_load_checks(
1071            tx_data,
1072            transaction.tx_signatures(),
1073            &input_object_kinds,
1074            &receiving_objects_refs,
1075            epoch_store.protocol_config(),
1076        )?;
1077
1078        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1079            Some(tx_digest),
1080            &input_object_kinds,
1081            &receiving_objects_refs,
1082            epoch_store.epoch(),
1083        )?;
1084
1085        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1086            epoch_store.protocol_config(),
1087            epoch_store.reference_gas_price(),
1088            tx_data,
1089            input_objects,
1090            &receiving_objects,
1091            &self.metrics.bytecode_verifier_metrics,
1092            &self.config.verifier_signing_config,
1093        )?;
1094
1095        self.handle_coin_deny_list_checks(
1096            tx_data,
1097            &checked_input_objects,
1098            &receiving_objects,
1099            epoch_store,
1100        )?;
1101
1102        Ok(checked_input_objects)
1103    }
1104
1105    fn handle_coin_deny_list_checks(
1106        &self,
1107        tx_data: &TransactionData,
1108        checked_input_objects: &CheckedInputObjects,
1109        receiving_objects: &ReceivingObjects,
1110        epoch_store: &Arc<AuthorityPerEpochStore>,
1111    ) -> SuiResult<()> {
1112        use sui_types::balance::Balance;
1113
1114        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1115            self.chain_identifier,
1116            self.coin_reservation_resolver.as_ref(),
1117        )?;
1118
1119        let funds_withdraw_types = declared_withdrawals
1120            .values()
1121            .filter_map(|(_, type_tag)| {
1122                Balance::maybe_get_balance_type_param(type_tag)
1123                    .map(|ty| ty.to_canonical_string(false))
1124            })
1125            .collect::<BTreeSet<_>>();
1126
1127        if epoch_store.coin_deny_list_v1_enabled() {
1128            check_coin_deny_list_v1(
1129                tx_data.sender(),
1130                checked_input_objects,
1131                receiving_objects,
1132                funds_withdraw_types.clone(),
1133                &self.get_object_store(),
1134            )?;
1135        }
1136
1137        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1138            check_coin_deny_list_v2_during_signing(
1139                tx_data.sender(),
1140                checked_input_objects,
1141                receiving_objects,
1142                funds_withdraw_types.clone(),
1143                &self.get_object_store(),
1144            )?;
1145        }
1146
1147        Ok(())
1148    }
1149
1150    /// Vote for a transaction, either when validator receives a submit_transaction request,
1151    /// or sees a transaction from consensus. Performs the same types of checks as
1152    /// transaction signing, but does not explicitly sign the transaction.
1153    /// Note that if the transaction has been executed, we still go through the
1154    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1155    /// but not yet finalized, the signing will still work since the objects are still available.
1156    /// But if the transaction has been finalized, the signing will fail.
1157    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1158    /// has already been finalized executed.
1159    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1160    pub fn handle_vote_transaction(
1161        &self,
1162        epoch_store: &Arc<AuthorityPerEpochStore>,
1163        transaction: VerifiedTransaction,
1164    ) -> SuiResult<()> {
1165        debug!("handle_vote_transaction");
1166
1167        let _metrics_guard = self
1168            .metrics
1169            .authority_state_handle_vote_transaction_latency
1170            .start_timer();
1171        self.metrics.tx_orders.inc();
1172
1173        // The should_accept_user_certs check here is best effort, because
1174        // between a validator signs a tx and a cert is formed, the validator
1175        // could close the window.
1176        if !epoch_store
1177            .get_reconfig_state_read_lock_guard()
1178            .should_accept_user_certs()
1179        {
1180            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1181        }
1182
1183        // Accept finalized transactions, instead of voting to reject them.
1184        // Checking executed transactions is limited to the current epoch.
1185        // Otherwise there can be a race where the transaction is accepted because it has executed,
1186        // but the executed effects are pruned post consensus, leading to failures.
1187        let tx_digest = *transaction.digest();
1188        if epoch_store.is_recently_finalized(&tx_digest)
1189            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1190        {
1191            assert_reachable!("transaction recently executed");
1192            return Ok(());
1193        }
1194
1195        if self
1196            .get_transaction_cache_reader()
1197            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1198        {
1199            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1200                digest: (*transaction.digest()),
1201            }
1202            .into());
1203        }
1204
1205        // Ensure that validator cannot reconfigure while we are validating the transaction.
1206        let _execution_lock = self.execution_lock_for_validation()?;
1207
1208        let checked_input_objects =
1209            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1210
1211        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1212
1213        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1214        // in the consensus handler. Validation still runs to prevent spam transactions with
1215        // invalid object versions, and is necessary to handle recently created objects.
1216        self.get_cache_writer()
1217            .validate_owned_object_versions(&owned_objects)
1218    }
1219
1220    /// Used for early client validation check for transactions before submission to server.
1221    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1222    /// This allows for fast failure feedback to clients for non-retriable errors.
1223    ///
1224    /// The key addition is checking that owned object versions match live object versions.
1225    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1226    /// requested version (which may exist in storage as historical versions), whereas
1227    /// validators check against the live/current version during locking (verify_live_object).
1228    pub fn check_transaction_validity(
1229        &self,
1230        epoch_store: &Arc<AuthorityPerEpochStore>,
1231        transaction: &VerifiedTransaction,
1232        enforce_live_input_objects: bool,
1233    ) -> SuiResult<()> {
1234        if !epoch_store
1235            .get_reconfig_state_read_lock_guard()
1236            .should_accept_user_certs()
1237        {
1238            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1239        }
1240
1241        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1242
1243        let checked_input_objects =
1244            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1245
1246        // Check that owned object versions match live objects
1247        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1248        let cache_reader = self.get_object_cache_reader();
1249
1250        for obj_ref in &owned_objects {
1251            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1252                // Only reject if transaction references an old version. Allow newer
1253                // versions that may exist on validators but not yet synced to fullnode.
1254                if obj_ref.1 < live_object.version() {
1255                    return Err(SuiErrorKind::UserInputError {
1256                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1257                            provided_obj_ref: *obj_ref,
1258                            current_version: live_object.version(),
1259                        },
1260                    }
1261                    .into());
1262                }
1263
1264                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1265                    // Returns a non-retriable error when the input object version is required to be live.
1266                    return Err(SuiErrorKind::UserInputError {
1267                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1268                            provided_obj_ref: *obj_ref,
1269                            current_version: live_object.version(),
1270                        },
1271                    }
1272                    .into());
1273                }
1274
1275                // If version matches, verify digest also matches
1276                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1277                    return Err(SuiErrorKind::UserInputError {
1278                        error: UserInputError::InvalidObjectDigest {
1279                            object_id: obj_ref.0,
1280                            expected_digest: live_object.digest(),
1281                        },
1282                    }
1283                    .into());
1284                }
1285            }
1286        }
1287
1288        Ok(())
1289    }
1290
1291    pub fn check_system_overload_at_signing(&self) -> bool {
1292        self.config
1293            .authority_overload_config
1294            .check_system_overload_at_signing
1295    }
1296
1297    pub(crate) fn check_system_overload(
1298        &self,
1299        tx_data: &SenderSignedData,
1300        do_authority_overload_check: bool,
1301    ) -> SuiResult {
1302        if do_authority_overload_check {
1303            self.check_authority_overload(tx_data).tap_err(|_| {
1304                self.update_overload_metrics("execution_queue");
1305            })?;
1306        }
1307        self.execution_scheduler
1308            .check_execution_overload(self.overload_config(), tx_data)
1309            .tap_err(|_| {
1310                self.update_overload_metrics("execution_pending");
1311            })?;
1312        // Consensus overload is handled by the admission queue in authority_server.rs.
1313
1314        let pending_tx_count = self
1315            .get_cache_commit()
1316            .approximate_pending_transaction_count();
1317        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1318            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1319                retry_after_secs: 10,
1320            }
1321            .into());
1322        }
1323
1324        Ok(())
1325    }
1326
1327    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1328        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1329            return Ok(());
1330        }
1331
1332        let load_shedding_percentage = self
1333            .overload_info
1334            .load_shedding_percentage
1335            .load(Ordering::Relaxed);
1336        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1337    }
1338
1339    pub(crate) fn update_overload_metrics(&self, source: &str) {
1340        self.metrics
1341            .transaction_overload_sources
1342            .with_label_values(&[source])
1343            .inc();
1344    }
1345
1346    /// Waits for fastpath (owned, package) dependency objects to become available.
1347    /// Returns true if a chosen set of fastpath dependency objects are available,
1348    /// returns false otherwise after an internal timeout.
1349    pub(crate) async fn wait_for_fastpath_dependency_objects(
1350        &self,
1351        transaction: &VerifiedTransaction,
1352        epoch: EpochId,
1353    ) -> SuiResult<bool> {
1354        let txn_data = transaction.data().transaction_data();
1355        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1356
1357        // Gather and filter input objects to wait for.
1358        let fastpath_dependency_objects: Vec<_> = move_objects
1359            .into_iter()
1360            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1361            .chain(
1362                packages
1363                    .into_iter()
1364                    .map(|package_id| InputKey::Package { id: package_id }),
1365            )
1366            .collect();
1367        let receiving_keys: HashSet<_> = receiving_objects
1368            .into_iter()
1369            .filter_map(|receiving_obj_ref| {
1370                self.should_wait_for_dependency_object(receiving_obj_ref)
1371            })
1372            .collect();
1373        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1374            return Ok(true);
1375        }
1376
1377        // Use shorter wait timeout in simtests to exercise server-side error paths and
1378        // client-side retry logic.
1379        let max_wait = if cfg!(msim) {
1380            Duration::from_millis(200)
1381        } else {
1382            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1383        };
1384
1385        match timeout(
1386            max_wait,
1387            self.get_object_cache_reader().notify_read_input_objects(
1388                &fastpath_dependency_objects,
1389                &receiving_keys,
1390                epoch,
1391            ),
1392        )
1393        .await
1394        {
1395            Ok(()) => Ok(true),
1396            // Maybe return an error for unavailable input objects,
1397            // and allow the caller to skip the rest of input checks?
1398            Err(_) => Ok(false),
1399        }
1400    }
1401
1402    /// Returns Some(inputKey) if the object reference should be waited on until it is
1403    /// finalized, before proceeding to input checks.
1404    ///
1405    /// Incorrect decisions here should only affect user experience, not safety:
1406    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1407    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1408    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1409        let (obj_id, cur_version, _digest) = obj_ref;
1410        let Some(latest_obj_ref) = self
1411            .get_object_cache_reader()
1412            .get_latest_object_ref_or_tombstone(obj_id)
1413        else {
1414            // Object might not have been created.
1415            return Some(InputKey::VersionedObject {
1416                id: FullObjectID::new(obj_id, None),
1417                version: cur_version,
1418            });
1419        };
1420        let latest_digest = latest_obj_ref.2;
1421        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1422            // Do not wait for deleted object and rely on input check instead.
1423            return None;
1424        }
1425        let latest_version = latest_obj_ref.1;
1426        if cur_version <= latest_version {
1427            // Do not wait for version that already exists or has been consumed.
1428            // Let the input check to handle them and return the proper error.
1429            return None;
1430        }
1431        // Wait for the object version to become available.
1432        Some(InputKey::VersionedObject {
1433            id: FullObjectID::new(obj_id, None),
1434            version: cur_version,
1435        })
1436    }
1437
1438    /// Wait for a transaction to be executed. Transactions need to be sequenced by consensus
1439    /// before being enqueued for execution.
1440    ///
1441    /// Only use this in tests.
1442    #[instrument(level = "trace", skip_all)]
1443    pub async fn wait_for_transaction_execution_for_testing(
1444        &self,
1445        transaction: &VerifiedExecutableTransaction,
1446    ) -> TransactionEffects {
1447        self.notify_read_effects_for_testing(
1448            "AuthorityState::wait_for_transaction_execution_for_testing",
1449            *transaction.digest(),
1450        )
1451        .await
1452    }
1453
1454    /// Internal logic to execute a certificate.
1455    ///
1456    /// Guarantees that
1457    /// - If input objects are available, return no permanent failure.
1458    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1459    /// on successful execution; crashed execution has no observable effect and can be retried.
1460    ///
1461    /// It is caller's responsibility to ensure input objects are available and locks are set.
1462    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1463    ///
1464    /// Should only be called within sui-core.
1465    #[instrument(level = "trace", skip_all)]
1466    pub fn try_execute_immediately(
1467        &self,
1468        certificate: &VerifiedExecutableTransaction,
1469        mut execution_env: ExecutionEnv,
1470        epoch_store: &Arc<AuthorityPerEpochStore>,
1471    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1472        let _scope = monitored_scope("Execution::try_execute_immediately");
1473        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1474
1475        let tx_digest = certificate.digest();
1476
1477        if let Some(fork_recovery) = &self.fork_recovery_state
1478            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1479        {
1480            warn!(
1481                ?tx_digest,
1482                original_digest = ?execution_env.expected_effects_digest,
1483                override_digest = ?override_digest,
1484                "Applying fork recovery override for transaction effects digest"
1485            );
1486            execution_env.expected_effects_digest = Some(override_digest);
1487        }
1488
1489        // prevent concurrent executions of the same tx.
1490        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1491
1492        let tx_cache_reader = self.get_transaction_cache_reader();
1493        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1494            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1495                assert_eq!(
1496                    effects.digest(),
1497                    expected_effects_digest,
1498                    "Unexpected effects digest for transaction {:?}",
1499                    tx_digest
1500                );
1501            }
1502            tx_guard.release();
1503            return ExecutionOutput::Success((effects, None));
1504        }
1505
1506        let execution_start_time = Instant::now();
1507
1508        // Any caller that verifies the signatures on the certificate will have already checked the
1509        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1510        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1511        // epoch, then it's invalid.
1512        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1513        else {
1514            tx_guard.release();
1515            return ExecutionOutput::EpochEnded;
1516        };
1517        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1518        // possible that reconfiguration has happened and they no longer match.
1519        // TODO: We may not need the following check anymore since the scheduler
1520        // should have checked that the certificate is from the same epoch as epoch_store.
1521        if *execution_guard != epoch_store.epoch() {
1522            tx_guard.release();
1523            info!("The epoch of the execution_guard doesn't match the epoch store");
1524            return ExecutionOutput::EpochEnded;
1525        }
1526
1527        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1528
1529        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1530            &tx_guard,
1531            &execution_guard,
1532            certificate,
1533            execution_env,
1534            epoch_store,
1535        ) {
1536            ExecutionOutput::Success(result) => result,
1537            output => return output.unwrap_err(),
1538        };
1539        let transaction_outputs = Arc::new(transaction_outputs);
1540
1541        fail_point!("crash");
1542
1543        let effects = transaction_outputs.effects.clone();
1544        debug!(
1545            ?tx_digest,
1546            fx_digest=?effects.digest(),
1547            "process_certificate succeeded in {:.3}ms",
1548            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1549        );
1550
1551        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1552        if let Err(err) = commit_result {
1553            error!(?tx_digest, "Error committing transaction: {err}");
1554            tx_guard.release();
1555            return ExecutionOutput::Fatal(err);
1556        }
1557
1558        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1559            certificate.data().transaction_data().kind()
1560        {
1561            if let Some(err) = &execution_error_opt {
1562                debug_fatal!("Authenticator state update failed: {:?}", err);
1563            }
1564            epoch_store.update_authenticator_state(auth_state);
1565
1566            // double check that the signature verifier always matches the authenticator state
1567            if cfg!(debug_assertions) {
1568                let authenticator_state = get_authenticator_state(self.get_object_store())
1569                    .expect("Read cannot fail")
1570                    .expect("Authenticator state must exist");
1571
1572                let mut sys_jwks: Vec<_> = authenticator_state
1573                    .active_jwks
1574                    .into_iter()
1575                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1576                    .collect();
1577                let mut active_jwks: Vec<_> = epoch_store
1578                    .signature_verifier
1579                    .get_jwks()
1580                    .into_iter()
1581                    .collect();
1582                sys_jwks.sort();
1583                active_jwks.sort();
1584
1585                assert_eq!(sys_jwks, active_jwks);
1586            }
1587        }
1588
1589        if certificate
1590            .transaction_data()
1591            .kind()
1592            .is_accumulator_barrier_settle_tx()
1593        {
1594            let object_funds_checker = self.object_funds_checker.load();
1595            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1596                // unwrap safe because we assign accumulator version for every transaction
1597                // when accumulator is enabled.
1598                let next_accumulator_version = accumulator_version.unwrap().next();
1599                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1600            }
1601        }
1602
1603        tx_guard.commit_tx();
1604
1605        let elapsed = execution_start_time.elapsed();
1606        epoch_store.record_local_execution_time(
1607            certificate.data().transaction_data(),
1608            &effects,
1609            timings,
1610            elapsed,
1611        );
1612
1613        let elapsed_us = elapsed.as_micros() as f64;
1614        if elapsed_us > 0.0 {
1615            self.metrics
1616                .execution_gas_latency_ratio
1617                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1618        };
1619        ExecutionOutput::Success((effects, execution_error_opt))
1620    }
1621
1622    pub fn read_objects_for_execution(
1623        &self,
1624        tx_lock: &CertLockGuard,
1625        certificate: &VerifiedExecutableTransaction,
1626        assigned_shared_object_versions: &AssignedVersions,
1627        epoch_store: &Arc<AuthorityPerEpochStore>,
1628    ) -> SuiResult<InputObjects> {
1629        let _scope = monitored_scope("Execution::load_input_objects");
1630        let _metrics_guard = self
1631            .metrics
1632            .execution_load_input_objects_latency
1633            .start_timer();
1634        let input_objects = &certificate.data().transaction_data().input_objects()?;
1635        self.input_loader.read_objects_for_execution(
1636            &certificate.key(),
1637            tx_lock,
1638            input_objects,
1639            assigned_shared_object_versions,
1640            epoch_store.epoch(),
1641        )
1642    }
1643
1644    pub async fn try_execute_executable_for_test(
1645        &self,
1646        executable: &VerifiedExecutableTransaction,
1647        execution_env: ExecutionEnv,
1648    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1649        let epoch_store = self.epoch_store_for_testing();
1650        let (effects, execution_error_opt) = self
1651            .try_execute_immediately(executable, execution_env, &epoch_store)
1652            .unwrap();
1653        self.flush_post_processing(executable.digest()).await;
1654        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1655        (signed_effects, execution_error_opt)
1656    }
1657
1658    /// Wait until the effects of the given transaction are available and return them.
1659    /// Panics if the effects are not found.
1660    ///
1661    /// Only use this in tests where effects are expected to exist.
1662    pub async fn notify_read_effects_for_testing(
1663        &self,
1664        task_name: &'static str,
1665        digest: TransactionDigest,
1666    ) -> TransactionEffects {
1667        self.get_transaction_cache_reader()
1668            .notify_read_executed_effects(task_name, &[digest])
1669            .await
1670            .pop()
1671            .expect("must return correct number of effects")
1672    }
1673
1674    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1675        self.get_object_cache_reader()
1676            .check_owned_objects_are_live(owned_object_refs)
1677    }
1678
1679    /// This function captures the required state to debug a forked transaction.
1680    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1681    /// NOTE: Since this info escapes the validator context,
1682    /// make sure not to leak any private info here
1683    pub(crate) fn debug_dump_transaction_state(
1684        &self,
1685        tx_digest: &TransactionDigest,
1686        effects: &TransactionEffects,
1687        expected_effects_digest: TransactionEffectsDigest,
1688        inner_temporary_store: &InnerTemporaryStore,
1689        certificate: &VerifiedExecutableTransaction,
1690        debug_dump_config: &StateDebugDumpConfig,
1691    ) -> SuiResult<PathBuf> {
1692        let dump_dir = debug_dump_config
1693            .dump_file_directory
1694            .as_ref()
1695            .cloned()
1696            .unwrap_or(std::env::temp_dir());
1697        let epoch_store = self.load_epoch_store_one_call_per_task();
1698
1699        NodeStateDump::new(
1700            tx_digest,
1701            effects,
1702            expected_effects_digest,
1703            self.get_object_store().as_ref(),
1704            &epoch_store,
1705            inner_temporary_store,
1706            certificate,
1707        )?
1708        .write_to_file(&dump_dir)
1709        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1710    }
1711
1712    #[instrument(level = "trace", skip_all)]
1713    pub(crate) fn process_certificate(
1714        &self,
1715        tx_guard: &CertTxGuard,
1716        execution_guard: &ExecutionLockReadGuard<'_>,
1717        certificate: &VerifiedExecutableTransaction,
1718        execution_env: ExecutionEnv,
1719        epoch_store: &Arc<AuthorityPerEpochStore>,
1720    ) -> ExecutionOutput<(
1721        TransactionOutputs,
1722        Vec<ExecutionTiming>,
1723        Option<ExecutionError>,
1724    )> {
1725        let _scope = monitored_scope("Execution::process_certificate");
1726        let tx_digest = *certificate.digest();
1727
1728        let input_objects = match self.read_objects_for_execution(
1729            tx_guard.as_lock_guard(),
1730            certificate,
1731            &execution_env.assigned_versions,
1732            epoch_store,
1733        ) {
1734            Ok(objects) => objects,
1735            Err(e) => return ExecutionOutput::Fatal(e),
1736        };
1737
1738        let expected_effects_digest = match execution_env.expected_effects_digest {
1739            Some(expected_effects_digest) => Some(expected_effects_digest),
1740            None => {
1741                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1742                // restarting with a new binary. In this situation, if we have published an effects signature,
1743                // we must be sure not to equivocate.
1744                match epoch_store.get_signed_effects_digest(&tx_digest) {
1745                    Ok(digest) => digest,
1746                    Err(e) => return ExecutionOutput::Fatal(e),
1747                }
1748            }
1749        };
1750
1751        fail_point_if!("correlated-crash-process-certificate", || {
1752            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1753                sui_simulator::task::kill_current_node(None);
1754            }
1755        });
1756
1757        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1758        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1759        // this function occur before we have written anything to the db, so we commit the tx
1760        // guard and rely on the client to retry the tx (if it was transient).
1761        self.execute_certificate(
1762            execution_guard,
1763            certificate,
1764            input_objects,
1765            expected_effects_digest,
1766            execution_env,
1767            epoch_store,
1768        )
1769    }
1770
1771    pub async fn reconfigure_traffic_control(
1772        &self,
1773        params: TrafficControlReconfigParams,
1774    ) -> Result<TrafficControlReconfigParams, SuiError> {
1775        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1776            traffic_controller.admin_reconfigure(params).await
1777        } else {
1778            Err(SuiErrorKind::InvalidAdminRequest(
1779                "Traffic controller is not configured on this node".to_string(),
1780            )
1781            .into())
1782        }
1783    }
1784
1785    #[instrument(level = "trace", skip_all)]
1786    fn commit_certificate(
1787        &self,
1788        certificate: &VerifiedExecutableTransaction,
1789        transaction_outputs: Arc<TransactionOutputs>,
1790        epoch_store: &Arc<AuthorityPerEpochStore>,
1791    ) -> SuiResult {
1792        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1793            monitored_scope("Execution::commit_certificate");
1794        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1795
1796        let tx_digest = certificate.digest();
1797
1798        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1799        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1800        epoch_store.insert_executed_in_epoch(tx_digest);
1801        let key = certificate.key();
1802        if !matches!(key, TransactionKey::Digest(_)) {
1803            epoch_store.insert_tx_key(key, *tx_digest)?;
1804        }
1805
1806        // Allow testing what happens if we crash here.
1807        fail_point!("crash");
1808
1809        self.get_cache_writer()
1810            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1811
1812        // Fire an in-memory-only notification so the scheduler can detect that this
1813        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1814        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1815        // this to the DB to avoid stale entries surviving a crash while effects may not.
1816        if let Some(settlement_key) = certificate
1817            .transaction_data()
1818            .kind()
1819            .accumulator_barrier_settlement_key()
1820        {
1821            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1822        }
1823
1824        if certificate.transaction_data().is_end_of_epoch_tx() {
1825            // At the end of epoch, since system packages may have been upgraded, force
1826            // reload them in the cache.
1827            self.get_object_cache_reader()
1828                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1829        }
1830
1831        Ok(())
1832    }
1833
1834    fn update_metrics(
1835        &self,
1836        certificate: &VerifiedExecutableTransaction,
1837        inner_temporary_store: &InnerTemporaryStore,
1838        effects: &TransactionEffects,
1839    ) {
1840        // count signature by scheme, for zklogin and multisig
1841        if certificate.has_zklogin_sig() {
1842            self.metrics.zklogin_sig_count.inc();
1843        } else if certificate.has_upgraded_multisig() {
1844            self.metrics.multisig_sig_count.inc();
1845        }
1846
1847        self.metrics.total_effects.inc();
1848        self.metrics.total_certs.inc();
1849
1850        let consensus_object_count = effects.input_consensus_objects().len();
1851        if consensus_object_count > 0 {
1852            self.metrics.shared_obj_tx.inc();
1853        }
1854
1855        if certificate.is_sponsored_tx() {
1856            self.metrics.sponsored_tx.inc();
1857        }
1858
1859        let input_object_count = inner_temporary_store.input_objects.len();
1860        self.metrics
1861            .num_input_objs
1862            .observe(input_object_count as f64);
1863        self.metrics
1864            .num_shared_objects
1865            .observe(consensus_object_count as f64);
1866        self.metrics.batch_size.observe(
1867            certificate
1868                .data()
1869                .intent_message()
1870                .value
1871                .kind()
1872                .num_commands() as f64,
1873        );
1874    }
1875
1876    /// Runs the executor on an already-prepared transaction; the caller is responsible for any
1877    /// coin reservation rewriting.
1878    fn execute_transaction_to_effects(
1879        &self,
1880        executor: &dyn Executor,
1881        store: &dyn BackingStore,
1882        protocol_config: &ProtocolConfig,
1883        enable_expensive_checks: bool,
1884        execution_params: ExecutionOrEarlyError,
1885        epoch_id: &EpochId,
1886        epoch_timestamp_ms: u64,
1887        input_objects: CheckedInputObjects,
1888        gas_data: GasData,
1889        gas_status: SuiGasStatus,
1890        kind: TransactionKind,
1891        rewritten_inputs: Option<Vec<bool>>,
1892        signer: SuiAddress,
1893        tx_digest: TransactionDigest,
1894    ) -> (
1895        InnerTemporaryStore,
1896        SuiGasStatus,
1897        TransactionEffects,
1898        Vec<ExecutionTiming>,
1899        Result<(), ExecutionError>,
1900    ) {
1901        let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1902            // TODO only run this function on FullNodes, use `execute_transaction_to_effects` on validators.
1903            .execute_transaction_to_effects_and_execution_error(
1904                store,
1905                protocol_config,
1906                self.metrics.execution_metrics.clone(),
1907                enable_expensive_checks,
1908                execution_params,
1909                epoch_id,
1910                epoch_timestamp_ms,
1911                input_objects,
1912                gas_data,
1913                gas_status,
1914                kind,
1915                rewritten_inputs,
1916                signer,
1917                tx_digest,
1918                &mut None,
1919            );
1920
1921        (
1922            inner_temp_store,
1923            gas_status,
1924            effects,
1925            timings,
1926            execution_error,
1927        )
1928    }
1929
1930    /// execute_certificate validates the transaction input, and executes the certificate,
1931    /// returning transaction outputs.
1932    ///
1933    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1934    ///
1935    /// Executes a certificate and returns an ExecutionOutput.
1936    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1937    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1938    #[instrument(level = "trace", skip_all)]
1939    fn execute_certificate(
1940        &self,
1941        _execution_guard: &ExecutionLockReadGuard<'_>,
1942        certificate: &VerifiedExecutableTransaction,
1943        input_objects: InputObjects,
1944        expected_effects_digest: Option<TransactionEffectsDigest>,
1945        execution_env: ExecutionEnv,
1946        epoch_store: &Arc<AuthorityPerEpochStore>,
1947    ) -> ExecutionOutput<(
1948        TransactionOutputs,
1949        Vec<ExecutionTiming>,
1950        Option<ExecutionError>,
1951    )> {
1952        let _scope = monitored_scope("Execution::prepare_certificate");
1953        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1954        let prepare_certificate_start_time = tokio::time::Instant::now();
1955
1956        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1957        let tx_data = certificate.data().transaction_data();
1958
1959        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1960            return ExecutionOutput::Fatal(e);
1961        }
1962
1963        // The cost of partially re-auditing a transaction before execution is tolerated.
1964        // This step is required for correctness because, for example, ConsensusAddressOwner
1965        // object owner may have changed between signing and execution.
1966        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1967            certificate,
1968            input_objects,
1969            epoch_store.protocol_config(),
1970            epoch_store.reference_gas_price(),
1971        ) {
1972            Ok(result) => result,
1973            Err(e) => return ExecutionOutput::Fatal(e),
1974        };
1975
1976        let owned_object_refs = input_objects.inner().filter_owned_objects();
1977        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1978            return ExecutionOutput::Fatal(e);
1979        }
1980        let tx_digest = *certificate.digest();
1981        let protocol_config = epoch_store.protocol_config();
1982        let transaction_data = &certificate.data().intent_message().value;
1983        let sender = transaction_data.sender();
1984        let (mut kind, signer, gas_data) = transaction_data.execution_parts();
1985        let early_execution_error = get_early_execution_error(
1986            &tx_digest,
1987            &input_objects,
1988            self.config.certificate_deny_config.certificate_deny_set(),
1989            &execution_env.funds_withdraw_status,
1990        );
1991        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1992        let execution_params = match early_execution_error {
1993            None => ExecutionOrEarlyError::ok(accumulator_version),
1994            Some(errors) => ExecutionOrEarlyError::failed(errors, accumulator_version),
1995        };
1996
1997        // Skip on early error: the tx will fail anyway and rewriting may fail if the accumulator
1998        // was deleted.
1999        let rewritten_inputs = if execution_params.is_ok() {
2000            rewrite_transaction_for_coin_reservations(
2001                self.chain_identifier,
2002                &*self.coin_reservation_resolver,
2003                sender,
2004                &mut kind,
2005                execution_env.assigned_versions.accumulator_version,
2006            )
2007            .expect("rewriting must succeed for a certified transaction")
2008        } else {
2009            None
2010        };
2011
2012        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2013
2014        #[allow(unused_mut)]
2015        let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2016            .execute_transaction_to_effects(
2017                &**epoch_store.executor(),
2018                &tracking_store,
2019                protocol_config,
2020                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2021                // cyclic dependency w/ sui-adapter
2022                self.config
2023                    .expensive_safety_check_config
2024                    .enable_deep_per_tx_sui_conservation_check(),
2025                execution_params,
2026                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2027                epoch_store
2028                    .epoch_start_config()
2029                    .epoch_data()
2030                    .epoch_start_timestamp(),
2031                input_objects,
2032                gas_data,
2033                gas_status,
2034                kind,
2035                rewritten_inputs,
2036                signer,
2037                tx_digest,
2038            );
2039
2040        let object_funds_checker = self.object_funds_checker.load();
2041        if let Some(object_funds_checker) = object_funds_checker.as_ref()
2042            && !object_funds_checker.should_commit_object_funds_withdraws(
2043                certificate,
2044                effects.status(),
2045                &inner_temp_store.accumulator_running_max_withdraws,
2046                &execution_env,
2047                self.get_account_funds_read(),
2048                &self.execution_scheduler,
2049                epoch_store,
2050            )
2051        {
2052            assert_reachable!("retry object withdraw later");
2053            return ExecutionOutput::RetryLater;
2054        }
2055
2056        if let Some(expected_effects_digest) = expected_effects_digest
2057            && effects.digest() != expected_effects_digest
2058        {
2059            // We dont want to mask the original error, so we log it and continue.
2060            match self.debug_dump_transaction_state(
2061                &tx_digest,
2062                &effects,
2063                expected_effects_digest,
2064                &inner_temp_store,
2065                certificate,
2066                &self.config.state_debug_dump_config,
2067            ) {
2068                Ok(out_path) => {
2069                    info!(
2070                        "Dumped node state for transaction {} to {}",
2071                        tx_digest,
2072                        out_path.as_path().display().to_string()
2073                    );
2074                }
2075                Err(e) => {
2076                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2077                }
2078            }
2079            let expected_effects = self
2080                .get_transaction_cache_reader()
2081                .get_effects(&expected_effects_digest);
2082            error!(
2083                ?tx_digest,
2084                ?expected_effects_digest,
2085                actual_effects = ?effects,
2086                expected_effects = ?expected_effects,
2087                "fork detected!"
2088            );
2089            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2090                tx_digest,
2091                expected_effects_digest,
2092                effects.digest(),
2093            ) {
2094                error!("Failed to record transaction fork: {e}");
2095            }
2096
2097            fail_point_if!("kill_transaction_fork_node", || {
2098                #[cfg(msim)]
2099                {
2100                    tracing::error!(
2101                        fatal = true,
2102                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2103                        tx_digest
2104                    );
2105                    sui_simulator::task::shutdown_current_node();
2106                }
2107            });
2108
2109            fatal!(
2110                "Transaction {} is expected to have effects digest {}, but got {}!",
2111                tx_digest,
2112                expected_effects_digest,
2113                effects.digest()
2114            );
2115        }
2116
2117        fail_point_arg!("simulate_fork_during_execution", |(
2118            forked_validators,
2119            full_halt,
2120            effects_overrides,
2121            fork_probability,
2122        ): (
2123            std::sync::Arc<
2124                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2125            >,
2126            bool,
2127            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2128            f32,
2129        )| {
2130            #[cfg(msim)]
2131            self.simulate_fork_during_execution(
2132                certificate,
2133                epoch_store,
2134                &mut effects,
2135                forked_validators,
2136                full_halt,
2137                effects_overrides,
2138                fork_probability,
2139            );
2140        });
2141
2142        let unchanged_loaded_runtime_objects =
2143            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2144                certificate.transaction_data(),
2145                &effects,
2146                &tracking_store.into_read_objects(),
2147            );
2148
2149        // index certificate
2150        let _ = self
2151            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2152            .tap_err(|e| {
2153                self.metrics.post_processing_total_failures.inc();
2154                error!(?tx_digest, "tx post processing failed: {e}");
2155            });
2156
2157        self.update_metrics(certificate, &inner_temp_store, &effects);
2158
2159        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2160            certificate.clone().into_unsigned(),
2161            effects,
2162            inner_temp_store,
2163            unchanged_loaded_runtime_objects,
2164        );
2165
2166        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2167        if elapsed > 0.0 {
2168            self.metrics.prepare_cert_gas_latency_ratio.observe(
2169                transaction_outputs
2170                    .effects
2171                    .gas_cost_summary()
2172                    .computation_cost as f64
2173                    / elapsed,
2174            );
2175        }
2176
2177        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2178    }
2179
2180    pub fn prepare_certificate_for_benchmark(
2181        &self,
2182        certificate: &VerifiedExecutableTransaction,
2183        input_objects: InputObjects,
2184        epoch_store: &Arc<AuthorityPerEpochStore>,
2185    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2186        let lock = RwLock::new(epoch_store.epoch());
2187        let execution_guard = lock.try_read().unwrap();
2188
2189        let (transaction_outputs, _timings, execution_error_opt) = self
2190            .execute_certificate(
2191                &execution_guard,
2192                certificate,
2193                input_objects,
2194                None,
2195                ExecutionEnv::default(),
2196                epoch_store,
2197            )
2198            .unwrap();
2199        Ok((transaction_outputs, execution_error_opt))
2200    }
2201
2202    #[instrument(skip_all)]
2203    #[allow(clippy::type_complexity)]
2204    pub async fn dry_exec_transaction(
2205        &self,
2206        transaction: TransactionData,
2207    ) -> SuiResult<(
2208        DryRunTransactionBlockResponse,
2209        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2210        TransactionEffects,
2211        Option<ObjectID>,
2212    )> {
2213        let epoch_store = self.load_epoch_store_one_call_per_task();
2214        if !self.is_fullnode(&epoch_store) {
2215            return Err(SuiErrorKind::UnsupportedFeatureError {
2216                error: "dry-exec is only supported on fullnodes".to_string(),
2217            }
2218            .into());
2219        }
2220
2221        if transaction.kind().is_system_tx() {
2222            return Err(SuiErrorKind::UnsupportedFeatureError {
2223                error: "dry-exec does not support system transactions".to_string(),
2224            }
2225            .into());
2226        }
2227
2228        self.dry_exec_transaction_impl(&epoch_store, transaction)
2229    }
2230
2231    #[allow(clippy::type_complexity)]
2232    fn dry_exec_transaction_impl(
2233        &self,
2234        epoch_store: &AuthorityPerEpochStore,
2235        transaction: TransactionData,
2236    ) -> SuiResult<(
2237        DryRunTransactionBlockResponse,
2238        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2239        TransactionEffects,
2240        Option<ObjectID>,
2241    )> {
2242        // Route through `simulate_transaction` -- `dry-exec` matches
2243        // `simulate_transaction(_, TransactionChecks::Enabled, _)`
2244        let sim = self.simulate_transaction(
2245            transaction.clone(),
2246            TransactionChecks::Enabled,
2247            /* allow_mock_gas_coin */ true,
2248        )?;
2249
2250        self.build_dry_run_response(epoch_store, transaction, sim)
2251    }
2252
2253    /// Adapt a `SimulateTransactionResult` into the
2254    /// `(DryRunTransactionBlockResponse, written_objects_with_kind, effects, mock_gas_id)`
2255    /// tuple that the JSON-RPC dry-run layer consumes. Shared between
2256    /// `dry_exec_transaction_impl` and `dry_exec_transaction_for_benchmark`'s
2257    /// callers; pulls together:
2258    ///   - layout resolution over the simulate-produced `ObjectSet`,
2259    ///   - `(ObjectRef, Object, WriteKind)` derivation by walking
2260    ///     `effects.created / unwrapped / mutated` against that `ObjectSet`,
2261    ///   - `execution_error_source` from `sim.execution_result`,
2262    ///   - the `SuiTransactionBlockData` / `SuiTransactionBlockEffects` /
2263    ///     `SuiTransactionBlockEvents` conversions.
2264    #[allow(clippy::type_complexity)]
2265    fn build_dry_run_response(
2266        &self,
2267        epoch_store: &AuthorityPerEpochStore,
2268        transaction: TransactionData,
2269        sim: SimulateTransactionResult,
2270    ) -> SuiResult<(
2271        DryRunTransactionBlockResponse,
2272        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2273        TransactionEffects,
2274        Option<ObjectID>,
2275    )> {
2276        let SimulateTransactionResult {
2277            effects,
2278            events,
2279            objects,
2280            execution_result,
2281            mock_gas_id,
2282            suggested_gas_price,
2283            ..
2284        } = sim;
2285
2286        let tx_digest = *effects.transaction_digest();
2287
2288        // Walk effects' created / unwrapped / mutated lists against the
2289        // simulate-produced ObjectSet (which carries both input and written
2290        // objects). Refs missing from `objects` are dropped silently — that
2291        // would indicate a simulate-vs-effects inconsistency.
2292        let written_with_kind: BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)> = effects
2293            .created()
2294            .into_iter()
2295            .map(|(oref, _)| (oref, WriteKind::Create))
2296            .chain(
2297                effects
2298                    .unwrapped()
2299                    .into_iter()
2300                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2301            )
2302            .chain(
2303                effects
2304                    .mutated()
2305                    .into_iter()
2306                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2307            )
2308            .filter_map(|(oref, kind)| {
2309                objects
2310                    .get(&ObjectKey(oref.0, oref.1))
2311                    .map(|obj| (oref.0, (oref, obj.clone(), kind)))
2312            })
2313            .collect();
2314
2315        // Resolve event/object layouts against the simulate-produced ObjectSet
2316        // (newly-published packages from the simulation appear there), with the
2317        // node's backing package store as fallback for already-on-chain packages.
2318        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2319            epoch_store.protocol_config(),
2320            Box::new(OverlayBackingPackageStore::new(
2321                &objects,
2322                self.get_backing_package_store(),
2323            )),
2324        );
2325
2326        let execution_error_source = execution_result
2327            .as_ref()
2328            .err()
2329            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2330
2331        let response = DryRunTransactionBlockResponse {
2332            suggested_gas_price,
2333            input: SuiTransactionBlockData::try_from_with_module_cache(
2334                transaction,
2335                &epoch_store.module_cache().clone(),
2336            )
2337            .map_err(|e| SuiErrorKind::TransactionSerializationError {
2338                error: format!("Failed to convert transaction to SuiTransactionBlockData: {e}"),
2339            })?,
2340            effects: effects.clone().try_into()?,
2341            events: SuiTransactionBlockEvents::try_from(
2342                events.unwrap_or_default(),
2343                tx_digest,
2344                None,
2345                layout_resolver.as_mut(),
2346            )?,
2347            // The RPC layer recalculates object_changes / balance_changes from
2348            // the written_objects map and effects.
2349            object_changes: Vec::new(),
2350            balance_changes: Vec::new(),
2351            execution_error_source,
2352        };
2353
2354        Ok((response, written_with_kind, effects, mock_gas_id))
2355    }
2356
2357    pub fn simulate_transaction(
2358        &self,
2359        mut transaction: TransactionData,
2360        checks: TransactionChecks,
2361        allow_mock_gas_coin: bool,
2362    ) -> SuiResult<SimulateTransactionResult> {
2363        if transaction.kind().is_system_tx() {
2364            return Err(SuiErrorKind::UnsupportedFeatureError {
2365                error: "simulate does not support system transactions".to_string(),
2366            }
2367            .into());
2368        }
2369
2370        let epoch_store = self.load_epoch_store_one_call_per_task();
2371        if !self.is_fullnode(&epoch_store) {
2372            return Err(SuiErrorKind::UnsupportedFeatureError {
2373                error: "simulate is only supported on fullnodes".to_string(),
2374            }
2375            .into());
2376        }
2377
2378        let dev_inspect = checks.disabled();
2379        if dev_inspect && self.config.dev_inspect_disabled {
2380            return Err(SuiErrorKind::UnsupportedFeatureError {
2381                error: "simulate with checks disabled is not allowed on this node".to_string(),
2382            }
2383            .into());
2384        }
2385
2386        // Reject coin reservations in gas payment when the execution engine
2387        // doesn't support them.
2388        let protocol_config = epoch_store.protocol_config();
2389        if !protocol_config.enable_coin_reservation_obj_refs()
2390            && transaction.gas().iter().any(|obj_ref| {
2391                sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2392            })
2393        {
2394            return Err(SuiErrorKind::UnsupportedFeatureError {
2395                error:
2396                    "coin reservations in gas payment are not supported at this protocol version"
2397                        .to_string(),
2398            }
2399            .into());
2400        }
2401
2402        // Compute input/receiving object kinds before mock gas injection so the mock
2403        // gas reference is not included in input_object_kinds (it is added to
2404        // input_objects directly after object loading).
2405        let input_object_kinds = transaction.input_objects()?;
2406        let receiving_object_refs = transaction.receiving_objects();
2407
2408        // Inject mock gas coin before validity_check so that on protocol versions
2409        // where address-balance gas payments are not yet enabled, the non-empty
2410        // payment check in validity_check passes for simulate/dev-inspect requests
2411        // submitted without explicit gas.
2412        // Also required before pre_object_load_checks so that funds-withdrawal
2413        // processing sees non-empty payment and doesn't create an address-balance
2414        // withdrawal for gas.
2415        // Skip mock gas for gasless transactions — they don't use gas coins.
2416        let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2417        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2418        {
2419            let obj = Object::new_move(
2420                MoveObject::new_gas_coin(
2421                    OBJECT_START_VERSION,
2422                    ObjectID::MAX,
2423                    DEV_INSPECT_GAS_COIN_VALUE,
2424                ),
2425                Owner::AddressOwner(transaction.gas_data().owner),
2426                TransactionDigest::genesis_marker(),
2427            );
2428            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2429            Some(obj)
2430        } else {
2431            None
2432        };
2433
2434        // Full validity check including gas budget and price.
2435        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
2436
2437        let declared_withdrawals = self.pre_object_load_checks(
2438            &transaction,
2439            &[],
2440            &input_object_kinds,
2441            &receiving_object_refs,
2442            epoch_store.protocol_config(),
2443        )?;
2444        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2445
2446        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2447            // We don't want to cache this transaction since it's a simulation.
2448            None,
2449            &input_object_kinds,
2450            &receiving_object_refs,
2451            epoch_store.epoch(),
2452        )?;
2453
2454        // Add mock gas to input objects after loading (it doesn't exist in the store).
2455        let mock_gas_id = mock_gas_object.map(|obj| {
2456            let id = obj.id();
2457            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2458            id
2459        });
2460
2461        let protocol_config = epoch_store.protocol_config();
2462
2463        let (gas_status, checked_input_objects) = if dev_inspect {
2464            sui_transaction_checks::check_dev_inspect_input(
2465                protocol_config,
2466                &transaction,
2467                input_objects,
2468                receiving_objects,
2469                epoch_store.reference_gas_price(),
2470            )?
2471        } else {
2472            sui_transaction_checks::check_transaction_input(
2473                epoch_store.protocol_config(),
2474                epoch_store.reference_gas_price(),
2475                &transaction,
2476                input_objects,
2477                &receiving_objects,
2478                &self.metrics.bytecode_verifier_metrics,
2479                &self.config.verifier_signing_config,
2480            )?
2481        };
2482
2483        // TODO see if we can spin up a VM once and reuse it
2484        let executor = sui_execution::executor(
2485            protocol_config,
2486            true, // silent
2487        )
2488        .expect("Creating an executor should not fail here");
2489
2490        let (mut kind, signer, gas_data) = transaction.execution_parts();
2491        let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2492            self.chain_identifier,
2493            &*self.coin_reservation_resolver,
2494            signer,
2495            &mut kind,
2496            None,
2497        )?;
2498        let early_execution_error = get_early_execution_error(
2499            &transaction.digest(),
2500            &checked_input_objects,
2501            self.config.certificate_deny_config.certificate_deny_set(),
2502            &FundsWithdrawStatus::MaybeSufficient,
2503        );
2504        // Dev-inspect/simulation path (not committed): no assigned accumulator version here, so the
2505        // IFFW short-circuit applies unconditionally (`None`), matching non-mainnet execution.
2506        let execution_params = match early_execution_error {
2507            None => ExecutionOrEarlyError::ok(None),
2508            Some(errors) => ExecutionOrEarlyError::failed(errors, None),
2509        };
2510
2511        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2512
2513        // Clone inputs for potential retry if object funds check fails post-execution.
2514        let cloned_input_objects = checked_input_objects.clone();
2515        let cloned_gas = gas_data.clone();
2516        let cloned_kind = kind.clone();
2517        let tx_digest = transaction.digest();
2518        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2519        let epoch_timestamp_ms = epoch_store
2520            .epoch_start_config()
2521            .epoch_data()
2522            .epoch_start_timestamp();
2523        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2524            &tracking_store,
2525            protocol_config,
2526            self.metrics.execution_metrics.clone(),
2527            false, // expensive_checks
2528            execution_params,
2529            &epoch_id,
2530            epoch_timestamp_ms,
2531            checked_input_objects,
2532            gas_data,
2533            gas_status,
2534            kind,
2535            rewritten_inputs.clone(),
2536            signer,
2537            tx_digest,
2538            dev_inspect,
2539        );
2540
2541        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2542        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2543            let has_insufficient_object_funds = inner_temp_store
2544                .accumulator_running_max_withdraws
2545                .iter()
2546                .filter(|(id, _)| !address_funds.contains(id))
2547                .any(|(id, max_withdraw)| {
2548                    let balance = self.get_account_funds_read().get_latest_account_amount(id);
2549                    balance < *max_withdraw
2550                });
2551
2552            if has_insufficient_object_funds {
2553                let retry_gas_status = SuiGasStatus::new(
2554                    cloned_gas.budget,
2555                    cloned_gas.price,
2556                    epoch_store.reference_gas_price(),
2557                    protocol_config,
2558                )?;
2559                let (store, _, effects, result) = executor.dev_inspect_transaction(
2560                    &tracking_store,
2561                    protocol_config,
2562                    self.metrics.execution_metrics.clone(),
2563                    false,
2564                    ExecutionOrEarlyError::failed(
2565                        NonEmpty::new(ExecutionErrorKind::InsufficientFundsForWithdraw),
2566                        None,
2567                    ),
2568                    &epoch_id,
2569                    epoch_timestamp_ms,
2570                    cloned_input_objects,
2571                    cloned_gas,
2572                    retry_gas_status,
2573                    cloned_kind,
2574                    rewritten_inputs,
2575                    signer,
2576                    tx_digest,
2577                    dev_inspect,
2578                );
2579                (store, effects, result)
2580            } else {
2581                (inner_temp_store, effects, execution_result)
2582            }
2583        } else {
2584            (inner_temp_store, effects, execution_result)
2585        };
2586
2587        let loaded_runtime_objects = tracking_store.into_read_objects();
2588        let unchanged_loaded_runtime_objects =
2589            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2590                &transaction,
2591                &effects,
2592                &loaded_runtime_objects,
2593            );
2594
2595        let object_set = {
2596            let objects = {
2597                let mut objects = loaded_runtime_objects;
2598
2599                for o in inner_temp_store
2600                    .input_objects
2601                    .into_values()
2602                    .chain(inner_temp_store.written.into_values())
2603                {
2604                    objects.insert(o);
2605                }
2606
2607                objects
2608            };
2609
2610            let object_keys = sui_types::storage::get_transaction_object_set(
2611                &transaction,
2612                &effects,
2613                &unchanged_loaded_runtime_objects,
2614            );
2615
2616            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2617            for k in object_keys {
2618                if let Some(o) = objects.get(&k) {
2619                    set.insert(o.clone());
2620                }
2621            }
2622
2623            set
2624        };
2625
2626        Ok(SimulateTransactionResult {
2627            objects: object_set,
2628            events: effects.events_digest().map(|_| inner_temp_store.events),
2629            effects,
2630            execution_result,
2631            mock_gas_id,
2632            unchanged_loaded_runtime_objects,
2633            suggested_gas_price: self
2634                .congestion_tracker
2635                .get_suggested_gas_prices(&transaction),
2636        })
2637    }
2638
2639    /// The object ID for gas can be any object ID, even for an uncreated object
2640    #[instrument(skip_all)]
2641    pub async fn dev_inspect_transaction_block(
2642        &self,
2643        sender: SuiAddress,
2644        transaction_kind: TransactionKind,
2645        gas_price: Option<u64>,
2646        gas_budget: Option<u64>,
2647        gas_sponsor: Option<SuiAddress>,
2648        gas_objects: Option<Vec<ObjectRef>>,
2649        show_raw_txn_data_and_effects: Option<bool>,
2650        skip_checks: Option<bool>,
2651    ) -> SuiResult<DevInspectResults> {
2652        let epoch_store = self.load_epoch_store_one_call_per_task();
2653        let protocol_config = epoch_store.protocol_config();
2654        let reference_gas_price = epoch_store.reference_gas_price();
2655
2656        let skip_checks = skip_checks.unwrap_or(true);
2657        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2658
2659        // Synthesize the full TransactionData the caller would have signed.
2660        let price = gas_price.unwrap_or(reference_gas_price);
2661        let budget = gas_budget.unwrap_or(protocol_config.max_tx_gas());
2662        let owner = gas_sponsor.unwrap_or(sender);
2663        let payment = gas_objects.unwrap_or_default();
2664        let transaction = TransactionData::V1(TransactionDataV1 {
2665            kind: transaction_kind,
2666            sender,
2667            gas_data: GasData {
2668                payment,
2669                owner,
2670                price,
2671                budget,
2672            },
2673            expiration: TransactionExpiration::None,
2674        });
2675
2676        // Capture raw bytes before simulate (which may mutate gas_data for mock
2677        // gas injection).
2678        let raw_txn_data = if show_raw_txn_data_and_effects {
2679            bcs::to_bytes(&transaction).map_err(|_| {
2680                SuiErrorKind::TransactionSerializationError {
2681                    error: "Failed to serialize transaction during dev inspect".to_string(),
2682                }
2683            })?
2684        } else {
2685            vec![]
2686        };
2687
2688        // Route through `simulate_transaction`:
2689        //   skip_checks = true  → TransactionChecks::Disabled
2690        //   skip_checks = false → TransactionChecks::Enabled
2691        let checks = if skip_checks {
2692            TransactionChecks::Disabled
2693        } else {
2694            TransactionChecks::Enabled
2695        };
2696        let sim =
2697            self.simulate_transaction(transaction, checks, /* allow_mock_gas_coin */ true)?;
2698
2699        let raw_effects = if show_raw_txn_data_and_effects {
2700            bcs::to_bytes(&sim.effects).map_err(|_| {
2701                SuiErrorKind::TransactionSerializationError {
2702                    error: "Failed to serialize transaction effects during dev inspect".to_string(),
2703                }
2704            })?
2705        } else {
2706            vec![]
2707        };
2708
2709        // Resolve event/object layouts against the simulate-produced ObjectSet
2710        // (newly-published packages from the simulation appear there), with the
2711        // node's backing package store as fallback for already-on-chain packages.
2712        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2713            epoch_store.protocol_config(),
2714            Box::new(OverlayBackingPackageStore::new(
2715                &sim.objects,
2716                self.get_backing_package_store(),
2717            )),
2718        );
2719
2720        DevInspectResults::new(
2721            sim.effects,
2722            sim.events.unwrap_or_default(),
2723            sim.execution_result,
2724            raw_txn_data,
2725            raw_effects,
2726            layout_resolver.as_mut(),
2727        )
2728    }
2729
2730    // Only used for testing because of how epoch store is loaded.
2731    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2732        let epoch_store = self.epoch_store_for_testing();
2733        Ok(epoch_store.reference_gas_price())
2734    }
2735
2736    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2737        self.get_transaction_cache_reader()
2738            .is_tx_already_executed(digest)
2739    }
2740
2741    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2742    fn index_tx(
2743        sequence: u64,
2744        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2745        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2746        indexes: &IndexStore,
2747        digest: &TransactionDigest,
2748        // TODO: index_tx really just need the transaction data here.
2749        cert: &VerifiedExecutableTransaction,
2750        effects: &TransactionEffects,
2751        events: &TransactionEvents,
2752        timestamp_ms: u64,
2753        tx_coins: Option<TxCoins>,
2754        written: &WrittenObjects,
2755        inner_temporary_store: &InnerTemporaryStore,
2756        epoch_store: &Arc<AuthorityPerEpochStore>,
2757        acquire_locks: bool,
2758    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2759        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2760            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2761
2762        indexes.index_tx(
2763            sequence,
2764            cert.data().intent_message().value.sender(),
2765            cert.data()
2766                .intent_message()
2767                .value
2768                .input_objects()?
2769                .iter()
2770                .map(|o| o.object_id()),
2771            effects
2772                .all_changed_objects()
2773                .into_iter()
2774                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2775            cert.data()
2776                .intent_message()
2777                .value
2778                .move_calls()
2779                .into_iter()
2780                .map(|(_cmd_idx, package, module, function)| {
2781                    (*package, module.to_owned(), function.to_owned())
2782                }),
2783            events,
2784            changes,
2785            digest,
2786            timestamp_ms,
2787            tx_coins,
2788            effects.accumulator_events(),
2789            acquire_locks,
2790        )
2791    }
2792
2793    #[cfg(msim)]
2794    fn simulate_fork_during_execution(
2795        &self,
2796        certificate: &VerifiedExecutableTransaction,
2797        epoch_store: &Arc<AuthorityPerEpochStore>,
2798        effects: &mut TransactionEffects,
2799        forked_validators: std::sync::Arc<
2800            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2801        >,
2802        full_halt: bool,
2803        effects_overrides: std::sync::Arc<
2804            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2805        >,
2806        fork_probability: f32,
2807    ) {
2808        use std::cell::RefCell;
2809        thread_local! {
2810            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2811        }
2812        if !certificate.data().intent_message().value.is_system_tx() {
2813            let committee = epoch_store.committee();
2814            let cur_stake = (**committee).weight(&self.name);
2815            if cur_stake > 0 {
2816                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2817                    let already_forked = forked_validators
2818                        .lock()
2819                        .ok()
2820                        .map(|set| set.contains(&self.name))
2821                        .unwrap_or(false);
2822
2823                    if !already_forked {
2824                        let should_fork = if full_halt {
2825                            // For full halt, fork enough nodes to reach validity threshold
2826                            *total_stake <= committee.validity_threshold()
2827                        } else {
2828                            // For partial fork, stay strictly below validity threshold
2829                            *total_stake + cur_stake < committee.validity_threshold()
2830                        };
2831
2832                        if should_fork {
2833                            *total_stake += cur_stake;
2834
2835                            if let Ok(mut external_set) = forked_validators.lock() {
2836                                external_set.insert(self.name);
2837                                info!("forked_validators: {:?}", external_set);
2838                            }
2839                        }
2840                    }
2841
2842                    if let Ok(external_set) = forked_validators.lock() {
2843                        if external_set.contains(&self.name) {
2844                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
2845                            // Fork this transaction and record the digest and the original effects to original_effects.
2846                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
2847                            // the transaction was forked on a different validator), fork this txn as well.
2848
2849                            let tx_digest = certificate.digest().to_string();
2850                            if let Ok(mut overrides) = effects_overrides.lock() {
2851                                if overrides.contains_key(&tx_digest)
2852                                    || overrides.is_empty()
2853                                        && sui_simulator::random::deterministic_probability(
2854                                            &tx_digest,
2855                                            fork_probability,
2856                                        )
2857                                {
2858                                    let original_effects_digest = effects.digest().to_string();
2859                                    overrides
2860                                        .insert(tx_digest.clone(), original_effects_digest.clone());
2861                                    info!(
2862                                        ?tx_digest,
2863                                        ?original_effects_digest,
2864                                        "Captured forked effects digest for transaction"
2865                                    );
2866                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
2867                                        1;
2868                                }
2869                            }
2870                        }
2871                    }
2872                });
2873            }
2874        }
2875    }
2876
2877    fn process_object_index(
2878        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2879        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2880        effects: &TransactionEffects,
2881        written: &WrittenObjects,
2882        inner_temporary_store: &InnerTemporaryStore,
2883        epoch_store: &Arc<AuthorityPerEpochStore>,
2884    ) -> SuiResult<ObjectIndexChanges> {
2885        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2886            epoch_store.protocol_config(),
2887            Box::new(PackageStoreWithFallback::new(
2888                inner_temporary_store,
2889                backing_package_store,
2890            )),
2891        );
2892
2893        let modified_at_version = effects
2894            .modified_at_versions()
2895            .into_iter()
2896            .collect::<HashMap<_, _>>();
2897
2898        let tx_digest = effects.transaction_digest();
2899        let mut deleted_owners = vec![];
2900        let mut deleted_dynamic_fields = vec![];
2901        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2902            let old_version = modified_at_version.get(&id).unwrap();
2903            // When we process the index, the latest object hasn't been written yet so
2904            // the old object must be present.
2905            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
2906                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2907            ) {
2908                Owner::AddressOwner(addr)
2909                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
2910                Owner::ObjectOwner(object_id) => {
2911                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2912                }
2913                _ => {}
2914            }
2915        }
2916
2917        let mut new_owners = vec![];
2918        let mut new_dynamic_fields = vec![];
2919
2920        for (oref, owner, kind) in effects.all_changed_objects() {
2921            let id = &oref.0;
2922            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
2923            if let WriteKind::Mutate = kind {
2924                let Some(old_version) = modified_at_version.get(id) else {
2925                    panic!(
2926                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2927                        tx_digest
2928                    );
2929                };
2930                // When we process the index, the latest object hasn't been written yet so
2931                // the old object must be present.
2932                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
2933                    panic!(
2934                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2935                        tx_digest, id, old_version
2936                    );
2937                };
2938                if old_object.owner != owner {
2939                    match old_object.owner {
2940                        Owner::AddressOwner(addr)
2941                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2942                            deleted_owners.push((addr, *id));
2943                        }
2944                        Owner::ObjectOwner(object_id) => {
2945                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2946                        }
2947                        _ => {}
2948                    }
2949                }
2950            }
2951
2952            match owner {
2953                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2954                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
2955                    let new_object = written.get(id).unwrap_or_else(
2956                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2957                    );
2958                    assert_eq!(
2959                        new_object.version(),
2960                        oref.1,
2961                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2962                        tx_digest,
2963                        id,
2964                        new_object.version(),
2965                        oref.1
2966                    );
2967
2968                    let type_ = new_object
2969                        .type_()
2970                        .map(|type_| ObjectType::Struct(type_.clone()))
2971                        .unwrap_or(ObjectType::Package);
2972
2973                    new_owners.push((
2974                        (addr, *id),
2975                        ObjectInfo {
2976                            object_id: *id,
2977                            version: oref.1,
2978                            digest: oref.2,
2979                            type_,
2980                            owner,
2981                            previous_transaction: *effects.transaction_digest(),
2982                        },
2983                    ));
2984                }
2985                Owner::ObjectOwner(owner) => {
2986                    let new_object = written.get(id).unwrap_or_else(
2987                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2988                    );
2989                    assert_eq!(
2990                        new_object.version(),
2991                        oref.1,
2992                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2993                        tx_digest,
2994                        id,
2995                        new_object.version(),
2996                        oref.1
2997                    );
2998
2999                    let Some(df_info) = Self::try_create_dynamic_field_info(
3000                        object_store,
3001                        new_object,
3002                        written,
3003                        layout_resolver.as_mut(),
3004                    )
3005                    .unwrap_or_else(|e| {
3006                        error!(
3007                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3008                            e, new_object
3009                        );
3010                        None
3011                    }) else {
3012                        // Skip indexing for non dynamic field objects.
3013                        continue;
3014                    };
3015                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3016                }
3017                _ => {}
3018            }
3019        }
3020
3021        Ok(ObjectIndexChanges {
3022            deleted_owners,
3023            deleted_dynamic_fields,
3024            new_owners,
3025            new_dynamic_fields,
3026        })
3027    }
3028
3029    fn try_create_dynamic_field_info(
3030        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3031        o: &Object,
3032        written: &WrittenObjects,
3033        resolver: &mut dyn LayoutResolver,
3034    ) -> SuiResult<Option<DynamicFieldInfo>> {
3035        // Skip if not a move object
3036        let Some(move_object) = o.data.try_as_move().cloned() else {
3037            return Ok(None);
3038        };
3039
3040        // We only index dynamic field objects
3041        if !move_object.type_().is_dynamic_field() {
3042            return Ok(None);
3043        }
3044
3045        let layout = resolver
3046            .get_annotated_layout(&move_object.type_().clone().into())?
3047            .into_layout();
3048
3049        let field =
3050            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3051                SuiErrorKind::ObjectDeserializationError {
3052                    error: e.to_string(),
3053                }
3054            })?;
3055
3056        let type_ = field.kind;
3057        let name_type: TypeTag = field.name_layout.into();
3058        let bcs_name = field.name_bytes.to_owned();
3059
3060        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3061            .map_err(|e| {
3062                warn!("{e}");
3063                SuiErrorKind::ObjectDeserializationError {
3064                    error: e.to_string(),
3065                }
3066            })?;
3067
3068        let name = DynamicFieldName {
3069            type_: name_type,
3070            value: SuiMoveValue::from(name_value).to_json_value(),
3071        };
3072
3073        let value_metadata = field.value_metadata().map_err(|e| {
3074            warn!("{e}");
3075            SuiErrorKind::ObjectDeserializationError {
3076                error: e.to_string(),
3077            }
3078        })?;
3079
3080        Ok(Some(match value_metadata {
3081            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3082                name,
3083                bcs_name,
3084                type_,
3085                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3086                object_id: o.id(),
3087                version: o.version(),
3088                digest: o.digest(),
3089            },
3090
3091            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3092                // Find the actual object from storage using the object id obtained from the wrapper.
3093
3094                // Try to find the object in the written objects first.
3095                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3096                    let version = object.version();
3097                    let digest = object.digest();
3098                    let object_type = object.data.type_().unwrap().clone();
3099                    (version, digest, object_type)
3100                } else {
3101                    // If not found, try to find it in the database.
3102                    let object = object_store
3103                        .get_object_by_key(&object_id, o.version())
3104                        .ok_or_else(|| UserInputError::ObjectNotFound {
3105                            object_id,
3106                            version: Some(o.version()),
3107                        })?;
3108                    let version = object.version();
3109                    let digest = object.digest();
3110                    let object_type = object.data.type_().unwrap().clone();
3111                    (version, digest, object_type)
3112                };
3113
3114                DynamicFieldInfo {
3115                    name,
3116                    bcs_name,
3117                    type_,
3118                    object_type: object_type.to_string(),
3119                    object_id,
3120                    version,
3121                    digest,
3122                }
3123            }
3124        }))
3125    }
3126
3127    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3128    fn post_process_one_tx(
3129        &self,
3130        certificate: &VerifiedExecutableTransaction,
3131        effects: &TransactionEffects,
3132        inner_temporary_store: &InnerTemporaryStore,
3133        epoch_store: &Arc<AuthorityPerEpochStore>,
3134    ) -> SuiResult {
3135        let Some(indexes) = &self.indexes else {
3136            return Ok(());
3137        };
3138
3139        let tx_digest = *certificate.digest();
3140
3141        // Allocate sequence number on the calling thread to preserve execution order.
3142        let sequence = indexes.allocate_sequence_number();
3143
3144        if self.config.sync_post_process_one_tx {
3145            // Synchronous mode: run post-processing inline on the calling thread
3146            // and commit the index batch immediately with locks held.
3147            // Used as a rollback mechanism and for testing correctness against async mode.
3148            // TODO: delete this branch once async mode has shipped
3149            let result = Self::post_process_one_tx_impl(
3150                sequence,
3151                indexes,
3152                &self.subscription_handler,
3153                &self.metrics,
3154                self.name,
3155                self.get_backing_package_store(),
3156                self.get_object_store(),
3157                certificate,
3158                effects,
3159                inner_temporary_store,
3160                epoch_store,
3161                true, // acquire_locks
3162            );
3163
3164            match result {
3165                Ok((raw_batch, cache_updates_with_locks)) => {
3166                    let mut db_batch = indexes.new_db_batch();
3167                    db_batch
3168                        .concat(vec![raw_batch])
3169                        .expect("failed to absorb raw index batch");
3170                    // Destructure to keep _locks alive through commit_index_batch.
3171                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3172                        cache_updates_with_locks;
3173                    indexes
3174                        .commit_index_batch(db_batch, vec![inner])
3175                        .expect("failed to commit index batch");
3176                }
3177                Err(e) => {
3178                    self.metrics.post_processing_total_failures.inc();
3179                    error!(?tx_digest, "tx post processing failed: {e}");
3180                    return Err(e);
3181                }
3182            }
3183
3184            return Ok(());
3185        }
3186
3187        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3188        self.pending_post_processing.insert(tx_digest, done_rx);
3189
3190        let indexes = indexes.clone();
3191        let subscription_handler = self.subscription_handler.clone();
3192        let metrics = self.metrics.clone();
3193        let name = self.name;
3194        let backing_package_store = self.get_backing_package_store().clone();
3195        let object_store = self.get_object_store().clone();
3196        let semaphore = self.post_processing_semaphore.clone();
3197
3198        let certificate = certificate.clone();
3199        let effects = effects.clone();
3200        let inner_temporary_store = inner_temporary_store.clone();
3201        let epoch_store = epoch_store.clone();
3202
3203        // spawn post processing on a blocking thread
3204        tokio::spawn(async move {
3205            let permit = {
3206                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3207                semaphore
3208                    .acquire_owned()
3209                    .await
3210                    .expect("post-processing semaphore should not be closed")
3211            };
3212
3213            let _ = tokio::task::spawn_blocking(move || {
3214                let _permit = permit;
3215
3216                let result = Self::post_process_one_tx_impl(
3217                    sequence,
3218                    &indexes,
3219                    &subscription_handler,
3220                    &metrics,
3221                    name,
3222                    &backing_package_store,
3223                    &object_store,
3224                    &certificate,
3225                    &effects,
3226                    &inner_temporary_store,
3227                    &epoch_store,
3228                    false, // acquire_locks
3229                );
3230
3231                match result {
3232                    Ok((raw_batch, cache_updates_with_locks)) => {
3233                        fail_point!("crash-after-post-process-one-tx");
3234                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3235                        let _ = done_tx.send(output);
3236                    }
3237                    Err(e) => {
3238                        metrics.post_processing_total_failures.inc();
3239                        error!(?tx_digest, "tx post processing failed: {e}");
3240                    }
3241                }
3242            })
3243            .await;
3244        });
3245
3246        Ok(())
3247    }
3248
3249    fn post_process_one_tx_impl(
3250        sequence: u64,
3251        indexes: &Arc<IndexStore>,
3252        subscription_handler: &Arc<SubscriptionHandler>,
3253        metrics: &Arc<AuthorityMetrics>,
3254        name: AuthorityName,
3255        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3256        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3257        certificate: &VerifiedExecutableTransaction,
3258        effects: &TransactionEffects,
3259        inner_temporary_store: &InnerTemporaryStore,
3260        epoch_store: &Arc<AuthorityPerEpochStore>,
3261        acquire_locks: bool,
3262    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3263        let _scope = monitored_scope("Execution::post_process_one_tx");
3264
3265        let tx_digest = certificate.digest();
3266        let timestamp_ms = Self::unixtime_now_ms();
3267        let events = &inner_temporary_store.events;
3268        let written = &inner_temporary_store.written;
3269        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3270            name,
3271            object_store,
3272            effects,
3273            inner_temporary_store,
3274            epoch_store,
3275        );
3276
3277        let (raw_batch, cache_updates) = Self::index_tx(
3278            sequence,
3279            backing_package_store,
3280            object_store,
3281            indexes,
3282            tx_digest,
3283            certificate,
3284            effects,
3285            events,
3286            timestamp_ms,
3287            tx_coins,
3288            written,
3289            inner_temporary_store,
3290            epoch_store,
3291            acquire_locks,
3292        )
3293        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3294        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3295        .expect("Indexing tx should not fail");
3296
3297        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3298        let events = Self::make_transaction_block_events(
3299            backing_package_store,
3300            events.clone(),
3301            *tx_digest,
3302            timestamp_ms,
3303            epoch_store,
3304            inner_temporary_store,
3305        )?;
3306        // Emit events
3307        subscription_handler
3308            .process_tx(certificate.data().transaction_data(), &effects, &events)
3309            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3310            .tap_err(|e| {
3311                warn!(
3312                    ?tx_digest,
3313                    "Post processing - Couldn't process events for tx: {}", e
3314                )
3315            })?;
3316
3317        metrics
3318            .post_processing_total_events_emitted
3319            .inc_by(events.data.len() as u64);
3320
3321        Ok((raw_batch, cache_updates))
3322    }
3323
3324    fn make_transaction_block_events(
3325        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3326        transaction_events: TransactionEvents,
3327        digest: TransactionDigest,
3328        timestamp_ms: u64,
3329        epoch_store: &Arc<AuthorityPerEpochStore>,
3330        inner_temporary_store: &InnerTemporaryStore,
3331    ) -> SuiResult<SuiTransactionBlockEvents> {
3332        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3333            epoch_store.protocol_config(),
3334            Box::new(PackageStoreWithFallback::new(
3335                inner_temporary_store,
3336                backing_package_store,
3337            )),
3338        );
3339        SuiTransactionBlockEvents::try_from(
3340            transaction_events,
3341            digest,
3342            Some(timestamp_ms),
3343            layout_resolver.as_mut(),
3344        )
3345    }
3346
3347    pub fn unixtime_now_ms() -> u64 {
3348        let now = SystemTime::now()
3349            .duration_since(UNIX_EPOCH)
3350            .expect("Time went backwards")
3351            .as_millis();
3352        u64::try_from(now).expect("Travelling in time machine")
3353    }
3354
3355    // TODO(fastpath): update this handler for Mysticeti fastpath.
3356    // There will no longer be validator quorum signed transactions or effects.
3357    // The proof of finality needs to come from checkpoints.
3358    #[instrument(level = "trace", skip_all)]
3359    pub async fn handle_transaction_info_request(
3360        &self,
3361        request: TransactionInfoRequest,
3362    ) -> SuiResult<TransactionInfoResponse> {
3363        let epoch_store = self.load_epoch_store_one_call_per_task();
3364        let (transaction, status) = self
3365            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3366            .ok_or(SuiErrorKind::TransactionNotFound {
3367                digest: request.transaction_digest,
3368            })?;
3369        Ok(TransactionInfoResponse {
3370            transaction,
3371            status,
3372        })
3373    }
3374
3375    #[instrument(level = "trace", skip_all)]
3376    pub async fn handle_object_info_request(
3377        &self,
3378        request: ObjectInfoRequest,
3379    ) -> SuiResult<ObjectInfoResponse> {
3380        let epoch_store = self.load_epoch_store_one_call_per_task();
3381
3382        let requested_object_seq = match request.request_kind {
3383            ObjectInfoRequestKind::LatestObjectInfo => {
3384                let (_, seq, _) =
3385                    self.get_object_or_tombstone(request.object_id)
3386                        .ok_or_else(|| {
3387                            SuiError::from(UserInputError::ObjectNotFound {
3388                                object_id: request.object_id,
3389                                version: None,
3390                            })
3391                        })?;
3392                seq
3393            }
3394            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3395        };
3396
3397        let object = self
3398            .get_object_store()
3399            .get_object_by_key(&request.object_id, requested_object_seq)
3400            .ok_or_else(|| {
3401                SuiError::from(UserInputError::ObjectNotFound {
3402                    object_id: request.object_id,
3403                    version: Some(requested_object_seq),
3404                })
3405            })?;
3406
3407        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3408            (request.generate_layout, object.data.try_as_move())
3409        {
3410            let epoch_store = self.load_epoch_store_one_call_per_task();
3411            Some(into_struct_layout(
3412                epoch_store
3413                    .executor()
3414                    .type_layout_resolver(
3415                        epoch_store.protocol_config(),
3416                        Box::new(self.get_backing_package_store().as_ref()),
3417                    )
3418                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3419            )?)
3420        } else {
3421            None
3422        };
3423
3424        let lock = if !object.is_address_owned() {
3425            // Only address owned objects have locks.
3426            None
3427        } else {
3428            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)?
3429                .map(|s| s.into_inner())
3430        };
3431
3432        Ok(ObjectInfoResponse {
3433            object,
3434            layout,
3435            lock_for_debugging: lock,
3436        })
3437    }
3438
3439    #[instrument(level = "trace", skip_all)]
3440    pub fn handle_checkpoint_request(
3441        &self,
3442        request: &CheckpointRequest,
3443    ) -> SuiResult<CheckpointResponse> {
3444        let summary = match request.sequence_number {
3445            Some(seq) => self
3446                .checkpoint_store
3447                .get_checkpoint_by_sequence_number(seq)?,
3448            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3449        }
3450        .map(|v| v.into_inner());
3451        let contents = match &summary {
3452            Some(s) => self
3453                .checkpoint_store
3454                .get_checkpoint_contents(&s.content_digest)?,
3455            None => None,
3456        };
3457        Ok(CheckpointResponse {
3458            checkpoint: summary,
3459            contents,
3460        })
3461    }
3462
3463    #[instrument(level = "trace", skip_all)]
3464    pub fn handle_checkpoint_request_v2(
3465        &self,
3466        request: &CheckpointRequestV2,
3467    ) -> SuiResult<CheckpointResponseV2> {
3468        let summary = if request.certified {
3469            let summary = match request.sequence_number {
3470                Some(seq) => self
3471                    .checkpoint_store
3472                    .get_checkpoint_by_sequence_number(seq)?,
3473                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474            }
3475            .map(|v| v.into_inner());
3476            summary.map(CheckpointSummaryResponse::Certified)
3477        } else {
3478            let summary = match request.sequence_number {
3479                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3480                None => self
3481                    .checkpoint_store
3482                    .get_latest_locally_computed_checkpoint()?,
3483            };
3484            summary.map(CheckpointSummaryResponse::Pending)
3485        };
3486        let contents = match &summary {
3487            Some(s) => self
3488                .checkpoint_store
3489                .get_checkpoint_contents(&s.content_digest())?,
3490            None => None,
3491        };
3492        Ok(CheckpointResponseV2 {
3493            checkpoint: summary,
3494            contents,
3495        })
3496    }
3497
3498    fn check_protocol_version(
3499        supported_protocol_versions: SupportedProtocolVersions,
3500        current_version: ProtocolVersion,
3501    ) {
3502        info!("current protocol version is now {:?}", current_version);
3503        info!("supported versions are: {:?}", supported_protocol_versions);
3504        if !supported_protocol_versions.is_version_supported(current_version) {
3505            let msg = format!(
3506                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3507                current_version, supported_protocol_versions,
3508            );
3509
3510            error!("{}", msg);
3511            eprintln!("{}", msg);
3512
3513            #[cfg(not(msim))]
3514            std::process::exit(1);
3515
3516            #[cfg(msim)]
3517            sui_simulator::task::shutdown_current_node();
3518        }
3519    }
3520
3521    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3522    #[allow(clippy::too_many_arguments)]
3523    pub async fn new(
3524        name: AuthorityName,
3525        secret: StableSyncAuthoritySigner,
3526        supported_protocol_versions: SupportedProtocolVersions,
3527        store: Arc<AuthorityStore>,
3528        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3529        epoch_store: Arc<AuthorityPerEpochStore>,
3530        committee_store: Arc<CommitteeStore>,
3531        indexes: Option<Arc<IndexStore>>,
3532        rpc_index: Option<Arc<RpcIndexStore>>,
3533        rpc_store: Option<RpcStore>,
3534        checkpoint_store: Arc<CheckpointStore>,
3535        prometheus_registry: &Registry,
3536        genesis_objects: &[Object],
3537        db_checkpoint_config: &DBCheckpointConfig,
3538        config: NodeConfig,
3539        chain_identifier: ChainIdentifier,
3540        policy_config: Option<PolicyConfig>,
3541        firewall_config: Option<RemoteFirewallConfig>,
3542        pruner_watermarks: Arc<PrunerWatermarks>,
3543    ) -> Arc<Self> {
3544        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3545
3546        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3547        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3548        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3549            execution_cache_trait_pointers.object_cache_reader.clone(),
3550            execution_cache_trait_pointers.account_funds_read.clone(),
3551            execution_cache_trait_pointers
3552                .transaction_cache_reader
3553                .clone(),
3554            tx_ready_certificates,
3555            &epoch_store,
3556            config.funds_withdraw_scheduler_type,
3557            metrics.clone(),
3558            prometheus_registry,
3559        ));
3560        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3561
3562        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3563            epoch_store.get_parent_path(),
3564            &config.authority_store_pruning_config,
3565        );
3566        let _pruner = AuthorityStorePruner::new(
3567            store.perpetual_tables.clone(),
3568            checkpoint_store.clone(),
3569            rpc_index.clone(),
3570            rpc_store,
3571            indexes.clone(),
3572            config.authority_store_pruning_config.clone(),
3573            epoch_store.committee().authority_exists(&name),
3574            epoch_store.epoch_start_state().epoch_duration_ms(),
3575            prometheus_registry,
3576            pruner_watermarks,
3577        );
3578        let input_loader =
3579            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3580        let epoch = epoch_store.epoch();
3581        let traffic_controller_metrics =
3582            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3583        let traffic_controller = if let Some(policy_config) = policy_config {
3584            Some(Arc::new(
3585                TrafficController::init(
3586                    policy_config,
3587                    traffic_controller_metrics,
3588                    firewall_config.clone(),
3589                )
3590                .await,
3591            ))
3592        } else {
3593            None
3594        };
3595
3596        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3597            ForkRecoveryState::new(Some(fork_config))
3598                .expect("Failed to initialize fork recovery state")
3599        });
3600
3601        let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3602            execution_cache_trait_pointers.child_object_resolver.clone(),
3603        ));
3604
3605        let object_funds_checker_metrics =
3606            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3607        let state = Arc::new(AuthorityState {
3608            name,
3609            secret,
3610            execution_lock: RwLock::new(epoch),
3611            epoch_store: ArcSwap::new(epoch_store.clone()),
3612            input_loader,
3613            execution_cache_trait_pointers,
3614            coin_reservation_resolver,
3615            indexes,
3616            rpc_index,
3617            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3618            checkpoint_store,
3619            committee_store,
3620            execution_scheduler,
3621            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3622            metrics,
3623            _pruner,
3624            _authority_per_epoch_pruner,
3625            db_checkpoint_config: db_checkpoint_config.clone(),
3626            config,
3627            overload_info: AuthorityOverloadInfo::default(),
3628            chain_identifier,
3629            congestion_tracker: Arc::new(CongestionTracker::new()),
3630            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3631            traffic_controller,
3632            fork_recovery_state,
3633            notify_epoch: tokio::sync::watch::channel(epoch).0,
3634            object_funds_checker: ArcSwapOption::empty(),
3635            object_funds_checker_metrics,
3636            pending_post_processing: Arc::new(DashMap::new()),
3637            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3638        });
3639        state.init_object_funds_checker().await;
3640
3641        // Start a task to execute ready certificates.
3642        let authority_state = Arc::downgrade(&state);
3643        spawn_monitored_task!(execution_process(
3644            authority_state,
3645            rx_ready_certificates,
3646            rx_execution_shutdown,
3647        ));
3648        // TODO: This doesn't belong to the constructor of AuthorityState.
3649        state
3650            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3651            .expect("Error indexing genesis objects.");
3652
3653        if epoch_store
3654            .protocol_config()
3655            .enable_multi_epoch_transaction_expiration()
3656            && epoch_store.epoch() > 0
3657        {
3658            use typed_store::Map;
3659            let previous_epoch = epoch_store.epoch() - 1;
3660            let start_key = (previous_epoch, TransactionDigest::ZERO);
3661            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3662            let has_previous_epoch_data = store
3663                .perpetual_tables
3664                .executed_transaction_digests
3665                .safe_range_iter(start_key..end_key)
3666                .next()
3667                .is_some();
3668
3669            if !has_previous_epoch_data {
3670                panic!(
3671                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3672                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3673                    Please restore from a snapshot using the latest version of sui-tool.",
3674                    previous_epoch
3675                );
3676            }
3677        }
3678
3679        state
3680    }
3681
3682    async fn init_object_funds_checker(&self) {
3683        let epoch_store = self.epoch_store.load();
3684        if self.node_role(&epoch_store).runs_consensus()
3685            && epoch_store.protocol_config().enable_object_funds_withdraw()
3686        {
3687            if self.object_funds_checker.load().is_none() {
3688                let inner = self.get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID).map(|o| {
3689                    Arc::new(ObjectFundsChecker::new(
3690                        o.version(),
3691                        self.object_funds_checker_metrics.clone(),
3692                    ))
3693                });
3694                self.object_funds_checker.store(inner);
3695            }
3696        } else {
3697            self.object_funds_checker.store(None);
3698        }
3699    }
3700
3701    // TODO: Consolidate our traits to reduce the number of methods here.
3702    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3703        &self.execution_cache_trait_pointers.object_cache_reader
3704    }
3705
3706    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3707        &self.execution_cache_trait_pointers.transaction_cache_reader
3708    }
3709
3710    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3711        &self.execution_cache_trait_pointers.cache_writer
3712    }
3713
3714    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3715        &self.execution_cache_trait_pointers.backing_store
3716    }
3717
3718    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3719        &self.execution_cache_trait_pointers.child_object_resolver
3720    }
3721
3722    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3723        &self.execution_cache_trait_pointers.account_funds_read
3724    }
3725
3726    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3727        &self.execution_cache_trait_pointers.backing_package_store
3728    }
3729
3730    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3731        &self.execution_cache_trait_pointers.object_store
3732    }
3733
3734    pub async fn await_post_processing(
3735        &self,
3736        tx_digest: &TransactionDigest,
3737    ) -> Option<PostProcessingOutput> {
3738        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3739            // Tx was executed and post-processing is in flight.
3740            rx.await.ok()
3741        } else {
3742            // Tx was already persisted or post-processing already completed.
3743            None
3744        }
3745    }
3746
3747    /// Await post-processing for a transaction and commit the index batch immediately.
3748    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
3749    /// index batches at checkpoint boundaries.
3750    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3751        if let Some(indexes) = &self.indexes
3752            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3753        {
3754            let mut db_batch = indexes.new_db_batch();
3755            db_batch
3756                .concat(vec![raw_batch])
3757                .expect("failed to build index batch");
3758            indexes
3759                .commit_index_batch(db_batch, vec![cache_updates])
3760                .expect("failed to commit index batch");
3761        }
3762    }
3763
3764    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3765        &self.execution_cache_trait_pointers.reconfig_api
3766    }
3767
3768    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3769        &self.execution_cache_trait_pointers.global_state_hash_store
3770    }
3771
3772    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3773        &self.execution_cache_trait_pointers.checkpoint_cache
3774    }
3775
3776    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3777        &self.execution_cache_trait_pointers.state_sync_store
3778    }
3779
3780    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3781        &self.execution_cache_trait_pointers.cache_commit
3782    }
3783
3784    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3785        self.execution_cache_trait_pointers
3786            .testing_api
3787            .database_for_testing()
3788    }
3789
3790    /// Access to the underlying authority store for diagnostic tooling (db-shell).
3791    ///
3792    /// Goes through `testing_api` deliberately: db-shell is read-only diagnostics
3793    /// that bypasses the writeback cache, and we want the execution path to have
3794    /// no other route to the raw `AuthorityStore`. Using the testing API here
3795    /// keeps that invariant visible — diagnostic code is the only non-test caller.
3796    pub fn authority_store(&self) -> Arc<AuthorityStore> {
3797        self.execution_cache_trait_pointers
3798            .testing_api
3799            .database_for_testing()
3800    }
3801
3802    pub fn cache_for_testing(&self) -> &WritebackCache {
3803        self.execution_cache_trait_pointers
3804            .testing_api
3805            .cache_for_testing()
3806    }
3807
3808    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3809        &self,
3810        config: NodeConfig,
3811        metrics: Arc<AuthorityStorePruningMetrics>,
3812    ) -> anyhow::Result<()> {
3813        use crate::authority::authority_store_pruner::PrunerWatermarks;
3814        let watermarks = Arc::new(PrunerWatermarks::default());
3815        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3816            &self.database_for_testing().perpetual_tables,
3817            &self.checkpoint_store,
3818            self.rpc_index.as_deref(),
3819            None,
3820            config.authority_store_pruning_config,
3821            metrics,
3822            EPOCH_DURATION_MS_FOR_TESTING,
3823            &watermarks,
3824        )
3825        .await
3826    }
3827
3828    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3829        &self.execution_scheduler
3830    }
3831
3832    fn create_owner_index_if_empty(
3833        &self,
3834        genesis_objects: &[Object],
3835        epoch_store: &Arc<AuthorityPerEpochStore>,
3836    ) -> SuiResult {
3837        let Some(index_store) = &self.indexes else {
3838            return Ok(());
3839        };
3840        if !index_store.is_empty() {
3841            return Ok(());
3842        }
3843
3844        let mut new_owners = vec![];
3845        let mut new_dynamic_fields = vec![];
3846        let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3847            epoch_store.protocol_config(),
3848            Box::new(self.get_backing_package_store().as_ref()),
3849        );
3850        for o in genesis_objects.iter() {
3851            match o.owner {
3852                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3853                    new_owners.push((
3854                        (addr, o.id()),
3855                        ObjectInfo::new(&o.compute_object_reference(), o),
3856                    ))
3857                }
3858                Owner::ObjectOwner(object_id) => {
3859                    let id = o.id();
3860                    let Some(info) = Self::try_create_dynamic_field_info(
3861                        self.get_object_store(),
3862                        o,
3863                        &BTreeMap::new(),
3864                        layout_resolver.as_mut(),
3865                    )?
3866                    else {
3867                        continue;
3868                    };
3869                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3870                }
3871                _ => {}
3872            }
3873        }
3874
3875        index_store.insert_genesis_objects(ObjectIndexChanges {
3876            deleted_owners: vec![],
3877            deleted_dynamic_fields: vec![],
3878            new_owners,
3879            new_dynamic_fields,
3880        })
3881    }
3882
3883    /// Attempts to acquire execution lock for an executable transaction.
3884    /// Returns Some(lock) if the transaction is matching current executed epoch.
3885    /// Returns None if validator is halted at epoch end or epoch mismatch.
3886    pub fn execution_lock_for_executable_transaction(
3887        &self,
3888        transaction: &VerifiedExecutableTransaction,
3889    ) -> Option<ExecutionLockReadGuard<'_>> {
3890        let lock = self.execution_lock.try_read().ok()?;
3891        if *lock == transaction.auth_sig().epoch() {
3892            Some(lock)
3893        } else {
3894            // TODO: Can this still happen?
3895            None
3896        }
3897    }
3898
3899    /// Acquires the execution lock for the duration of transaction validation.
3900    /// This prevents reconfiguration from starting until we are finished validating the transaction.
3901    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3902        self.execution_lock
3903            .try_read()
3904            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3905    }
3906
3907    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3908        self.execution_lock.write().await
3909    }
3910
3911    #[instrument(level = "error", skip_all)]
3912    pub async fn reconfigure(
3913        &self,
3914        cur_epoch_store: &AuthorityPerEpochStore,
3915        supported_protocol_versions: SupportedProtocolVersions,
3916        new_committee: Committee,
3917        epoch_start_configuration: EpochStartConfiguration,
3918        state_hasher: Arc<GlobalStateHasher>,
3919        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3920        epoch_last_checkpoint: CheckpointSequenceNumber,
3921    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3922        Self::check_protocol_version(
3923            supported_protocol_versions,
3924            epoch_start_configuration
3925                .epoch_start_state()
3926                .protocol_version(),
3927        );
3928
3929        self.committee_store.insert_new_committee(&new_committee)?;
3930
3931        // Wait until no transactions are being executed.
3932        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3933
3934        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3935        cur_epoch_store.epoch_terminated().await;
3936
3937        // Safe to being reconfiguration now. No transactions are being executed,
3938        // and no epoch-specific tasks are running.
3939
3940        {
3941            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3942            if state.should_accept_user_certs() {
3943                // Need to change this so that consensus adapter do not accept certificates from user.
3944                // This can happen if our local validator did not initiate epoch change locally,
3945                // but 2f+1 nodes already concluded the epoch.
3946                //
3947                // This lock is essentially a barrier for
3948                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
3949                cur_epoch_store.close_user_certs(state);
3950            }
3951            // lock is dropped here
3952        }
3953
3954        self.get_reconfig_api()
3955            .clear_state_end_of_epoch(&execution_lock);
3956        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3957        self.maybe_reaccumulate_state_hash(
3958            cur_epoch_store,
3959            epoch_start_configuration
3960                .epoch_start_state()
3961                .protocol_version(),
3962        );
3963        self.get_reconfig_api()
3964            .set_epoch_start_configuration(&epoch_start_configuration);
3965        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3966            && self
3967                .db_checkpoint_config
3968                .perform_db_checkpoints_at_epoch_end
3969        {
3970            let checkpoint_indexes = self
3971                .db_checkpoint_config
3972                .perform_index_db_checkpoints_at_epoch_end
3973                .unwrap_or(false);
3974            let current_epoch = cur_epoch_store.epoch();
3975            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3976            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3977        }
3978
3979        self.get_reconfig_api()
3980            .reconfigure_cache(&epoch_start_configuration)
3981            .await;
3982
3983        let new_epoch = new_committee.epoch;
3984        let new_epoch_store = self
3985            .reopen_epoch_db(
3986                cur_epoch_store,
3987                new_committee,
3988                epoch_start_configuration,
3989                expensive_safety_check_config,
3990                epoch_last_checkpoint,
3991            )
3992            .await?;
3993        assert_eq!(new_epoch_store.epoch(), new_epoch);
3994        self.execution_scheduler
3995            .reconfigure(&new_epoch_store, self.get_account_funds_read());
3996        self.init_object_funds_checker().await;
3997        *execution_lock = new_epoch;
3998
3999        self.notify_epoch(new_epoch);
4000        // drop execution_lock after epoch store was updated
4001        // see also assert in AuthorityState::process_certificate
4002        // on the epoch store and execution lock epoch match
4003        Ok(new_epoch_store)
4004    }
4005
4006    fn notify_epoch(&self, new_epoch: EpochId) {
4007        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4008    }
4009
4010    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4011        let mut rx = self.notify_epoch.subscribe();
4012        loop {
4013            let epoch = *rx.borrow();
4014            if epoch >= target_epoch {
4015                return Ok(epoch);
4016            }
4017            rx.changed().await?;
4018        }
4019    }
4020
4021    /// Executes accumulator settlement for testing purposes.
4022    /// Returns a list of (transaction, execution_env) pairs that can be replayed on another
4023    /// AuthorityState (e.g., a fullnode) using `replay_settlement_for_testing`.
4024    pub async fn settle_accumulator_for_testing(
4025        &self,
4026        effects: &[TransactionEffects],
4027        checkpoint_seq: Option<u64>,
4028    ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4029        let accumulator_version = self
4030            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4031            .unwrap()
4032            .version();
4033        // Use provided checkpoint sequence, or fall back to accumulator version.
4034        let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4035        let builder = AccumulatorSettlementTxBuilder::new(
4036            Some(self.get_transaction_cache_reader().as_ref()),
4037            effects,
4038            ckpt_seq,
4039            0,
4040        );
4041        let balance_changes = builder.collect_funds_changes();
4042        let epoch_store = self.epoch_store_for_testing();
4043        let epoch = epoch_store.epoch();
4044        let accumulator_root_obj_initial_shared_version = epoch_store
4045            .epoch_start_config()
4046            .accumulator_root_obj_initial_shared_version()
4047            .unwrap();
4048        let settlements = builder.build_tx(
4049            epoch_store.protocol_config(),
4050            epoch,
4051            accumulator_root_obj_initial_shared_version,
4052            ckpt_seq,
4053            ckpt_seq,
4054        );
4055
4056        let settlements: Vec<_> = settlements
4057            .into_iter()
4058            .map(|tx| {
4059                VerifiedExecutableTransaction::new_system(
4060                    VerifiedTransaction::new_system_transaction(tx),
4061                    epoch,
4062                )
4063            })
4064            .collect();
4065
4066        let assigned_versions = epoch_store
4067            .assign_shared_object_versions_for_tests(
4068                self.get_object_cache_reader().as_ref(),
4069                &settlements,
4070            )
4071            .unwrap();
4072        let version_map = assigned_versions.into_map();
4073
4074        let mut replay_txns = Vec::new();
4075        let mut settlement_effects = Vec::with_capacity(settlements.len());
4076        for tx in settlements {
4077            let assigned = version_map.get(&tx.key()).unwrap().clone();
4078            let env = ExecutionEnv::new().with_assigned_versions(assigned);
4079            let (effects, _) = self
4080                .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4081                .unwrap();
4082            assert!(effects.status().is_ok());
4083            replay_txns.push((tx, env));
4084            settlement_effects.push(effects);
4085        }
4086
4087        let barrier = accumulators::build_accumulator_barrier_tx(
4088            epoch,
4089            accumulator_root_obj_initial_shared_version,
4090            ckpt_seq,
4091            &settlement_effects,
4092        );
4093        let barrier = VerifiedExecutableTransaction::new_system(
4094            VerifiedTransaction::new_system_transaction(barrier),
4095            epoch,
4096        );
4097
4098        let assigned_versions = epoch_store
4099            .assign_shared_object_versions_for_tests(
4100                self.get_object_cache_reader().as_ref(),
4101                std::slice::from_ref(&barrier),
4102            )
4103            .unwrap();
4104        let version_map = assigned_versions.into_map();
4105
4106        let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4107        let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4108        let (effects, _) = self
4109            .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4110            .unwrap();
4111        assert!(effects.status().is_ok());
4112        replay_txns.push((barrier, env));
4113
4114        let next_accumulator_version = accumulator_version.next();
4115        self.execution_scheduler
4116            .settle_address_funds(FundsSettlement {
4117                funds_changes: balance_changes,
4118                next_accumulator_version,
4119            });
4120        // object funds are settled while executing the barrier transaction
4121
4122        replay_txns
4123    }
4124
4125    /// Replays settlement transactions on this AuthorityState.
4126    /// Used to sync a fullnode with settlement transactions executed on a validator.
4127    pub async fn replay_settlement_for_testing(
4128        &self,
4129        txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4130    ) {
4131        let epoch_store = self.epoch_store_for_testing();
4132        for (tx, env) in txns {
4133            let (effects, _) = self
4134                .try_execute_immediately(tx, env.clone(), &epoch_store)
4135                .unwrap();
4136            assert!(effects.status().is_ok());
4137        }
4138    }
4139
4140    /// Advance the epoch store to the next epoch for testing only.
4141    /// This only manually sets all the places where we have the epoch number.
4142    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4143    pub async fn reconfigure_for_testing(&self) {
4144        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4145        let epoch_store = self.epoch_store_for_testing().clone();
4146        let protocol_config = epoch_store.protocol_config().clone();
4147        // The current protocol config used in the epoch store may have been overridden and diverged from
4148        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4149        // We reapply the override before creating the new epoch store, to make sure that
4150        // the new epoch store has the same protocol config as the current one.
4151        // Since this is for testing only, we mostly like to keep the protocol config the same
4152        // across epochs.
4153        let _guard =
4154            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4155        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4156            self.get_backing_package_store().clone(),
4157            self.get_object_store().clone(),
4158            &self.config.expensive_safety_check_config,
4159            self.checkpoint_store
4160                .get_epoch_last_checkpoint(epoch_store.epoch())
4161                .unwrap()
4162                .map(|c| *c.sequence_number())
4163                .unwrap_or_default(),
4164        );
4165        self.execution_scheduler
4166            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4167        let new_epoch = new_epoch_store.epoch();
4168        self.epoch_store.store(new_epoch_store);
4169        epoch_store.epoch_terminated().await;
4170
4171        *execution_lock = new_epoch;
4172    }
4173
4174    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4175    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4176    #[instrument(level = "error", skip_all)]
4177    fn maybe_reaccumulate_state_hash(
4178        &self,
4179        cur_epoch_store: &AuthorityPerEpochStore,
4180        new_protocol_version: ProtocolVersion,
4181    ) {
4182        self.get_reconfig_api()
4183            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4184    }
4185
4186    #[instrument(level = "error", skip_all)]
4187    fn check_system_consistency(
4188        &self,
4189        cur_epoch_store: &AuthorityPerEpochStore,
4190        state_hasher: Arc<GlobalStateHasher>,
4191        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4192    ) {
4193        info!(
4194            "Performing sui conservation consistency check for epoch {}",
4195            cur_epoch_store.epoch()
4196        );
4197
4198        if let Err(err) = self
4199            .get_reconfig_api()
4200            .expensive_check_sui_conservation(cur_epoch_store)
4201        {
4202            if cfg!(debug_assertions) {
4203                panic!("{}", err);
4204            } else {
4205                // We cannot panic in production yet because it is known that there are some
4206                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4207                warn!("Sui conservation consistency check failed: {}", err);
4208            }
4209        } else {
4210            info!("Sui conservation consistency check passed");
4211        }
4212
4213        // check for root state hash consistency with live object set
4214        if expensive_safety_check_config.enable_state_consistency_check() {
4215            info!(
4216                "Performing state consistency check for epoch {}",
4217                cur_epoch_store.epoch()
4218            );
4219            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4220        }
4221
4222        // Verify all checkpointed transactions are present in transactions_seq.
4223        // This catches any post-processing gaps that could occur if async
4224        // post-processing failed to complete before persistence.
4225        if expensive_safety_check_config.enable_secondary_index_checks()
4226            && let Some(indexes) = self.indexes.clone()
4227        {
4228            let epoch = cur_epoch_store.epoch();
4229            // Only verify the current epoch's checkpoints. Previous epoch contents
4230            // may have been pruned, and we only need to verify that this epoch's
4231            // async post-processing completed correctly.
4232            let first_checkpoint = if epoch == 0 {
4233                0
4234            } else {
4235                self.checkpoint_store
4236                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4237                    .expect("Failed to get previous epoch's last checkpoint")
4238                    .expect("Previous epoch's last checkpoint missing")
4239                    + 1
4240            };
4241            let highest_executed = self
4242                .checkpoint_store
4243                .get_highest_executed_checkpoint_seq_number()
4244                .expect("Failed to get highest executed checkpoint")
4245                .expect("No executed checkpoints");
4246
4247            info!(
4248                "Verifying checkpointed transactions are in transactions_seq \
4249                 (checkpoints {first_checkpoint}..={highest_executed})"
4250            );
4251            for seq in first_checkpoint..=highest_executed {
4252                let checkpoint = self
4253                    .checkpoint_store
4254                    .get_checkpoint_by_sequence_number(seq)
4255                    .expect("Failed to get checkpoint")
4256                    .expect("Checkpoint missing");
4257                let contents = self
4258                    .checkpoint_store
4259                    .get_checkpoint_contents(&checkpoint.content_digest)
4260                    .expect("Failed to get checkpoint contents")
4261                    .expect("Checkpoint contents missing");
4262                for digests in contents.iter() {
4263                    let tx_digest = digests.transaction;
4264                    assert!(
4265                        indexes
4266                            .get_transaction_seq(&tx_digest)
4267                            .expect("Failed to read transactions_seq")
4268                            .is_some(),
4269                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4270                    );
4271                }
4272            }
4273            info!("All checkpointed transactions verified in transactions_seq");
4274        }
4275    }
4276
4277    fn expensive_check_is_consistent_state(
4278        &self,
4279        state_hasher: Arc<GlobalStateHasher>,
4280        cur_epoch_store: &AuthorityPerEpochStore,
4281    ) {
4282        let live_object_set_hash = state_hasher.digest_live_object_set(
4283            !cur_epoch_store
4284                .protocol_config()
4285                .simplified_unwrap_then_delete(),
4286        );
4287
4288        let root_state_hash: ECMHLiveObjectSetDigest = self
4289            .get_global_state_hash_store()
4290            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4291            .expect("Retrieving root state hash cannot fail")
4292            .expect("Root state hash for epoch must exist")
4293            .1
4294            .digest()
4295            .into();
4296
4297        let is_inconsistent = root_state_hash != live_object_set_hash;
4298        if is_inconsistent {
4299            debug_fatal!(
4300                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4301                root_state_hash,
4302                live_object_set_hash
4303            );
4304        } else {
4305            info!("State consistency check passed");
4306        }
4307
4308        state_hasher.set_inconsistent_state(is_inconsistent);
4309    }
4310
4311    pub fn current_epoch_for_testing(&self) -> EpochId {
4312        self.epoch_store_for_testing().epoch()
4313    }
4314
4315    #[instrument(level = "error", skip_all)]
4316    pub fn checkpoint_all_dbs(
4317        &self,
4318        checkpoint_path: &Path,
4319        cur_epoch_store: &AuthorityPerEpochStore,
4320        checkpoint_indexes: bool,
4321    ) -> SuiResult {
4322        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4323        let current_epoch = cur_epoch_store.epoch();
4324
4325        if checkpoint_path.exists() {
4326            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4327            return Ok(());
4328        }
4329
4330        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4331        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4332
4333        if checkpoint_path_tmp.exists() {
4334            fs::remove_dir_all(&checkpoint_path_tmp)
4335                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4336        }
4337
4338        fs::create_dir_all(&checkpoint_path_tmp)
4339            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4340        fs::create_dir(&store_checkpoint_path_tmp)
4341            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4342
4343        // NOTE: Do not change the order of invoking these checkpoint calls
4344        // We want to snapshot checkpoint db first to not race with state sync
4345        self.checkpoint_store
4346            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4347
4348        self.get_reconfig_api()
4349            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4350
4351        self.committee_store
4352            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4353
4354        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4355            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4356        }
4357
4358        fs::rename(checkpoint_path_tmp, checkpoint_path)
4359            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4360        Ok(())
4361    }
4362
4363    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4364    /// we never end up accessing different epoch stores in a single task, we need to make sure
4365    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4366    /// the case. This also means we should minimize the number of call-sites. Only call it when
4367    /// there is no way to obtain it from somewhere else.
4368    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4369        self.epoch_store.load()
4370    }
4371
4372    // Load the epoch store, should be used in tests only.
4373    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4374        self.load_epoch_store_one_call_per_task()
4375    }
4376
4377    pub fn clone_committee_for_testing(&self) -> Committee {
4378        Committee::clone(self.epoch_store_for_testing().committee())
4379    }
4380
4381    #[instrument(level = "trace", skip_all)]
4382    pub fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4383        self.get_object_store().get_object(object_id)
4384    }
4385
4386    pub fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4387        Ok(self
4388            .get_object(&SUI_SYSTEM_ADDRESS.into())
4389            .expect("framework object should always exist")
4390            .compute_object_reference())
4391    }
4392
4393    // This function is only used for testing.
4394    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4395        self.get_object_cache_reader()
4396            .get_sui_system_state_object_unsafe()
4397    }
4398
4399    #[instrument(level = "trace", skip_all)]
4400    fn get_transaction_checkpoint_sequence(
4401        &self,
4402        digest: &TransactionDigest,
4403        epoch_store: &AuthorityPerEpochStore,
4404    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4405        epoch_store.get_transaction_checkpoint(digest)
4406    }
4407
4408    #[instrument(level = "trace", skip_all)]
4409    pub fn get_checkpoint_by_sequence_number(
4410        &self,
4411        sequence_number: CheckpointSequenceNumber,
4412    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4413        Ok(self
4414            .checkpoint_store
4415            .get_checkpoint_by_sequence_number(sequence_number)?)
4416    }
4417
4418    #[instrument(level = "trace", skip_all)]
4419    pub fn get_transaction_checkpoint_for_tests(
4420        &self,
4421        digest: &TransactionDigest,
4422        epoch_store: &AuthorityPerEpochStore,
4423    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4424        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4425        let Some(checkpoint) = checkpoint else {
4426            return Ok(None);
4427        };
4428        let checkpoint = self
4429            .checkpoint_store
4430            .get_checkpoint_by_sequence_number(checkpoint)?;
4431        Ok(checkpoint)
4432    }
4433
4434    #[instrument(level = "trace", skip_all)]
4435    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4436        Ok(
4437            match self
4438                .get_object_cache_reader()
4439                .get_latest_object_or_tombstone(*object_id)
4440            {
4441                Some((_, ObjectOrTombstone::Object(object))) => {
4442                    let layout = self.get_object_layout(&object)?;
4443                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4444                }
4445                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4446                None => ObjectRead::NotExists(*object_id),
4447            },
4448        )
4449    }
4450
4451    /// Chain Identifier is the digest of the genesis checkpoint.
4452    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4453        self.chain_identifier
4454    }
4455
4456    /// This function aims to serve rpc reads on past objects and
4457    /// we don't expect it to be called for other purposes.
4458    /// Depending on the object pruning policies that will be enforced in the
4459    /// future there is no software-level guarantee/SLA to retrieve an object
4460    /// with an old version even if it exists/existed.
4461    #[instrument(level = "trace", skip_all)]
4462    pub fn get_past_object_read(
4463        &self,
4464        object_id: &ObjectID,
4465        version: SequenceNumber,
4466    ) -> SuiResult<PastObjectRead> {
4467        // Firstly we see if the object ever existed by getting its latest data
4468        let Some(obj_ref) = self
4469            .get_object_cache_reader()
4470            .get_latest_object_ref_or_tombstone(*object_id)
4471        else {
4472            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4473        };
4474
4475        if version > obj_ref.1 {
4476            return Ok(PastObjectRead::VersionTooHigh {
4477                object_id: *object_id,
4478                asked_version: version,
4479                latest_version: obj_ref.1,
4480            });
4481        }
4482
4483        if version < obj_ref.1 {
4484            // Read past objects
4485            return Ok(match self.read_object_at_version(object_id, version)? {
4486                Some((object, layout)) => {
4487                    let obj_ref = object.compute_object_reference();
4488                    PastObjectRead::VersionFound(obj_ref, object, layout)
4489                }
4490
4491                None => PastObjectRead::VersionNotFound(*object_id, version),
4492            });
4493        }
4494
4495        if !obj_ref.2.is_alive() {
4496            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4497        }
4498
4499        match self.read_object_at_version(object_id, obj_ref.1)? {
4500            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4501            None => {
4502                debug_fatal!(
4503                    "Object with in parent_entry is missing from object store, datastore is \
4504                     inconsistent"
4505                );
4506                Err(UserInputError::ObjectNotFound {
4507                    object_id: *object_id,
4508                    version: Some(obj_ref.1),
4509                }
4510                .into())
4511            }
4512        }
4513    }
4514
4515    #[instrument(level = "trace", skip_all)]
4516    fn read_object_at_version(
4517        &self,
4518        object_id: &ObjectID,
4519        version: SequenceNumber,
4520    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4521        let Some(object) = self
4522            .get_object_cache_reader()
4523            .get_object_by_key(object_id, version)
4524        else {
4525            return Ok(None);
4526        };
4527
4528        let layout = self.get_object_layout(&object)?;
4529        Ok(Some((object, layout)))
4530    }
4531
4532    pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4533        let layout = object
4534            .data
4535            .try_as_move()
4536            .map(|object| {
4537                let epoch_store = self.load_epoch_store_one_call_per_task();
4538                into_struct_layout(
4539                    epoch_store
4540                        .executor()
4541                        // TODO(cache) - must read through cache
4542                        .type_layout_resolver(
4543                            epoch_store.protocol_config(),
4544                            Box::new(self.get_backing_package_store().as_ref()),
4545                        )
4546                        .get_annotated_layout(&object.type_().clone().into())?,
4547                )
4548            })
4549            .transpose()?;
4550        Ok(layout)
4551    }
4552
4553    /// Returns a fake ObjectRef representing an address balance, along with the balance value
4554    /// and the previous transaction digest. The ObjectRef can be returned to JSON-RPC clients
4555    /// that don't understand address balances.
4556    #[instrument(level = "trace", skip_all)]
4557    pub fn get_address_balance_coin_info(
4558        &self,
4559        owner: SuiAddress,
4560        balance_type: TypeTag,
4561    ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4562        let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4563        let accumulator_obj = AccumulatorValue::load_object_by_id(
4564            self.get_child_object_resolver().as_ref(),
4565            None,
4566            *accumulator_id.inner(),
4567        )?;
4568
4569        let Some(accumulator_obj) = accumulator_obj else {
4570            return Ok(None);
4571        };
4572
4573        // Extract the currency type from balance_type (e.g., SUI from Balance<SUI>).
4574        // get_balance expects the currency type, not the balance type.
4575        let currency_type =
4576            Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4577
4578        let balance = crate::accumulators::balances::get_balance(
4579            owner,
4580            self.get_child_object_resolver().as_ref(),
4581            currency_type,
4582        )?;
4583
4584        if balance == 0 {
4585            return Ok(None);
4586        };
4587
4588        let object_ref = coin_reservation::encode_object_ref(
4589            accumulator_obj.id(),
4590            accumulator_obj.version(),
4591            self.load_epoch_store_one_call_per_task().epoch(),
4592            balance,
4593            self.get_chain_identifier(),
4594        );
4595
4596        Ok(Some((
4597            object_ref,
4598            balance,
4599            accumulator_obj.previous_transaction,
4600        )))
4601    }
4602
4603    /// Returns fake ObjectRefs for all address balances of an owner, keyed by coin type string.
4604    /// Used by get_all_coins to include fake coins for each coin type.
4605    #[instrument(level = "trace", skip_all)]
4606    pub fn get_all_address_balance_coin_infos(
4607        &self,
4608        owner: SuiAddress,
4609    ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4610        let indexes = self
4611            .indexes
4612            .as_ref()
4613            .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4614
4615        let mut result = std::collections::HashMap::new();
4616        for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4617            let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4618            if let Some((obj_ref, balance, prev_tx)) =
4619                self.get_address_balance_coin_info(owner, balance_type)?
4620            {
4621                // Use currency_type.to_string() to match the format in CoinIndexKey2
4622                // (e.g., "0x2::sui::SUI", not "0x2::coin::Coin<0x2::sui::SUI>")
4623                result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4624            }
4625        }
4626        Ok(result)
4627    }
4628
4629    fn get_owner_at_version(
4630        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4631        object_id: &ObjectID,
4632        version: SequenceNumber,
4633    ) -> SuiResult<Owner> {
4634        object_store
4635            .get_object_by_key(object_id, version)
4636            .ok_or_else(|| {
4637                SuiError::from(UserInputError::ObjectNotFound {
4638                    object_id: *object_id,
4639                    version: Some(version),
4640                })
4641            })
4642            .map(|o| o.owner.clone())
4643    }
4644
4645    #[instrument(level = "trace", skip_all)]
4646    pub fn get_owner_objects(
4647        &self,
4648        owner: SuiAddress,
4649        // If `Some`, the query will start from the next item after the specified cursor
4650        cursor: Option<ObjectID>,
4651        limit: usize,
4652        filter: Option<SuiObjectDataFilter>,
4653    ) -> SuiResult<Vec<ObjectInfo>> {
4654        if let Some(indexes) = &self.indexes {
4655            indexes.get_owner_objects(owner, cursor, limit, filter)
4656        } else {
4657            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4658        }
4659    }
4660
4661    #[instrument(level = "trace", skip_all)]
4662    pub fn get_owned_coins_iterator_with_cursor(
4663        &self,
4664        owner: SuiAddress,
4665        // If `Some`, the query will start from the next item after the specified cursor
4666        cursor: (String, u64, ObjectID),
4667        limit: usize,
4668        one_coin_type_only: bool,
4669    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4670        if let Some(indexes) = &self.indexes {
4671            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4672        } else {
4673            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4674        }
4675    }
4676
4677    #[instrument(level = "trace", skip_all)]
4678    pub fn get_owner_objects_iterator(
4679        &self,
4680        owner: SuiAddress,
4681        // If `Some`, the query will start from the next item after the specified cursor
4682        cursor: Option<ObjectID>,
4683        filter: Option<SuiObjectDataFilter>,
4684    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4685        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4686        if let Some(indexes) = &self.indexes {
4687            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4688        } else {
4689            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4690        }
4691    }
4692
4693    #[instrument(level = "trace", skip_all)]
4694    pub fn get_move_objects<T>(&self, owner: SuiAddress, type_: MoveObjectType) -> SuiResult<Vec<T>>
4695    where
4696        T: DeserializeOwned,
4697    {
4698        let object_ids = self
4699            .get_owner_objects_iterator(owner, None, None)?
4700            .filter(|o| match &o.type_ {
4701                ObjectType::Struct(s) => &type_ == s,
4702                ObjectType::Package => false,
4703            })
4704            .map(|info| ObjectKey(info.object_id, info.version))
4705            .collect::<Vec<_>>();
4706        let mut move_objects = vec![];
4707
4708        let objects = self
4709            .get_object_store()
4710            .multi_get_objects_by_key(&object_ids);
4711
4712        for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4713            let object = o.ok_or_else(|| {
4714                SuiError::from(UserInputError::ObjectNotFound {
4715                    object_id: id.0,
4716                    version: Some(id.1),
4717                })
4718            })?;
4719            let move_object = object.data.try_as_move().ok_or_else(|| {
4720                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4721            })?;
4722            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4723                SuiErrorKind::ObjectDeserializationError {
4724                    error: format!("{e}"),
4725                }
4726            })?);
4727        }
4728        Ok(move_objects)
4729    }
4730
4731    #[instrument(level = "trace", skip_all)]
4732    pub fn get_dynamic_fields(
4733        &self,
4734        owner: ObjectID,
4735        // If `Some`, the query will start from the next item after the specified cursor
4736        cursor: Option<ObjectID>,
4737        limit: usize,
4738    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4739        Ok(self
4740            .get_dynamic_fields_iterator(owner, cursor)?
4741            .take(limit)
4742            .collect::<Result<Vec<_>, _>>()?)
4743    }
4744
4745    fn get_dynamic_fields_iterator(
4746        &self,
4747        owner: ObjectID,
4748        // If `Some`, the query will start from the next item after the specified cursor
4749        cursor: Option<ObjectID>,
4750    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4751    {
4752        if let Some(indexes) = &self.indexes {
4753            indexes.get_dynamic_fields_iterator(owner, cursor)
4754        } else {
4755            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4756        }
4757    }
4758
4759    #[instrument(level = "trace", skip_all)]
4760    pub fn get_dynamic_field_object_id(
4761        &self,
4762        owner: ObjectID,
4763        name_type: TypeTag,
4764        name_bcs_bytes: &[u8],
4765    ) -> SuiResult<Option<ObjectID>> {
4766        if let Some(indexes) = &self.indexes {
4767            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4768        } else {
4769            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4770        }
4771    }
4772
4773    #[instrument(level = "trace", skip_all)]
4774    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4775        Ok(self.get_indexes()?.next_sequence_number())
4776    }
4777
4778    #[instrument(level = "trace", skip_all)]
4779    pub async fn get_executed_transaction_and_effects(
4780        &self,
4781        digest: TransactionDigest,
4782        kv_store: Arc<TransactionKeyValueStore>,
4783    ) -> SuiResult<(Transaction, TransactionEffects)> {
4784        let transaction = kv_store.get_tx(digest).await?;
4785        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4786        Ok((transaction, effects))
4787    }
4788
4789    #[instrument(level = "trace", skip_all)]
4790    pub fn multi_get_checkpoint_by_sequence_number(
4791        &self,
4792        sequence_numbers: &[CheckpointSequenceNumber],
4793    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4794        Ok(self
4795            .checkpoint_store
4796            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4797    }
4798
4799    #[instrument(level = "trace", skip_all)]
4800    pub fn get_transaction_events(
4801        &self,
4802        digest: &TransactionDigest,
4803    ) -> SuiResult<TransactionEvents> {
4804        self.get_transaction_cache_reader()
4805            .get_events(digest)
4806            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4807    }
4808
4809    pub fn get_transaction_input_objects(
4810        &self,
4811        effects: &TransactionEffects,
4812    ) -> SuiResult<Vec<Object>> {
4813        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4814            .map_err(Into::into)
4815    }
4816
4817    pub fn get_transaction_output_objects(
4818        &self,
4819        effects: &TransactionEffects,
4820    ) -> SuiResult<Vec<Object>> {
4821        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4822            .map_err(Into::into)
4823    }
4824
4825    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4826        match &self.indexes {
4827            Some(i) => Ok(i.clone()),
4828            None => Err(SuiErrorKind::UnsupportedFeatureError {
4829                error: "extended object indexing is not enabled on this server".into(),
4830            }
4831            .into()),
4832        }
4833    }
4834
4835    pub async fn get_transactions_for_tests(
4836        self: &Arc<Self>,
4837        filter: Option<TransactionFilter>,
4838        cursor: Option<TransactionDigest>,
4839        limit: Option<usize>,
4840        reverse: bool,
4841    ) -> SuiResult<Vec<TransactionDigest>> {
4842        let metrics = KeyValueStoreMetrics::new_for_tests();
4843        let kv_store = Arc::new(TransactionKeyValueStore::new(
4844            "rocksdb",
4845            metrics,
4846            self.clone(),
4847        ));
4848        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4849            .await
4850    }
4851
4852    #[instrument(level = "trace", skip_all)]
4853    pub async fn get_transactions(
4854        &self,
4855        kv_store: &Arc<TransactionKeyValueStore>,
4856        filter: Option<TransactionFilter>,
4857        // If `Some`, the query will start from the next item after the specified cursor
4858        cursor: Option<TransactionDigest>,
4859        limit: Option<usize>,
4860        reverse: bool,
4861    ) -> SuiResult<Vec<TransactionDigest>> {
4862        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4863            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4864            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4865            if reverse {
4866                let iter = iter
4867                    .rev()
4868                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4869                    .skip(usize::from(cursor.is_some()));
4870                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4871            } else {
4872                let iter = iter
4873                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4874                    .skip(usize::from(cursor.is_some()));
4875                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4876            }
4877        }
4878        self.get_indexes()?
4879            .get_transactions(filter, cursor, limit, reverse)
4880    }
4881
4882    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4883        &self.checkpoint_store
4884    }
4885
4886    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4887        self.get_checkpoint_store()
4888            .get_highest_executed_checkpoint_seq_number()?
4889            .ok_or(
4890                SuiErrorKind::UserInputError {
4891                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4892                }
4893                .into(),
4894            )
4895    }
4896
4897    #[cfg(msim)]
4898    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4899        self.database_for_testing()
4900            .perpetual_tables
4901            .get_highest_pruned_checkpoint()
4902            .map(|c| c.unwrap_or(0))
4903            .map_err(Into::into)
4904    }
4905
4906    #[instrument(level = "trace", skip_all)]
4907    pub fn get_checkpoint_summary_by_sequence_number(
4908        &self,
4909        sequence_number: CheckpointSequenceNumber,
4910    ) -> SuiResult<CheckpointSummary> {
4911        let verified_checkpoint = self
4912            .get_checkpoint_store()
4913            .get_checkpoint_by_sequence_number(sequence_number)?;
4914        match verified_checkpoint {
4915            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4916            None => Err(SuiErrorKind::UserInputError {
4917                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4918            }
4919            .into()),
4920        }
4921    }
4922
4923    #[instrument(level = "trace", skip_all)]
4924    pub fn get_checkpoint_summary_by_digest(
4925        &self,
4926        digest: CheckpointDigest,
4927    ) -> SuiResult<CheckpointSummary> {
4928        let verified_checkpoint = self
4929            .get_checkpoint_store()
4930            .get_checkpoint_by_digest(&digest)?;
4931        match verified_checkpoint {
4932            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4933            None => Err(SuiErrorKind::UserInputError {
4934                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4935            }
4936            .into()),
4937        }
4938    }
4939
4940    #[instrument(level = "trace", skip_all)]
4941    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4942        if is_system_package(package_id) {
4943            return self.find_genesis_txn_digest();
4944        }
4945        Ok(self
4946            .get_object_read(&package_id)?
4947            .into_object()?
4948            .previous_transaction)
4949    }
4950
4951    #[instrument(level = "trace", skip_all)]
4952    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4953        let summary = self
4954            .get_verified_checkpoint_by_sequence_number(0)?
4955            .into_message();
4956        let content = self.get_checkpoint_contents(summary.content_digest)?;
4957        let genesis_transaction = content.enumerate_transactions(&summary).next();
4958        Ok(genesis_transaction
4959            .ok_or(SuiErrorKind::UserInputError {
4960                error: UserInputError::GenesisTransactionNotFound,
4961            })?
4962            .1
4963            .transaction)
4964    }
4965
4966    #[instrument(level = "trace", skip_all)]
4967    pub fn get_verified_checkpoint_by_sequence_number(
4968        &self,
4969        sequence_number: CheckpointSequenceNumber,
4970    ) -> SuiResult<VerifiedCheckpoint> {
4971        let verified_checkpoint = self
4972            .get_checkpoint_store()
4973            .get_checkpoint_by_sequence_number(sequence_number)?;
4974        match verified_checkpoint {
4975            Some(verified_checkpoint) => Ok(verified_checkpoint),
4976            None => Err(SuiErrorKind::UserInputError {
4977                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4978            }
4979            .into()),
4980        }
4981    }
4982
4983    #[instrument(level = "trace", skip_all)]
4984    pub fn get_verified_checkpoint_summary_by_digest(
4985        &self,
4986        digest: CheckpointDigest,
4987    ) -> SuiResult<VerifiedCheckpoint> {
4988        let verified_checkpoint = self
4989            .get_checkpoint_store()
4990            .get_checkpoint_by_digest(&digest)?;
4991        match verified_checkpoint {
4992            Some(verified_checkpoint) => Ok(verified_checkpoint),
4993            None => Err(SuiErrorKind::UserInputError {
4994                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4995            }
4996            .into()),
4997        }
4998    }
4999
5000    #[instrument(level = "trace", skip_all)]
5001    pub fn get_checkpoint_contents(
5002        &self,
5003        digest: CheckpointContentsDigest,
5004    ) -> SuiResult<CheckpointContents> {
5005        self.get_checkpoint_store()
5006            .get_checkpoint_contents(&digest)?
5007            .ok_or(
5008                SuiErrorKind::UserInputError {
5009                    error: UserInputError::CheckpointContentsNotFound(digest),
5010                }
5011                .into(),
5012            )
5013    }
5014
5015    #[instrument(level = "trace", skip_all)]
5016    pub fn get_checkpoint_contents_by_sequence_number(
5017        &self,
5018        sequence_number: CheckpointSequenceNumber,
5019    ) -> SuiResult<CheckpointContents> {
5020        let verified_checkpoint = self
5021            .get_checkpoint_store()
5022            .get_checkpoint_by_sequence_number(sequence_number)?;
5023        match verified_checkpoint {
5024            Some(verified_checkpoint) => {
5025                let content_digest = verified_checkpoint.into_inner().content_digest;
5026                self.get_checkpoint_contents(content_digest)
5027            }
5028            None => Err(SuiErrorKind::UserInputError {
5029                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5030            }
5031            .into()),
5032        }
5033    }
5034
5035    #[instrument(level = "trace", skip_all)]
5036    pub async fn query_events(
5037        &self,
5038        kv_store: &Arc<TransactionKeyValueStore>,
5039        query: EventFilter,
5040        // If `Some`, the query will start from the next item after the specified cursor
5041        cursor: Option<EventID>,
5042        limit: usize,
5043        descending: bool,
5044    ) -> SuiResult<Vec<SuiEvent>> {
5045        let index_store = self.get_indexes()?;
5046
5047        //Get the tx_num from tx_digest
5048        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5049            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5050                SuiErrorKind::TransactionNotFound {
5051                    digest: cursor.tx_digest,
5052                },
5053            )?;
5054            (tx_seq, cursor.event_seq as usize)
5055        } else if descending {
5056            (u64::MAX, usize::MAX)
5057        } else {
5058            (0, 0)
5059        };
5060
5061        let limit = limit + 1;
5062        let mut event_keys = match query {
5063            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5064            EventFilter::Transaction(digest) => {
5065                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5066            }
5067            EventFilter::MoveModule { package, module } => {
5068                let module_id = ModuleId::new(package.into(), module);
5069                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5070            }
5071            EventFilter::MoveEventType(struct_name) => index_store
5072                .events_by_move_event_struct_name(
5073                    &struct_name,
5074                    tx_num,
5075                    event_num,
5076                    limit,
5077                    descending,
5078                )?,
5079            EventFilter::Sender(sender) => {
5080                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5081            }
5082            EventFilter::TimeRange {
5083                start_time,
5084                end_time,
5085            } => index_store
5086                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5087            EventFilter::MoveEventModule { package, module } => index_store
5088                .events_by_move_event_module(
5089                    &ModuleId::new(package.into(), module),
5090                    tx_num,
5091                    event_num,
5092                    limit,
5093                    descending,
5094                )?,
5095            // not using "_ =>" because we want to make sure we remember to add new variants here
5096            EventFilter::Any(_) => {
5097                return Err(SuiErrorKind::UserInputError {
5098                    error: UserInputError::Unsupported(
5099                        "'Any' queries are not supported by the fullnode.".to_string(),
5100                    ),
5101                }
5102                .into());
5103            }
5104        };
5105
5106        // skip one event if exclusive cursor is provided,
5107        // otherwise truncate to the original limit.
5108        if cursor.is_some() {
5109            if !event_keys.is_empty() {
5110                event_keys.remove(0);
5111            }
5112        } else {
5113            event_keys.truncate(limit - 1);
5114        }
5115
5116        // get the unique set of digests from the event_keys
5117        let transaction_digests = event_keys
5118            .iter()
5119            .map(|(_, digest, _, _)| *digest)
5120            .collect::<HashSet<_>>()
5121            .into_iter()
5122            .collect::<Vec<_>>();
5123
5124        let events = kv_store
5125            .multi_get_events_by_tx_digests(&transaction_digests)
5126            .await?;
5127
5128        let events_map: HashMap<_, _> = transaction_digests
5129            .iter()
5130            .zip_debug_eq(events.into_iter())
5131            .collect();
5132
5133        let stored_events = event_keys
5134            .into_iter()
5135            .map(|k| {
5136                (
5137                    k,
5138                    events_map
5139                        .get(&k.1)
5140                        .expect("fetched digest is missing")
5141                        .clone()
5142                        .and_then(|e| e.data.get(k.2).cloned()),
5143                )
5144            })
5145            .map(
5146                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5147                    event
5148                        .map(|e| (e, tx_digest, event_seq, timestamp))
5149                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5150                },
5151            )
5152            .collect::<Result<Vec<_>, _>>()?;
5153
5154        let epoch_store = self.load_epoch_store_one_call_per_task();
5155        let backing_store = self.get_backing_package_store().as_ref();
5156        let mut layout_resolver = epoch_store
5157            .executor()
5158            .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5159        let mut events = vec![];
5160        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5161            events.push(SuiEvent::try_from(
5162                e.clone(),
5163                tx_digest,
5164                event_seq as u64,
5165                Some(timestamp),
5166                layout_resolver.get_annotated_layout(&e.type_)?,
5167            )?)
5168        }
5169        Ok(events)
5170    }
5171
5172    pub fn insert_genesis_object(&self, object: Object) {
5173        self.get_reconfig_api().insert_genesis_object(object);
5174    }
5175
5176    pub fn insert_genesis_objects(&self, objects: &[Object]) {
5177        for o in objects {
5178            self.insert_genesis_object(o.clone());
5179        }
5180    }
5181
5182    /// Make a status response for a transaction
5183    #[instrument(level = "trace", skip_all)]
5184    pub fn get_transaction_status(
5185        &self,
5186        transaction_digest: &TransactionDigest,
5187        epoch_store: &Arc<AuthorityPerEpochStore>,
5188    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5189        // TODO: In the case of read path, we should not have to re-sign the effects.
5190        if let Some(effects) =
5191            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5192        {
5193            if let Some(transaction) = self
5194                .get_transaction_cache_reader()
5195                .get_transaction_block(transaction_digest)
5196            {
5197                let events = if effects.events_digest().is_some() {
5198                    self.get_transaction_events(effects.transaction_digest())?
5199                } else {
5200                    TransactionEvents::default()
5201                };
5202                // The cert_sig slot is permanently None: validators no longer aggregate or
5203                // persist per-transaction quorum signatures.
5204                return Ok(Some((
5205                    (*transaction).clone().into_message(),
5206                    TransactionStatus::Executed(None, effects.into_inner(), events),
5207                )));
5208            } else {
5209                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5210                // the transaction (during epoch change) in between the above two reads, and we end up
5211                // having effects but not transaction. In this case, we just fall through.
5212                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5213            }
5214        }
5215        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5216            self.metrics.tx_already_processed.inc();
5217            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5218            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5219        } else {
5220            Ok(None)
5221        }
5222    }
5223
5224    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5225    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5226    /// epoch.
5227    #[instrument(level = "trace", skip_all)]
5228    pub fn get_signed_effects_and_maybe_resign(
5229        &self,
5230        transaction_digest: &TransactionDigest,
5231        epoch_store: &Arc<AuthorityPerEpochStore>,
5232    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5233        let effects = self
5234            .get_transaction_cache_reader()
5235            .get_executed_effects(transaction_digest);
5236        match effects {
5237            Some(effects) => {
5238                // If the transaction was executed in previous epochs, the validator will
5239                // re-sign the effects with new current epoch so that a client is always able to
5240                // obtain an effects certificate at the current epoch.
5241                //
5242                // Why is this necessary? Consider the following case:
5243                // - assume there are 4 validators
5244                // - Quorum driver gets 2 signed effects before reconfig halt
5245                // - The tx makes it into final checkpoint.
5246                // - 2 validators go away and are replaced in the new epoch.
5247                // - The new epoch begins.
5248                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5249                //   because it may not be able to reach either of the 2 former validators.
5250                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5251                //   epoch, the QD can make a new effects cert and return it to the client.
5252                //
5253                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5254                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5255                // the case above, the Quorum Driver would return a proof of inclusion in the final
5256                // checkpoint, and this code would no longer be necessary.
5257                if effects.executed_epoch() != epoch_store.epoch() {
5258                    debug!(
5259                        tx_digest=?transaction_digest,
5260                        effects_epoch=?effects.executed_epoch(),
5261                        epoch=?epoch_store.epoch(),
5262                        "Re-signing the effects with the current epoch"
5263                    );
5264                }
5265                Ok(Some(self.sign_effects(effects, epoch_store)?))
5266            }
5267            None => Ok(None),
5268        }
5269    }
5270
5271    #[instrument(level = "trace", skip_all)]
5272    pub(crate) fn sign_effects(
5273        &self,
5274        effects: TransactionEffects,
5275        epoch_store: &Arc<AuthorityPerEpochStore>,
5276    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5277        let tx_digest = *effects.transaction_digest();
5278        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5279            Some(sig) => {
5280                debug_assert!(sig.epoch == epoch_store.epoch());
5281                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5282            }
5283            _ => {
5284                let sig = AuthoritySignInfo::new(
5285                    epoch_store.epoch(),
5286                    &effects,
5287                    Intent::sui_app(IntentScope::TransactionEffects),
5288                    self.name,
5289                    &*self.secret,
5290                );
5291
5292                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5293
5294                epoch_store.insert_effects_digest_and_signature(
5295                    &tx_digest,
5296                    effects.digest(),
5297                    &sig,
5298                )?;
5299
5300                effects
5301            }
5302        };
5303
5304        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5305            signed_effects,
5306        ))
5307    }
5308
5309    // Returns coin objects for indexing for fullnode if indexing is enabled.
5310    #[instrument(level = "trace", skip_all)]
5311    fn fullnode_only_get_tx_coins_for_indexing(
5312        name: AuthorityName,
5313        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5314        effects: &TransactionEffects,
5315        inner_temporary_store: &InnerTemporaryStore,
5316        epoch_store: &Arc<AuthorityPerEpochStore>,
5317    ) -> Option<TxCoins> {
5318        if epoch_store.committee().authority_exists(&name) {
5319            return None;
5320        }
5321        let written_coin_objects = inner_temporary_store
5322            .written
5323            .iter()
5324            .filter_map(|(k, v)| {
5325                if v.is_coin() {
5326                    Some((*k, v.clone()))
5327                } else {
5328                    None
5329                }
5330            })
5331            .collect();
5332        let mut input_coin_objects = inner_temporary_store
5333            .input_objects
5334            .iter()
5335            .filter_map(|(k, v)| {
5336                if v.is_coin() {
5337                    Some((*k, v.clone()))
5338                } else {
5339                    None
5340                }
5341            })
5342            .collect::<ObjectMap>();
5343
5344        // Check for receiving objects that were actually used and modified during execution. Their
5345        // updated version will already showup in "written_coins" but their input isn't included in
5346        // the set of input objects in a inner_temporary_store.
5347        for (object_id, version) in effects.modified_at_versions() {
5348            if inner_temporary_store
5349                .loaded_runtime_objects
5350                .contains_key(&object_id)
5351                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5352                && object.is_coin()
5353            {
5354                input_coin_objects.insert(object_id, object);
5355            }
5356        }
5357
5358        Some((input_coin_objects, written_coin_objects))
5359    }
5360
5361    /// Get the TransactionEnvelope that currently locks the given object, if any.
5362    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5363    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5364    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5365    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5366    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5367    ///     or cannot find the transaction in transaction table, because of data race etc.
5368    #[instrument(level = "trace", skip_all)]
5369    pub fn get_transaction_lock(
5370        &self,
5371        object_ref: &ObjectRef,
5372        epoch_store: &AuthorityPerEpochStore,
5373    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5374        let lock_info = self
5375            .get_object_cache_reader()
5376            .get_lock(*object_ref, epoch_store)?;
5377        let lock_info = match lock_info {
5378            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5379                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5380                    provided_obj_ref: *object_ref,
5381                    current_version: locked_ref.1,
5382                }
5383                .into());
5384            }
5385            ObjectLockStatus::Initialized => {
5386                return Ok(None);
5387            }
5388            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5389        };
5390
5391        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5392    }
5393
5394    pub fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5395        self.get_object_cache_reader().get_objects(objects)
5396    }
5397
5398    pub fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5399        self.get_object_cache_reader()
5400            .get_latest_object_ref_or_tombstone(object_id)
5401    }
5402
5403    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5404    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5405    ///
5406    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5407    /// will go through with only 2f+1 votes.
5408    ///
5409    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5410    /// value), or you risk halting the chain.
5411    pub fn set_override_protocol_upgrade_buffer_stake(
5412        &self,
5413        expected_epoch: EpochId,
5414        buffer_stake_bps: u64,
5415    ) -> SuiResult {
5416        let epoch_store = self.load_epoch_store_one_call_per_task();
5417        let actual_epoch = epoch_store.epoch();
5418        if actual_epoch != expected_epoch {
5419            return Err(SuiErrorKind::WrongEpoch {
5420                expected_epoch,
5421                actual_epoch,
5422            }
5423            .into());
5424        }
5425
5426        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5427    }
5428
5429    pub fn clear_override_protocol_upgrade_buffer_stake(
5430        &self,
5431        expected_epoch: EpochId,
5432    ) -> SuiResult {
5433        let epoch_store = self.load_epoch_store_one_call_per_task();
5434        let actual_epoch = epoch_store.epoch();
5435        if actual_epoch != expected_epoch {
5436            return Err(SuiErrorKind::WrongEpoch {
5437                expected_epoch,
5438                actual_epoch,
5439            }
5440            .into());
5441        }
5442
5443        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5444    }
5445
5446    /// Get the set of system packages that are compiled in to this build, if those packages are
5447    /// compatible with the current versions of those packages on-chain.
5448    pub async fn get_available_system_packages(
5449        &self,
5450        binary_config: &BinaryConfig,
5451    ) -> Vec<ObjectRef> {
5452        let mut results = vec![];
5453
5454        let system_packages = BuiltInFramework::iter_system_packages();
5455
5456        // Add extra framework packages during simtest
5457        #[cfg(msim)]
5458        let extra_packages = framework_injection::get_extra_packages(self.name);
5459        #[cfg(msim)]
5460        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5461
5462        for system_package in system_packages {
5463            let modules = system_package.modules().to_vec();
5464            // In simtests, we could override the current built-in framework packages.
5465            #[cfg(msim)]
5466            let modules =
5467                match framework_injection::get_override_modules(&system_package.id, self.name) {
5468                    Some(overrides) if overrides.is_empty() => continue,
5469                    Some(overrides) => overrides,
5470                    None => modules,
5471                };
5472
5473            let Some(obj_ref) = sui_framework::compare_system_package(
5474                &self.get_object_store(),
5475                &system_package.id,
5476                &modules,
5477                system_package.dependencies.to_vec(),
5478                binary_config,
5479            )
5480            .await
5481            else {
5482                return vec![];
5483            };
5484            results.push(obj_ref);
5485        }
5486
5487        results
5488    }
5489
5490    /// Return the new versions, module bytes, and dependencies for the packages that have been
5491    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5492    /// the binary, and performs the following checks:
5493    ///
5494    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5495    ///   required, and its contents are omitted from the output.
5496    /// - Whether the contents in the binary can form a package whose digest matches the input,
5497    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5498    ///   which case the contents are included in the output.
5499    ///
5500    /// If the current version of the framework can't be loaded, the binary does not contain the
5501    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5502    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5503    async fn get_system_package_bytes(
5504        &self,
5505        system_packages: Vec<ObjectRef>,
5506        binary_config: &BinaryConfig,
5507    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5508        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5509        let objects = self.get_objects(&ids);
5510
5511        let mut res = Vec::with_capacity(system_packages.len());
5512        for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5513        {
5514            let prev_transaction = match object {
5515                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5516                    // Skip this one because it doesn't need to be upgraded.
5517                    info!("Framework {} does not need updating", system_package_ref.0);
5518                    continue;
5519                }
5520
5521                Some(cur_object) => cur_object.previous_transaction,
5522                None => TransactionDigest::genesis_marker(),
5523            };
5524
5525            #[cfg(msim)]
5526            let SystemPackage {
5527                id: _,
5528                bytes,
5529                dependencies,
5530            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5531                .unwrap_or_else(|| {
5532                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5533                });
5534
5535            #[cfg(not(msim))]
5536            let SystemPackage {
5537                id: _,
5538                bytes,
5539                dependencies,
5540            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5541
5542            let modules: Vec<_> = bytes
5543                .iter()
5544                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5545                .collect();
5546
5547            let new_object = Object::new_system_package(
5548                &modules,
5549                system_package_ref.1,
5550                dependencies.clone(),
5551                prev_transaction,
5552            );
5553
5554            let new_ref = new_object.compute_object_reference();
5555            if new_ref != system_package_ref {
5556                if cfg!(msim) {
5557                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5558                    debug_fatal!(
5559                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5560                    );
5561                } else {
5562                    error!(
5563                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5564                    );
5565                }
5566                return None;
5567            }
5568
5569            res.push((system_package_ref.1, bytes, dependencies));
5570        }
5571
5572        Some(res)
5573    }
5574
5575    fn is_protocol_version_supported_v2(
5576        current_protocol_version: ProtocolVersion,
5577        proposed_protocol_version: ProtocolVersion,
5578        protocol_config: &ProtocolConfig,
5579        committee: &Committee,
5580        capabilities: Vec<AuthorityCapabilitiesV2>,
5581        mut buffer_stake_bps: u64,
5582    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5583        if proposed_protocol_version > current_protocol_version + 1
5584            && !protocol_config.advance_to_highest_supported_protocol_version()
5585        {
5586            return None;
5587        }
5588
5589        if buffer_stake_bps > 10000 {
5590            warn!("clamping buffer_stake_bps to 10000");
5591            buffer_stake_bps = 10000;
5592        }
5593
5594        // For each validator, gather the protocol version and system packages that it would like
5595        // to upgrade to in the next epoch.
5596        let mut desired_upgrades: Vec<_> = capabilities
5597            .into_iter()
5598            .filter_map(|mut cap| {
5599                // A validator that lists no packages is voting against any change at all.
5600                if cap.available_system_packages.is_empty() {
5601                    return None;
5602                }
5603
5604                cap.available_system_packages.sort();
5605
5606                info!(
5607                    "validator {:?} supports {:?} with system packages: {:?}",
5608                    cap.authority.concise(),
5609                    cap.supported_protocol_versions,
5610                    cap.available_system_packages,
5611                );
5612
5613                // A validator that only supports the current protocol version is also voting
5614                // against any change, because framework upgrades always require a protocol version
5615                // bump.
5616                cap.supported_protocol_versions
5617                    .get_version_digest(proposed_protocol_version)
5618                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5619            })
5620            .collect();
5621
5622        // There can only be one set of votes that have a majority, find one if it exists.
5623        desired_upgrades.sort();
5624        desired_upgrades
5625            .into_iter()
5626            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5627            .into_iter()
5628            .find_map(|((digest, packages), group)| {
5629                // should have been filtered out earlier.
5630                assert!(!packages.is_empty());
5631
5632                let mut stake_aggregator: StakeAggregator<(), true> =
5633                    StakeAggregator::new(Arc::new(committee.clone()));
5634
5635                for (_, _, authority) in group {
5636                    stake_aggregator.insert_generic(authority, ());
5637                }
5638
5639                let total_votes = stake_aggregator.total_votes();
5640                let quorum_threshold = committee.quorum_threshold();
5641                let f = committee.total_votes() - committee.quorum_threshold();
5642
5643                // multiple by buffer_stake_bps / 10000, rounded up.
5644                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5645                let effective_threshold = quorum_threshold + buffer_stake;
5646
5647                info!(
5648                    protocol_config_digest = ?digest,
5649                    ?total_votes,
5650                    ?quorum_threshold,
5651                    ?buffer_stake_bps,
5652                    ?effective_threshold,
5653                    ?proposed_protocol_version,
5654                    ?packages,
5655                    "support for upgrade"
5656                );
5657
5658                let has_support = total_votes >= effective_threshold;
5659                has_support.then_some((proposed_protocol_version, packages))
5660            })
5661    }
5662
5663    fn choose_protocol_version_and_system_packages_v2(
5664        current_protocol_version: ProtocolVersion,
5665        protocol_config: &ProtocolConfig,
5666        committee: &Committee,
5667        capabilities: Vec<AuthorityCapabilitiesV2>,
5668        buffer_stake_bps: u64,
5669    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5670        assert!(protocol_config.authority_capabilities_v2());
5671        let mut next_protocol_version = current_protocol_version;
5672        let mut system_packages = vec![];
5673
5674        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5675            current_protocol_version,
5676            next_protocol_version + 1,
5677            protocol_config,
5678            committee,
5679            capabilities.clone(),
5680            buffer_stake_bps,
5681        ) {
5682            next_protocol_version = version;
5683            system_packages = packages;
5684        }
5685
5686        (next_protocol_version, system_packages)
5687    }
5688
5689    #[instrument(level = "debug", skip_all)]
5690    fn create_authenticator_state_tx(
5691        &self,
5692        epoch_store: &Arc<AuthorityPerEpochStore>,
5693    ) -> Option<EndOfEpochTransactionKind> {
5694        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5695            info!("authenticator state transactions not enabled");
5696            return None;
5697        }
5698
5699        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5700        let tx = if authenticator_state_exists {
5701            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5702            let min_epoch =
5703                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5704            let authenticator_obj_initial_shared_version = epoch_store
5705                .epoch_start_config()
5706                .authenticator_obj_initial_shared_version()
5707                .expect("initial version must exist");
5708
5709            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5710                min_epoch,
5711                authenticator_obj_initial_shared_version,
5712            );
5713
5714            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5715
5716            tx
5717        } else {
5718            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5719            info!("Creating AuthenticatorStateCreate tx");
5720            tx
5721        };
5722        Some(tx)
5723    }
5724
5725    #[instrument(level = "debug", skip_all)]
5726    fn create_randomness_state_tx(
5727        &self,
5728        epoch_store: &Arc<AuthorityPerEpochStore>,
5729    ) -> Option<EndOfEpochTransactionKind> {
5730        if !epoch_store.protocol_config().random_beacon() {
5731            info!("randomness state transactions not enabled");
5732            return None;
5733        }
5734
5735        if epoch_store.randomness_state_exists() {
5736            return None;
5737        }
5738
5739        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5740        info!("Creating RandomnessStateCreate tx");
5741        Some(tx)
5742    }
5743
5744    #[instrument(level = "debug", skip_all)]
5745    fn create_accumulator_root_tx(
5746        &self,
5747        epoch_store: &Arc<AuthorityPerEpochStore>,
5748    ) -> Option<EndOfEpochTransactionKind> {
5749        if !epoch_store
5750            .protocol_config()
5751            .create_root_accumulator_object()
5752        {
5753            info!("accumulator root creation not enabled");
5754            return None;
5755        }
5756
5757        if epoch_store.accumulator_root_exists() {
5758            return None;
5759        }
5760
5761        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5762        info!("Creating AccumulatorRootCreate tx");
5763        Some(tx)
5764    }
5765
5766    #[instrument(level = "debug", skip_all)]
5767    fn create_write_accumulator_storage_cost_tx(
5768        &self,
5769        epoch_store: &Arc<AuthorityPerEpochStore>,
5770    ) -> Option<EndOfEpochTransactionKind> {
5771        if !epoch_store.accumulator_root_exists() {
5772            info!("accumulator root does not exist yet");
5773            return None;
5774        }
5775        if !epoch_store.protocol_config().enable_accumulators() {
5776            info!("accumulators not enabled");
5777            return None;
5778        }
5779
5780        let object_store = self.get_object_store();
5781        let object_count =
5782            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5783                Ok(Some(count)) => count,
5784                Ok(None) => return None,
5785                Err(e) => {
5786                    fatal!("failed to read accumulator object count: {e}");
5787                }
5788            };
5789
5790        let storage_cost = object_count.saturating_mul(
5791            epoch_store
5792                .protocol_config()
5793                .accumulator_object_storage_cost(),
5794        );
5795
5796        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5797        info!(
5798            object_count,
5799            storage_cost, "Creating WriteAccumulatorStorageCost tx"
5800        );
5801        Some(tx)
5802    }
5803
5804    #[instrument(level = "debug", skip_all)]
5805    fn create_coin_registry_tx(
5806        &self,
5807        epoch_store: &Arc<AuthorityPerEpochStore>,
5808    ) -> Option<EndOfEpochTransactionKind> {
5809        if !epoch_store.protocol_config().enable_coin_registry() {
5810            info!("coin registry not enabled");
5811            return None;
5812        }
5813
5814        if epoch_store.coin_registry_exists() {
5815            return None;
5816        }
5817
5818        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5819        info!("Creating CoinRegistryCreate tx");
5820        Some(tx)
5821    }
5822
5823    #[instrument(level = "debug", skip_all)]
5824    fn create_display_registry_tx(
5825        &self,
5826        epoch_store: &Arc<AuthorityPerEpochStore>,
5827    ) -> Option<EndOfEpochTransactionKind> {
5828        if !epoch_store.protocol_config().enable_display_registry() {
5829            info!("display registry not enabled");
5830            return None;
5831        }
5832
5833        if epoch_store.display_registry_exists() {
5834            return None;
5835        }
5836
5837        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5838        info!("Creating DisplayRegistryCreate tx");
5839        Some(tx)
5840    }
5841
5842    #[instrument(level = "debug", skip_all)]
5843    fn create_bridge_tx(
5844        &self,
5845        epoch_store: &Arc<AuthorityPerEpochStore>,
5846    ) -> Option<EndOfEpochTransactionKind> {
5847        if !epoch_store.protocol_config().enable_bridge() {
5848            info!("bridge not enabled");
5849            return None;
5850        }
5851        if epoch_store.bridge_exists() {
5852            return None;
5853        }
5854        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5855        info!("Creating Bridge Create tx");
5856        Some(tx)
5857    }
5858
5859    #[instrument(level = "debug", skip_all)]
5860    fn init_bridge_committee_tx(
5861        &self,
5862        epoch_store: &Arc<AuthorityPerEpochStore>,
5863    ) -> Option<EndOfEpochTransactionKind> {
5864        if !epoch_store.protocol_config().enable_bridge() {
5865            info!("bridge not enabled");
5866            return None;
5867        }
5868        if !epoch_store
5869            .protocol_config()
5870            .should_try_to_finalize_bridge_committee()
5871        {
5872            info!("should not try to finalize bridge committee yet");
5873            return None;
5874        }
5875        // Only create this transaction if bridge exists
5876        if !epoch_store.bridge_exists() {
5877            return None;
5878        }
5879
5880        if epoch_store.bridge_committee_initiated() {
5881            return None;
5882        }
5883
5884        let bridge_initial_shared_version = epoch_store
5885            .epoch_start_config()
5886            .bridge_obj_initial_shared_version()
5887            .expect("initial version must exist");
5888        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5889        info!("Init Bridge committee tx");
5890        Some(tx)
5891    }
5892
5893    #[instrument(level = "debug", skip_all)]
5894    fn create_deny_list_state_tx(
5895        &self,
5896        epoch_store: &Arc<AuthorityPerEpochStore>,
5897    ) -> Option<EndOfEpochTransactionKind> {
5898        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5899            return None;
5900        }
5901
5902        if epoch_store.coin_deny_list_state_exists() {
5903            return None;
5904        }
5905
5906        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5907        info!("Creating DenyListStateCreate tx");
5908        Some(tx)
5909    }
5910
5911    #[instrument(level = "debug", skip_all)]
5912    fn create_address_alias_state_tx(
5913        &self,
5914        epoch_store: &Arc<AuthorityPerEpochStore>,
5915    ) -> Option<EndOfEpochTransactionKind> {
5916        if !epoch_store.protocol_config().address_aliases() {
5917            info!("address aliases not enabled");
5918            return None;
5919        }
5920
5921        if epoch_store.address_alias_state_exists() {
5922            return None;
5923        }
5924
5925        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5926        info!("Creating AddressAliasStateCreate tx");
5927        Some(tx)
5928    }
5929
5930    #[instrument(level = "debug", skip_all)]
5931    fn create_execution_time_observations_tx(
5932        &self,
5933        epoch_store: &Arc<AuthorityPerEpochStore>,
5934        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5935        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5936    ) -> Option<EndOfEpochTransactionKind> {
5937        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5938            .protocol_config()
5939            .per_object_congestion_control_mode()
5940        else {
5941            return None;
5942        };
5943
5944        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5945        // execution time observations for commands in these checkpoints.
5946        let start_checkpoint = std::cmp::max(
5947            last_checkpoint_before_end_of_epoch
5948                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5949            // If we have <N checkpoints in the epoch, use all of them.
5950            epoch_store
5951                .epoch()
5952                .checked_sub(1)
5953                .map(|prev_epoch| {
5954                    self.checkpoint_store
5955                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
5956                        .expect("typed store must not fail")
5957                        .expect(
5958                            "sequence number of last checkpoint of preceding epoch must be saved",
5959                        )
5960                        + 1
5961                })
5962                .unwrap_or(1),
5963        );
5964        info!(
5965            "reading checkpoint range {:?}..={:?}",
5966            start_checkpoint, last_checkpoint_before_end_of_epoch
5967        );
5968        let sequence_numbers =
5969            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5970        let contents_digests: Vec<_> = self
5971            .checkpoint_store
5972            .multi_get_locally_computed_checkpoints(&sequence_numbers)
5973            .expect("typed store must not fail")
5974            .into_iter()
5975            .zip_debug_eq(sequence_numbers)
5976            .map(|(maybe_checkpoint, sequence_number)| {
5977                if let Some(checkpoint) = maybe_checkpoint {
5978                    checkpoint.content_digest
5979                } else {
5980                    // If locally computed checkpoint summary was already pruned, load the
5981                    // certified checkpoint.
5982                    self.checkpoint_store
5983                        .get_checkpoint_by_sequence_number(sequence_number)
5984                        .expect("typed store must not fail")
5985                        .unwrap_or_else(|| {
5986                            fatal!("preceding checkpoints must exist by end of epoch")
5987                        })
5988                        .data()
5989                        .content_digest
5990                }
5991            })
5992            .collect();
5993        let tx_digests: Vec<_> = self
5994            .checkpoint_store
5995            .multi_get_checkpoint_content(&contents_digests)
5996            .expect("typed store must not fail")
5997            .into_iter()
5998            .flat_map(|maybe_contents| {
5999                maybe_contents
6000                    .expect("preceding checkpoint contents must exist by end of epoch")
6001                    .into_inner()
6002                    .into_iter()
6003                    .map(|digests| digests.transaction)
6004            })
6005            .collect();
6006        let included_execution_time_observations: HashSet<_> = self
6007            .get_transaction_cache_reader()
6008            .multi_get_transaction_blocks(&tx_digests)
6009            .into_iter()
6010            .flat_map(|maybe_tx| {
6011                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6012                    .expect("preceding transaction must exist by end of epoch")
6013                    .transaction_data()
6014                    .kind()
6015                {
6016                    #[allow(clippy::unnecessary_to_owned)]
6017                    itertools::Either::Left(
6018                        ptb.commands
6019                            .to_owned()
6020                            .into_iter()
6021                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6022                    )
6023                } else {
6024                    itertools::Either::Right(std::iter::empty())
6025                }
6026            })
6027            .chain(end_of_epoch_observation_keys.into_iter())
6028            .collect();
6029
6030        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6031            epoch_store
6032                .get_end_of_epoch_execution_time_observations()
6033                .filter_and_sort_v1(
6034                    |(key, _)| included_execution_time_observations.contains(key),
6035                    params.stored_observations_limit.try_into().unwrap(),
6036                ),
6037        );
6038        info!("Creating StoreExecutionTimeObservations tx");
6039        Some(tx)
6040    }
6041
6042    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6043    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6044    /// formed and executed by CheckpointExecutor.
6045    ///
6046    /// When a framework upgraded has been decided on, but the validator does not have the new
6047    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6048    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6049    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6050    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6051    /// by CheckpointExecutor.
6052    #[instrument(level = "error", skip_all)]
6053    pub async fn create_and_execute_advance_epoch_tx(
6054        &self,
6055        epoch_store: &Arc<AuthorityPerEpochStore>,
6056        gas_cost_summary: &GasCostSummary,
6057        checkpoint: CheckpointSequenceNumber,
6058        epoch_start_timestamp_ms: CheckpointTimestamp,
6059        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6060        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6061        // >1 checkpoint.
6062        last_checkpoint: CheckpointSequenceNumber,
6063    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6064        let mut txns = Vec::new();
6065
6066        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6067            txns.push(tx);
6068        }
6069        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6070            txns.push(tx);
6071        }
6072        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6073            txns.push(tx);
6074        }
6075        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6076            txns.push(tx);
6077        }
6078        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6079            txns.push(tx);
6080        }
6081        if let Some(tx) = self.create_execution_time_observations_tx(
6082            epoch_store,
6083            end_of_epoch_observation_keys,
6084            last_checkpoint,
6085        ) {
6086            txns.push(tx);
6087        }
6088        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6089            txns.push(tx);
6090        }
6091
6092        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6093            txns.push(tx);
6094        }
6095        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6096            txns.push(tx);
6097        }
6098        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6099            txns.push(tx);
6100        }
6101        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6102            txns.push(tx);
6103        }
6104
6105        let next_epoch = epoch_store.epoch() + 1;
6106
6107        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6108
6109        let (next_epoch_protocol_version, next_epoch_system_packages) =
6110            Self::choose_protocol_version_and_system_packages_v2(
6111                epoch_store.protocol_version(),
6112                epoch_store.protocol_config(),
6113                epoch_store.committee(),
6114                epoch_store
6115                    .get_capabilities_v2()
6116                    .expect("read capabilities from db cannot fail"),
6117                buffer_stake_bps,
6118            );
6119
6120        // since system packages are created during the current epoch, they should abide by the
6121        // rules of the current epoch, including the current epoch's max Move binary format version
6122        let config = epoch_store.protocol_config();
6123        let binary_config = config.binary_config(None);
6124        let Some(next_epoch_system_package_bytes) = self
6125            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6126            .await
6127        else {
6128            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6129                // This case should only be hit if the validator supports the new protocol version,
6130                // but carries a different framework. The validator should still be able to
6131                // reconfigure, as the correct framework will be installed by the change epoch txn.
6132                debug_fatal!(
6133                    "upgraded system packages {:?} are not locally available, cannot create \
6134                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6135                    next_epoch_system_packages
6136                );
6137            } else {
6138                error!(
6139                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6140                    next_epoch_protocol_version
6141                );
6142            }
6143            // the checkpoint builder will keep retrying forever when it hits this error.
6144            // Eventually, one of two things will happen:
6145            // - The operator will upgrade this binary to one that has the new packages locally,
6146            //   and this function will succeed.
6147            // - The final checkpoint will be certified by other validators, we will receive it via
6148            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6149            //   and most likely shut down in the new epoch (this validator likely doesn't support
6150            //   the new protocol version, or else it should have had the packages.)
6151            return Err(CheckpointBuilderError::SystemPackagesMissing);
6152        };
6153
6154        let tx = if epoch_store
6155            .protocol_config()
6156            .end_of_epoch_transaction_supported()
6157        {
6158            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6159                next_epoch,
6160                next_epoch_protocol_version,
6161                gas_cost_summary.storage_cost,
6162                gas_cost_summary.computation_cost,
6163                gas_cost_summary.storage_rebate,
6164                gas_cost_summary.non_refundable_storage_fee,
6165                epoch_start_timestamp_ms,
6166                next_epoch_system_package_bytes,
6167            ));
6168
6169            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6170        } else {
6171            VerifiedTransaction::new_change_epoch(
6172                next_epoch,
6173                next_epoch_protocol_version,
6174                gas_cost_summary.storage_cost,
6175                gas_cost_summary.computation_cost,
6176                gas_cost_summary.storage_rebate,
6177                gas_cost_summary.non_refundable_storage_fee,
6178                epoch_start_timestamp_ms,
6179                next_epoch_system_package_bytes,
6180            )
6181        };
6182
6183        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6184            tx.clone(),
6185            epoch_store.epoch(),
6186            checkpoint,
6187        );
6188
6189        let tx_digest = executable_tx.digest();
6190
6191        info!(
6192            ?next_epoch,
6193            ?next_epoch_protocol_version,
6194            ?next_epoch_system_packages,
6195            computation_cost=?gas_cost_summary.computation_cost,
6196            storage_cost=?gas_cost_summary.storage_cost,
6197            storage_rebate=?gas_cost_summary.storage_rebate,
6198            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6199            ?tx_digest,
6200            "Creating advance epoch transaction"
6201        );
6202
6203        fail_point_async!("change_epoch_tx_delay");
6204        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6205
6206        // The tx could have been executed by state sync already - if so simply return an error.
6207        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6208        if self
6209            .get_transaction_cache_reader()
6210            .is_tx_already_executed(tx_digest)
6211        {
6212            warn!("change epoch tx has already been executed via state sync");
6213            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6214        }
6215
6216        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6217        else {
6218            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6219        };
6220
6221        // We must manually assign the shared object versions to the transaction before executing it.
6222        // This is because we do not sequence end-of-epoch transactions through consensus.
6223        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6224            self.get_object_cache_reader().as_ref(),
6225            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6226        )?;
6227
6228        assert_eq!(assigned_versions.0.len(), 1);
6229        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6230
6231        let input_objects = self.read_objects_for_execution(
6232            &tx_lock,
6233            &executable_tx,
6234            &assigned_versions,
6235            epoch_store,
6236        )?;
6237
6238        let (transaction_outputs, _timings, _execution_error_opt) = self
6239            .execute_certificate(
6240                &execution_guard,
6241                &executable_tx,
6242                input_objects,
6243                None,
6244                ExecutionEnv::default(),
6245                epoch_store,
6246            )
6247            .unwrap();
6248        let system_obj = get_sui_system_state(&transaction_outputs.written)
6249            .expect("change epoch tx must write to system object");
6250
6251        let effects = transaction_outputs.effects;
6252        // We must write tx and effects to the state sync tables so that state sync is able to
6253        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6254        // checkpoint.
6255        self.get_state_sync_store()
6256            .insert_transaction_and_effects(&tx, &effects);
6257
6258        info!(
6259            "Effects summary of the change epoch transaction: {:?}",
6260            effects.summary_for_debug()
6261        );
6262        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6263        // The change epoch transaction cannot fail to execute.
6264        assert!(effects.status().is_ok());
6265        Ok((system_obj, effects))
6266    }
6267
6268    #[instrument(level = "error", skip_all)]
6269    async fn reopen_epoch_db(
6270        &self,
6271        cur_epoch_store: &AuthorityPerEpochStore,
6272        new_committee: Committee,
6273        epoch_start_configuration: EpochStartConfiguration,
6274        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6275        epoch_last_checkpoint: CheckpointSequenceNumber,
6276    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6277        let new_epoch = new_committee.epoch;
6278        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6279        assert_eq!(
6280            epoch_start_configuration.epoch_start_state().epoch(),
6281            new_committee.epoch
6282        );
6283        fail_point!("before-open-new-epoch-store");
6284        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6285            self.name,
6286            new_committee,
6287            epoch_start_configuration,
6288            self.get_backing_package_store().clone(),
6289            self.get_object_store().clone(),
6290            expensive_safety_check_config,
6291            epoch_last_checkpoint,
6292            self.config.fullnode_sync_mode,
6293        )?;
6294        self.epoch_store.store(new_epoch_store.clone());
6295        Ok(new_epoch_store)
6296    }
6297
6298    #[cfg(test)]
6299    pub(crate) fn iter_live_object_set_for_testing(
6300        &self,
6301    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6302        let include_wrapped_object = !self
6303            .epoch_store_for_testing()
6304            .protocol_config()
6305            .simplified_unwrap_then_delete();
6306        self.get_global_state_hash_store()
6307            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6308    }
6309
6310    #[cfg(test)]
6311    pub(crate) fn shutdown_execution_for_test(&self) {
6312        self.tx_execution_shutdown
6313            .lock()
6314            .take()
6315            .unwrap()
6316            .send(())
6317            .unwrap();
6318    }
6319
6320    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6321    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6322        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6323        self.get_object_cache_reader()
6324            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6325        self.get_reconfig_api()
6326            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6327        Ok(())
6328    }
6329}
6330
6331#[async_trait]
6332impl TransactionKeyValueStoreTrait for AuthorityState {
6333    #[instrument(skip(self))]
6334    async fn multi_get(
6335        &self,
6336        transactions: &[TransactionDigest],
6337        effects: &[TransactionDigest],
6338    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6339        let txns = if !transactions.is_empty() {
6340            self.get_transaction_cache_reader()
6341                .multi_get_transaction_blocks(transactions)
6342                .into_iter()
6343                .map(|t| t.map(|t| (*t).clone().into_inner()))
6344                .collect()
6345        } else {
6346            vec![]
6347        };
6348
6349        let fx = if !effects.is_empty() {
6350            self.get_transaction_cache_reader()
6351                .multi_get_executed_effects(effects)
6352        } else {
6353            vec![]
6354        };
6355
6356        Ok((txns, fx))
6357    }
6358
6359    #[instrument(skip(self))]
6360    async fn multi_get_checkpoints(
6361        &self,
6362        checkpoint_summaries: &[CheckpointSequenceNumber],
6363        checkpoint_contents: &[CheckpointSequenceNumber],
6364        checkpoint_summaries_by_digest: &[CheckpointDigest],
6365    ) -> SuiResult<(
6366        Vec<Option<CertifiedCheckpointSummary>>,
6367        Vec<Option<CheckpointContents>>,
6368        Vec<Option<CertifiedCheckpointSummary>>,
6369    )> {
6370        // TODO: use multi-get methods if it ever becomes important (unlikely)
6371        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6372        let store = self.get_checkpoint_store();
6373        for seq in checkpoint_summaries {
6374            let checkpoint = store
6375                .get_checkpoint_by_sequence_number(*seq)?
6376                .map(|c| c.into_inner());
6377
6378            summaries.push(checkpoint);
6379        }
6380
6381        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6382        for seq in checkpoint_contents {
6383            let checkpoint = store
6384                .get_checkpoint_by_sequence_number(*seq)?
6385                .and_then(|summary| {
6386                    store
6387                        .get_checkpoint_contents(&summary.content_digest)
6388                        .expect("db read cannot fail")
6389                });
6390            contents.push(checkpoint);
6391        }
6392
6393        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6394        for digest in checkpoint_summaries_by_digest {
6395            let checkpoint = store
6396                .get_checkpoint_by_digest(digest)?
6397                .map(|c| c.into_inner());
6398            summaries_by_digest.push(checkpoint);
6399        }
6400        Ok((summaries, contents, summaries_by_digest))
6401    }
6402
6403    #[instrument(skip(self))]
6404    async fn deprecated_get_transaction_checkpoint(
6405        &self,
6406        digest: TransactionDigest,
6407    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6408        Ok(self
6409            .get_checkpoint_cache()
6410            .deprecated_get_transaction_checkpoint(&digest)
6411            .map(|(_epoch, checkpoint)| checkpoint))
6412    }
6413
6414    #[instrument(skip(self))]
6415    async fn get_object(
6416        &self,
6417        object_id: ObjectID,
6418        version: VersionNumber,
6419    ) -> SuiResult<Option<Object>> {
6420        Ok(self
6421            .get_object_cache_reader()
6422            .get_object_by_key(&object_id, version))
6423    }
6424
6425    #[instrument(skip_all)]
6426    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6427        Ok(self
6428            .get_object_cache_reader()
6429            .multi_get_objects_by_key(object_keys))
6430    }
6431
6432    #[instrument(skip(self))]
6433    async fn multi_get_transaction_checkpoint(
6434        &self,
6435        digests: &[TransactionDigest],
6436    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6437        let res = self
6438            .get_checkpoint_cache()
6439            .deprecated_multi_get_transaction_checkpoint(digests);
6440
6441        Ok(res
6442            .into_iter()
6443            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6444            .collect())
6445    }
6446
6447    #[instrument(skip(self))]
6448    async fn multi_get_events_by_tx_digests(
6449        &self,
6450        digests: &[TransactionDigest],
6451    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6452        if digests.is_empty() {
6453            return Ok(vec![]);
6454        }
6455
6456        Ok(self
6457            .get_transaction_cache_reader()
6458            .multi_get_events(digests))
6459    }
6460}
6461
6462#[cfg(msim)]
6463pub mod framework_injection {
6464    use move_binary_format::CompiledModule;
6465    use std::collections::BTreeMap;
6466    use std::{cell::RefCell, collections::BTreeSet};
6467    use sui_framework::{BuiltInFramework, SystemPackage};
6468    use sui_types::base_types::{AuthorityName, ObjectID};
6469    use sui_types::is_system_package;
6470
6471    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6472
6473    // Thread local cache because all simtests run in a single unique thread.
6474    thread_local! {
6475        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6476    }
6477
6478    type Framework = Vec<CompiledModule>;
6479
6480    pub type PackageUpgradeCallback =
6481        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6482
6483    enum PackageOverrideConfig {
6484        Global(Framework),
6485        PerValidator(PackageUpgradeCallback),
6486    }
6487
6488    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6489        modules
6490            .iter()
6491            .map(|m| {
6492                let mut buf = Vec::new();
6493                m.serialize_with_version(m.version, &mut buf).unwrap();
6494                buf
6495            })
6496            .collect()
6497    }
6498
6499    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6500        OVERRIDE.with(|bs| {
6501            bs.borrow_mut()
6502                .insert(package_id, PackageOverrideConfig::Global(modules))
6503        });
6504    }
6505
6506    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6507        OVERRIDE.with(|bs| {
6508            bs.borrow_mut()
6509                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6510        });
6511    }
6512
6513    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6514        OVERRIDE.with(|bs| {
6515            let mut new_packages_not_to_include: BTreeSet<_> =
6516                BuiltInFramework::all_package_ids().into_iter().collect();
6517            for pkg in &packages {
6518                new_packages_not_to_include.remove(&pkg.id);
6519            }
6520            for pkg in packages {
6521                bs.borrow_mut()
6522                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6523            }
6524            for empty_pkg in new_packages_not_to_include {
6525                bs.borrow_mut()
6526                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6527            }
6528        });
6529    }
6530
6531    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6532        OVERRIDE.with(|cfg| {
6533            cfg.borrow().get(package_id).and_then(|entry| match entry {
6534                PackageOverrideConfig::Global(framework) => {
6535                    Some(compiled_modules_to_bytes(framework))
6536                }
6537                PackageOverrideConfig::PerValidator(func) => {
6538                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6539                }
6540            })
6541        })
6542    }
6543
6544    pub fn get_override_modules(
6545        package_id: &ObjectID,
6546        name: AuthorityName,
6547    ) -> Option<Vec<CompiledModule>> {
6548        OVERRIDE.with(|cfg| {
6549            cfg.borrow().get(package_id).and_then(|entry| match entry {
6550                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6551                PackageOverrideConfig::PerValidator(func) => func(name),
6552            })
6553        })
6554    }
6555
6556    pub fn get_override_system_package(
6557        package_id: &ObjectID,
6558        name: AuthorityName,
6559    ) -> Option<SystemPackage> {
6560        let bytes = get_override_bytes(package_id, name)?;
6561        let dependencies = if is_system_package(*package_id) {
6562            BuiltInFramework::get_package_by_id(package_id)
6563                .dependencies
6564                .to_vec()
6565        } else {
6566            // Assume that entirely new injected packages depend on all existing system packages.
6567            BuiltInFramework::all_package_ids()
6568        };
6569        Some(SystemPackage {
6570            id: *package_id,
6571            bytes,
6572            dependencies,
6573        })
6574    }
6575
6576    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6577        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6578        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6579            cfg.borrow()
6580                .keys()
6581                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6582                .collect()
6583        });
6584
6585        extra
6586            .into_iter()
6587            .map(|package| SystemPackage {
6588                id: package,
6589                bytes: get_override_bytes(&package, name).unwrap(),
6590                dependencies: BuiltInFramework::all_package_ids(),
6591            })
6592            .collect()
6593    }
6594}
6595
6596#[derive(Debug, Serialize, Deserialize, Clone)]
6597pub struct ObjDumpFormat {
6598    pub id: ObjectID,
6599    pub version: VersionNumber,
6600    pub digest: ObjectDigest,
6601    pub object: Object,
6602}
6603
6604impl ObjDumpFormat {
6605    fn new(object: Object) -> Self {
6606        let oref = object.compute_object_reference();
6607        Self {
6608            id: oref.0,
6609            version: oref.1,
6610            digest: oref.2,
6611            object,
6612        }
6613    }
6614}
6615
6616#[derive(Debug, Serialize, Deserialize, Clone)]
6617pub struct NodeStateDump {
6618    pub tx_digest: TransactionDigest,
6619    pub sender_signed_data: SenderSignedData,
6620    pub executed_epoch: u64,
6621    pub reference_gas_price: u64,
6622    pub protocol_version: u64,
6623    pub epoch_start_timestamp_ms: u64,
6624    pub computed_effects: TransactionEffects,
6625    pub expected_effects_digest: TransactionEffectsDigest,
6626    pub relevant_system_packages: Vec<ObjDumpFormat>,
6627    pub shared_objects: Vec<ObjDumpFormat>,
6628    pub loaded_child_objects: Vec<ObjDumpFormat>,
6629    pub modified_at_versions: Vec<ObjDumpFormat>,
6630    pub runtime_reads: Vec<ObjDumpFormat>,
6631    pub input_objects: Vec<ObjDumpFormat>,
6632}
6633
6634impl NodeStateDump {
6635    pub fn new(
6636        tx_digest: &TransactionDigest,
6637        effects: &TransactionEffects,
6638        expected_effects_digest: TransactionEffectsDigest,
6639        object_store: &dyn ObjectStore,
6640        epoch_store: &Arc<AuthorityPerEpochStore>,
6641        inner_temporary_store: &InnerTemporaryStore,
6642        certificate: &VerifiedExecutableTransaction,
6643    ) -> SuiResult<Self> {
6644        // Epoch info
6645        let executed_epoch = epoch_store.epoch();
6646        let reference_gas_price = epoch_store.reference_gas_price();
6647        let epoch_start_config = epoch_store.epoch_start_config();
6648        let protocol_version = epoch_store.protocol_version().as_u64();
6649        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6650
6651        // Record all system packages at this version
6652        let mut relevant_system_packages = Vec::new();
6653        for sys_package_id in BuiltInFramework::all_package_ids() {
6654            if let Some(w) = object_store.get_object(&sys_package_id) {
6655                relevant_system_packages.push(ObjDumpFormat::new(w))
6656            }
6657        }
6658
6659        // Record all the shared objects
6660        let mut shared_objects = Vec::new();
6661        for kind in effects.input_consensus_objects() {
6662            match kind {
6663                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6664                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6665                        shared_objects.push(ObjDumpFormat::new(w))
6666                    }
6667                }
6668                InputConsensusObject::ReadConsensusStreamEnded(..)
6669                | InputConsensusObject::MutateConsensusStreamEnded(..)
6670                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6671            }
6672        }
6673
6674        // Record all loaded child objects
6675        // Child objects which are read but not mutated are not tracked anywhere else
6676        let mut loaded_child_objects = Vec::new();
6677        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6678            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6679                loaded_child_objects.push(ObjDumpFormat::new(w))
6680            }
6681        }
6682
6683        // Record all modified objects
6684        let mut modified_at_versions = Vec::new();
6685        for (id, ver) in effects.modified_at_versions() {
6686            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6687                modified_at_versions.push(ObjDumpFormat::new(w))
6688            }
6689        }
6690
6691        // Packages read at runtime, which were not previously loaded into the temoorary store
6692        // Some packages may be fetched at runtime and wont show up in input objects
6693        let mut runtime_reads = Vec::new();
6694        for obj in inner_temporary_store
6695            .runtime_packages_loaded_from_db
6696            .values()
6697        {
6698            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6699        }
6700
6701        // All other input objects should already be in `inner_temporary_store.objects`
6702
6703        Ok(Self {
6704            tx_digest: *tx_digest,
6705            executed_epoch,
6706            reference_gas_price,
6707            epoch_start_timestamp_ms,
6708            protocol_version,
6709            relevant_system_packages,
6710            shared_objects,
6711            loaded_child_objects,
6712            modified_at_versions,
6713            runtime_reads,
6714            sender_signed_data: certificate.clone().into_message(),
6715            input_objects: inner_temporary_store
6716                .input_objects
6717                .values()
6718                .map(|o| ObjDumpFormat::new(o.clone()))
6719                .collect(),
6720            computed_effects: effects.clone(),
6721            expected_effects_digest,
6722        })
6723    }
6724
6725    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6726        let mut objects = Vec::new();
6727        objects.extend(self.relevant_system_packages.clone());
6728        objects.extend(self.shared_objects.clone());
6729        objects.extend(self.loaded_child_objects.clone());
6730        objects.extend(self.modified_at_versions.clone());
6731        objects.extend(self.runtime_reads.clone());
6732        objects.extend(self.input_objects.clone());
6733        objects
6734    }
6735
6736    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6737        let file_name = format!(
6738            "{}_{}_NODE_DUMP.json",
6739            self.tx_digest,
6740            AuthorityState::unixtime_now_ms()
6741        );
6742        let mut path = path.to_path_buf();
6743        path.push(&file_name);
6744        let mut file = File::create(path.clone())?;
6745        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6746        Ok(path)
6747    }
6748
6749    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6750        let file = File::open(path)?;
6751        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6752    }
6753}