sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
8use crate::checkpoints::CheckpointBuilderError;
9use crate::checkpoints::CheckpointBuilderResult;
10use crate::congestion_tracker::CongestionTracker;
11use crate::consensus_adapter::ConsensusOverloadChecker;
12use crate::execution_cache::ExecutionCacheTraitPointers;
13use crate::execution_cache::TransactionCacheRead;
14use crate::execution_cache::writeback_cache::WritebackCache;
15use crate::execution_scheduler::ExecutionScheduler;
16use crate::execution_scheduler::SchedulingSource;
17use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
18use crate::jsonrpc_index::CoinIndexKey2;
19use crate::rpc_index::RpcIndexStore;
20use crate::traffic_controller::TrafficController;
21use crate::traffic_controller::metrics::TrafficControllerMetrics;
22use crate::transaction_outputs::TransactionOutputs;
23use crate::verify_indexes::{fix_indexes, verify_indexes};
24use arc_swap::{ArcSwap, Guard};
25use async_trait::async_trait;
26use authority_per_epoch_store::CertLockGuard;
27use fastcrypto::encoding::Base58;
28use fastcrypto::encoding::Encoding;
29use fastcrypto::hash::MultisetHash;
30use itertools::Itertools;
31use move_binary_format::CompiledModule;
32use move_binary_format::binary_config::BinaryConfig;
33use move_core_types::annotated_value::MoveStructLayout;
34use move_core_types::language_storage::ModuleId;
35use mysten_common::fatal;
36use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
37use parking_lot::Mutex;
38use prometheus::{
39    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
40    register_histogram_vec_with_registry, register_histogram_with_registry,
41    register_int_counter_vec_with_registry, register_int_counter_with_registry,
42    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
43};
44use serde::de::DeserializeOwned;
45use serde::{Deserialize, Serialize};
46use shared_object_version_manager::AssignedVersions;
47use shared_object_version_manager::Schedulable;
48use std::collections::BTreeMap;
49use std::collections::BTreeSet;
50use std::fs::File;
51use std::io::Write;
52use std::path::{Path, PathBuf};
53use std::sync::atomic::Ordering;
54use std::time::Duration;
55use std::time::Instant;
56use std::time::SystemTime;
57use std::time::UNIX_EPOCH;
58use std::{
59    collections::{HashMap, HashSet},
60    fs,
61    pin::Pin,
62    str::FromStr,
63    sync::Arc,
64    vec,
65};
66use sui_config::NodeConfig;
67use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
68use sui_protocol_config::PerObjectCongestionControlMode;
69use sui_types::crypto::RandomnessRound;
70use sui_types::dynamic_field::visitor as DFV;
71use sui_types::execution::ExecutionOutput;
72use sui_types::execution::ExecutionTimeObservationKey;
73use sui_types::execution::ExecutionTiming;
74use sui_types::execution_params::ExecutionOrEarlyError;
75use sui_types::execution_params::FundsWithdrawStatus;
76use sui_types::execution_params::get_early_execution_error;
77use sui_types::execution_status::ExecutionStatus;
78use sui_types::inner_temporary_store::PackageStoreWithFallback;
79use sui_types::layout_resolver::LayoutResolver;
80use sui_types::layout_resolver::into_struct_layout;
81use sui_types::messages_consensus::AuthorityCapabilitiesV2;
82use sui_types::object::bounded_visitor::BoundedVisitor;
83use sui_types::storage::ChildObjectResolver;
84use sui_types::storage::InputKey;
85use sui_types::storage::TrackingBackingStore;
86use sui_types::traffic_control::{
87    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
88};
89use sui_types::transaction_executor::SimulateTransactionResult;
90use sui_types::transaction_executor::TransactionChecks;
91use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
92use tap::TapFallible;
93use tokio::sync::RwLock;
94use tokio::sync::mpsc::unbounded_channel;
95use tokio::sync::watch::error::RecvError;
96use tokio::sync::{mpsc, oneshot};
97use tokio::task::JoinHandle;
98use tokio::time::timeout;
99use tracing::trace;
100use tracing::{debug, error, info, instrument, warn};
101
102use self::authority_store::ExecutionLockWriteGuard;
103use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
104pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
105use mysten_metrics::{monitored_scope, spawn_monitored_task};
106
107use crate::jsonrpc_index::IndexStore;
108use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
109use mysten_common::debug_fatal;
110use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
111use sui_config::genesis::Genesis;
112use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
113use sui_framework::{BuiltInFramework, SystemPackage};
114use sui_json_rpc_types::{
115    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
116    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
117    SuiTransactionBlockEvents, TransactionFilter,
118};
119use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
120use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
121use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
122use sui_types::authenticator_state::get_authenticator_state;
123use sui_types::committee::{EpochId, ProtocolVersion};
124use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
125use sui_types::deny_list_v1::check_coin_deny_list_v1;
126use sui_types::digests::ChainIdentifier;
127use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
128use sui_types::effects::{
129    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
130    TransactionEvents, VerifiedSignedTransactionEffects,
131};
132use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
133use sui_types::event::{Event, EventID};
134use sui_types::executable_transaction::VerifiedExecutableTransaction;
135use sui_types::gas::{GasCostSummary, SuiGasStatus};
136use sui_types::inner_temporary_store::{
137    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
138};
139use sui_types::message_envelope::Message;
140use sui_types::messages_checkpoint::{
141    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
142    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
143    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
144    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
145};
146use sui_types::messages_grpc::{
147    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
148    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
149};
150use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
151use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
152use sui_types::storage::{
153    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
154};
155use sui_types::sui_system_state::SuiSystemStateTrait;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
158use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
159use sui_types::{
160    SUI_SYSTEM_ADDRESS,
161    base_types::*,
162    committee::Committee,
163    crypto::AuthoritySignature,
164    error::{SuiError, SuiResult},
165    object::{Object, ObjectRead},
166    transaction::*,
167};
168use sui_types::{TypeTag, is_system_package};
169use typed_store::TypedStoreError;
170
171use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
172use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
173use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
174use crate::authority::authority_store_pruner::{
175    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
176};
177use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
178use crate::authority::epoch_start_configuration::EpochStartConfiguration;
179use crate::checkpoints::CheckpointStore;
180use crate::epoch::committee_store::CommitteeStore;
181use crate::execution_cache::{
182    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
183    ObjectCacheRead, StateSyncAPI,
184};
185use crate::execution_driver::execution_process;
186use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
187use crate::metrics::LatencyObserver;
188use crate::metrics::RateTracker;
189use crate::module_cache_metrics::ResolverMetrics;
190use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
191use crate::stake_aggregator::StakeAggregator;
192use crate::subscription_handler::SubscriptionHandler;
193use crate::transaction_input_loader::TransactionInputLoader;
194
195#[cfg(msim)]
196pub use crate::checkpoints::checkpoint_executor::utils::{
197    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
198};
199
200use crate::authority::authority_store_tables::AuthorityPrunerTables;
201#[cfg(msim)]
202use sui_types::committee::CommitteeTrait;
203use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
204
205#[cfg(test)]
206#[path = "unit_tests/authority_tests.rs"]
207pub mod authority_tests;
208
209#[cfg(test)]
210#[path = "unit_tests/transaction_tests.rs"]
211pub mod transaction_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/batch_transaction_tests.rs"]
215mod batch_transaction_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/move_integration_tests.rs"]
219pub mod move_integration_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/gas_tests.rs"]
223mod gas_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/batch_verification_tests.rs"]
227mod batch_verification_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/coin_deny_list_tests.rs"]
231mod coin_deny_list_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
235mod mysticeti_fastpath_execution_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/auth_unit_test_utils.rs"]
239pub mod auth_unit_test_utils;
240
241pub mod authority_test_utils;
242
243pub mod authority_per_epoch_store;
244pub mod authority_per_epoch_store_pruner;
245
246pub mod authority_store_pruner;
247pub mod authority_store_tables;
248pub mod authority_store_types;
249pub mod consensus_tx_status_cache;
250pub mod epoch_start_configuration;
251pub mod execution_time_estimator;
252pub mod shared_object_congestion_tracker;
253pub mod shared_object_version_manager;
254pub mod submitted_transaction_cache;
255pub mod test_authority_builder;
256pub mod transaction_deferral;
257pub mod transaction_reject_reason_cache;
258mod weighted_moving_average;
259
260pub(crate) mod authority_store;
261pub mod backpressure;
262
263/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
264pub struct AuthorityMetrics {
265    tx_orders: IntCounter,
266    total_certs: IntCounter,
267    total_cert_attempts: IntCounter,
268    total_effects: IntCounter,
269    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
270    pub shared_obj_tx: IntCounter,
271    sponsored_tx: IntCounter,
272    tx_already_processed: IntCounter,
273    num_input_objs: Histogram,
274    // TODO: this tracks consensus object count, not just shared. Consider renaming.
275    num_shared_objects: Histogram,
276    batch_size: Histogram,
277
278    authority_state_handle_vote_transaction_latency: Histogram,
279
280    execute_certificate_latency_single_writer: Histogram,
281    execute_certificate_latency_shared_object: Histogram,
282    await_transaction_latency: Histogram,
283
284    internal_execution_latency: Histogram,
285    execution_load_input_objects_latency: Histogram,
286    prepare_certificate_latency: Histogram,
287    commit_certificate_latency: Histogram,
288    db_checkpoint_latency: Histogram,
289
290    // TODO: Rename these metrics.
291    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
292    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
293    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
294    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
295
296    pub(crate) execution_driver_executed_transactions: IntCounter,
297    pub(crate) execution_driver_paused_transactions: IntCounter,
298    pub(crate) execution_driver_dispatch_queue: IntGauge,
299    pub(crate) execution_queueing_delay_s: Histogram,
300    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
301    pub(crate) execution_gas_latency_ratio: Histogram,
302
303    pub(crate) skipped_consensus_txns: IntCounter,
304    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
305
306    pub(crate) authority_overload_status: IntGauge,
307    pub(crate) authority_load_shedding_percentage: IntGauge,
308
309    pub(crate) transaction_overload_sources: IntCounterVec,
310
311    /// Post processing metrics
312    post_processing_total_events_emitted: IntCounter,
313    post_processing_total_tx_indexed: IntCounter,
314    post_processing_total_tx_had_event_processed: IntCounter,
315    post_processing_total_failures: IntCounter,
316
317    /// Consensus commit and transaction handler metrics
318    pub consensus_handler_processed: IntCounterVec,
319    pub consensus_handler_transaction_sizes: HistogramVec,
320    pub consensus_handler_num_low_scoring_authorities: IntGauge,
321    pub consensus_handler_scores: IntGaugeVec,
322    pub consensus_handler_deferred_transactions: IntCounter,
323    pub consensus_handler_congested_transactions: IntCounter,
324    pub consensus_handler_cancelled_transactions: IntCounter,
325    pub consensus_handler_max_object_costs: IntGaugeVec,
326    pub consensus_committed_subdags: IntCounterVec,
327    pub consensus_committed_messages: IntGaugeVec,
328    pub consensus_committed_user_transactions: IntGaugeVec,
329    pub consensus_finalized_user_transactions: IntGaugeVec,
330    pub consensus_rejected_user_transactions: IntGaugeVec,
331    pub consensus_calculated_throughput: IntGauge,
332    pub consensus_calculated_throughput_profile: IntGauge,
333    pub consensus_block_handler_block_processed: IntCounter,
334    pub consensus_block_handler_txn_processed: IntCounterVec,
335    pub consensus_block_handler_fastpath_executions: IntCounter,
336    pub consensus_timestamp_bias: Histogram,
337
338    pub limits_metrics: Arc<LimitsMetrics>,
339
340    /// bytecode verifier metrics for tracking timeouts
341    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
342
343    /// Count of zklogin signatures
344    pub zklogin_sig_count: IntCounter,
345    /// Count of multisig signatures
346    pub multisig_sig_count: IntCounter,
347
348    // Tracks recent average txn queueing delay between when it is ready for execution
349    // until it starts executing.
350    pub execution_queueing_latency: LatencyObserver,
351
352    // Tracks the rate of transactions become ready for execution in transaction manager.
353    // The need for the Mutex is that the tracker is updated in transaction manager and read
354    // in the overload_monitor. There should be low mutex contention because
355    // transaction manager is single threaded and the read rate in overload_monitor is
356    // low. In the case where transaction manager becomes multi-threaded, we can
357    // create one rate tracker per thread.
358    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
359
360    // Tracks the rate of transactions starts execution in execution driver.
361    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
362    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
363}
364
365// Override default Prom buckets for positive numbers in 0-10M range
366const POSITIVE_INT_BUCKETS: &[f64] = &[
367    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
368    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
369    7000000., 10000000.,
370];
371
372const LATENCY_SEC_BUCKETS: &[f64] = &[
373    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
374    10., 20., 30., 60., 90.,
375];
376
377// Buckets for low latency samples. Starts from 10us.
378const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
379    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
380    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
381];
382
383// Buckets for consensus timestamp bias in seconds.
384// We expect 200-300ms, so we cluster the buckets more tightly in that range.
385const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
386    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
387    2.0, 10.0, 30.0,
388];
389
390const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
391    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
392    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
393];
394
395pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
396
397// Transaction author should have observed the input objects as finalized output,
398// so usually the wait does not need to be long.
399// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
400pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
401
402impl AuthorityMetrics {
403    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
404        let execute_certificate_latency = register_histogram_vec_with_registry!(
405            "authority_state_execute_certificate_latency",
406            "Latency of executing certificates, including waiting for inputs",
407            &["tx_type"],
408            LATENCY_SEC_BUCKETS.to_vec(),
409            registry,
410        )
411        .unwrap();
412
413        let execute_certificate_latency_single_writer =
414            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
415        let execute_certificate_latency_shared_object =
416            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
417
418        Self {
419            tx_orders: register_int_counter_with_registry!(
420                "total_transaction_orders",
421                "Total number of transaction orders",
422                registry,
423            )
424            .unwrap(),
425            total_certs: register_int_counter_with_registry!(
426                "total_transaction_certificates",
427                "Total number of transaction certificates handled",
428                registry,
429            )
430            .unwrap(),
431            total_cert_attempts: register_int_counter_with_registry!(
432                "total_handle_certificate_attempts",
433                "Number of calls to handle_certificate",
434                registry,
435            )
436            .unwrap(),
437            // total_effects == total transactions finished
438            total_effects: register_int_counter_with_registry!(
439                "total_transaction_effects",
440                "Total number of transaction effects produced",
441                registry,
442            )
443            .unwrap(),
444
445            shared_obj_tx: register_int_counter_with_registry!(
446                "num_shared_obj_tx",
447                "Number of transactions involving shared objects",
448                registry,
449            )
450            .unwrap(),
451
452            sponsored_tx: register_int_counter_with_registry!(
453                "num_sponsored_tx",
454                "Number of sponsored transactions",
455                registry,
456            )
457            .unwrap(),
458
459            tx_already_processed: register_int_counter_with_registry!(
460                "num_tx_already_processed",
461                "Number of transaction orders already processed previously",
462                registry,
463            )
464            .unwrap(),
465            num_input_objs: register_histogram_with_registry!(
466                "num_input_objects",
467                "Distribution of number of input TX objects per TX",
468                POSITIVE_INT_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            num_shared_objects: register_histogram_with_registry!(
473                "num_shared_objects",
474                "Number of shared input objects per TX",
475                POSITIVE_INT_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            batch_size: register_histogram_with_registry!(
480                "batch_size",
481                "Distribution of size of transaction batch",
482                POSITIVE_INT_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
487                "authority_state_handle_vote_transaction_latency",
488                "Latency of voting on transactions without signing",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            )
492            .unwrap(),
493            execute_certificate_latency_single_writer,
494            execute_certificate_latency_shared_object,
495            await_transaction_latency: register_histogram_with_registry!(
496                "await_transaction_latency",
497                "Latency of awaiting user transaction execution, including waiting for inputs",
498                LATENCY_SEC_BUCKETS.to_vec(),
499                registry,
500            )
501            .unwrap(),
502            internal_execution_latency: register_histogram_with_registry!(
503                "authority_state_internal_execution_latency",
504                "Latency of actual certificate executions",
505                LATENCY_SEC_BUCKETS.to_vec(),
506                registry,
507            )
508            .unwrap(),
509            execution_load_input_objects_latency: register_histogram_with_registry!(
510                "authority_state_execution_load_input_objects_latency",
511                "Latency of loading input objects for execution",
512                LOW_LATENCY_SEC_BUCKETS.to_vec(),
513                registry,
514            )
515            .unwrap(),
516            prepare_certificate_latency: register_histogram_with_registry!(
517                "authority_state_prepare_certificate_latency",
518                "Latency of executing certificates, before committing the results",
519                LATENCY_SEC_BUCKETS.to_vec(),
520                registry,
521            )
522            .unwrap(),
523            commit_certificate_latency: register_histogram_with_registry!(
524                "authority_state_commit_certificate_latency",
525                "Latency of committing certificate execution results",
526                LATENCY_SEC_BUCKETS.to_vec(),
527                registry,
528            )
529            .unwrap(),
530            db_checkpoint_latency: register_histogram_with_registry!(
531                "db_checkpoint_latency",
532                "Latency of checkpointing dbs",
533                LATENCY_SEC_BUCKETS.to_vec(),
534                registry,
535            ).unwrap(),
536            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
537                "transaction_manager_num_enqueued_certificates",
538                "Current number of certificates enqueued to ExecutionScheduler",
539                &["result"],
540                registry,
541            )
542            .unwrap(),
543            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
544                "transaction_manager_num_pending_certificates",
545                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
546                registry,
547            )
548            .unwrap(),
549            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
550                "transaction_manager_num_executing_certificates",
551                "Number of executing certificates, including queued and actually running certificates",
552                registry,
553            )
554            .unwrap(),
555            authority_overload_status: register_int_gauge_with_registry!(
556                "authority_overload_status",
557                "Whether authority is current experiencing overload and enters load shedding mode.",
558                registry)
559            .unwrap(),
560            authority_load_shedding_percentage: register_int_gauge_with_registry!(
561                "authority_load_shedding_percentage",
562                "The percentage of transactions is shed when the authority is in load shedding mode.",
563                registry)
564            .unwrap(),
565            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
566                "transaction_manager_transaction_queue_age_s",
567                "Time spent in waiting for transaction in the queue",
568                LATENCY_SEC_BUCKETS.to_vec(),
569                registry,
570            )
571            .unwrap(),
572            transaction_overload_sources: register_int_counter_vec_with_registry!(
573                "transaction_overload_sources",
574                "Number of times each source indicates transaction overload.",
575                &["source"],
576                registry)
577            .unwrap(),
578            execution_driver_executed_transactions: register_int_counter_with_registry!(
579                "execution_driver_executed_transactions",
580                "Cumulative number of transaction executed by execution driver",
581                registry,
582            )
583            .unwrap(),
584            execution_driver_paused_transactions: register_int_counter_with_registry!(
585                "execution_driver_paused_transactions",
586                "Cumulative number of transactions paused by execution driver",
587                registry,
588            )
589            .unwrap(),
590            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591                "execution_driver_dispatch_queue",
592                "Number of transaction pending in execution driver dispatch queue",
593                registry,
594            )
595            .unwrap(),
596            execution_queueing_delay_s: register_histogram_with_registry!(
597                "execution_queueing_delay_s",
598                "Queueing delay between a transaction is ready for execution until it starts executing.",
599                LATENCY_SEC_BUCKETS.to_vec(),
600                registry
601            )
602            .unwrap(),
603            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604                "prepare_cert_gas_latency_ratio",
605                "The ratio of computation gas divided by VM execution latency.",
606                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607                registry
608            )
609            .unwrap(),
610            execution_gas_latency_ratio: register_histogram_with_registry!(
611                "execution_gas_latency_ratio",
612                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614                registry
615            )
616            .unwrap(),
617            skipped_consensus_txns: register_int_counter_with_registry!(
618                "skipped_consensus_txns",
619                "Total number of consensus transactions skipped",
620                registry,
621            )
622            .unwrap(),
623            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624                "skipped_consensus_txns_cache_hit",
625                "Total number of consensus transactions skipped because of local cache hit",
626                registry,
627            )
628            .unwrap(),
629            post_processing_total_events_emitted: register_int_counter_with_registry!(
630                "post_processing_total_events_emitted",
631                "Total number of events emitted in post processing",
632                registry,
633            )
634            .unwrap(),
635            post_processing_total_tx_indexed: register_int_counter_with_registry!(
636                "post_processing_total_tx_indexed",
637                "Total number of txes indexed in post processing",
638                registry,
639            )
640            .unwrap(),
641            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642                "post_processing_total_tx_had_event_processed",
643                "Total number of txes finished event processing in post processing",
644                registry,
645            )
646            .unwrap(),
647            post_processing_total_failures: register_int_counter_with_registry!(
648                "post_processing_total_failures",
649                "Total number of failure in post processing",
650                registry,
651            )
652            .unwrap(),
653            consensus_handler_processed: register_int_counter_vec_with_registry!(
654                "consensus_handler_processed",
655                "Number of transactions processed by consensus handler",
656                &["class"],
657                registry
658            ).unwrap(),
659            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660                "consensus_handler_transaction_sizes",
661                "Sizes of each type of transactions processed by consensus handler",
662                &["class"],
663                POSITIVE_INT_BUCKETS.to_vec(),
664                registry
665            ).unwrap(),
666            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667                "consensus_handler_num_low_scoring_authorities",
668                "Number of low scoring authorities based on reputation scores from consensus",
669                registry
670            ).unwrap(),
671            consensus_handler_scores: register_int_gauge_vec_with_registry!(
672                "consensus_handler_scores",
673                "scores from consensus for each authority",
674                &["authority"],
675                registry,
676            ).unwrap(),
677            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678                "consensus_handler_deferred_transactions",
679                "Number of transactions deferred by consensus handler",
680                registry,
681            ).unwrap(),
682            consensus_handler_congested_transactions: register_int_counter_with_registry!(
683                "consensus_handler_congested_transactions",
684                "Number of transactions deferred by consensus handler due to congestion",
685                registry,
686            ).unwrap(),
687            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688                "consensus_handler_cancelled_transactions",
689                "Number of transactions cancelled by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693                "consensus_handler_max_congestion_control_object_costs",
694                "Max object costs for congestion control in the current consensus commit",
695                &["commit_type"],
696                registry,
697            ).unwrap(),
698            consensus_committed_subdags: register_int_counter_vec_with_registry!(
699                "consensus_committed_subdags",
700                "Number of committed subdags, sliced by author",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            consensus_committed_messages: register_int_gauge_vec_with_registry!(
705                "consensus_committed_messages",
706                "Total number of committed consensus messages, sliced by author",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711                "consensus_committed_user_transactions",
712                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
713                &["authority"],
714                registry,
715            ).unwrap(),
716            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
717                "consensus_finalized_user_transactions",
718                "Number of user transactions finalized, sliced by submitter",
719                &["authority"],
720                registry,
721            ).unwrap(),
722            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
723                "consensus_rejected_user_transactions",
724                "Number of user transactions rejected, sliced by submitter",
725                &["authority"],
726                registry,
727            ).unwrap(),
728            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
729            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
730            zklogin_sig_count: register_int_counter_with_registry!(
731                "zklogin_sig_count",
732                "Count of zkLogin signatures",
733                registry,
734            )
735            .unwrap(),
736            multisig_sig_count: register_int_counter_with_registry!(
737                "multisig_sig_count",
738                "Count of zkLogin signatures",
739                registry,
740            )
741            .unwrap(),
742            consensus_calculated_throughput: register_int_gauge_with_registry!(
743                "consensus_calculated_throughput",
744                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
745                registry,
746            ).unwrap(),
747            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
748                "consensus_calculated_throughput_profile",
749                "The current active calculated throughput profile",
750                registry
751            ).unwrap(),
752            consensus_block_handler_block_processed: register_int_counter_with_registry!(
753                "consensus_block_handler_block_processed",
754                "Number of blocks processed by consensus block handler.",
755                registry
756            ).unwrap(),
757            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
758                "consensus_block_handler_txn_processed",
759                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
760                &["outcome"],
761                registry
762            ).unwrap(),
763            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
764                "consensus_block_handler_fastpath_executions",
765                "Number of fastpath transactions sent for execution by consensus transaction handler",
766                registry,
767            ).unwrap(),
768            consensus_timestamp_bias: register_histogram_with_registry!(
769                "consensus_timestamp_bias",
770                "Bias/delay from consensus timestamp to system time",
771                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
772                registry
773            ).unwrap(),
774            execution_queueing_latency: LatencyObserver::new(),
775            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
776            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
777        }
778    }
779}
780
781/// a Trait object for `Signer` that is:
782/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
783/// - Sync, i.e. can be safely shared between threads.
784///
785/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
786///
787pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
788
789/// Execution env contains the "environment" for the transaction to be executed in, that is,
790/// all the information necessary for execution that is not specified by the transaction itself.
791#[derive(Debug, Clone)]
792pub struct ExecutionEnv {
793    /// The assigned version of each shared object for the transaction.
794    pub assigned_versions: AssignedVersions,
795    /// The expected digest of the effects of the transaction, if executing from checkpoint or
796    /// other sources where the effects are known in advance.
797    pub expected_effects_digest: Option<TransactionEffectsDigest>,
798    /// The source of the scheduling of the transaction.
799    pub scheduling_source: SchedulingSource,
800    /// Status of the address funds withdraw scheduling of the transaction,
801    /// including both address and object funds withdraws.
802    pub funds_withdraw_status: FundsWithdrawStatus,
803    /// Transactions that must finish before this transaction can be executed.
804    /// Used to schedule barrier transactions after non-exclusive writes.
805    pub barrier_dependencies: Vec<TransactionDigest>,
806}
807
808impl Default for ExecutionEnv {
809    fn default() -> Self {
810        Self {
811            assigned_versions: Default::default(),
812            expected_effects_digest: None,
813            scheduling_source: SchedulingSource::NonFastPath,
814            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
815            barrier_dependencies: Default::default(),
816        }
817    }
818}
819
820impl ExecutionEnv {
821    pub fn new() -> Self {
822        Default::default()
823    }
824
825    pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
826        self.scheduling_source = scheduling_source;
827        self
828    }
829
830    pub fn with_expected_effects_digest(
831        mut self,
832        expected_effects_digest: TransactionEffectsDigest,
833    ) -> Self {
834        self.expected_effects_digest = Some(expected_effects_digest);
835        self
836    }
837
838    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
839        self.assigned_versions = assigned_versions;
840        self
841    }
842
843    pub fn with_insufficient_funds(mut self) -> Self {
844        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
845        self
846    }
847
848    pub fn with_barrier_dependencies(
849        mut self,
850        barrier_dependencies: BTreeSet<TransactionDigest>,
851    ) -> Self {
852        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
853        self
854    }
855}
856
857#[derive(Debug)]
858pub struct ForkRecoveryState {
859    /// Transaction digest to effects digest overrides
860    transaction_overrides:
861        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
862}
863
864impl Default for ForkRecoveryState {
865    fn default() -> Self {
866        Self {
867            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
868        }
869    }
870}
871
872impl ForkRecoveryState {
873    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
874        let Some(config) = config else {
875            return Ok(Self::default());
876        };
877
878        let mut transaction_overrides = HashMap::new();
879        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
880            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
881                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
882            })?;
883            let effects_digest =
884                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
885                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
886                })?;
887            transaction_overrides.insert(tx_digest, effects_digest);
888        }
889
890        Ok(Self {
891            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
892        })
893    }
894
895    pub fn get_transaction_override(
896        &self,
897        tx_digest: &TransactionDigest,
898    ) -> Option<TransactionEffectsDigest> {
899        self.transaction_overrides.read().get(tx_digest).copied()
900    }
901}
902
903pub struct AuthorityState {
904    // Fixed size, static, identity of the authority
905    /// The name of this authority.
906    pub name: AuthorityName,
907    /// The signature key of the authority.
908    pub secret: StableSyncAuthoritySigner,
909
910    /// The database
911    input_loader: TransactionInputLoader,
912    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
913    coin_reservation_resolver: Arc<CoinReservationResolver>,
914
915    epoch_store: ArcSwap<AuthorityPerEpochStore>,
916
917    /// This lock denotes current 'execution epoch'.
918    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
919    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
920    /// from previous epoch that are executed but did not make into checkpoint.
921    execution_lock: RwLock<EpochId>,
922
923    pub indexes: Option<Arc<IndexStore>>,
924    pub rpc_index: Option<Arc<RpcIndexStore>>,
925
926    pub subscription_handler: Arc<SubscriptionHandler>,
927    pub checkpoint_store: Arc<CheckpointStore>,
928
929    committee_store: Arc<CommitteeStore>,
930
931    /// Schedules transaction execution.
932    execution_scheduler: Arc<ExecutionScheduler>,
933
934    /// Shuts down the execution task. Used only in testing.
935    #[allow(unused)]
936    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
937
938    pub metrics: Arc<AuthorityMetrics>,
939    _pruner: AuthorityStorePruner,
940    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
941
942    /// Take db checkpoints of different dbs
943    db_checkpoint_config: DBCheckpointConfig,
944
945    pub config: NodeConfig,
946
947    /// Current overload status in this authority. Updated periodically.
948    pub overload_info: AuthorityOverloadInfo,
949
950    /// The chain identifier is derived from the digest of the genesis checkpoint.
951    chain_identifier: ChainIdentifier,
952
953    pub(crate) congestion_tracker: Arc<CongestionTracker>,
954
955    /// Traffic controller for Sui core servers (json-rpc, validator service)
956    pub traffic_controller: Option<Arc<TrafficController>>,
957
958    /// Fork recovery state for handling equivocation after forks
959    fork_recovery_state: Option<ForkRecoveryState>,
960
961    /// Notification channel for reconfiguration
962    notify_epoch: tokio::sync::watch::Sender<EpochId>,
963}
964
965/// The authority state encapsulates all state, drives execution, and ensures safety.
966///
967/// Note the authority operations can be accessed through a read ref (&) and do not
968/// require &mut. Internally a database is synchronized through a mutex lock.
969///
970/// Repeating valid commands should produce no changes and return no error.
971impl AuthorityState {
972    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
973        epoch_store.committee().authority_exists(&self.name)
974    }
975
976    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
977        !self.is_validator(epoch_store)
978    }
979
980    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
981        &self.committee_store
982    }
983
984    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
985        self.committee_store.clone()
986    }
987
988    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
989        &self.config.authority_overload_config
990    }
991
992    pub fn get_epoch_state_commitments(
993        &self,
994        epoch: EpochId,
995    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
996        self.checkpoint_store.get_epoch_state_commitments(epoch)
997    }
998
999    fn handle_transaction_deny_checks(
1000        &self,
1001        transaction: &VerifiedTransaction,
1002        epoch_store: &Arc<AuthorityPerEpochStore>,
1003    ) -> SuiResult<CheckedInputObjects> {
1004        let tx_digest = transaction.digest();
1005        let tx_data = transaction.data().transaction_data();
1006
1007        let input_object_kinds = tx_data.input_objects()?;
1008        let receiving_objects_refs = tx_data.receiving_objects();
1009
1010        // Note: the deny checks may do redundant package loads but:
1011        // - they only load packages when there is an active package deny map
1012        // - the loads are cached anyway
1013        sui_transaction_checks::deny::check_transaction_for_signing(
1014            tx_data,
1015            transaction.tx_signatures(),
1016            &input_object_kinds,
1017            &receiving_objects_refs,
1018            &self.config.transaction_deny_config,
1019            self.get_backing_package_store().as_ref(),
1020        )?;
1021
1022        let withdraws = tx_data.process_funds_withdrawals_for_signing(
1023            self.chain_identifier,
1024            self.coin_reservation_resolver.as_ref(),
1025        )?;
1026
1027        self.execution_cache_trait_pointers
1028            .account_funds_read
1029            .check_amounts_available(&withdraws)?;
1030
1031        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1032            Some(tx_digest),
1033            &input_object_kinds,
1034            &receiving_objects_refs,
1035            epoch_store.epoch(),
1036        )?;
1037
1038        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1039            epoch_store.protocol_config(),
1040            epoch_store.reference_gas_price(),
1041            tx_data,
1042            input_objects,
1043            &receiving_objects,
1044            &self.metrics.bytecode_verifier_metrics,
1045            &self.config.verifier_signing_config,
1046        )?;
1047
1048        self.handle_coin_deny_list_checks(
1049            tx_data,
1050            &checked_input_objects,
1051            &receiving_objects,
1052            epoch_store,
1053        )?;
1054
1055        Ok(checked_input_objects)
1056    }
1057
1058    fn handle_coin_deny_list_checks(
1059        &self,
1060        tx_data: &TransactionData,
1061        checked_input_objects: &CheckedInputObjects,
1062        receiving_objects: &ReceivingObjects,
1063        epoch_store: &Arc<AuthorityPerEpochStore>,
1064    ) -> SuiResult<()> {
1065        let funds_withdraw_types = tx_data
1066            .get_funds_withdrawals()
1067            .into_iter()
1068            .filter_map(|withdraw| {
1069                withdraw
1070                    .type_arg
1071                    .get_balance_type_param()
1072                    .map(|ty| ty.to_canonical_string(false))
1073            })
1074            .collect::<BTreeSet<_>>();
1075
1076        if epoch_store.coin_deny_list_v1_enabled() {
1077            check_coin_deny_list_v1(
1078                tx_data.sender(),
1079                checked_input_objects,
1080                receiving_objects,
1081                funds_withdraw_types.clone(),
1082                &self.get_object_store(),
1083            )?;
1084        }
1085
1086        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1087            check_coin_deny_list_v2_during_signing(
1088                tx_data.sender(),
1089                checked_input_objects,
1090                receiving_objects,
1091                funds_withdraw_types.clone(),
1092                &self.get_object_store(),
1093            )?;
1094        }
1095
1096        Ok(())
1097    }
1098
1099    /// This is a private method and should be kept that way. It doesn't check whether
1100    /// the provided transaction is a system transaction, and hence can only be called internally.
1101    #[instrument(level = "trace", skip_all)]
1102    fn handle_transaction_impl(
1103        &self,
1104        transaction: VerifiedTransaction,
1105        sign: bool,
1106        epoch_store: &Arc<AuthorityPerEpochStore>,
1107    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1108        // Ensure that validator cannot reconfigure while we are signing the tx
1109        let _execution_lock = self.execution_lock_for_signing()?;
1110
1111        let checked_input_objects =
1112            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1113
1114        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1115
1116        let tx_digest = *transaction.digest();
1117        let signed_transaction = if sign {
1118            Some(VerifiedSignedTransaction::new(
1119                epoch_store.epoch(),
1120                transaction,
1121                self.name,
1122                &*self.secret,
1123            ))
1124        } else {
1125            None
1126        };
1127
1128        if epoch_store.protocol_config().disable_preconsensus_locking() {
1129            // When preconsensus locking is disabled, validate owned object versions without
1130            // acquiring locks. Locking happens post-consensus in the consensus handler. Validation
1131            // still runs to prevent spam transactions with invalid object versions, and is necessary
1132            // to handle recently created objects.
1133            //
1134            // Note: No signed transaction or locks are stored, unlike acquire_transaction_locks().
1135            self.get_cache_writer()
1136                .validate_owned_object_versions(&owned_objects)?;
1137        } else {
1138            // Check and write locks and signed transaction, into the epoch store.
1139            // The call to acquire_transaction_locks() checks that the locks are not conflicting,
1140            // and returns ObjectLockConflict error in case there is a lock from a different
1141            // transaction on an object.
1142            self.get_cache_writer().acquire_transaction_locks(
1143                epoch_store,
1144                &owned_objects,
1145                tx_digest,
1146                signed_transaction.clone(),
1147            )?;
1148        }
1149
1150        Ok(signed_transaction)
1151    }
1152
1153    /// Vote for a transaction, either when validator receives a submit_transaction request,
1154    /// or sees a transaction from consensus. Performs the same types of checks as
1155    /// transaction signing, but does not explicitly sign the transaction.
1156    /// Note that if the transaction has been executed, we still go through the
1157    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1158    /// but not yet finalized, the signing will still work since the objects are still available.
1159    /// But if the transaction has been finalized, the signing will fail.
1160    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1161    /// has already been finalized executed.
1162    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1163    pub fn handle_vote_transaction(
1164        &self,
1165        epoch_store: &Arc<AuthorityPerEpochStore>,
1166        transaction: VerifiedTransaction,
1167    ) -> SuiResult<()> {
1168        debug!("handle_vote_transaction");
1169
1170        let _metrics_guard = self
1171            .metrics
1172            .authority_state_handle_vote_transaction_latency
1173            .start_timer();
1174        self.metrics.tx_orders.inc();
1175
1176        // The should_accept_user_certs check here is best effort, because
1177        // between a validator signs a tx and a cert is formed, the validator
1178        // could close the window.
1179        if !epoch_store
1180            .get_reconfig_state_read_lock_guard()
1181            .should_accept_user_certs()
1182        {
1183            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1184        }
1185
1186        // Accept finalized transactions, instead of voting to reject them.
1187        // Checking executed transactions is limited to the current epoch.
1188        // Otherwise there can be a race where the transaction is accepted because it has executed,
1189        // but the executed effects are pruned post consensus, leading to failures.
1190        let tx_digest = *transaction.digest();
1191        if epoch_store.is_recently_finalized(&tx_digest)
1192            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1193        {
1194            return Ok(());
1195        }
1196
1197        if self
1198            .get_transaction_cache_reader()
1199            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1200        {
1201            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1202                digest: (*transaction.digest()),
1203            }
1204            .into());
1205        }
1206
1207        let result =
1208            self.handle_transaction_impl(transaction, false /* sign */, epoch_store)?;
1209        assert!(
1210            result.is_none(),
1211            "handle_transaction_impl should not return a signed transaction when sign is false"
1212        );
1213        Ok(())
1214    }
1215
1216    /// Used for early client validation check for transactions before submission to server.
1217    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1218    /// This allows for fast failure feedback to clients for non-retriable errors.
1219    ///
1220    /// The key addition is checking that owned object versions match live object versions.
1221    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1222    /// requested version (which may exist in storage as historical versions), whereas
1223    /// validators check against the live/current version during locking (verify_live_object).
1224    pub fn check_transaction_validity(
1225        &self,
1226        epoch_store: &Arc<AuthorityPerEpochStore>,
1227        transaction: &VerifiedTransaction,
1228        enforce_live_input_objects: bool,
1229    ) -> SuiResult<()> {
1230        if !epoch_store
1231            .get_reconfig_state_read_lock_guard()
1232            .should_accept_user_certs()
1233        {
1234            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1235        }
1236
1237        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1238
1239        let checked_input_objects =
1240            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1241
1242        // Check that owned object versions match live objects
1243        // This closely mimics verify_live_object logic from acquire_transaction_locks
1244        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1245        let cache_reader = self.get_object_cache_reader();
1246
1247        for obj_ref in &owned_objects {
1248            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1249                // Only reject if transaction references an old version. Allow newer
1250                // versions that may exist on validators but not yet synced to fullnode.
1251                if obj_ref.1 < live_object.version() {
1252                    return Err(SuiErrorKind::UserInputError {
1253                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1254                            provided_obj_ref: *obj_ref,
1255                            current_version: live_object.version(),
1256                        },
1257                    }
1258                    .into());
1259                }
1260
1261                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1262                    // Returns a non-retriable error when the input object version is required to be live.
1263                    return Err(SuiErrorKind::UserInputError {
1264                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1265                            provided_obj_ref: *obj_ref,
1266                            current_version: live_object.version(),
1267                        },
1268                    }
1269                    .into());
1270                }
1271
1272                // If version matches, verify digest also matches
1273                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1274                    return Err(SuiErrorKind::UserInputError {
1275                        error: UserInputError::InvalidObjectDigest {
1276                            object_id: obj_ref.0,
1277                            expected_digest: live_object.digest(),
1278                        },
1279                    }
1280                    .into());
1281                }
1282            }
1283        }
1284
1285        Ok(())
1286    }
1287
1288    pub fn check_system_overload_at_signing(&self) -> bool {
1289        self.config
1290            .authority_overload_config
1291            .check_system_overload_at_signing
1292    }
1293
1294    pub fn check_system_overload_at_execution(&self) -> bool {
1295        self.config
1296            .authority_overload_config
1297            .check_system_overload_at_execution
1298    }
1299
1300    pub(crate) fn check_system_overload(
1301        &self,
1302        consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1303        tx_data: &SenderSignedData,
1304        do_authority_overload_check: bool,
1305    ) -> SuiResult {
1306        if do_authority_overload_check {
1307            self.check_authority_overload(tx_data).tap_err(|_| {
1308                self.update_overload_metrics("execution_queue");
1309            })?;
1310        }
1311        self.execution_scheduler
1312            .check_execution_overload(self.overload_config(), tx_data)
1313            .tap_err(|_| {
1314                self.update_overload_metrics("execution_pending");
1315            })?;
1316        consensus_overload_checker
1317            .check_consensus_overload()
1318            .tap_err(|_| {
1319                self.update_overload_metrics("consensus");
1320            })?;
1321
1322        let pending_tx_count = self
1323            .get_cache_commit()
1324            .approximate_pending_transaction_count();
1325        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1326            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1327                retry_after_secs: 10,
1328            }
1329            .into());
1330        }
1331
1332        Ok(())
1333    }
1334
1335    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1336        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1337            return Ok(());
1338        }
1339
1340        let load_shedding_percentage = self
1341            .overload_info
1342            .load_shedding_percentage
1343            .load(Ordering::Relaxed);
1344        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1345    }
1346
1347    fn update_overload_metrics(&self, source: &str) {
1348        self.metrics
1349            .transaction_overload_sources
1350            .with_label_values(&[source])
1351            .inc();
1352    }
1353
1354    /// Waits for fastpath (owned, package) dependency objects to become available.
1355    /// Returns true if a chosen set of fastpath dependency objects are available,
1356    /// returns false otherwise after an internal timeout.
1357    pub(crate) async fn wait_for_fastpath_dependency_objects(
1358        &self,
1359        transaction: &VerifiedTransaction,
1360        epoch: EpochId,
1361    ) -> SuiResult<bool> {
1362        let txn_data = transaction.data().transaction_data();
1363        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1364
1365        // Gather and filter input objects to wait for.
1366        let fastpath_dependency_objects: Vec<_> = move_objects
1367            .into_iter()
1368            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1369            .chain(
1370                packages
1371                    .into_iter()
1372                    .map(|package_id| InputKey::Package { id: package_id }),
1373            )
1374            .collect();
1375        let receiving_keys: HashSet<_> = receiving_objects
1376            .into_iter()
1377            .filter_map(|receiving_obj_ref| {
1378                self.should_wait_for_dependency_object(receiving_obj_ref)
1379            })
1380            .collect();
1381        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1382            return Ok(true);
1383        }
1384
1385        // Use shorter wait timeout in simtests to exercise server-side error paths and
1386        // client-side retry logic.
1387        let max_wait = if cfg!(msim) {
1388            Duration::from_millis(200)
1389        } else {
1390            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1391        };
1392
1393        match timeout(
1394            max_wait,
1395            self.get_object_cache_reader().notify_read_input_objects(
1396                &fastpath_dependency_objects,
1397                &receiving_keys,
1398                epoch,
1399            ),
1400        )
1401        .await
1402        {
1403            Ok(()) => Ok(true),
1404            // Maybe return an error for unavailable input objects,
1405            // and allow the caller to skip the rest of input checks?
1406            Err(_) => Ok(false),
1407        }
1408    }
1409
1410    /// Returns Some(inputKey) if the object reference should be waited on until it is
1411    /// finalized, before proceeding to input checks.
1412    ///
1413    /// Incorrect decisions here should only affect user experience, not safety:
1414    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1415    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1416    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1417        let (obj_id, cur_version, _digest) = obj_ref;
1418        let Some(latest_obj_ref) = self
1419            .get_object_cache_reader()
1420            .get_latest_object_ref_or_tombstone(obj_id)
1421        else {
1422            // Object might not have been created.
1423            return Some(InputKey::VersionedObject {
1424                id: FullObjectID::new(obj_id, None),
1425                version: cur_version,
1426            });
1427        };
1428        let latest_digest = latest_obj_ref.2;
1429        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1430            // Do not wait for deleted object and rely on input check instead.
1431            return None;
1432        }
1433        let latest_version = latest_obj_ref.1;
1434        if cur_version <= latest_version {
1435            // Do not wait for version that already exists or has been consumed.
1436            // Let the input check to handle them and return the proper error.
1437            return None;
1438        }
1439        // Wait for the object version to become available.
1440        Some(InputKey::VersionedObject {
1441            id: FullObjectID::new(obj_id, None),
1442            version: cur_version,
1443        })
1444    }
1445
1446    /// Wait for a certificate to be executed.
1447    /// For consensus transactions, it needs to be sequenced by the consensus.
1448    /// For owned object transactions, this function will enqueue the transaction for execution.
1449    // TODO: The next 3 functions are very similar. We should refactor them.
1450    #[instrument(level = "trace", skip_all)]
1451    pub async fn wait_for_certificate_execution(
1452        &self,
1453        certificate: &VerifiedCertificate,
1454        epoch_store: &Arc<AuthorityPerEpochStore>,
1455    ) -> SuiResult<TransactionEffects> {
1456        self.wait_for_transaction_execution(
1457            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1458            epoch_store,
1459        )
1460        .await
1461    }
1462
1463    /// Wait for a transaction to be executed.
1464    /// For consensus transactions, it needs to be sequenced by the consensus.
1465    /// For owned object transactions, this function will enqueue the transaction for execution.
1466    #[instrument(level = "trace", skip_all)]
1467    pub async fn wait_for_transaction_execution(
1468        &self,
1469        transaction: &VerifiedExecutableTransaction,
1470        epoch_store: &Arc<AuthorityPerEpochStore>,
1471    ) -> SuiResult<TransactionEffects> {
1472        let _metrics_guard = if transaction.is_consensus_tx() {
1473            self.metrics
1474                .execute_certificate_latency_shared_object
1475                .start_timer()
1476        } else {
1477            self.metrics
1478                .execute_certificate_latency_single_writer
1479                .start_timer()
1480        };
1481        trace!("execute_transaction");
1482
1483        self.metrics.total_cert_attempts.inc();
1484
1485        if !transaction.is_consensus_tx()
1486            && !epoch_store.protocol_config().disable_preconsensus_locking()
1487        {
1488            // Shared object transactions need to be sequenced by the consensus before enqueueing
1489            // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
1490            //
1491            // For owned object transactions, they can be enqueued for execution immediately
1492            // ONLY when disable_preconsensus_locking is false (QD/original fastpath mode).
1493            // When disable_preconsensus_locking is true (MFP mode), all transactions including
1494            // owned object transactions must go through consensus before enqueuing for execution.
1495            self.execution_scheduler.enqueue(
1496                vec![(
1497                    Schedulable::Transaction(transaction.clone()),
1498                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1499                )],
1500                epoch_store,
1501            );
1502        }
1503
1504        // tx could be reverted when epoch ends, so we must be careful not to return a result
1505        // here after the epoch ends.
1506        epoch_store
1507            .within_alive_epoch(self.notify_read_effects(
1508                "AuthorityState::wait_for_transaction_execution",
1509                *transaction.digest(),
1510            ))
1511            .await
1512            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1513            .and_then(|r| r)
1514    }
1515
1516    /// Awaits the effects of executing a user transaction.
1517    ///
1518    /// Relies on consensus to enqueue the transaction for execution.
1519    pub async fn await_transaction_effects(
1520        &self,
1521        digest: TransactionDigest,
1522        epoch_store: &Arc<AuthorityPerEpochStore>,
1523    ) -> SuiResult<TransactionEffects> {
1524        let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1525        debug!("await_transaction");
1526
1527        epoch_store
1528            .within_alive_epoch(
1529                self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1530            )
1531            .await
1532            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1533            .and_then(|r| r)
1534    }
1535
1536    /// Internal logic to execute a certificate.
1537    ///
1538    /// Guarantees that
1539    /// - If input objects are available, return no permanent failure.
1540    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1541    /// on successful execution; crashed execution has no observable effect and can be retried.
1542    ///
1543    /// It is caller's responsibility to ensure input objects are available and locks are set.
1544    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1545    ///
1546    /// Should only be called within sui-core.
1547    #[instrument(level = "trace", skip_all)]
1548    pub async fn try_execute_immediately(
1549        &self,
1550        certificate: &VerifiedExecutableTransaction,
1551        mut execution_env: ExecutionEnv,
1552        epoch_store: &Arc<AuthorityPerEpochStore>,
1553    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1554        let _scope = monitored_scope("Execution::try_execute_immediately");
1555        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1556
1557        let tx_digest = certificate.digest();
1558
1559        if let Some(fork_recovery) = &self.fork_recovery_state
1560            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1561        {
1562            warn!(
1563                ?tx_digest,
1564                original_digest = ?execution_env.expected_effects_digest,
1565                override_digest = ?override_digest,
1566                "Applying fork recovery override for transaction effects digest"
1567            );
1568            execution_env.expected_effects_digest = Some(override_digest);
1569        }
1570
1571        // prevent concurrent executions of the same tx.
1572        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1573
1574        let tx_cache_reader = self.get_transaction_cache_reader();
1575        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1576            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1577                assert_eq!(
1578                    effects.digest(),
1579                    expected_effects_digest,
1580                    "Unexpected effects digest for transaction {:?}",
1581                    tx_digest
1582                );
1583            }
1584            tx_guard.release();
1585            return ExecutionOutput::Success((effects, None));
1586        }
1587
1588        let execution_start_time = Instant::now();
1589
1590        // Any caller that verifies the signatures on the certificate will have already checked the
1591        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1592        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1593        // epoch, then it's invalid.
1594        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1595        else {
1596            tx_guard.release();
1597            return ExecutionOutput::EpochEnded;
1598        };
1599        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1600        // possible that reconfiguration has happened and they no longer match.
1601        // TODO: We may not need the following check anymore since the scheduler
1602        // should have checked that the certificate is from the same epoch as epoch_store.
1603        if *execution_guard != epoch_store.epoch() {
1604            tx_guard.release();
1605            info!("The epoch of the execution_guard doesn't match the epoch store");
1606            return ExecutionOutput::EpochEnded;
1607        }
1608
1609        let scheduling_source = execution_env.scheduling_source;
1610        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1611        let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1612            tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1613        } else {
1614            None
1615        };
1616
1617        let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1618            mysticeti_fp_outputs
1619        {
1620            assert!(
1621                !certificate.is_consensus_tx(),
1622                "Mysticeti fastpath executed transactions cannot be consensus transactions"
1623            );
1624            // If this transaction is not scheduled from fastpath, it must be either
1625            // from consensus or from checkpoint, i.e. it must be finalized.
1626            // To avoid re-executing fastpath transactions, we check if the outputs are already
1627            // in the mysticeti fastpath outputs cache. If so, we skip the execution and commit the transaction.
1628            debug!(
1629                ?tx_digest,
1630                "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1631            );
1632            (outputs, None, None)
1633        } else {
1634            let (transaction_outputs, timings, execution_error_opt) = match self
1635                .process_certificate(
1636                    &tx_guard,
1637                    &execution_guard,
1638                    certificate,
1639                    execution_env,
1640                    epoch_store,
1641                ) {
1642                ExecutionOutput::Success(result) => result,
1643                output => return output.unwrap_err(),
1644            };
1645            (
1646                Arc::new(transaction_outputs),
1647                Some(timings),
1648                execution_error_opt,
1649            )
1650        };
1651
1652        fail_point!("crash");
1653
1654        let effects = transaction_outputs.effects.clone();
1655        debug!(
1656            ?tx_digest,
1657            fx_digest=?effects.digest(),
1658            "process_certificate succeeded in {:.3}ms",
1659            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1660        );
1661
1662        if scheduling_source == SchedulingSource::MysticetiFastPath {
1663            self.get_cache_writer()
1664                .write_fastpath_transaction_outputs(transaction_outputs);
1665        } else {
1666            let commit_result =
1667                self.commit_certificate(certificate, transaction_outputs, epoch_store);
1668            if let Err(err) = commit_result {
1669                error!(?tx_digest, "Error committing transaction: {err}");
1670                tx_guard.release();
1671                return ExecutionOutput::Fatal(err);
1672            }
1673        }
1674
1675        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1676            certificate.data().transaction_data().kind()
1677        {
1678            if let Some(err) = &execution_error_opt {
1679                debug_fatal!("Authenticator state update failed: {:?}", err);
1680            }
1681            epoch_store.update_authenticator_state(auth_state);
1682
1683            // double check that the signature verifier always matches the authenticator state
1684            if cfg!(debug_assertions) {
1685                let authenticator_state = get_authenticator_state(self.get_object_store())
1686                    .expect("Read cannot fail")
1687                    .expect("Authenticator state must exist");
1688
1689                let mut sys_jwks: Vec<_> = authenticator_state
1690                    .active_jwks
1691                    .into_iter()
1692                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1693                    .collect();
1694                let mut active_jwks: Vec<_> = epoch_store
1695                    .signature_verifier
1696                    .get_jwks()
1697                    .into_iter()
1698                    .collect();
1699                sys_jwks.sort();
1700                active_jwks.sort();
1701
1702                assert_eq!(sys_jwks, active_jwks);
1703            }
1704        }
1705
1706        // TODO: We should also settle address funds during execution,
1707        // instead of from checkpoint builder. It will improve performance
1708        // since we will be settling as soon as we can.
1709        if certificate
1710            .transaction_data()
1711            .kind()
1712            .is_accumulator_barrier_settle_tx()
1713        {
1714            // unwrap safe because we assign accumulator version for every transaction
1715            // when accumulator is enabled.
1716            let next_accumulator_version = accumulator_version.unwrap().next();
1717            self.execution_scheduler
1718                .settle_object_funds(next_accumulator_version);
1719        }
1720
1721        tx_guard.commit_tx();
1722
1723        let elapsed = execution_start_time.elapsed();
1724        if let Some(timings) = timings {
1725            epoch_store.record_local_execution_time(
1726                certificate.data().transaction_data(),
1727                &effects,
1728                timings,
1729                elapsed,
1730            );
1731        }
1732
1733        let elapsed_us = elapsed.as_micros() as f64;
1734        if elapsed_us > 0.0 {
1735            self.metrics
1736                .execution_gas_latency_ratio
1737                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1738        };
1739        ExecutionOutput::Success((effects, execution_error_opt))
1740    }
1741
1742    pub fn read_objects_for_execution(
1743        &self,
1744        tx_lock: &CertLockGuard,
1745        certificate: &VerifiedExecutableTransaction,
1746        assigned_shared_object_versions: &AssignedVersions,
1747        epoch_store: &Arc<AuthorityPerEpochStore>,
1748    ) -> SuiResult<InputObjects> {
1749        let _scope = monitored_scope("Execution::load_input_objects");
1750        let _metrics_guard = self
1751            .metrics
1752            .execution_load_input_objects_latency
1753            .start_timer();
1754        let input_objects = &certificate.data().transaction_data().input_objects()?;
1755        self.input_loader.read_objects_for_execution(
1756            &certificate.key(),
1757            tx_lock,
1758            input_objects,
1759            assigned_shared_object_versions,
1760            epoch_store.epoch(),
1761        )
1762    }
1763
1764    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1765    /// transactions. Assumes execution will always succeed.
1766    pub async fn try_execute_for_test(
1767        &self,
1768        certificate: &VerifiedCertificate,
1769        execution_env: ExecutionEnv,
1770    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1771        let epoch_store = self.epoch_store_for_testing();
1772        let (effects, execution_error_opt) = self
1773            .try_execute_immediately(
1774                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1775                execution_env,
1776                &epoch_store,
1777            )
1778            .await
1779            .unwrap();
1780        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1781        (signed_effects, execution_error_opt)
1782    }
1783
1784    pub async fn try_execute_executable_for_test(
1785        &self,
1786        executable: &VerifiedExecutableTransaction,
1787        execution_env: ExecutionEnv,
1788    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1789        let epoch_store = self.epoch_store_for_testing();
1790        let (effects, execution_error_opt) = self
1791            .try_execute_immediately(executable, execution_env, &epoch_store)
1792            .await
1793            .unwrap();
1794        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1795        (signed_effects, execution_error_opt)
1796    }
1797
1798    pub async fn notify_read_effects(
1799        &self,
1800        task_name: &'static str,
1801        digest: TransactionDigest,
1802    ) -> SuiResult<TransactionEffects> {
1803        Ok(self
1804            .get_transaction_cache_reader()
1805            .notify_read_executed_effects(task_name, &[digest])
1806            .await
1807            .pop()
1808            .expect("must return correct number of effects"))
1809    }
1810
1811    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1812        self.get_object_cache_reader()
1813            .check_owned_objects_are_live(owned_object_refs)
1814    }
1815
1816    /// This function captures the required state to debug a forked transaction.
1817    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1818    /// NOTE: Since this info escapes the validator context,
1819    /// make sure not to leak any private info here
1820    pub(crate) fn debug_dump_transaction_state(
1821        &self,
1822        tx_digest: &TransactionDigest,
1823        effects: &TransactionEffects,
1824        expected_effects_digest: TransactionEffectsDigest,
1825        inner_temporary_store: &InnerTemporaryStore,
1826        certificate: &VerifiedExecutableTransaction,
1827        debug_dump_config: &StateDebugDumpConfig,
1828    ) -> SuiResult<PathBuf> {
1829        let dump_dir = debug_dump_config
1830            .dump_file_directory
1831            .as_ref()
1832            .cloned()
1833            .unwrap_or(std::env::temp_dir());
1834        let epoch_store = self.load_epoch_store_one_call_per_task();
1835
1836        NodeStateDump::new(
1837            tx_digest,
1838            effects,
1839            expected_effects_digest,
1840            self.get_object_store().as_ref(),
1841            &epoch_store,
1842            inner_temporary_store,
1843            certificate,
1844        )?
1845        .write_to_file(&dump_dir)
1846        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1847    }
1848
1849    #[instrument(level = "trace", skip_all)]
1850    pub(crate) fn process_certificate(
1851        &self,
1852        tx_guard: &CertTxGuard,
1853        execution_guard: &ExecutionLockReadGuard<'_>,
1854        certificate: &VerifiedExecutableTransaction,
1855        execution_env: ExecutionEnv,
1856        epoch_store: &Arc<AuthorityPerEpochStore>,
1857    ) -> ExecutionOutput<(
1858        TransactionOutputs,
1859        Vec<ExecutionTiming>,
1860        Option<ExecutionError>,
1861    )> {
1862        let _scope = monitored_scope("Execution::process_certificate");
1863        let tx_digest = *certificate.digest();
1864
1865        let input_objects = match self.read_objects_for_execution(
1866            tx_guard.as_lock_guard(),
1867            certificate,
1868            &execution_env.assigned_versions,
1869            epoch_store,
1870        ) {
1871            Ok(objects) => objects,
1872            Err(e) => return ExecutionOutput::Fatal(e),
1873        };
1874
1875        let expected_effects_digest = match execution_env.expected_effects_digest {
1876            Some(expected_effects_digest) => Some(expected_effects_digest),
1877            None => {
1878                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1879                // restarting with a new binary. In this situation, if we have published an effects signature,
1880                // we must be sure not to equivocate.
1881                // TODO: read from cache instead of DB
1882                match epoch_store.get_signed_effects_digest(&tx_digest) {
1883                    Ok(digest) => digest,
1884                    Err(e) => return ExecutionOutput::Fatal(e),
1885                }
1886            }
1887        };
1888
1889        fail_point_if!("correlated-crash-process-certificate", || {
1890            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1891                sui_simulator::task::kill_current_node(None);
1892            }
1893        });
1894
1895        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1896        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1897        // this function occur before we have written anything to the db, so we commit the tx
1898        // guard and rely on the client to retry the tx (if it was transient).
1899        self.execute_certificate(
1900            execution_guard,
1901            certificate,
1902            input_objects,
1903            expected_effects_digest,
1904            execution_env,
1905            epoch_store,
1906        )
1907    }
1908
1909    pub async fn reconfigure_traffic_control(
1910        &self,
1911        params: TrafficControlReconfigParams,
1912    ) -> Result<TrafficControlReconfigParams, SuiError> {
1913        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1914            traffic_controller.admin_reconfigure(params).await
1915        } else {
1916            Err(SuiErrorKind::InvalidAdminRequest(
1917                "Traffic controller is not configured on this node".to_string(),
1918            )
1919            .into())
1920        }
1921    }
1922
1923    #[instrument(level = "trace", skip_all)]
1924    fn commit_certificate(
1925        &self,
1926        certificate: &VerifiedExecutableTransaction,
1927        transaction_outputs: Arc<TransactionOutputs>,
1928        epoch_store: &Arc<AuthorityPerEpochStore>,
1929    ) -> SuiResult {
1930        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1931            monitored_scope("Execution::commit_certificate");
1932        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1933
1934        let tx_digest = certificate.digest();
1935
1936        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1937        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1938        epoch_store.insert_executed_in_epoch(tx_digest);
1939        let key = certificate.key();
1940        if !matches!(key, TransactionKey::Digest(_)) {
1941            epoch_store.insert_tx_key(key, *tx_digest)?;
1942        }
1943
1944        // Allow testing what happens if we crash here.
1945        fail_point!("crash");
1946
1947        self.get_cache_writer()
1948            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1949
1950        if certificate.transaction_data().is_end_of_epoch_tx() {
1951            // At the end of epoch, since system packages may have been upgraded, force
1952            // reload them in the cache.
1953            self.get_object_cache_reader()
1954                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1955        }
1956
1957        Ok(())
1958    }
1959
1960    fn update_metrics(
1961        &self,
1962        certificate: &VerifiedExecutableTransaction,
1963        inner_temporary_store: &InnerTemporaryStore,
1964        effects: &TransactionEffects,
1965    ) {
1966        // count signature by scheme, for zklogin and multisig
1967        if certificate.has_zklogin_sig() {
1968            self.metrics.zklogin_sig_count.inc();
1969        } else if certificate.has_upgraded_multisig() {
1970            self.metrics.multisig_sig_count.inc();
1971        }
1972
1973        self.metrics.total_effects.inc();
1974        self.metrics.total_certs.inc();
1975
1976        let consensus_object_count = effects.input_consensus_objects().len();
1977        if consensus_object_count > 0 {
1978            self.metrics.shared_obj_tx.inc();
1979        }
1980
1981        if certificate.is_sponsored_tx() {
1982            self.metrics.sponsored_tx.inc();
1983        }
1984
1985        let input_object_count = inner_temporary_store.input_objects.len();
1986        self.metrics
1987            .num_input_objs
1988            .observe(input_object_count as f64);
1989        self.metrics
1990            .num_shared_objects
1991            .observe(consensus_object_count as f64);
1992        self.metrics.batch_size.observe(
1993            certificate
1994                .data()
1995                .intent_message()
1996                .value
1997                .kind()
1998                .num_commands() as f64,
1999        );
2000    }
2001
2002    /// execute_certificate validates the transaction input, and executes the certificate,
2003    /// returning transaction outputs.
2004    ///
2005    /// It reads state from the db (both owned and shared locks), but it has no side effects.
2006    ///
2007    /// Executes a certificate and returns an ExecutionOutput.
2008    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
2009    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
2010    #[instrument(level = "trace", skip_all)]
2011    fn execute_certificate(
2012        &self,
2013        _execution_guard: &ExecutionLockReadGuard<'_>,
2014        certificate: &VerifiedExecutableTransaction,
2015        input_objects: InputObjects,
2016        expected_effects_digest: Option<TransactionEffectsDigest>,
2017        execution_env: ExecutionEnv,
2018        epoch_store: &Arc<AuthorityPerEpochStore>,
2019    ) -> ExecutionOutput<(
2020        TransactionOutputs,
2021        Vec<ExecutionTiming>,
2022        Option<ExecutionError>,
2023    )> {
2024        let _scope = monitored_scope("Execution::prepare_certificate");
2025        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2026        let prepare_certificate_start_time = tokio::time::Instant::now();
2027
2028        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
2029        let tx_data = certificate.data().transaction_data();
2030
2031        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
2032            return ExecutionOutput::Fatal(e);
2033        }
2034
2035        // The cost of partially re-auditing a transaction before execution is tolerated.
2036        // This step is required for correctness because, for example, ConsensusAddressOwner
2037        // object owner may have changed between signing and execution.
2038        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2039            certificate,
2040            input_objects,
2041            epoch_store.protocol_config(),
2042            epoch_store.reference_gas_price(),
2043        ) {
2044            Ok(result) => result,
2045            Err(e) => return ExecutionOutput::Fatal(e),
2046        };
2047
2048        let owned_object_refs = input_objects.inner().filter_owned_objects();
2049        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2050            return ExecutionOutput::Fatal(e);
2051        }
2052        let tx_digest = *certificate.digest();
2053        let protocol_config = epoch_store.protocol_config();
2054        let transaction_data = &certificate.data().intent_message().value;
2055        let (kind, signer, gas_data) = transaction_data.execution_parts();
2056        let early_execution_error = get_early_execution_error(
2057            &tx_digest,
2058            &input_objects,
2059            self.config.certificate_deny_config.certificate_deny_set(),
2060            &execution_env.funds_withdraw_status,
2061        );
2062        let execution_params = match early_execution_error {
2063            Some(error) => ExecutionOrEarlyError::Err(error),
2064            None => ExecutionOrEarlyError::Ok(()),
2065        };
2066
2067        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2068
2069        #[allow(unused_mut)]
2070        let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2071            epoch_store.executor().execute_transaction_to_effects(
2072                &tracking_store,
2073                protocol_config,
2074                self.metrics.limits_metrics.clone(),
2075                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2076                // cyclic dependency w/ sui-adapter
2077                self.config
2078                    .expensive_safety_check_config
2079                    .enable_deep_per_tx_sui_conservation_check(),
2080                execution_params,
2081                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2082                epoch_store
2083                    .epoch_start_config()
2084                    .epoch_data()
2085                    .epoch_start_timestamp(),
2086                input_objects,
2087                gas_data,
2088                gas_status,
2089                kind,
2090                signer,
2091                tx_digest,
2092                &mut None,
2093            );
2094
2095        if !self
2096            .execution_scheduler
2097            .should_commit_object_funds_withdraws(
2098                certificate,
2099                &effects,
2100                &execution_env,
2101                epoch_store,
2102            )
2103        {
2104            return ExecutionOutput::RetryLater;
2105        }
2106
2107        if let Some(expected_effects_digest) = expected_effects_digest
2108            && effects.digest() != expected_effects_digest
2109        {
2110            // We dont want to mask the original error, so we log it and continue.
2111            match self.debug_dump_transaction_state(
2112                &tx_digest,
2113                &effects,
2114                expected_effects_digest,
2115                &inner_temp_store,
2116                certificate,
2117                &self.config.state_debug_dump_config,
2118            ) {
2119                Ok(out_path) => {
2120                    info!(
2121                        "Dumped node state for transaction {} to {}",
2122                        tx_digest,
2123                        out_path.as_path().display().to_string()
2124                    );
2125                }
2126                Err(e) => {
2127                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2128                }
2129            }
2130            let expected_effects = self
2131                .get_transaction_cache_reader()
2132                .get_effects(&expected_effects_digest);
2133            error!(
2134                ?tx_digest,
2135                ?expected_effects_digest,
2136                actual_effects = ?effects,
2137                expected_effects = ?expected_effects,
2138                "fork detected!"
2139            );
2140            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2141                tx_digest,
2142                expected_effects_digest,
2143                effects.digest(),
2144            ) {
2145                error!("Failed to record transaction fork: {e}");
2146            }
2147
2148            fail_point_if!("kill_transaction_fork_node", || {
2149                #[cfg(msim)]
2150                {
2151                    tracing::error!(
2152                        fatal = true,
2153                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2154                        tx_digest
2155                    );
2156                    sui_simulator::task::shutdown_current_node();
2157                }
2158            });
2159
2160            fatal!(
2161                "Transaction {} is expected to have effects digest {}, but got {}!",
2162                tx_digest,
2163                expected_effects_digest,
2164                effects.digest()
2165            );
2166        }
2167
2168        fail_point_arg!("simulate_fork_during_execution", |(
2169            forked_validators,
2170            full_halt,
2171            effects_overrides,
2172            fork_probability,
2173        ): (
2174            std::sync::Arc<
2175                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2176            >,
2177            bool,
2178            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2179            f32,
2180        )| {
2181            #[cfg(msim)]
2182            self.simulate_fork_during_execution(
2183                certificate,
2184                epoch_store,
2185                &mut effects,
2186                forked_validators,
2187                full_halt,
2188                effects_overrides,
2189                fork_probability,
2190            );
2191        });
2192
2193        let unchanged_loaded_runtime_objects =
2194            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2195                certificate.transaction_data(),
2196                &effects,
2197                &tracking_store.into_read_objects(),
2198            );
2199
2200        // index certificate
2201        let _ = self
2202            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2203            .tap_err(|e| {
2204                self.metrics.post_processing_total_failures.inc();
2205                error!(?tx_digest, "tx post processing failed: {e}");
2206            });
2207
2208        self.update_metrics(certificate, &inner_temp_store, &effects);
2209
2210        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2211            certificate.clone().into_unsigned(),
2212            effects,
2213            inner_temp_store,
2214            unchanged_loaded_runtime_objects,
2215        );
2216
2217        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2218        if elapsed > 0.0 {
2219            self.metrics.prepare_cert_gas_latency_ratio.observe(
2220                transaction_outputs
2221                    .effects
2222                    .gas_cost_summary()
2223                    .computation_cost as f64
2224                    / elapsed,
2225            );
2226        }
2227
2228        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2229    }
2230
2231    pub fn prepare_certificate_for_benchmark(
2232        &self,
2233        certificate: &VerifiedExecutableTransaction,
2234        input_objects: InputObjects,
2235        epoch_store: &Arc<AuthorityPerEpochStore>,
2236    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2237        let lock = RwLock::new(epoch_store.epoch());
2238        let execution_guard = lock.try_read().unwrap();
2239
2240        let (transaction_outputs, _timings, execution_error_opt) = self
2241            .execute_certificate(
2242                &execution_guard,
2243                certificate,
2244                input_objects,
2245                None,
2246                ExecutionEnv::default(),
2247                epoch_store,
2248            )
2249            .unwrap();
2250        Ok((transaction_outputs, execution_error_opt))
2251    }
2252
2253    #[instrument(skip_all)]
2254    #[allow(clippy::type_complexity)]
2255    pub async fn dry_exec_transaction(
2256        &self,
2257        transaction: TransactionData,
2258        transaction_digest: TransactionDigest,
2259    ) -> SuiResult<(
2260        DryRunTransactionBlockResponse,
2261        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2262        TransactionEffects,
2263        Option<ObjectID>,
2264    )> {
2265        let epoch_store = self.load_epoch_store_one_call_per_task();
2266        if !self.is_fullnode(&epoch_store) {
2267            return Err(SuiErrorKind::UnsupportedFeatureError {
2268                error: "dry-exec is only supported on fullnodes".to_string(),
2269            }
2270            .into());
2271        }
2272
2273        if transaction.kind().is_system_tx() {
2274            return Err(SuiErrorKind::UnsupportedFeatureError {
2275                error: "dry-exec does not support system transactions".to_string(),
2276            }
2277            .into());
2278        }
2279
2280        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2281    }
2282
2283    #[allow(clippy::type_complexity)]
2284    pub fn dry_exec_transaction_for_benchmark(
2285        &self,
2286        transaction: TransactionData,
2287        transaction_digest: TransactionDigest,
2288    ) -> SuiResult<(
2289        DryRunTransactionBlockResponse,
2290        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2291        TransactionEffects,
2292        Option<ObjectID>,
2293    )> {
2294        let epoch_store = self.load_epoch_store_one_call_per_task();
2295        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2296    }
2297
2298    #[allow(clippy::type_complexity)]
2299    fn dry_exec_transaction_impl(
2300        &self,
2301        epoch_store: &AuthorityPerEpochStore,
2302        transaction: TransactionData,
2303        transaction_digest: TransactionDigest,
2304    ) -> SuiResult<(
2305        DryRunTransactionBlockResponse,
2306        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2307        TransactionEffects,
2308        Option<ObjectID>,
2309    )> {
2310        // Cheap validity checks for a transaction, including input size limits.
2311        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2312
2313        let input_object_kinds = transaction.input_objects()?;
2314        let receiving_object_refs = transaction.receiving_objects();
2315
2316        sui_transaction_checks::deny::check_transaction_for_signing(
2317            &transaction,
2318            &[],
2319            &input_object_kinds,
2320            &receiving_object_refs,
2321            &self.config.transaction_deny_config,
2322            self.get_backing_package_store().as_ref(),
2323        )?;
2324
2325        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2326            // We don't want to cache this transaction since it's a dry run.
2327            None,
2328            &input_object_kinds,
2329            &receiving_object_refs,
2330            epoch_store.epoch(),
2331        )?;
2332
2333        // make a gas object if one was not provided
2334        let mut gas_data = transaction.gas_data().clone();
2335        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2336            let sender = transaction.sender();
2337            // use a 1B sui coin
2338            const MIST_TO_SUI: u64 = 1_000_000_000;
2339            const DRY_RUN_SUI: u64 = 1_000_000_000;
2340            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2341            let gas_object_id = ObjectID::random();
2342            let gas_object = Object::new_move(
2343                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2344                Owner::AddressOwner(sender),
2345                TransactionDigest::genesis_marker(),
2346            );
2347            let gas_object_ref = gas_object.compute_object_reference();
2348            gas_data.payment = vec![gas_object_ref];
2349            (
2350                sui_transaction_checks::check_transaction_input_with_given_gas(
2351                    epoch_store.protocol_config(),
2352                    epoch_store.reference_gas_price(),
2353                    &transaction,
2354                    input_objects,
2355                    receiving_objects,
2356                    gas_object,
2357                    &self.metrics.bytecode_verifier_metrics,
2358                    &self.config.verifier_signing_config,
2359                )?,
2360                Some(gas_object_id),
2361            )
2362        } else {
2363            (
2364                sui_transaction_checks::check_transaction_input(
2365                    epoch_store.protocol_config(),
2366                    epoch_store.reference_gas_price(),
2367                    &transaction,
2368                    input_objects,
2369                    &receiving_objects,
2370                    &self.metrics.bytecode_verifier_metrics,
2371                    &self.config.verifier_signing_config,
2372                )?,
2373                None,
2374            )
2375        };
2376
2377        let protocol_config = epoch_store.protocol_config();
2378        let (kind, signer, _) = transaction.execution_parts();
2379
2380        let silent = true;
2381        let executor = sui_execution::executor(protocol_config, silent)
2382            .expect("Creating an executor should not fail here");
2383
2384        let expensive_checks = false;
2385        let early_execution_error = get_early_execution_error(
2386            &transaction_digest,
2387            &checked_input_objects,
2388            self.config.certificate_deny_config.certificate_deny_set(),
2389            // TODO(address-balances): This does not currently support balance withdraws properly.
2390            // For address balance withdraws, this cannot detect insufficient balance. We need to
2391            // first check if the balance is sufficient, similar to how we schedule withdraws.
2392            // For object balance withdraws, we need to handle the case where object balance is
2393            // insufficient in the post-execution.
2394            &FundsWithdrawStatus::MaybeSufficient,
2395        );
2396        let execution_params = match early_execution_error {
2397            Some(error) => ExecutionOrEarlyError::Err(error),
2398            None => ExecutionOrEarlyError::Ok(()),
2399        };
2400        let (inner_temp_store, _, effects, _timings, execution_error) = executor
2401            .execute_transaction_to_effects(
2402                self.get_backing_store().as_ref(),
2403                protocol_config,
2404                self.metrics.limits_metrics.clone(),
2405                expensive_checks,
2406                execution_params,
2407                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2408                epoch_store
2409                    .epoch_start_config()
2410                    .epoch_data()
2411                    .epoch_start_timestamp(),
2412                checked_input_objects,
2413                gas_data,
2414                gas_status,
2415                kind,
2416                signer,
2417                transaction_digest,
2418                &mut None,
2419            );
2420        let tx_digest = *effects.transaction_digest();
2421
2422        let module_cache =
2423            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2424
2425        let mut layout_resolver =
2426            epoch_store
2427                .executor()
2428                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2429                    &inner_temp_store,
2430                    self.get_backing_package_store(),
2431                )));
2432        // Returning empty vector here because we recalculate changes in the rpc layer.
2433        let object_changes = Vec::new();
2434
2435        // Returning empty vector here because we recalculate changes in the rpc layer.
2436        let balance_changes = Vec::new();
2437
2438        let written_with_kind = effects
2439            .created()
2440            .into_iter()
2441            .map(|(oref, _)| (oref, WriteKind::Create))
2442            .chain(
2443                effects
2444                    .unwrapped()
2445                    .into_iter()
2446                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2447            )
2448            .chain(
2449                effects
2450                    .mutated()
2451                    .into_iter()
2452                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2453            )
2454            .map(|(oref, kind)| {
2455                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2456                // TODO: Avoid clones.
2457                (oref.0, (oref, obj.clone(), kind))
2458            })
2459            .collect();
2460
2461        let execution_error_source = execution_error
2462            .as_ref()
2463            .err()
2464            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2465
2466        Ok((
2467            DryRunTransactionBlockResponse {
2468                suggested_gas_price: self
2469                    .congestion_tracker
2470                    .get_suggested_gas_prices(&transaction),
2471                input: SuiTransactionBlockData::try_from_with_module_cache(
2472                    transaction,
2473                    &module_cache,
2474                )
2475                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2476                    error: format!(
2477                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2478                        e
2479                    ),
2480                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2481                effects: effects.clone().try_into()?,
2482                events: SuiTransactionBlockEvents::try_from(
2483                    inner_temp_store.events.clone(),
2484                    tx_digest,
2485                    None,
2486                    layout_resolver.as_mut(),
2487                )?,
2488                object_changes,
2489                balance_changes,
2490                execution_error_source,
2491            },
2492            written_with_kind,
2493            effects,
2494            mock_gas,
2495        ))
2496    }
2497
2498    pub fn simulate_transaction(
2499        &self,
2500        mut transaction: TransactionData,
2501        checks: TransactionChecks,
2502        allow_mock_gas_coin: bool,
2503    ) -> SuiResult<SimulateTransactionResult> {
2504        if transaction.kind().is_system_tx() {
2505            return Err(SuiErrorKind::UnsupportedFeatureError {
2506                error: "simulate does not support system transactions".to_string(),
2507            }
2508            .into());
2509        }
2510
2511        let epoch_store = self.load_epoch_store_one_call_per_task();
2512        if !self.is_fullnode(&epoch_store) {
2513            return Err(SuiErrorKind::UnsupportedFeatureError {
2514                error: "simulate is only supported on fullnodes".to_string(),
2515            }
2516            .into());
2517        }
2518
2519        // Cheap validity checks for a transaction, including input size limits.
2520        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2521
2522        let input_object_kinds = transaction.input_objects()?;
2523        let receiving_object_refs = transaction.receiving_objects();
2524
2525        sui_transaction_checks::deny::check_transaction_for_signing(
2526            &transaction,
2527            &[],
2528            &input_object_kinds,
2529            &receiving_object_refs,
2530            &self.config.transaction_deny_config,
2531            self.get_backing_package_store().as_ref(),
2532        )?;
2533
2534        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2535            // We don't want to cache this transaction since it's a simulation.
2536            None,
2537            &input_object_kinds,
2538            &receiving_object_refs,
2539            epoch_store.epoch(),
2540        )?;
2541
2542        // mock a gas object if one was not provided
2543        let mock_gas_id = if allow_mock_gas_coin && transaction.gas().is_empty() {
2544            let mock_gas_object = Object::new_move(
2545                MoveObject::new_gas_coin(
2546                    OBJECT_START_VERSION,
2547                    ObjectID::MAX,
2548                    DEV_INSPECT_GAS_COIN_VALUE,
2549                ),
2550                Owner::AddressOwner(transaction.gas_data().owner),
2551                TransactionDigest::genesis_marker(),
2552            );
2553            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2554            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2555            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2556            Some(mock_gas_object.id())
2557        } else {
2558            None
2559        };
2560
2561        let protocol_config = epoch_store.protocol_config();
2562
2563        let (gas_status, checked_input_objects) = if checks.enabled() {
2564            sui_transaction_checks::check_transaction_input(
2565                epoch_store.protocol_config(),
2566                epoch_store.reference_gas_price(),
2567                &transaction,
2568                input_objects,
2569                &receiving_objects,
2570                &self.metrics.bytecode_verifier_metrics,
2571                &self.config.verifier_signing_config,
2572            )?
2573        } else {
2574            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2575                protocol_config,
2576                transaction.kind(),
2577                input_objects,
2578                receiving_objects,
2579            )?;
2580            let gas_status = SuiGasStatus::new(
2581                transaction.gas_budget(),
2582                transaction.gas_price(),
2583                epoch_store.reference_gas_price(),
2584                protocol_config,
2585            )?;
2586
2587            (gas_status, checked_input_objects)
2588        };
2589
2590        // TODO see if we can spin up a VM once and reuse it
2591        let executor = sui_execution::executor(
2592            protocol_config,
2593            true, // silent
2594        )
2595        .expect("Creating an executor should not fail here");
2596
2597        let (kind, signer, gas_data) = transaction.execution_parts();
2598        let early_execution_error = get_early_execution_error(
2599            &transaction.digest(),
2600            &checked_input_objects,
2601            self.config.certificate_deny_config.certificate_deny_set(),
2602            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2603            &FundsWithdrawStatus::MaybeSufficient,
2604        );
2605        let execution_params = match early_execution_error {
2606            Some(error) => ExecutionOrEarlyError::Err(error),
2607            None => ExecutionOrEarlyError::Ok(()),
2608        };
2609
2610        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2611
2612        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2613            &tracking_store,
2614            protocol_config,
2615            self.metrics.limits_metrics.clone(),
2616            false, // expensive_checks
2617            execution_params,
2618            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2619            epoch_store
2620                .epoch_start_config()
2621                .epoch_data()
2622                .epoch_start_timestamp(),
2623            checked_input_objects,
2624            gas_data,
2625            gas_status,
2626            kind,
2627            signer,
2628            transaction.digest(),
2629            checks.disabled(),
2630        );
2631
2632        let loaded_runtime_objects = tracking_store.into_read_objects();
2633        let unchanged_loaded_runtime_objects =
2634            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2635                &transaction,
2636                &effects,
2637                &loaded_runtime_objects,
2638            );
2639
2640        let object_set = {
2641            let objects = {
2642                let mut objects = loaded_runtime_objects;
2643
2644                for o in inner_temp_store
2645                    .input_objects
2646                    .into_values()
2647                    .chain(inner_temp_store.written.into_values())
2648                {
2649                    objects.insert(o);
2650                }
2651
2652                objects
2653            };
2654
2655            let object_keys = sui_types::storage::get_transaction_object_set(
2656                &transaction,
2657                &effects,
2658                &unchanged_loaded_runtime_objects,
2659            );
2660
2661            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2662            for k in object_keys {
2663                if let Some(o) = objects.get(&k) {
2664                    set.insert(o.clone());
2665                }
2666            }
2667
2668            set
2669        };
2670
2671        Ok(SimulateTransactionResult {
2672            objects: object_set,
2673            events: effects.events_digest().map(|_| inner_temp_store.events),
2674            effects,
2675            execution_result,
2676            mock_gas_id,
2677            unchanged_loaded_runtime_objects,
2678        })
2679    }
2680
2681    /// The object ID for gas can be any object ID, even for an uncreated object
2682    #[allow(clippy::collapsible_else_if)]
2683    #[instrument(skip_all)]
2684    pub async fn dev_inspect_transaction_block(
2685        &self,
2686        sender: SuiAddress,
2687        transaction_kind: TransactionKind,
2688        gas_price: Option<u64>,
2689        gas_budget: Option<u64>,
2690        gas_sponsor: Option<SuiAddress>,
2691        gas_objects: Option<Vec<ObjectRef>>,
2692        show_raw_txn_data_and_effects: Option<bool>,
2693        skip_checks: Option<bool>,
2694    ) -> SuiResult<DevInspectResults> {
2695        let epoch_store = self.load_epoch_store_one_call_per_task();
2696
2697        if !self.is_fullnode(&epoch_store) {
2698            return Err(SuiErrorKind::UnsupportedFeatureError {
2699                error: "dev-inspect is only supported on fullnodes".to_string(),
2700            }
2701            .into());
2702        }
2703
2704        if transaction_kind.is_system_tx() {
2705            return Err(SuiErrorKind::UnsupportedFeatureError {
2706                error: "system transactions are not supported".to_string(),
2707            }
2708            .into());
2709        }
2710
2711        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2712        let skip_checks = skip_checks.unwrap_or(true);
2713        let reference_gas_price = epoch_store.reference_gas_price();
2714        let protocol_config = epoch_store.protocol_config();
2715        let max_tx_gas = protocol_config.max_tx_gas();
2716
2717        let price = gas_price.unwrap_or(reference_gas_price);
2718        let budget = gas_budget.unwrap_or(max_tx_gas);
2719        let owner = gas_sponsor.unwrap_or(sender);
2720        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2721        let payment = gas_objects.unwrap_or_default();
2722        let mut transaction = TransactionData::V1(TransactionDataV1 {
2723            kind: transaction_kind.clone(),
2724            sender,
2725            gas_data: GasData {
2726                payment,
2727                owner,
2728                price,
2729                budget,
2730            },
2731            expiration: TransactionExpiration::None,
2732        });
2733
2734        let raw_txn_data = if show_raw_txn_data_and_effects {
2735            bcs::to_bytes(&transaction).map_err(|_| {
2736                SuiErrorKind::TransactionSerializationError {
2737                    error: "Failed to serialize transaction during dev inspect".to_string(),
2738                }
2739            })?
2740        } else {
2741            vec![]
2742        };
2743
2744        transaction.validity_check_no_gas_check(protocol_config)?;
2745
2746        let input_object_kinds = transaction.input_objects()?;
2747        let receiving_object_refs = transaction.receiving_objects();
2748
2749        sui_transaction_checks::deny::check_transaction_for_signing(
2750            &transaction,
2751            &[],
2752            &input_object_kinds,
2753            &receiving_object_refs,
2754            &self.config.transaction_deny_config,
2755            self.get_backing_package_store().as_ref(),
2756        )?;
2757
2758        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2759            // We don't want to cache this transaction since it's a dev inspect.
2760            None,
2761            &input_object_kinds,
2762            &receiving_object_refs,
2763            epoch_store.epoch(),
2764        )?;
2765
2766        let (gas_status, checked_input_objects) = if skip_checks {
2767            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2768            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2769            // use the dummy gas object so we need to add it to the input objects vector.
2770            if transaction.gas().is_empty() {
2771                // Create and use a dummy gas object if there is no gas object provided.
2772                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2773                    DEV_INSPECT_GAS_COIN_VALUE,
2774                    transaction.gas_owner(),
2775                );
2776                let gas_object_ref = dummy_gas_object.compute_object_reference();
2777                transaction.gas_data_mut().payment = vec![gas_object_ref];
2778                input_objects.push(ObjectReadResult::new(
2779                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2780                    dummy_gas_object.into(),
2781                ));
2782            }
2783            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2784                protocol_config,
2785                &transaction_kind,
2786                input_objects,
2787                receiving_objects,
2788            )?;
2789            let gas_status = SuiGasStatus::new(
2790                max_tx_gas,
2791                transaction.gas_price(),
2792                reference_gas_price,
2793                protocol_config,
2794            )?;
2795
2796            (gas_status, checked_input_objects)
2797        } else {
2798            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2799            // variant which will perform full fledged checks just like a real transaction execution.
2800            if transaction.gas().is_empty() {
2801                // Create and use a dummy gas object if there is no gas object provided.
2802                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2803                    DEV_INSPECT_GAS_COIN_VALUE,
2804                    transaction.gas_owner(),
2805                );
2806                let gas_object_ref = dummy_gas_object.compute_object_reference();
2807                transaction.gas_data_mut().payment = vec![gas_object_ref];
2808                sui_transaction_checks::check_transaction_input_with_given_gas(
2809                    epoch_store.protocol_config(),
2810                    epoch_store.reference_gas_price(),
2811                    &transaction,
2812                    input_objects,
2813                    receiving_objects,
2814                    dummy_gas_object,
2815                    &self.metrics.bytecode_verifier_metrics,
2816                    &self.config.verifier_signing_config,
2817                )?
2818            } else {
2819                sui_transaction_checks::check_transaction_input(
2820                    epoch_store.protocol_config(),
2821                    epoch_store.reference_gas_price(),
2822                    &transaction,
2823                    input_objects,
2824                    &receiving_objects,
2825                    &self.metrics.bytecode_verifier_metrics,
2826                    &self.config.verifier_signing_config,
2827                )?
2828            }
2829        };
2830
2831        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2832            .expect("Creating an executor should not fail here");
2833        let gas_data = transaction.gas_data().clone();
2834        let intent_msg = IntentMessage::new(
2835            Intent {
2836                version: IntentVersion::V0,
2837                scope: IntentScope::TransactionData,
2838                app_id: AppId::Sui,
2839            },
2840            transaction,
2841        );
2842        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2843        let early_execution_error = get_early_execution_error(
2844            &transaction_digest,
2845            &checked_input_objects,
2846            self.config.certificate_deny_config.certificate_deny_set(),
2847            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2848            &FundsWithdrawStatus::MaybeSufficient,
2849        );
2850        let execution_params = match early_execution_error {
2851            Some(error) => ExecutionOrEarlyError::Err(error),
2852            None => ExecutionOrEarlyError::Ok(()),
2853        };
2854        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2855            self.get_backing_store().as_ref(),
2856            protocol_config,
2857            self.metrics.limits_metrics.clone(),
2858            /* expensive checks */ false,
2859            execution_params,
2860            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2861            epoch_store
2862                .epoch_start_config()
2863                .epoch_data()
2864                .epoch_start_timestamp(),
2865            checked_input_objects,
2866            gas_data,
2867            gas_status,
2868            transaction_kind,
2869            sender,
2870            transaction_digest,
2871            skip_checks,
2872        );
2873
2874        let raw_effects = if show_raw_txn_data_and_effects {
2875            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2876                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2877            })?
2878        } else {
2879            vec![]
2880        };
2881
2882        let mut layout_resolver =
2883            epoch_store
2884                .executor()
2885                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2886                    &inner_temp_store,
2887                    self.get_backing_package_store(),
2888                )));
2889
2890        DevInspectResults::new(
2891            effects,
2892            inner_temp_store.events.clone(),
2893            execution_result,
2894            raw_txn_data,
2895            raw_effects,
2896            layout_resolver.as_mut(),
2897        )
2898    }
2899
2900    // Only used for testing because of how epoch store is loaded.
2901    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2902        let epoch_store = self.epoch_store_for_testing();
2903        Ok(epoch_store.reference_gas_price())
2904    }
2905
2906    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2907        self.get_transaction_cache_reader()
2908            .is_tx_already_executed(digest)
2909    }
2910
2911    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2912    fn index_tx(
2913        &self,
2914        indexes: &IndexStore,
2915        digest: &TransactionDigest,
2916        // TODO: index_tx really just need the transaction data here.
2917        cert: &VerifiedExecutableTransaction,
2918        effects: &TransactionEffects,
2919        events: &TransactionEvents,
2920        timestamp_ms: u64,
2921        tx_coins: Option<TxCoins>,
2922        written: &WrittenObjects,
2923        inner_temporary_store: &InnerTemporaryStore,
2924        epoch_store: &Arc<AuthorityPerEpochStore>,
2925    ) -> SuiResult<u64> {
2926        let changes = self
2927            .process_object_index(effects, written, inner_temporary_store, epoch_store)
2928            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2929
2930        indexes.index_tx(
2931            cert.data().intent_message().value.sender(),
2932            cert.data()
2933                .intent_message()
2934                .value
2935                .input_objects()?
2936                .iter()
2937                .map(|o| o.object_id()),
2938            effects
2939                .all_changed_objects()
2940                .into_iter()
2941                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2942            cert.data()
2943                .intent_message()
2944                .value
2945                .move_calls()
2946                .into_iter()
2947                .map(|(_cmd_idx, package, module, function)| {
2948                    (*package, module.to_owned(), function.to_owned())
2949                }),
2950            events,
2951            changes,
2952            digest,
2953            timestamp_ms,
2954            tx_coins,
2955            effects.accumulator_events(),
2956        )
2957    }
2958
2959    #[cfg(msim)]
2960    fn simulate_fork_during_execution(
2961        &self,
2962        certificate: &VerifiedExecutableTransaction,
2963        epoch_store: &Arc<AuthorityPerEpochStore>,
2964        effects: &mut TransactionEffects,
2965        forked_validators: std::sync::Arc<
2966            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2967        >,
2968        full_halt: bool,
2969        effects_overrides: std::sync::Arc<
2970            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2971        >,
2972        fork_probability: f32,
2973    ) {
2974        use std::cell::RefCell;
2975        thread_local! {
2976            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2977        }
2978        if !certificate.data().intent_message().value.is_system_tx() {
2979            let committee = epoch_store.committee();
2980            let cur_stake = (**committee).weight(&self.name);
2981            if cur_stake > 0 {
2982                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2983                    let already_forked = forked_validators
2984                        .lock()
2985                        .ok()
2986                        .map(|set| set.contains(&self.name))
2987                        .unwrap_or(false);
2988
2989                    if !already_forked {
2990                        let should_fork = if full_halt {
2991                            // For full halt, fork enough nodes to reach validity threshold
2992                            *total_stake <= committee.validity_threshold()
2993                        } else {
2994                            // For partial fork, stay strictly below validity threshold
2995                            *total_stake + cur_stake < committee.validity_threshold()
2996                        };
2997
2998                        if should_fork {
2999                            *total_stake += cur_stake;
3000
3001                            if let Ok(mut external_set) = forked_validators.lock() {
3002                                external_set.insert(self.name);
3003                                info!("forked_validators: {:?}", external_set);
3004                            }
3005                        }
3006                    }
3007
3008                    if let Ok(external_set) = forked_validators.lock() {
3009                        if external_set.contains(&self.name) {
3010                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
3011                            // Fork this transaction and record the digest and the original effects to original_effects.
3012                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
3013                            // the transaction was forked on a different validator), fork this txn as well.
3014
3015                            let tx_digest = certificate.digest().to_string();
3016                            if let Ok(mut overrides) = effects_overrides.lock() {
3017                                if overrides.contains_key(&tx_digest)
3018                                    || overrides.is_empty()
3019                                        && sui_simulator::random::deterministic_probability(
3020                                            &tx_digest,
3021                                            fork_probability,
3022                                        )
3023                                {
3024                                    let original_effects_digest = effects.digest().to_string();
3025                                    overrides
3026                                        .insert(tx_digest.clone(), original_effects_digest.clone());
3027                                    info!(
3028                                        ?tx_digest,
3029                                        ?original_effects_digest,
3030                                        "Captured forked effects digest for transaction"
3031                                    );
3032                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
3033                                        1;
3034                                }
3035                            }
3036                        }
3037                    }
3038                });
3039            }
3040        }
3041    }
3042
3043    fn process_object_index(
3044        &self,
3045        effects: &TransactionEffects,
3046        written: &WrittenObjects,
3047        inner_temporary_store: &InnerTemporaryStore,
3048        epoch_store: &Arc<AuthorityPerEpochStore>,
3049    ) -> SuiResult<ObjectIndexChanges> {
3050        let mut layout_resolver =
3051            epoch_store
3052                .executor()
3053                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3054                    inner_temporary_store,
3055                    self.get_backing_package_store(),
3056                )));
3057
3058        let modified_at_version = effects
3059            .modified_at_versions()
3060            .into_iter()
3061            .collect::<HashMap<_, _>>();
3062
3063        let tx_digest = effects.transaction_digest();
3064        let mut deleted_owners = vec![];
3065        let mut deleted_dynamic_fields = vec![];
3066        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3067            let old_version = modified_at_version.get(&id).unwrap();
3068            // When we process the index, the latest object hasn't been written yet so
3069            // the old object must be present.
3070            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3071                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3072            ) {
3073                Owner::AddressOwner(addr)
3074                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3075                Owner::ObjectOwner(object_id) => {
3076                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3077                }
3078                _ => {}
3079            }
3080        }
3081
3082        let mut new_owners = vec![];
3083        let mut new_dynamic_fields = vec![];
3084
3085        for (oref, owner, kind) in effects.all_changed_objects() {
3086            let id = &oref.0;
3087            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3088            if let WriteKind::Mutate = kind {
3089                let Some(old_version) = modified_at_version.get(id) else {
3090                    panic!(
3091                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3092                        tx_digest
3093                    );
3094                };
3095                // When we process the index, the latest object hasn't been written yet so
3096                // the old object must be present.
3097                let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3098                else {
3099                    panic!(
3100                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3101                        tx_digest, id, old_version
3102                    );
3103                };
3104                if old_object.owner != owner {
3105                    match old_object.owner {
3106                        Owner::AddressOwner(addr)
3107                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3108                            deleted_owners.push((addr, *id));
3109                        }
3110                        Owner::ObjectOwner(object_id) => {
3111                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3112                        }
3113                        _ => {}
3114                    }
3115                }
3116            }
3117
3118            match owner {
3119                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3120                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3121                    let new_object = written.get(id).unwrap_or_else(
3122                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3123                    );
3124                    assert_eq!(
3125                        new_object.version(),
3126                        oref.1,
3127                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3128                        tx_digest,
3129                        id,
3130                        new_object.version(),
3131                        oref.1
3132                    );
3133
3134                    let type_ = new_object
3135                        .type_()
3136                        .map(|type_| ObjectType::Struct(type_.clone()))
3137                        .unwrap_or(ObjectType::Package);
3138
3139                    new_owners.push((
3140                        (addr, *id),
3141                        ObjectInfo {
3142                            object_id: *id,
3143                            version: oref.1,
3144                            digest: oref.2,
3145                            type_,
3146                            owner,
3147                            previous_transaction: *effects.transaction_digest(),
3148                        },
3149                    ));
3150                }
3151                Owner::ObjectOwner(owner) => {
3152                    let new_object = written.get(id).unwrap_or_else(
3153                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3154                    );
3155                    assert_eq!(
3156                        new_object.version(),
3157                        oref.1,
3158                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3159                        tx_digest,
3160                        id,
3161                        new_object.version(),
3162                        oref.1
3163                    );
3164
3165                    let Some(df_info) = self
3166                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3167                        .unwrap_or_else(|e| {
3168                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3169                            None
3170                        }
3171                    )
3172                        else {
3173                            // Skip indexing for non dynamic field objects.
3174                            continue;
3175                        };
3176                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3177                }
3178                _ => {}
3179            }
3180        }
3181
3182        Ok(ObjectIndexChanges {
3183            deleted_owners,
3184            deleted_dynamic_fields,
3185            new_owners,
3186            new_dynamic_fields,
3187        })
3188    }
3189
3190    fn try_create_dynamic_field_info(
3191        &self,
3192        o: &Object,
3193        written: &WrittenObjects,
3194        resolver: &mut dyn LayoutResolver,
3195    ) -> SuiResult<Option<DynamicFieldInfo>> {
3196        // Skip if not a move object
3197        let Some(move_object) = o.data.try_as_move().cloned() else {
3198            return Ok(None);
3199        };
3200
3201        // We only index dynamic field objects
3202        if !move_object.type_().is_dynamic_field() {
3203            return Ok(None);
3204        }
3205
3206        let layout = resolver
3207            .get_annotated_layout(&move_object.type_().clone().into())?
3208            .into_layout();
3209
3210        let field =
3211            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3212                SuiErrorKind::ObjectDeserializationError {
3213                    error: e.to_string(),
3214                }
3215            })?;
3216
3217        let type_ = field.kind;
3218        let name_type: TypeTag = field.name_layout.into();
3219        let bcs_name = field.name_bytes.to_owned();
3220
3221        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3222            .map_err(|e| {
3223                warn!("{e}");
3224                SuiErrorKind::ObjectDeserializationError {
3225                    error: e.to_string(),
3226                }
3227            })?;
3228
3229        let name = DynamicFieldName {
3230            type_: name_type,
3231            value: SuiMoveValue::from(name_value).to_json_value(),
3232        };
3233
3234        let value_metadata = field.value_metadata().map_err(|e| {
3235            warn!("{e}");
3236            SuiErrorKind::ObjectDeserializationError {
3237                error: e.to_string(),
3238            }
3239        })?;
3240
3241        Ok(Some(match value_metadata {
3242            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3243                name,
3244                bcs_name,
3245                type_,
3246                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3247                object_id: o.id(),
3248                version: o.version(),
3249                digest: o.digest(),
3250            },
3251
3252            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3253                // Find the actual object from storage using the object id obtained from the wrapper.
3254
3255                // Try to find the object in the written objects first.
3256                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3257                    let version = object.version();
3258                    let digest = object.digest();
3259                    let object_type = object.data.type_().unwrap().clone();
3260                    (version, digest, object_type)
3261                } else {
3262                    // If not found, try to find it in the database.
3263                    let object = self
3264                        .get_object_store()
3265                        .get_object_by_key(&object_id, o.version())
3266                        .ok_or_else(|| UserInputError::ObjectNotFound {
3267                            object_id,
3268                            version: Some(o.version()),
3269                        })?;
3270                    let version = object.version();
3271                    let digest = object.digest();
3272                    let object_type = object.data.type_().unwrap().clone();
3273                    (version, digest, object_type)
3274                };
3275
3276                DynamicFieldInfo {
3277                    name,
3278                    bcs_name,
3279                    type_,
3280                    object_type: object_type.to_string(),
3281                    object_id,
3282                    version,
3283                    digest,
3284                }
3285            }
3286        }))
3287    }
3288
3289    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3290    fn post_process_one_tx(
3291        &self,
3292        certificate: &VerifiedExecutableTransaction,
3293        effects: &TransactionEffects,
3294        inner_temporary_store: &InnerTemporaryStore,
3295        epoch_store: &Arc<AuthorityPerEpochStore>,
3296    ) -> SuiResult {
3297        if self.indexes.is_none() {
3298            return Ok(());
3299        }
3300
3301        let _scope = monitored_scope("Execution::post_process_one_tx");
3302
3303        let tx_digest = certificate.digest();
3304        let timestamp_ms = Self::unixtime_now_ms();
3305        let events = &inner_temporary_store.events;
3306        let written = &inner_temporary_store.written;
3307        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3308            effects,
3309            inner_temporary_store,
3310            epoch_store,
3311        );
3312
3313        // Index tx
3314        if let Some(indexes) = &self.indexes {
3315            let _ = self
3316                .index_tx(
3317                    indexes.as_ref(),
3318                    tx_digest,
3319                    certificate,
3320                    effects,
3321                    events,
3322                    timestamp_ms,
3323                    tx_coins,
3324                    written,
3325                    inner_temporary_store,
3326                    epoch_store,
3327                )
3328                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3329                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3330                .expect("Indexing tx should not fail");
3331
3332            let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3333            let events = self.make_transaction_block_events(
3334                events.clone(),
3335                *tx_digest,
3336                timestamp_ms,
3337                epoch_store,
3338                inner_temporary_store,
3339            )?;
3340            // Emit events
3341            self.subscription_handler
3342                .process_tx(certificate.data().transaction_data(), &effects, &events)
3343                .tap_ok(|_| {
3344                    self.metrics
3345                        .post_processing_total_tx_had_event_processed
3346                        .inc()
3347                })
3348                .tap_err(|e| {
3349                    warn!(
3350                        ?tx_digest,
3351                        "Post processing - Couldn't process events for tx: {}", e
3352                    )
3353                })?;
3354
3355            self.metrics
3356                .post_processing_total_events_emitted
3357                .inc_by(events.data.len() as u64);
3358        };
3359        Ok(())
3360    }
3361
3362    fn make_transaction_block_events(
3363        &self,
3364        transaction_events: TransactionEvents,
3365        digest: TransactionDigest,
3366        timestamp_ms: u64,
3367        epoch_store: &Arc<AuthorityPerEpochStore>,
3368        inner_temporary_store: &InnerTemporaryStore,
3369    ) -> SuiResult<SuiTransactionBlockEvents> {
3370        let mut layout_resolver =
3371            epoch_store
3372                .executor()
3373                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3374                    inner_temporary_store,
3375                    self.get_backing_package_store(),
3376                )));
3377        SuiTransactionBlockEvents::try_from(
3378            transaction_events,
3379            digest,
3380            Some(timestamp_ms),
3381            layout_resolver.as_mut(),
3382        )
3383    }
3384
3385    pub fn unixtime_now_ms() -> u64 {
3386        let now = SystemTime::now()
3387            .duration_since(UNIX_EPOCH)
3388            .expect("Time went backwards")
3389            .as_millis();
3390        u64::try_from(now).expect("Travelling in time machine")
3391    }
3392
3393    // TODO(fastpath): update this handler for Mysticeti fastpath.
3394    // There will no longer be validator quorum signed transactions or effects.
3395    // The proof of finality needs to come from checkpoints.
3396    #[instrument(level = "trace", skip_all)]
3397    pub async fn handle_transaction_info_request(
3398        &self,
3399        request: TransactionInfoRequest,
3400    ) -> SuiResult<TransactionInfoResponse> {
3401        let epoch_store = self.load_epoch_store_one_call_per_task();
3402        let (transaction, status) = self
3403            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3404            .ok_or(SuiErrorKind::TransactionNotFound {
3405                digest: request.transaction_digest,
3406            })?;
3407        Ok(TransactionInfoResponse {
3408            transaction,
3409            status,
3410        })
3411    }
3412
3413    #[instrument(level = "trace", skip_all)]
3414    pub async fn handle_object_info_request(
3415        &self,
3416        request: ObjectInfoRequest,
3417    ) -> SuiResult<ObjectInfoResponse> {
3418        let epoch_store = self.load_epoch_store_one_call_per_task();
3419
3420        let requested_object_seq = match request.request_kind {
3421            ObjectInfoRequestKind::LatestObjectInfo => {
3422                let (_, seq, _) = self
3423                    .get_object_or_tombstone(request.object_id)
3424                    .await
3425                    .ok_or_else(|| {
3426                        SuiError::from(UserInputError::ObjectNotFound {
3427                            object_id: request.object_id,
3428                            version: None,
3429                        })
3430                    })?;
3431                seq
3432            }
3433            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3434        };
3435
3436        let object = self
3437            .get_object_store()
3438            .get_object_by_key(&request.object_id, requested_object_seq)
3439            .ok_or_else(|| {
3440                SuiError::from(UserInputError::ObjectNotFound {
3441                    object_id: request.object_id,
3442                    version: Some(requested_object_seq),
3443                })
3444            })?;
3445
3446        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3447            (request.generate_layout, object.data.try_as_move())
3448        {
3449            Some(into_struct_layout(
3450                self.load_epoch_store_one_call_per_task()
3451                    .executor()
3452                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3453                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3454            )?)
3455        } else {
3456            None
3457        };
3458
3459        let lock = if !object.is_address_owned() {
3460            // Only address owned objects have locks.
3461            None
3462        } else {
3463            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3464                .await?
3465                .map(|s| s.into_inner())
3466        };
3467
3468        Ok(ObjectInfoResponse {
3469            object,
3470            layout,
3471            lock_for_debugging: lock,
3472        })
3473    }
3474
3475    #[instrument(level = "trace", skip_all)]
3476    pub fn handle_checkpoint_request(
3477        &self,
3478        request: &CheckpointRequest,
3479    ) -> SuiResult<CheckpointResponse> {
3480        let summary = match request.sequence_number {
3481            Some(seq) => self
3482                .checkpoint_store
3483                .get_checkpoint_by_sequence_number(seq)?,
3484            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3485        }
3486        .map(|v| v.into_inner());
3487        let contents = match &summary {
3488            Some(s) => self
3489                .checkpoint_store
3490                .get_checkpoint_contents(&s.content_digest)?,
3491            None => None,
3492        };
3493        Ok(CheckpointResponse {
3494            checkpoint: summary,
3495            contents,
3496        })
3497    }
3498
3499    #[instrument(level = "trace", skip_all)]
3500    pub fn handle_checkpoint_request_v2(
3501        &self,
3502        request: &CheckpointRequestV2,
3503    ) -> SuiResult<CheckpointResponseV2> {
3504        let summary = if request.certified {
3505            let summary = match request.sequence_number {
3506                Some(seq) => self
3507                    .checkpoint_store
3508                    .get_checkpoint_by_sequence_number(seq)?,
3509                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3510            }
3511            .map(|v| v.into_inner());
3512            summary.map(CheckpointSummaryResponse::Certified)
3513        } else {
3514            let summary = match request.sequence_number {
3515                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3516                None => self
3517                    .checkpoint_store
3518                    .get_latest_locally_computed_checkpoint()?,
3519            };
3520            summary.map(CheckpointSummaryResponse::Pending)
3521        };
3522        let contents = match &summary {
3523            Some(s) => self
3524                .checkpoint_store
3525                .get_checkpoint_contents(&s.content_digest())?,
3526            None => None,
3527        };
3528        Ok(CheckpointResponseV2 {
3529            checkpoint: summary,
3530            contents,
3531        })
3532    }
3533
3534    fn check_protocol_version(
3535        supported_protocol_versions: SupportedProtocolVersions,
3536        current_version: ProtocolVersion,
3537    ) {
3538        info!("current protocol version is now {:?}", current_version);
3539        info!("supported versions are: {:?}", supported_protocol_versions);
3540        if !supported_protocol_versions.is_version_supported(current_version) {
3541            let msg = format!(
3542                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3543                current_version, supported_protocol_versions,
3544            );
3545
3546            error!("{}", msg);
3547            eprintln!("{}", msg);
3548
3549            #[cfg(not(msim))]
3550            std::process::exit(1);
3551
3552            #[cfg(msim)]
3553            sui_simulator::task::shutdown_current_node();
3554        }
3555    }
3556
3557    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3558    #[allow(clippy::too_many_arguments)]
3559    pub async fn new(
3560        name: AuthorityName,
3561        secret: StableSyncAuthoritySigner,
3562        supported_protocol_versions: SupportedProtocolVersions,
3563        store: Arc<AuthorityStore>,
3564        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3565        epoch_store: Arc<AuthorityPerEpochStore>,
3566        committee_store: Arc<CommitteeStore>,
3567        indexes: Option<Arc<IndexStore>>,
3568        rpc_index: Option<Arc<RpcIndexStore>>,
3569        checkpoint_store: Arc<CheckpointStore>,
3570        prometheus_registry: &Registry,
3571        genesis_objects: &[Object],
3572        db_checkpoint_config: &DBCheckpointConfig,
3573        config: NodeConfig,
3574        chain_identifier: ChainIdentifier,
3575        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3576        policy_config: Option<PolicyConfig>,
3577        firewall_config: Option<RemoteFirewallConfig>,
3578        pruner_watermarks: Arc<PrunerWatermarks>,
3579    ) -> Arc<Self> {
3580        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3581
3582        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3583        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3584        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3585            execution_cache_trait_pointers.object_cache_reader.clone(),
3586            execution_cache_trait_pointers.account_funds_read.clone(),
3587            execution_cache_trait_pointers
3588                .transaction_cache_reader
3589                .clone(),
3590            tx_ready_certificates,
3591            &epoch_store,
3592            metrics.clone(),
3593        ));
3594        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3595
3596        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3597            epoch_store.get_parent_path(),
3598            &config.authority_store_pruning_config,
3599        );
3600        let _pruner = AuthorityStorePruner::new(
3601            store.perpetual_tables.clone(),
3602            checkpoint_store.clone(),
3603            rpc_index.clone(),
3604            indexes.clone(),
3605            config.authority_store_pruning_config.clone(),
3606            epoch_store.committee().authority_exists(&name),
3607            epoch_store.epoch_start_state().epoch_duration_ms(),
3608            prometheus_registry,
3609            pruner_db,
3610            pruner_watermarks,
3611        );
3612        let input_loader =
3613            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3614        let epoch = epoch_store.epoch();
3615        let traffic_controller_metrics =
3616            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3617        let traffic_controller = if let Some(policy_config) = policy_config {
3618            Some(Arc::new(
3619                TrafficController::init(
3620                    policy_config,
3621                    traffic_controller_metrics,
3622                    firewall_config.clone(),
3623                )
3624                .await,
3625            ))
3626        } else {
3627            None
3628        };
3629
3630        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3631            ForkRecoveryState::new(Some(fork_config))
3632                .expect("Failed to initialize fork recovery state")
3633        });
3634
3635        let coin_reservation_resolver = Arc::new(CoinReservationResolver::new(
3636            execution_cache_trait_pointers.child_object_resolver.clone(),
3637        ));
3638
3639        let state = Arc::new(AuthorityState {
3640            name,
3641            secret,
3642            execution_lock: RwLock::new(epoch),
3643            epoch_store: ArcSwap::new(epoch_store.clone()),
3644            input_loader,
3645            execution_cache_trait_pointers,
3646            coin_reservation_resolver,
3647            indexes,
3648            rpc_index,
3649            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3650            checkpoint_store,
3651            committee_store,
3652            execution_scheduler,
3653            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3654            metrics,
3655            _pruner,
3656            _authority_per_epoch_pruner,
3657            db_checkpoint_config: db_checkpoint_config.clone(),
3658            config,
3659            overload_info: AuthorityOverloadInfo::default(),
3660            chain_identifier,
3661            congestion_tracker: Arc::new(CongestionTracker::new()),
3662            traffic_controller,
3663            fork_recovery_state,
3664            notify_epoch: tokio::sync::watch::channel(epoch).0,
3665        });
3666
3667        let state_clone = Arc::downgrade(&state);
3668        spawn_monitored_task!(fix_indexes(state_clone));
3669        // Start a task to execute ready certificates.
3670        let authority_state = Arc::downgrade(&state);
3671        spawn_monitored_task!(execution_process(
3672            authority_state,
3673            rx_ready_certificates,
3674            rx_execution_shutdown,
3675        ));
3676        // TODO: This doesn't belong to the constructor of AuthorityState.
3677        state
3678            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3679            .expect("Error indexing genesis objects.");
3680
3681        if epoch_store
3682            .protocol_config()
3683            .enable_multi_epoch_transaction_expiration()
3684            && epoch_store.epoch() > 0
3685        {
3686            use typed_store::Map;
3687            let previous_epoch = epoch_store.epoch() - 1;
3688            let start_key = (previous_epoch, TransactionDigest::ZERO);
3689            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3690            let has_previous_epoch_data = store
3691                .perpetual_tables
3692                .executed_transaction_digests
3693                .safe_range_iter(start_key..end_key)
3694                .next()
3695                .is_some();
3696
3697            if !has_previous_epoch_data {
3698                panic!(
3699                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3700                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3701                    Please restore from a snapshot using the latest version of sui-tool.",
3702                    previous_epoch
3703                );
3704            }
3705        }
3706
3707        state
3708    }
3709
3710    // TODO: Consolidate our traits to reduce the number of methods here.
3711    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3712        &self.execution_cache_trait_pointers.object_cache_reader
3713    }
3714
3715    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3716        &self.execution_cache_trait_pointers.transaction_cache_reader
3717    }
3718
3719    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3720        &self.execution_cache_trait_pointers.cache_writer
3721    }
3722
3723    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3724        &self.execution_cache_trait_pointers.backing_store
3725    }
3726
3727    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3728        &self.execution_cache_trait_pointers.child_object_resolver
3729    }
3730
3731    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3732        &self.execution_cache_trait_pointers.account_funds_read
3733    }
3734
3735    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3736        &self.execution_cache_trait_pointers.backing_package_store
3737    }
3738
3739    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3740        &self.execution_cache_trait_pointers.object_store
3741    }
3742
3743    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3744        &self.execution_cache_trait_pointers.reconfig_api
3745    }
3746
3747    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3748        &self.execution_cache_trait_pointers.global_state_hash_store
3749    }
3750
3751    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3752        &self.execution_cache_trait_pointers.checkpoint_cache
3753    }
3754
3755    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3756        &self.execution_cache_trait_pointers.state_sync_store
3757    }
3758
3759    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3760        &self.execution_cache_trait_pointers.cache_commit
3761    }
3762
3763    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3764        self.execution_cache_trait_pointers
3765            .testing_api
3766            .database_for_testing()
3767    }
3768
3769    pub fn cache_for_testing(&self) -> &WritebackCache {
3770        self.execution_cache_trait_pointers
3771            .testing_api
3772            .cache_for_testing()
3773    }
3774
3775    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3776        &self,
3777        config: NodeConfig,
3778        metrics: Arc<AuthorityStorePruningMetrics>,
3779    ) -> anyhow::Result<()> {
3780        use crate::authority::authority_store_pruner::PrunerWatermarks;
3781        let watermarks = Arc::new(PrunerWatermarks::default());
3782        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3783            &self.database_for_testing().perpetual_tables,
3784            &self.checkpoint_store,
3785            self.rpc_index.as_deref(),
3786            None,
3787            config.authority_store_pruning_config,
3788            metrics,
3789            EPOCH_DURATION_MS_FOR_TESTING,
3790            &watermarks,
3791        )
3792        .await
3793    }
3794
3795    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3796        &self.execution_scheduler
3797    }
3798
3799    fn create_owner_index_if_empty(
3800        &self,
3801        genesis_objects: &[Object],
3802        epoch_store: &Arc<AuthorityPerEpochStore>,
3803    ) -> SuiResult {
3804        let Some(index_store) = &self.indexes else {
3805            return Ok(());
3806        };
3807        if !index_store.is_empty() {
3808            return Ok(());
3809        }
3810
3811        let mut new_owners = vec![];
3812        let mut new_dynamic_fields = vec![];
3813        let mut layout_resolver = epoch_store
3814            .executor()
3815            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3816        for o in genesis_objects.iter() {
3817            match o.owner {
3818                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3819                    new_owners.push((
3820                        (addr, o.id()),
3821                        ObjectInfo::new(&o.compute_object_reference(), o),
3822                    ))
3823                }
3824                Owner::ObjectOwner(object_id) => {
3825                    let id = o.id();
3826                    let Some(info) = self.try_create_dynamic_field_info(
3827                        o,
3828                        &BTreeMap::new(),
3829                        layout_resolver.as_mut(),
3830                    )?
3831                    else {
3832                        continue;
3833                    };
3834                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3835                }
3836                _ => {}
3837            }
3838        }
3839
3840        index_store.insert_genesis_objects(ObjectIndexChanges {
3841            deleted_owners: vec![],
3842            deleted_dynamic_fields: vec![],
3843            new_owners,
3844            new_dynamic_fields,
3845        })
3846    }
3847
3848    /// Attempts to acquire execution lock for an executable transaction.
3849    /// Returns Some(lock) if the transaction is matching current executed epoch.
3850    /// Returns None if validator is halted at epoch end or epoch mismatch.
3851    pub fn execution_lock_for_executable_transaction(
3852        &self,
3853        transaction: &VerifiedExecutableTransaction,
3854    ) -> Option<ExecutionLockReadGuard<'_>> {
3855        let lock = self.execution_lock.try_read().ok()?;
3856        if *lock == transaction.auth_sig().epoch() {
3857            Some(lock)
3858        } else {
3859            // TODO: Can this still happen?
3860            None
3861        }
3862    }
3863
3864    /// Acquires the execution lock for the duration of a transaction signing request.
3865    /// This prevents reconfiguration from starting until we are finished handling the signing request.
3866    /// Otherwise, in-memory lock state could be cleared (by `ObjectLocks::clear_cached_locks`)
3867    /// while we are attempting to acquire locks for the transaction.
3868    pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3869        self.execution_lock
3870            .try_read()
3871            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3872    }
3873
3874    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3875        self.execution_lock.write().await
3876    }
3877
3878    #[instrument(level = "error", skip_all)]
3879    pub async fn reconfigure(
3880        &self,
3881        cur_epoch_store: &AuthorityPerEpochStore,
3882        supported_protocol_versions: SupportedProtocolVersions,
3883        new_committee: Committee,
3884        epoch_start_configuration: EpochStartConfiguration,
3885        state_hasher: Arc<GlobalStateHasher>,
3886        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3887        epoch_last_checkpoint: CheckpointSequenceNumber,
3888    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3889        Self::check_protocol_version(
3890            supported_protocol_versions,
3891            epoch_start_configuration
3892                .epoch_start_state()
3893                .protocol_version(),
3894        );
3895
3896        self.committee_store.insert_new_committee(&new_committee)?;
3897
3898        // Wait until no transactions are being executed.
3899        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3900
3901        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3902        cur_epoch_store.epoch_terminated().await;
3903
3904        // Safe to being reconfiguration now. No transactions are being executed,
3905        // and no epoch-specific tasks are running.
3906
3907        {
3908            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3909            if state.should_accept_user_certs() {
3910                // Need to change this so that consensus adapter do not accept certificates from user.
3911                // This can happen if our local validator did not initiate epoch change locally,
3912                // but 2f+1 nodes already concluded the epoch.
3913                //
3914                // This lock is essentially a barrier for
3915                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
3916                cur_epoch_store.close_user_certs(state);
3917            }
3918            // lock is dropped here
3919        }
3920
3921        self.get_reconfig_api()
3922            .clear_state_end_of_epoch(&execution_lock);
3923        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3924        self.maybe_reaccumulate_state_hash(
3925            cur_epoch_store,
3926            epoch_start_configuration
3927                .epoch_start_state()
3928                .protocol_version(),
3929        );
3930        self.get_reconfig_api()
3931            .set_epoch_start_configuration(&epoch_start_configuration);
3932        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3933            && self
3934                .db_checkpoint_config
3935                .perform_db_checkpoints_at_epoch_end
3936        {
3937            let checkpoint_indexes = self
3938                .db_checkpoint_config
3939                .perform_index_db_checkpoints_at_epoch_end
3940                .unwrap_or(false);
3941            let current_epoch = cur_epoch_store.epoch();
3942            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3943            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3944        }
3945
3946        self.get_reconfig_api()
3947            .reconfigure_cache(&epoch_start_configuration)
3948            .await;
3949
3950        let new_epoch = new_committee.epoch;
3951        let new_epoch_store = self
3952            .reopen_epoch_db(
3953                cur_epoch_store,
3954                new_committee,
3955                epoch_start_configuration,
3956                expensive_safety_check_config,
3957                epoch_last_checkpoint,
3958            )
3959            .await?;
3960        assert_eq!(new_epoch_store.epoch(), new_epoch);
3961        self.execution_scheduler
3962            .reconfigure(&new_epoch_store, self.get_account_funds_read());
3963        *execution_lock = new_epoch;
3964
3965        self.notify_epoch(new_epoch);
3966        // drop execution_lock after epoch store was updated
3967        // see also assert in AuthorityState::process_certificate
3968        // on the epoch store and execution lock epoch match
3969        Ok(new_epoch_store)
3970    }
3971
3972    fn notify_epoch(&self, new_epoch: EpochId) {
3973        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
3974    }
3975
3976    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
3977        let mut rx = self.notify_epoch.subscribe();
3978        loop {
3979            let epoch = *rx.borrow();
3980            if epoch >= target_epoch {
3981                return Ok(epoch);
3982            }
3983            rx.changed().await?;
3984        }
3985    }
3986
3987    pub async fn settle_accumulator_for_testing(&self, effects: &[TransactionEffects]) {
3988        let accumulator_version = self
3989            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3990            .await
3991            .unwrap()
3992            .version();
3993        // Use the current accumulator version as the checkpoint sequence number.
3994        // This is a convenient hack to keep the checkpoint version unique for each settlement and incrementing.
3995        let ckpt_seq = accumulator_version.value();
3996        let builder = AccumulatorSettlementTxBuilder::new(
3997            Some(self.get_transaction_cache_reader().as_ref()),
3998            effects,
3999            ckpt_seq,
4000            0,
4001        );
4002        let balance_changes = builder.collect_funds_changes();
4003        let epoch_store = self.epoch_store_for_testing();
4004        let epoch = epoch_store.epoch();
4005        let accumulator_root_obj_initial_shared_version = epoch_store
4006            .epoch_start_config()
4007            .accumulator_root_obj_initial_shared_version()
4008            .unwrap();
4009        let settlements = builder.build_tx(
4010            epoch_store.protocol_config(),
4011            epoch,
4012            accumulator_root_obj_initial_shared_version,
4013            ckpt_seq,
4014            ckpt_seq,
4015        );
4016
4017        let settlements: Vec<_> = settlements
4018            .into_iter()
4019            .map(|tx| {
4020                VerifiedExecutableTransaction::new_system(
4021                    VerifiedTransaction::new_system_transaction(tx),
4022                    epoch,
4023                )
4024            })
4025            .collect();
4026
4027        let assigned_versions = epoch_store
4028            .assign_shared_object_versions_for_tests(
4029                self.get_object_cache_reader().as_ref(),
4030                &settlements,
4031            )
4032            .unwrap();
4033        let version_map = assigned_versions.into_map();
4034
4035        let mut settlement_effects = Vec::with_capacity(settlements.len());
4036        for tx in settlements {
4037            let env = ExecutionEnv::new()
4038                .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
4039            let (effects, _) = self
4040                .try_execute_immediately(&tx, env, &epoch_store)
4041                .await
4042                .unwrap();
4043            assert!(effects.status().is_ok());
4044            settlement_effects.push(effects);
4045        }
4046
4047        let barrier = accumulators::build_accumulator_barrier_tx(
4048            epoch,
4049            accumulator_root_obj_initial_shared_version,
4050            ckpt_seq,
4051            &settlement_effects,
4052        );
4053        let barrier = VerifiedExecutableTransaction::new_system(
4054            VerifiedTransaction::new_system_transaction(barrier),
4055            epoch,
4056        );
4057
4058        let assigned_versions = epoch_store
4059            .assign_shared_object_versions_for_tests(
4060                self.get_object_cache_reader().as_ref(),
4061                std::slice::from_ref(&barrier),
4062            )
4063            .unwrap();
4064        let version_map = assigned_versions.into_map();
4065
4066        let env = ExecutionEnv::new()
4067            .with_assigned_versions(version_map.get(&barrier.key()).unwrap().clone());
4068        let (effects, _) = self
4069            .try_execute_immediately(&barrier, env, &epoch_store)
4070            .await
4071            .unwrap();
4072        assert!(effects.status().is_ok());
4073
4074        let next_accumulator_version = accumulator_version.next();
4075        self.execution_scheduler
4076            .settle_address_funds(FundsSettlement {
4077                funds_changes: balance_changes,
4078                next_accumulator_version,
4079            });
4080        // object funds are settled while executing the barrier transaction
4081    }
4082
4083    /// Advance the epoch store to the next epoch for testing only.
4084    /// This only manually sets all the places where we have the epoch number.
4085    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4086    pub async fn reconfigure_for_testing(&self) {
4087        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4088        let epoch_store = self.epoch_store_for_testing().clone();
4089        let protocol_config = epoch_store.protocol_config().clone();
4090        // The current protocol config used in the epoch store may have been overridden and diverged from
4091        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4092        // We reapply the override before creating the new epoch store, to make sure that
4093        // the new epoch store has the same protocol config as the current one.
4094        // Since this is for testing only, we mostly like to keep the protocol config the same
4095        // across epochs.
4096        let _guard =
4097            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4098        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4099            self.get_backing_package_store().clone(),
4100            self.get_object_store().clone(),
4101            &self.config.expensive_safety_check_config,
4102            self.checkpoint_store
4103                .get_epoch_last_checkpoint(epoch_store.epoch())
4104                .unwrap()
4105                .map(|c| *c.sequence_number())
4106                .unwrap_or_default(),
4107        );
4108        self.execution_scheduler
4109            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4110        let new_epoch = new_epoch_store.epoch();
4111        self.epoch_store.store(new_epoch_store);
4112        epoch_store.epoch_terminated().await;
4113
4114        *execution_lock = new_epoch;
4115    }
4116
4117    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4118    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4119    #[instrument(level = "error", skip_all)]
4120    fn maybe_reaccumulate_state_hash(
4121        &self,
4122        cur_epoch_store: &AuthorityPerEpochStore,
4123        new_protocol_version: ProtocolVersion,
4124    ) {
4125        self.get_reconfig_api()
4126            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4127    }
4128
4129    #[instrument(level = "error", skip_all)]
4130    fn check_system_consistency(
4131        &self,
4132        cur_epoch_store: &AuthorityPerEpochStore,
4133        state_hasher: Arc<GlobalStateHasher>,
4134        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4135    ) {
4136        info!(
4137            "Performing sui conservation consistency check for epoch {}",
4138            cur_epoch_store.epoch()
4139        );
4140
4141        if let Err(err) = self
4142            .get_reconfig_api()
4143            .expensive_check_sui_conservation(cur_epoch_store)
4144        {
4145            if cfg!(debug_assertions) {
4146                panic!("{}", err);
4147            } else {
4148                // We cannot panic in production yet because it is known that there are some
4149                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4150                warn!("Sui conservation consistency check failed: {}", err);
4151            }
4152        } else {
4153            info!("Sui conservation consistency check passed");
4154        }
4155
4156        // check for root state hash consistency with live object set
4157        if expensive_safety_check_config.enable_state_consistency_check() {
4158            info!(
4159                "Performing state consistency check for epoch {}",
4160                cur_epoch_store.epoch()
4161            );
4162            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4163        }
4164
4165        if expensive_safety_check_config.enable_secondary_index_checks()
4166            && let Some(indexes) = self.indexes.clone()
4167        {
4168            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4169                .expect("secondary indexes are inconsistent");
4170        }
4171    }
4172
4173    fn expensive_check_is_consistent_state(
4174        &self,
4175        state_hasher: Arc<GlobalStateHasher>,
4176        cur_epoch_store: &AuthorityPerEpochStore,
4177    ) {
4178        let live_object_set_hash = state_hasher.digest_live_object_set(
4179            !cur_epoch_store
4180                .protocol_config()
4181                .simplified_unwrap_then_delete(),
4182        );
4183
4184        let root_state_hash: ECMHLiveObjectSetDigest = self
4185            .get_global_state_hash_store()
4186            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4187            .expect("Retrieving root state hash cannot fail")
4188            .expect("Root state hash for epoch must exist")
4189            .1
4190            .digest()
4191            .into();
4192
4193        let is_inconsistent = root_state_hash != live_object_set_hash;
4194        if is_inconsistent {
4195            debug_fatal!(
4196                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4197                root_state_hash,
4198                live_object_set_hash
4199            );
4200        } else {
4201            info!("State consistency check passed");
4202        }
4203
4204        state_hasher.set_inconsistent_state(is_inconsistent);
4205    }
4206
4207    pub fn current_epoch_for_testing(&self) -> EpochId {
4208        self.epoch_store_for_testing().epoch()
4209    }
4210
4211    #[instrument(level = "error", skip_all)]
4212    pub fn checkpoint_all_dbs(
4213        &self,
4214        checkpoint_path: &Path,
4215        cur_epoch_store: &AuthorityPerEpochStore,
4216        checkpoint_indexes: bool,
4217    ) -> SuiResult {
4218        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4219        let current_epoch = cur_epoch_store.epoch();
4220
4221        if checkpoint_path.exists() {
4222            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4223            return Ok(());
4224        }
4225
4226        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4227        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4228
4229        if checkpoint_path_tmp.exists() {
4230            fs::remove_dir_all(&checkpoint_path_tmp)
4231                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4232        }
4233
4234        fs::create_dir_all(&checkpoint_path_tmp)
4235            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4236        fs::create_dir(&store_checkpoint_path_tmp)
4237            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4238
4239        // NOTE: Do not change the order of invoking these checkpoint calls
4240        // We want to snapshot checkpoint db first to not race with state sync
4241        self.checkpoint_store
4242            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4243
4244        self.get_reconfig_api()
4245            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4246
4247        self.committee_store
4248            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4249
4250        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4251            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4252        }
4253
4254        fs::rename(checkpoint_path_tmp, checkpoint_path)
4255            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4256        Ok(())
4257    }
4258
4259    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4260    /// we never end up accessing different epoch stores in a single task, we need to make sure
4261    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4262    /// the case. This also means we should minimize the number of call-sites. Only call it when
4263    /// there is no way to obtain it from somewhere else.
4264    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4265        self.epoch_store.load()
4266    }
4267
4268    // Load the epoch store, should be used in tests only.
4269    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4270        self.load_epoch_store_one_call_per_task()
4271    }
4272
4273    pub fn clone_committee_for_testing(&self) -> Committee {
4274        Committee::clone(self.epoch_store_for_testing().committee())
4275    }
4276
4277    #[instrument(level = "trace", skip_all)]
4278    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4279        self.get_object_store().get_object(object_id)
4280    }
4281
4282    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4283        Ok(self
4284            .get_object(&SUI_SYSTEM_ADDRESS.into())
4285            .await
4286            .expect("framework object should always exist")
4287            .compute_object_reference())
4288    }
4289
4290    // This function is only used for testing.
4291    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4292        self.get_object_cache_reader()
4293            .get_sui_system_state_object_unsafe()
4294    }
4295
4296    #[instrument(level = "trace", skip_all)]
4297    fn get_transaction_checkpoint_sequence(
4298        &self,
4299        digest: &TransactionDigest,
4300        epoch_store: &AuthorityPerEpochStore,
4301    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4302        epoch_store.get_transaction_checkpoint(digest)
4303    }
4304
4305    #[instrument(level = "trace", skip_all)]
4306    pub fn get_checkpoint_by_sequence_number(
4307        &self,
4308        sequence_number: CheckpointSequenceNumber,
4309    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4310        Ok(self
4311            .checkpoint_store
4312            .get_checkpoint_by_sequence_number(sequence_number)?)
4313    }
4314
4315    #[instrument(level = "trace", skip_all)]
4316    pub fn get_transaction_checkpoint_for_tests(
4317        &self,
4318        digest: &TransactionDigest,
4319        epoch_store: &AuthorityPerEpochStore,
4320    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4321        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4322        let Some(checkpoint) = checkpoint else {
4323            return Ok(None);
4324        };
4325        let checkpoint = self
4326            .checkpoint_store
4327            .get_checkpoint_by_sequence_number(checkpoint)?;
4328        Ok(checkpoint)
4329    }
4330
4331    #[instrument(level = "trace", skip_all)]
4332    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4333        Ok(
4334            match self
4335                .get_object_cache_reader()
4336                .get_latest_object_or_tombstone(*object_id)
4337            {
4338                Some((_, ObjectOrTombstone::Object(object))) => {
4339                    let layout = self.get_object_layout(&object)?;
4340                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4341                }
4342                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4343                None => ObjectRead::NotExists(*object_id),
4344            },
4345        )
4346    }
4347
4348    /// Chain Identifier is the digest of the genesis checkpoint.
4349    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4350        self.chain_identifier
4351    }
4352
4353    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4354        self.fork_recovery_state.as_ref()
4355    }
4356
4357    #[instrument(level = "trace", skip_all)]
4358    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4359    where
4360        T: DeserializeOwned,
4361    {
4362        let o = self.get_object_read(object_id)?.into_object()?;
4363        if let Some(move_object) = o.data.try_as_move() {
4364            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4365                SuiErrorKind::ObjectDeserializationError {
4366                    error: format!("{e}"),
4367                }
4368            })?)
4369        } else {
4370            Err(SuiErrorKind::ObjectDeserializationError {
4371                error: format!("Provided object : [{object_id}] is not a Move object."),
4372            }
4373            .into())
4374        }
4375    }
4376
4377    /// This function aims to serve rpc reads on past objects and
4378    /// we don't expect it to be called for other purposes.
4379    /// Depending on the object pruning policies that will be enforced in the
4380    /// future there is no software-level guarantee/SLA to retrieve an object
4381    /// with an old version even if it exists/existed.
4382    #[instrument(level = "trace", skip_all)]
4383    pub fn get_past_object_read(
4384        &self,
4385        object_id: &ObjectID,
4386        version: SequenceNumber,
4387    ) -> SuiResult<PastObjectRead> {
4388        // Firstly we see if the object ever existed by getting its latest data
4389        let Some(obj_ref) = self
4390            .get_object_cache_reader()
4391            .get_latest_object_ref_or_tombstone(*object_id)
4392        else {
4393            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4394        };
4395
4396        if version > obj_ref.1 {
4397            return Ok(PastObjectRead::VersionTooHigh {
4398                object_id: *object_id,
4399                asked_version: version,
4400                latest_version: obj_ref.1,
4401            });
4402        }
4403
4404        if version < obj_ref.1 {
4405            // Read past objects
4406            return Ok(match self.read_object_at_version(object_id, version)? {
4407                Some((object, layout)) => {
4408                    let obj_ref = object.compute_object_reference();
4409                    PastObjectRead::VersionFound(obj_ref, object, layout)
4410                }
4411
4412                None => PastObjectRead::VersionNotFound(*object_id, version),
4413            });
4414        }
4415
4416        if !obj_ref.2.is_alive() {
4417            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4418        }
4419
4420        match self.read_object_at_version(object_id, obj_ref.1)? {
4421            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4422            None => {
4423                debug_fatal!(
4424                    "Object with in parent_entry is missing from object store, datastore is \
4425                     inconsistent"
4426                );
4427                Err(UserInputError::ObjectNotFound {
4428                    object_id: *object_id,
4429                    version: Some(obj_ref.1),
4430                }
4431                .into())
4432            }
4433        }
4434    }
4435
4436    #[instrument(level = "trace", skip_all)]
4437    fn read_object_at_version(
4438        &self,
4439        object_id: &ObjectID,
4440        version: SequenceNumber,
4441    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4442        let Some(object) = self
4443            .get_object_cache_reader()
4444            .get_object_by_key(object_id, version)
4445        else {
4446            return Ok(None);
4447        };
4448
4449        let layout = self.get_object_layout(&object)?;
4450        Ok(Some((object, layout)))
4451    }
4452
4453    fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4454        let layout = object
4455            .data
4456            .try_as_move()
4457            .map(|object| {
4458                into_struct_layout(
4459                    self.load_epoch_store_one_call_per_task()
4460                        .executor()
4461                        // TODO(cache) - must read through cache
4462                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4463                        .get_annotated_layout(&object.type_().clone().into())?,
4464                )
4465            })
4466            .transpose()?;
4467        Ok(layout)
4468    }
4469
4470    fn get_owner_at_version(
4471        &self,
4472        object_id: &ObjectID,
4473        version: SequenceNumber,
4474    ) -> SuiResult<Owner> {
4475        self.get_object_store()
4476            .get_object_by_key(object_id, version)
4477            .ok_or_else(|| {
4478                SuiError::from(UserInputError::ObjectNotFound {
4479                    object_id: *object_id,
4480                    version: Some(version),
4481                })
4482            })
4483            .map(|o| o.owner.clone())
4484    }
4485
4486    #[instrument(level = "trace", skip_all)]
4487    pub fn get_owner_objects(
4488        &self,
4489        owner: SuiAddress,
4490        // If `Some`, the query will start from the next item after the specified cursor
4491        cursor: Option<ObjectID>,
4492        limit: usize,
4493        filter: Option<SuiObjectDataFilter>,
4494    ) -> SuiResult<Vec<ObjectInfo>> {
4495        if let Some(indexes) = &self.indexes {
4496            indexes.get_owner_objects(owner, cursor, limit, filter)
4497        } else {
4498            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4499        }
4500    }
4501
4502    #[instrument(level = "trace", skip_all)]
4503    pub fn get_owned_coins_iterator_with_cursor(
4504        &self,
4505        owner: SuiAddress,
4506        // If `Some`, the query will start from the next item after the specified cursor
4507        cursor: (String, u64, ObjectID),
4508        limit: usize,
4509        one_coin_type_only: bool,
4510    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4511        if let Some(indexes) = &self.indexes {
4512            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4513        } else {
4514            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4515        }
4516    }
4517
4518    #[instrument(level = "trace", skip_all)]
4519    pub fn get_owner_objects_iterator(
4520        &self,
4521        owner: SuiAddress,
4522        // If `Some`, the query will start from the next item after the specified cursor
4523        cursor: Option<ObjectID>,
4524        filter: Option<SuiObjectDataFilter>,
4525    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4526        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4527        if let Some(indexes) = &self.indexes {
4528            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4529        } else {
4530            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4531        }
4532    }
4533
4534    #[instrument(level = "trace", skip_all)]
4535    pub async fn get_move_objects<T>(
4536        &self,
4537        owner: SuiAddress,
4538        type_: MoveObjectType,
4539    ) -> SuiResult<Vec<T>>
4540    where
4541        T: DeserializeOwned,
4542    {
4543        let object_ids = self
4544            .get_owner_objects_iterator(owner, None, None)?
4545            .filter(|o| match &o.type_ {
4546                ObjectType::Struct(s) => &type_ == s,
4547                ObjectType::Package => false,
4548            })
4549            .map(|info| ObjectKey(info.object_id, info.version))
4550            .collect::<Vec<_>>();
4551        let mut move_objects = vec![];
4552
4553        let objects = self
4554            .get_object_store()
4555            .multi_get_objects_by_key(&object_ids);
4556
4557        for (o, id) in objects.into_iter().zip(object_ids) {
4558            let object = o.ok_or_else(|| {
4559                SuiError::from(UserInputError::ObjectNotFound {
4560                    object_id: id.0,
4561                    version: Some(id.1),
4562                })
4563            })?;
4564            let move_object = object.data.try_as_move().ok_or_else(|| {
4565                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4566            })?;
4567            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4568                SuiErrorKind::ObjectDeserializationError {
4569                    error: format!("{e}"),
4570                }
4571            })?);
4572        }
4573        Ok(move_objects)
4574    }
4575
4576    #[instrument(level = "trace", skip_all)]
4577    pub fn get_dynamic_fields(
4578        &self,
4579        owner: ObjectID,
4580        // If `Some`, the query will start from the next item after the specified cursor
4581        cursor: Option<ObjectID>,
4582        limit: usize,
4583    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4584        Ok(self
4585            .get_dynamic_fields_iterator(owner, cursor)?
4586            .take(limit)
4587            .collect::<Result<Vec<_>, _>>()?)
4588    }
4589
4590    fn get_dynamic_fields_iterator(
4591        &self,
4592        owner: ObjectID,
4593        // If `Some`, the query will start from the next item after the specified cursor
4594        cursor: Option<ObjectID>,
4595    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4596    {
4597        if let Some(indexes) = &self.indexes {
4598            indexes.get_dynamic_fields_iterator(owner, cursor)
4599        } else {
4600            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4601        }
4602    }
4603
4604    #[instrument(level = "trace", skip_all)]
4605    pub fn get_dynamic_field_object_id(
4606        &self,
4607        owner: ObjectID,
4608        name_type: TypeTag,
4609        name_bcs_bytes: &[u8],
4610    ) -> SuiResult<Option<ObjectID>> {
4611        if let Some(indexes) = &self.indexes {
4612            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4613        } else {
4614            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4615        }
4616    }
4617
4618    #[instrument(level = "trace", skip_all)]
4619    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4620        Ok(self.get_indexes()?.next_sequence_number())
4621    }
4622
4623    #[instrument(level = "trace", skip_all)]
4624    pub async fn get_executed_transaction_and_effects(
4625        &self,
4626        digest: TransactionDigest,
4627        kv_store: Arc<TransactionKeyValueStore>,
4628    ) -> SuiResult<(Transaction, TransactionEffects)> {
4629        let transaction = kv_store.get_tx(digest).await?;
4630        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4631        Ok((transaction, effects))
4632    }
4633
4634    #[instrument(level = "trace", skip_all)]
4635    pub fn multi_get_checkpoint_by_sequence_number(
4636        &self,
4637        sequence_numbers: &[CheckpointSequenceNumber],
4638    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4639        Ok(self
4640            .checkpoint_store
4641            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4642    }
4643
4644    #[instrument(level = "trace", skip_all)]
4645    pub fn get_transaction_events(
4646        &self,
4647        digest: &TransactionDigest,
4648    ) -> SuiResult<TransactionEvents> {
4649        self.get_transaction_cache_reader()
4650            .get_events(digest)
4651            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4652    }
4653
4654    pub fn get_transaction_input_objects(
4655        &self,
4656        effects: &TransactionEffects,
4657    ) -> SuiResult<Vec<Object>> {
4658        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4659            .map_err(Into::into)
4660    }
4661
4662    pub fn get_transaction_output_objects(
4663        &self,
4664        effects: &TransactionEffects,
4665    ) -> SuiResult<Vec<Object>> {
4666        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4667            .map_err(Into::into)
4668    }
4669
4670    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4671        match &self.indexes {
4672            Some(i) => Ok(i.clone()),
4673            None => Err(SuiErrorKind::UnsupportedFeatureError {
4674                error: "extended object indexing is not enabled on this server".into(),
4675            }
4676            .into()),
4677        }
4678    }
4679
4680    pub async fn get_transactions_for_tests(
4681        self: &Arc<Self>,
4682        filter: Option<TransactionFilter>,
4683        cursor: Option<TransactionDigest>,
4684        limit: Option<usize>,
4685        reverse: bool,
4686    ) -> SuiResult<Vec<TransactionDigest>> {
4687        let metrics = KeyValueStoreMetrics::new_for_tests();
4688        let kv_store = Arc::new(TransactionKeyValueStore::new(
4689            "rocksdb",
4690            metrics,
4691            self.clone(),
4692        ));
4693        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4694            .await
4695    }
4696
4697    #[instrument(level = "trace", skip_all)]
4698    pub async fn get_transactions(
4699        &self,
4700        kv_store: &Arc<TransactionKeyValueStore>,
4701        filter: Option<TransactionFilter>,
4702        // If `Some`, the query will start from the next item after the specified cursor
4703        cursor: Option<TransactionDigest>,
4704        limit: Option<usize>,
4705        reverse: bool,
4706    ) -> SuiResult<Vec<TransactionDigest>> {
4707        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4708            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4709            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4710            if reverse {
4711                let iter = iter
4712                    .rev()
4713                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4714                    .skip(usize::from(cursor.is_some()));
4715                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4716            } else {
4717                let iter = iter
4718                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4719                    .skip(usize::from(cursor.is_some()));
4720                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4721            }
4722        }
4723        self.get_indexes()?
4724            .get_transactions(filter, cursor, limit, reverse)
4725    }
4726
4727    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4728        &self.checkpoint_store
4729    }
4730
4731    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4732        self.get_checkpoint_store()
4733            .get_highest_executed_checkpoint_seq_number()?
4734            .ok_or(
4735                SuiErrorKind::UserInputError {
4736                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4737                }
4738                .into(),
4739            )
4740    }
4741
4742    #[cfg(msim)]
4743    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4744        self.database_for_testing()
4745            .perpetual_tables
4746            .get_highest_pruned_checkpoint()
4747            .map(|c| c.unwrap_or(0))
4748            .map_err(Into::into)
4749    }
4750
4751    #[instrument(level = "trace", skip_all)]
4752    pub fn get_checkpoint_summary_by_sequence_number(
4753        &self,
4754        sequence_number: CheckpointSequenceNumber,
4755    ) -> SuiResult<CheckpointSummary> {
4756        let verified_checkpoint = self
4757            .get_checkpoint_store()
4758            .get_checkpoint_by_sequence_number(sequence_number)?;
4759        match verified_checkpoint {
4760            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4761            None => Err(SuiErrorKind::UserInputError {
4762                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4763            }
4764            .into()),
4765        }
4766    }
4767
4768    #[instrument(level = "trace", skip_all)]
4769    pub fn get_checkpoint_summary_by_digest(
4770        &self,
4771        digest: CheckpointDigest,
4772    ) -> SuiResult<CheckpointSummary> {
4773        let verified_checkpoint = self
4774            .get_checkpoint_store()
4775            .get_checkpoint_by_digest(&digest)?;
4776        match verified_checkpoint {
4777            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4778            None => Err(SuiErrorKind::UserInputError {
4779                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4780            }
4781            .into()),
4782        }
4783    }
4784
4785    #[instrument(level = "trace", skip_all)]
4786    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4787        if is_system_package(package_id) {
4788            return self.find_genesis_txn_digest();
4789        }
4790        Ok(self
4791            .get_object_read(&package_id)?
4792            .into_object()?
4793            .previous_transaction)
4794    }
4795
4796    #[instrument(level = "trace", skip_all)]
4797    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4798        let summary = self
4799            .get_verified_checkpoint_by_sequence_number(0)?
4800            .into_message();
4801        let content = self.get_checkpoint_contents(summary.content_digest)?;
4802        let genesis_transaction = content.enumerate_transactions(&summary).next();
4803        Ok(genesis_transaction
4804            .ok_or(SuiErrorKind::UserInputError {
4805                error: UserInputError::GenesisTransactionNotFound,
4806            })?
4807            .1
4808            .transaction)
4809    }
4810
4811    #[instrument(level = "trace", skip_all)]
4812    pub fn get_verified_checkpoint_by_sequence_number(
4813        &self,
4814        sequence_number: CheckpointSequenceNumber,
4815    ) -> SuiResult<VerifiedCheckpoint> {
4816        let verified_checkpoint = self
4817            .get_checkpoint_store()
4818            .get_checkpoint_by_sequence_number(sequence_number)?;
4819        match verified_checkpoint {
4820            Some(verified_checkpoint) => Ok(verified_checkpoint),
4821            None => Err(SuiErrorKind::UserInputError {
4822                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4823            }
4824            .into()),
4825        }
4826    }
4827
4828    #[instrument(level = "trace", skip_all)]
4829    pub fn get_verified_checkpoint_summary_by_digest(
4830        &self,
4831        digest: CheckpointDigest,
4832    ) -> SuiResult<VerifiedCheckpoint> {
4833        let verified_checkpoint = self
4834            .get_checkpoint_store()
4835            .get_checkpoint_by_digest(&digest)?;
4836        match verified_checkpoint {
4837            Some(verified_checkpoint) => Ok(verified_checkpoint),
4838            None => Err(SuiErrorKind::UserInputError {
4839                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4840            }
4841            .into()),
4842        }
4843    }
4844
4845    #[instrument(level = "trace", skip_all)]
4846    pub fn get_checkpoint_contents(
4847        &self,
4848        digest: CheckpointContentsDigest,
4849    ) -> SuiResult<CheckpointContents> {
4850        self.get_checkpoint_store()
4851            .get_checkpoint_contents(&digest)?
4852            .ok_or(
4853                SuiErrorKind::UserInputError {
4854                    error: UserInputError::CheckpointContentsNotFound(digest),
4855                }
4856                .into(),
4857            )
4858    }
4859
4860    #[instrument(level = "trace", skip_all)]
4861    pub fn get_checkpoint_contents_by_sequence_number(
4862        &self,
4863        sequence_number: CheckpointSequenceNumber,
4864    ) -> SuiResult<CheckpointContents> {
4865        let verified_checkpoint = self
4866            .get_checkpoint_store()
4867            .get_checkpoint_by_sequence_number(sequence_number)?;
4868        match verified_checkpoint {
4869            Some(verified_checkpoint) => {
4870                let content_digest = verified_checkpoint.into_inner().content_digest;
4871                self.get_checkpoint_contents(content_digest)
4872            }
4873            None => Err(SuiErrorKind::UserInputError {
4874                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4875            }
4876            .into()),
4877        }
4878    }
4879
4880    #[instrument(level = "trace", skip_all)]
4881    pub async fn query_events(
4882        &self,
4883        kv_store: &Arc<TransactionKeyValueStore>,
4884        query: EventFilter,
4885        // If `Some`, the query will start from the next item after the specified cursor
4886        cursor: Option<EventID>,
4887        limit: usize,
4888        descending: bool,
4889    ) -> SuiResult<Vec<SuiEvent>> {
4890        let index_store = self.get_indexes()?;
4891
4892        //Get the tx_num from tx_digest
4893        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4894            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4895                SuiErrorKind::TransactionNotFound {
4896                    digest: cursor.tx_digest,
4897                },
4898            )?;
4899            (tx_seq, cursor.event_seq as usize)
4900        } else if descending {
4901            (u64::MAX, usize::MAX)
4902        } else {
4903            (0, 0)
4904        };
4905
4906        let limit = limit + 1;
4907        let mut event_keys = match query {
4908            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4909            EventFilter::Transaction(digest) => {
4910                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4911            }
4912            EventFilter::MoveModule { package, module } => {
4913                let module_id = ModuleId::new(package.into(), module);
4914                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4915            }
4916            EventFilter::MoveEventType(struct_name) => index_store
4917                .events_by_move_event_struct_name(
4918                    &struct_name,
4919                    tx_num,
4920                    event_num,
4921                    limit,
4922                    descending,
4923                )?,
4924            EventFilter::Sender(sender) => {
4925                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4926            }
4927            EventFilter::TimeRange {
4928                start_time,
4929                end_time,
4930            } => index_store
4931                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4932            EventFilter::MoveEventModule { package, module } => index_store
4933                .events_by_move_event_module(
4934                    &ModuleId::new(package.into(), module),
4935                    tx_num,
4936                    event_num,
4937                    limit,
4938                    descending,
4939                )?,
4940            // not using "_ =>" because we want to make sure we remember to add new variants here
4941            EventFilter::Any(_) => {
4942                return Err(SuiErrorKind::UserInputError {
4943                    error: UserInputError::Unsupported(
4944                        "'Any' queries are not supported by the fullnode.".to_string(),
4945                    ),
4946                }
4947                .into());
4948            }
4949        };
4950
4951        // skip one event if exclusive cursor is provided,
4952        // otherwise truncate to the original limit.
4953        if cursor.is_some() {
4954            if !event_keys.is_empty() {
4955                event_keys.remove(0);
4956            }
4957        } else {
4958            event_keys.truncate(limit - 1);
4959        }
4960
4961        // get the unique set of digests from the event_keys
4962        let transaction_digests = event_keys
4963            .iter()
4964            .map(|(_, digest, _, _)| *digest)
4965            .collect::<HashSet<_>>()
4966            .into_iter()
4967            .collect::<Vec<_>>();
4968
4969        let events = kv_store
4970            .multi_get_events_by_tx_digests(&transaction_digests)
4971            .await?;
4972
4973        let events_map: HashMap<_, _> =
4974            transaction_digests.iter().zip(events.into_iter()).collect();
4975
4976        let stored_events = event_keys
4977            .into_iter()
4978            .map(|k| {
4979                (
4980                    k,
4981                    events_map
4982                        .get(&k.1)
4983                        .expect("fetched digest is missing")
4984                        .clone()
4985                        .and_then(|e| e.data.get(k.2).cloned()),
4986                )
4987            })
4988            .map(
4989                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4990                    event
4991                        .map(|e| (e, tx_digest, event_seq, timestamp))
4992                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4993                },
4994            )
4995            .collect::<Result<Vec<_>, _>>()?;
4996
4997        let epoch_store = self.load_epoch_store_one_call_per_task();
4998        let backing_store = self.get_backing_package_store().as_ref();
4999        let mut layout_resolver = epoch_store
5000            .executor()
5001            .type_layout_resolver(Box::new(backing_store));
5002        let mut events = vec![];
5003        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5004            events.push(SuiEvent::try_from(
5005                e.clone(),
5006                tx_digest,
5007                event_seq as u64,
5008                Some(timestamp),
5009                layout_resolver.get_annotated_layout(&e.type_)?,
5010            )?)
5011        }
5012        Ok(events)
5013    }
5014
5015    pub async fn insert_genesis_object(&self, object: Object) {
5016        self.get_reconfig_api().insert_genesis_object(object);
5017    }
5018
5019    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5020        futures::future::join_all(
5021            objects
5022                .iter()
5023                .map(|o| self.insert_genesis_object(o.clone())),
5024        )
5025        .await;
5026    }
5027
5028    /// Make a status response for a transaction
5029    #[instrument(level = "trace", skip_all)]
5030    pub fn get_transaction_status(
5031        &self,
5032        transaction_digest: &TransactionDigest,
5033        epoch_store: &Arc<AuthorityPerEpochStore>,
5034    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5035        // TODO: In the case of read path, we should not have to re-sign the effects.
5036        if let Some(effects) =
5037            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5038        {
5039            if let Some(transaction) = self
5040                .get_transaction_cache_reader()
5041                .get_transaction_block(transaction_digest)
5042            {
5043                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5044                let events = if effects.events_digest().is_some() {
5045                    self.get_transaction_events(effects.transaction_digest())?
5046                } else {
5047                    TransactionEvents::default()
5048                };
5049                return Ok(Some((
5050                    (*transaction).clone().into_message(),
5051                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5052                )));
5053            } else {
5054                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5055                // the transaction (during epoch change) in between the above two reads, and we end up
5056                // having effects but not transaction. In this case, we just fall through.
5057                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5058            }
5059        }
5060        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5061            self.metrics.tx_already_processed.inc();
5062            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5063            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5064        } else {
5065            Ok(None)
5066        }
5067    }
5068
5069    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5070    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5071    /// epoch.
5072    #[instrument(level = "trace", skip_all)]
5073    pub fn get_signed_effects_and_maybe_resign(
5074        &self,
5075        transaction_digest: &TransactionDigest,
5076        epoch_store: &Arc<AuthorityPerEpochStore>,
5077    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5078        let effects = self
5079            .get_transaction_cache_reader()
5080            .get_executed_effects(transaction_digest);
5081        match effects {
5082            Some(effects) => {
5083                // If the transaction was executed in previous epochs, the validator will
5084                // re-sign the effects with new current epoch so that a client is always able to
5085                // obtain an effects certificate at the current epoch.
5086                //
5087                // Why is this necessary? Consider the following case:
5088                // - assume there are 4 validators
5089                // - Quorum driver gets 2 signed effects before reconfig halt
5090                // - The tx makes it into final checkpoint.
5091                // - 2 validators go away and are replaced in the new epoch.
5092                // - The new epoch begins.
5093                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5094                //   because it may not be able to reach either of the 2 former validators.
5095                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5096                //   epoch, the QD can make a new effects cert and return it to the client.
5097                //
5098                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5099                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5100                // the case above, the Quorum Driver would return a proof of inclusion in the final
5101                // checkpoint, and this code would no longer be necessary.
5102                if effects.executed_epoch() != epoch_store.epoch() {
5103                    debug!(
5104                        tx_digest=?transaction_digest,
5105                        effects_epoch=?effects.executed_epoch(),
5106                        epoch=?epoch_store.epoch(),
5107                        "Re-signing the effects with the current epoch"
5108                    );
5109                }
5110                Ok(Some(self.sign_effects(effects, epoch_store)?))
5111            }
5112            None => Ok(None),
5113        }
5114    }
5115
5116    #[instrument(level = "trace", skip_all)]
5117    pub(crate) fn sign_effects(
5118        &self,
5119        effects: TransactionEffects,
5120        epoch_store: &Arc<AuthorityPerEpochStore>,
5121    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5122        let tx_digest = *effects.transaction_digest();
5123        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5124            Some(sig) => {
5125                debug_assert!(sig.epoch == epoch_store.epoch());
5126                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5127            }
5128            _ => {
5129                let sig = AuthoritySignInfo::new(
5130                    epoch_store.epoch(),
5131                    &effects,
5132                    Intent::sui_app(IntentScope::TransactionEffects),
5133                    self.name,
5134                    &*self.secret,
5135                );
5136
5137                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5138
5139                epoch_store.insert_effects_digest_and_signature(
5140                    &tx_digest,
5141                    effects.digest(),
5142                    &sig,
5143                )?;
5144
5145                effects
5146            }
5147        };
5148
5149        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5150            signed_effects,
5151        ))
5152    }
5153
5154    // Returns coin objects for indexing for fullnode if indexing is enabled.
5155    #[instrument(level = "trace", skip_all)]
5156    fn fullnode_only_get_tx_coins_for_indexing(
5157        &self,
5158        effects: &TransactionEffects,
5159        inner_temporary_store: &InnerTemporaryStore,
5160        epoch_store: &Arc<AuthorityPerEpochStore>,
5161    ) -> Option<TxCoins> {
5162        if self.indexes.is_none() || self.is_validator(epoch_store) {
5163            return None;
5164        }
5165        let written_coin_objects = inner_temporary_store
5166            .written
5167            .iter()
5168            .filter_map(|(k, v)| {
5169                if v.is_coin() {
5170                    Some((*k, v.clone()))
5171                } else {
5172                    None
5173                }
5174            })
5175            .collect();
5176        let mut input_coin_objects = inner_temporary_store
5177            .input_objects
5178            .iter()
5179            .filter_map(|(k, v)| {
5180                if v.is_coin() {
5181                    Some((*k, v.clone()))
5182                } else {
5183                    None
5184                }
5185            })
5186            .collect::<ObjectMap>();
5187
5188        // Check for receiving objects that were actually used and modified during execution. Their
5189        // updated version will already showup in "written_coins" but their input isn't included in
5190        // the set of input objects in a inner_temporary_store.
5191        for (object_id, version) in effects.modified_at_versions() {
5192            if inner_temporary_store
5193                .loaded_runtime_objects
5194                .contains_key(&object_id)
5195                && let Some(object) = self
5196                    .get_object_store()
5197                    .get_object_by_key(&object_id, version)
5198                && object.is_coin()
5199            {
5200                input_coin_objects.insert(object_id, object);
5201            }
5202        }
5203
5204        Some((input_coin_objects, written_coin_objects))
5205    }
5206
5207    /// Get the TransactionEnvelope that currently locks the given object, if any.
5208    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5209    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5210    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5211    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5212    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5213    ///     or cannot find the transaction in transaction table, because of data race etc.
5214    #[instrument(level = "trace", skip_all)]
5215    pub async fn get_transaction_lock(
5216        &self,
5217        object_ref: &ObjectRef,
5218        epoch_store: &AuthorityPerEpochStore,
5219    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5220        let lock_info = self
5221            .get_object_cache_reader()
5222            .get_lock(*object_ref, epoch_store)?;
5223        let lock_info = match lock_info {
5224            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5225                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5226                    provided_obj_ref: *object_ref,
5227                    current_version: locked_ref.1,
5228                }
5229                .into());
5230            }
5231            ObjectLockStatus::Initialized => {
5232                return Ok(None);
5233            }
5234            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5235        };
5236
5237        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5238    }
5239
5240    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5241        self.get_object_cache_reader().get_objects(objects)
5242    }
5243
5244    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5245        self.get_object_cache_reader()
5246            .get_latest_object_ref_or_tombstone(object_id)
5247    }
5248
5249    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5250    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5251    ///
5252    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5253    /// will go through with only 2f+1 votes.
5254    ///
5255    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5256    /// value), or you risk halting the chain.
5257    pub fn set_override_protocol_upgrade_buffer_stake(
5258        &self,
5259        expected_epoch: EpochId,
5260        buffer_stake_bps: u64,
5261    ) -> SuiResult {
5262        let epoch_store = self.load_epoch_store_one_call_per_task();
5263        let actual_epoch = epoch_store.epoch();
5264        if actual_epoch != expected_epoch {
5265            return Err(SuiErrorKind::WrongEpoch {
5266                expected_epoch,
5267                actual_epoch,
5268            }
5269            .into());
5270        }
5271
5272        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5273    }
5274
5275    pub fn clear_override_protocol_upgrade_buffer_stake(
5276        &self,
5277        expected_epoch: EpochId,
5278    ) -> SuiResult {
5279        let epoch_store = self.load_epoch_store_one_call_per_task();
5280        let actual_epoch = epoch_store.epoch();
5281        if actual_epoch != expected_epoch {
5282            return Err(SuiErrorKind::WrongEpoch {
5283                expected_epoch,
5284                actual_epoch,
5285            }
5286            .into());
5287        }
5288
5289        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5290    }
5291
5292    /// Get the set of system packages that are compiled in to this build, if those packages are
5293    /// compatible with the current versions of those packages on-chain.
5294    pub async fn get_available_system_packages(
5295        &self,
5296        binary_config: &BinaryConfig,
5297    ) -> Vec<ObjectRef> {
5298        let mut results = vec![];
5299
5300        let system_packages = BuiltInFramework::iter_system_packages();
5301
5302        // Add extra framework packages during simtest
5303        #[cfg(msim)]
5304        let extra_packages = framework_injection::get_extra_packages(self.name);
5305        #[cfg(msim)]
5306        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5307
5308        for system_package in system_packages {
5309            let modules = system_package.modules().to_vec();
5310            // In simtests, we could override the current built-in framework packages.
5311            #[cfg(msim)]
5312            let modules =
5313                match framework_injection::get_override_modules(&system_package.id, self.name) {
5314                    Some(overrides) if overrides.is_empty() => continue,
5315                    Some(overrides) => overrides,
5316                    None => modules,
5317                };
5318
5319            let Some(obj_ref) = sui_framework::compare_system_package(
5320                &self.get_object_store(),
5321                &system_package.id,
5322                &modules,
5323                system_package.dependencies.to_vec(),
5324                binary_config,
5325            )
5326            .await
5327            else {
5328                return vec![];
5329            };
5330            results.push(obj_ref);
5331        }
5332
5333        results
5334    }
5335
5336    /// Return the new versions, module bytes, and dependencies for the packages that have been
5337    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5338    /// the binary, and performs the following checks:
5339    ///
5340    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5341    ///   required, and its contents are omitted from the output.
5342    /// - Whether the contents in the binary can form a package whose digest matches the input,
5343    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5344    ///   which case the contents are included in the output.
5345    ///
5346    /// If the current version of the framework can't be loaded, the binary does not contain the
5347    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5348    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5349    async fn get_system_package_bytes(
5350        &self,
5351        system_packages: Vec<ObjectRef>,
5352        binary_config: &BinaryConfig,
5353    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5354        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5355        let objects = self.get_objects(&ids).await;
5356
5357        let mut res = Vec::with_capacity(system_packages.len());
5358        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5359            let prev_transaction = match object {
5360                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5361                    // Skip this one because it doesn't need to be upgraded.
5362                    info!("Framework {} does not need updating", system_package_ref.0);
5363                    continue;
5364                }
5365
5366                Some(cur_object) => cur_object.previous_transaction,
5367                None => TransactionDigest::genesis_marker(),
5368            };
5369
5370            #[cfg(msim)]
5371            let SystemPackage {
5372                id: _,
5373                bytes,
5374                dependencies,
5375            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5376                .unwrap_or_else(|| {
5377                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5378                });
5379
5380            #[cfg(not(msim))]
5381            let SystemPackage {
5382                id: _,
5383                bytes,
5384                dependencies,
5385            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5386
5387            let modules: Vec<_> = bytes
5388                .iter()
5389                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5390                .collect();
5391
5392            let new_object = Object::new_system_package(
5393                &modules,
5394                system_package_ref.1,
5395                dependencies.clone(),
5396                prev_transaction,
5397            );
5398
5399            let new_ref = new_object.compute_object_reference();
5400            if new_ref != system_package_ref {
5401                if cfg!(msim) {
5402                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5403                    debug_fatal!(
5404                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5405                    );
5406                } else {
5407                    error!(
5408                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5409                    );
5410                }
5411                return None;
5412            }
5413
5414            res.push((system_package_ref.1, bytes, dependencies));
5415        }
5416
5417        Some(res)
5418    }
5419
5420    fn is_protocol_version_supported_v2(
5421        current_protocol_version: ProtocolVersion,
5422        proposed_protocol_version: ProtocolVersion,
5423        protocol_config: &ProtocolConfig,
5424        committee: &Committee,
5425        capabilities: Vec<AuthorityCapabilitiesV2>,
5426        mut buffer_stake_bps: u64,
5427    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5428        if proposed_protocol_version > current_protocol_version + 1
5429            && !protocol_config.advance_to_highest_supported_protocol_version()
5430        {
5431            return None;
5432        }
5433
5434        if buffer_stake_bps > 10000 {
5435            warn!("clamping buffer_stake_bps to 10000");
5436            buffer_stake_bps = 10000;
5437        }
5438
5439        // For each validator, gather the protocol version and system packages that it would like
5440        // to upgrade to in the next epoch.
5441        let mut desired_upgrades: Vec<_> = capabilities
5442            .into_iter()
5443            .filter_map(|mut cap| {
5444                // A validator that lists no packages is voting against any change at all.
5445                if cap.available_system_packages.is_empty() {
5446                    return None;
5447                }
5448
5449                cap.available_system_packages.sort();
5450
5451                info!(
5452                    "validator {:?} supports {:?} with system packages: {:?}",
5453                    cap.authority.concise(),
5454                    cap.supported_protocol_versions,
5455                    cap.available_system_packages,
5456                );
5457
5458                // A validator that only supports the current protocol version is also voting
5459                // against any change, because framework upgrades always require a protocol version
5460                // bump.
5461                cap.supported_protocol_versions
5462                    .get_version_digest(proposed_protocol_version)
5463                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5464            })
5465            .collect();
5466
5467        // There can only be one set of votes that have a majority, find one if it exists.
5468        desired_upgrades.sort();
5469        desired_upgrades
5470            .into_iter()
5471            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5472            .into_iter()
5473            .find_map(|((digest, packages), group)| {
5474                // should have been filtered out earlier.
5475                assert!(!packages.is_empty());
5476
5477                let mut stake_aggregator: StakeAggregator<(), true> =
5478                    StakeAggregator::new(Arc::new(committee.clone()));
5479
5480                for (_, _, authority) in group {
5481                    stake_aggregator.insert_generic(authority, ());
5482                }
5483
5484                let total_votes = stake_aggregator.total_votes();
5485                let quorum_threshold = committee.quorum_threshold();
5486                let f = committee.total_votes() - committee.quorum_threshold();
5487
5488                // multiple by buffer_stake_bps / 10000, rounded up.
5489                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5490                let effective_threshold = quorum_threshold + buffer_stake;
5491
5492                info!(
5493                    protocol_config_digest = ?digest,
5494                    ?total_votes,
5495                    ?quorum_threshold,
5496                    ?buffer_stake_bps,
5497                    ?effective_threshold,
5498                    ?proposed_protocol_version,
5499                    ?packages,
5500                    "support for upgrade"
5501                );
5502
5503                let has_support = total_votes >= effective_threshold;
5504                has_support.then_some((proposed_protocol_version, packages))
5505            })
5506    }
5507
5508    fn choose_protocol_version_and_system_packages_v2(
5509        current_protocol_version: ProtocolVersion,
5510        protocol_config: &ProtocolConfig,
5511        committee: &Committee,
5512        capabilities: Vec<AuthorityCapabilitiesV2>,
5513        buffer_stake_bps: u64,
5514    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5515        assert!(protocol_config.authority_capabilities_v2());
5516        let mut next_protocol_version = current_protocol_version;
5517        let mut system_packages = vec![];
5518
5519        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5520            current_protocol_version,
5521            next_protocol_version + 1,
5522            protocol_config,
5523            committee,
5524            capabilities.clone(),
5525            buffer_stake_bps,
5526        ) {
5527            next_protocol_version = version;
5528            system_packages = packages;
5529        }
5530
5531        (next_protocol_version, system_packages)
5532    }
5533
5534    #[instrument(level = "debug", skip_all)]
5535    fn create_authenticator_state_tx(
5536        &self,
5537        epoch_store: &Arc<AuthorityPerEpochStore>,
5538    ) -> Option<EndOfEpochTransactionKind> {
5539        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5540            info!("authenticator state transactions not enabled");
5541            return None;
5542        }
5543
5544        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5545        let tx = if authenticator_state_exists {
5546            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5547            let min_epoch =
5548                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5549            let authenticator_obj_initial_shared_version = epoch_store
5550                .epoch_start_config()
5551                .authenticator_obj_initial_shared_version()
5552                .expect("initial version must exist");
5553
5554            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5555                min_epoch,
5556                authenticator_obj_initial_shared_version,
5557            );
5558
5559            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5560
5561            tx
5562        } else {
5563            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5564            info!("Creating AuthenticatorStateCreate tx");
5565            tx
5566        };
5567        Some(tx)
5568    }
5569
5570    #[instrument(level = "debug", skip_all)]
5571    fn create_randomness_state_tx(
5572        &self,
5573        epoch_store: &Arc<AuthorityPerEpochStore>,
5574    ) -> Option<EndOfEpochTransactionKind> {
5575        if !epoch_store.protocol_config().random_beacon() {
5576            info!("randomness state transactions not enabled");
5577            return None;
5578        }
5579
5580        if epoch_store.randomness_state_exists() {
5581            return None;
5582        }
5583
5584        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5585        info!("Creating RandomnessStateCreate tx");
5586        Some(tx)
5587    }
5588
5589    #[instrument(level = "debug", skip_all)]
5590    fn create_accumulator_root_tx(
5591        &self,
5592        epoch_store: &Arc<AuthorityPerEpochStore>,
5593    ) -> Option<EndOfEpochTransactionKind> {
5594        if !epoch_store
5595            .protocol_config()
5596            .create_root_accumulator_object()
5597        {
5598            info!("accumulator root creation not enabled");
5599            return None;
5600        }
5601
5602        if epoch_store.accumulator_root_exists() {
5603            return None;
5604        }
5605
5606        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5607        info!("Creating AccumulatorRootCreate tx");
5608        Some(tx)
5609    }
5610
5611    #[instrument(level = "debug", skip_all)]
5612    fn create_write_accumulator_storage_cost_tx(
5613        &self,
5614        epoch_store: &Arc<AuthorityPerEpochStore>,
5615    ) -> Option<EndOfEpochTransactionKind> {
5616        if !epoch_store.accumulator_root_exists() {
5617            info!("accumulator root does not exist yet");
5618            return None;
5619        }
5620        if !epoch_store.protocol_config().enable_accumulators() {
5621            info!("accumulators not enabled");
5622            return None;
5623        }
5624
5625        let object_store = self.get_object_store();
5626        let object_count =
5627            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5628                Ok(Some(count)) => count,
5629                Ok(None) => return None,
5630                Err(e) => {
5631                    fatal!("failed to read accumulator object count: {e}");
5632                }
5633            };
5634
5635        let storage_cost = object_count.saturating_mul(
5636            epoch_store
5637                .protocol_config()
5638                .accumulator_object_storage_cost(),
5639        );
5640
5641        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5642        info!(
5643            object_count,
5644            storage_cost, "Creating WriteAccumulatorStorageCost tx"
5645        );
5646        Some(tx)
5647    }
5648
5649    #[instrument(level = "debug", skip_all)]
5650    fn create_coin_registry_tx(
5651        &self,
5652        epoch_store: &Arc<AuthorityPerEpochStore>,
5653    ) -> Option<EndOfEpochTransactionKind> {
5654        if !epoch_store.protocol_config().enable_coin_registry() {
5655            info!("coin registry not enabled");
5656            return None;
5657        }
5658
5659        if epoch_store.coin_registry_exists() {
5660            return None;
5661        }
5662
5663        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5664        info!("Creating CoinRegistryCreate tx");
5665        Some(tx)
5666    }
5667
5668    #[instrument(level = "debug", skip_all)]
5669    fn create_display_registry_tx(
5670        &self,
5671        epoch_store: &Arc<AuthorityPerEpochStore>,
5672    ) -> Option<EndOfEpochTransactionKind> {
5673        if !epoch_store.protocol_config().enable_display_registry() {
5674            info!("display registry not enabled");
5675            return None;
5676        }
5677
5678        if epoch_store.display_registry_exists() {
5679            return None;
5680        }
5681
5682        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5683        info!("Creating DisplayRegistryCreate tx");
5684        Some(tx)
5685    }
5686
5687    #[instrument(level = "debug", skip_all)]
5688    fn create_bridge_tx(
5689        &self,
5690        epoch_store: &Arc<AuthorityPerEpochStore>,
5691    ) -> Option<EndOfEpochTransactionKind> {
5692        if !epoch_store.protocol_config().enable_bridge() {
5693            info!("bridge not enabled");
5694            return None;
5695        }
5696        if epoch_store.bridge_exists() {
5697            return None;
5698        }
5699        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5700        info!("Creating Bridge Create tx");
5701        Some(tx)
5702    }
5703
5704    #[instrument(level = "debug", skip_all)]
5705    fn init_bridge_committee_tx(
5706        &self,
5707        epoch_store: &Arc<AuthorityPerEpochStore>,
5708    ) -> Option<EndOfEpochTransactionKind> {
5709        if !epoch_store.protocol_config().enable_bridge() {
5710            info!("bridge not enabled");
5711            return None;
5712        }
5713        if !epoch_store
5714            .protocol_config()
5715            .should_try_to_finalize_bridge_committee()
5716        {
5717            info!("should not try to finalize bridge committee yet");
5718            return None;
5719        }
5720        // Only create this transaction if bridge exists
5721        if !epoch_store.bridge_exists() {
5722            return None;
5723        }
5724
5725        if epoch_store.bridge_committee_initiated() {
5726            return None;
5727        }
5728
5729        let bridge_initial_shared_version = epoch_store
5730            .epoch_start_config()
5731            .bridge_obj_initial_shared_version()
5732            .expect("initial version must exist");
5733        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5734        info!("Init Bridge committee tx");
5735        Some(tx)
5736    }
5737
5738    #[instrument(level = "debug", skip_all)]
5739    fn create_deny_list_state_tx(
5740        &self,
5741        epoch_store: &Arc<AuthorityPerEpochStore>,
5742    ) -> Option<EndOfEpochTransactionKind> {
5743        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5744            return None;
5745        }
5746
5747        if epoch_store.coin_deny_list_state_exists() {
5748            return None;
5749        }
5750
5751        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5752        info!("Creating DenyListStateCreate tx");
5753        Some(tx)
5754    }
5755
5756    #[instrument(level = "debug", skip_all)]
5757    fn create_address_alias_state_tx(
5758        &self,
5759        epoch_store: &Arc<AuthorityPerEpochStore>,
5760    ) -> Option<EndOfEpochTransactionKind> {
5761        if !epoch_store.protocol_config().address_aliases() {
5762            info!("address aliases not enabled");
5763            return None;
5764        }
5765
5766        if epoch_store.address_alias_state_exists() {
5767            return None;
5768        }
5769
5770        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5771        info!("Creating AddressAliasStateCreate tx");
5772        Some(tx)
5773    }
5774
5775    #[instrument(level = "debug", skip_all)]
5776    fn create_execution_time_observations_tx(
5777        &self,
5778        epoch_store: &Arc<AuthorityPerEpochStore>,
5779        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5780        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5781    ) -> Option<EndOfEpochTransactionKind> {
5782        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5783            .protocol_config()
5784            .per_object_congestion_control_mode()
5785        else {
5786            return None;
5787        };
5788
5789        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5790        // execution time observations for commands in these checkpoints.
5791        let start_checkpoint = std::cmp::max(
5792            last_checkpoint_before_end_of_epoch
5793                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5794            // If we have <N checkpoints in the epoch, use all of them.
5795            epoch_store
5796                .epoch()
5797                .checked_sub(1)
5798                .map(|prev_epoch| {
5799                    self.checkpoint_store
5800                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
5801                        .expect("typed store must not fail")
5802                        .expect(
5803                            "sequence number of last checkpoint of preceding epoch must be saved",
5804                        )
5805                        + 1
5806                })
5807                .unwrap_or(1),
5808        );
5809        info!(
5810            "reading checkpoint range {:?}..={:?}",
5811            start_checkpoint, last_checkpoint_before_end_of_epoch
5812        );
5813        let sequence_numbers =
5814            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5815        let contents_digests: Vec<_> = self
5816            .checkpoint_store
5817            .multi_get_locally_computed_checkpoints(&sequence_numbers)
5818            .expect("typed store must not fail")
5819            .into_iter()
5820            .zip(sequence_numbers)
5821            .map(|(maybe_checkpoint, sequence_number)| {
5822                if let Some(checkpoint) = maybe_checkpoint {
5823                    checkpoint.content_digest
5824                } else {
5825                    // If locally computed checkpoint summary was already pruned, load the
5826                    // certified checkpoint.
5827                    self.checkpoint_store
5828                        .get_checkpoint_by_sequence_number(sequence_number)
5829                        .expect("typed store must not fail")
5830                        .unwrap_or_else(|| {
5831                            fatal!("preceding checkpoints must exist by end of epoch")
5832                        })
5833                        .data()
5834                        .content_digest
5835                }
5836            })
5837            .collect();
5838        let tx_digests: Vec<_> = self
5839            .checkpoint_store
5840            .multi_get_checkpoint_content(&contents_digests)
5841            .expect("typed store must not fail")
5842            .into_iter()
5843            .flat_map(|maybe_contents| {
5844                maybe_contents
5845                    .expect("preceding checkpoint contents must exist by end of epoch")
5846                    .into_inner()
5847                    .into_iter()
5848                    .map(|digests| digests.transaction)
5849            })
5850            .collect();
5851        let included_execution_time_observations: HashSet<_> = self
5852            .get_transaction_cache_reader()
5853            .multi_get_transaction_blocks(&tx_digests)
5854            .into_iter()
5855            .flat_map(|maybe_tx| {
5856                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5857                    .expect("preceding transaction must exist by end of epoch")
5858                    .transaction_data()
5859                    .kind()
5860                {
5861                    #[allow(clippy::unnecessary_to_owned)]
5862                    itertools::Either::Left(
5863                        ptb.commands
5864                            .to_owned()
5865                            .into_iter()
5866                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5867                    )
5868                } else {
5869                    itertools::Either::Right(std::iter::empty())
5870                }
5871            })
5872            .chain(end_of_epoch_observation_keys.into_iter())
5873            .collect();
5874
5875        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5876            epoch_store
5877                .get_end_of_epoch_execution_time_observations()
5878                .filter_and_sort_v1(
5879                    |(key, _)| included_execution_time_observations.contains(key),
5880                    params.stored_observations_limit.try_into().unwrap(),
5881                ),
5882        );
5883        info!("Creating StoreExecutionTimeObservations tx");
5884        Some(tx)
5885    }
5886
5887    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
5888    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
5889    /// formed and executed by CheckpointExecutor.
5890    ///
5891    /// When a framework upgraded has been decided on, but the validator does not have the new
5892    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
5893    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
5894    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
5895    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
5896    /// by CheckpointExecutor.
5897    #[instrument(level = "error", skip_all)]
5898    pub async fn create_and_execute_advance_epoch_tx(
5899        &self,
5900        epoch_store: &Arc<AuthorityPerEpochStore>,
5901        gas_cost_summary: &GasCostSummary,
5902        checkpoint: CheckpointSequenceNumber,
5903        epoch_start_timestamp_ms: CheckpointTimestamp,
5904        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5905        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
5906        // >1 checkpoint.
5907        last_checkpoint: CheckpointSequenceNumber,
5908    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5909        let mut txns = Vec::new();
5910
5911        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5912            txns.push(tx);
5913        }
5914        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5915            txns.push(tx);
5916        }
5917        if let Some(tx) = self.create_bridge_tx(epoch_store) {
5918            txns.push(tx);
5919        }
5920        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5921            txns.push(tx);
5922        }
5923        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5924            txns.push(tx);
5925        }
5926        if let Some(tx) = self.create_execution_time_observations_tx(
5927            epoch_store,
5928            end_of_epoch_observation_keys,
5929            last_checkpoint,
5930        ) {
5931            txns.push(tx);
5932        }
5933        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5934            txns.push(tx);
5935        }
5936
5937        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5938            txns.push(tx);
5939        }
5940        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5941            txns.push(tx);
5942        }
5943        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
5944            txns.push(tx);
5945        }
5946        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
5947            txns.push(tx);
5948        }
5949
5950        let next_epoch = epoch_store.epoch() + 1;
5951
5952        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5953
5954        let (next_epoch_protocol_version, next_epoch_system_packages) =
5955            Self::choose_protocol_version_and_system_packages_v2(
5956                epoch_store.protocol_version(),
5957                epoch_store.protocol_config(),
5958                epoch_store.committee(),
5959                epoch_store
5960                    .get_capabilities_v2()
5961                    .expect("read capabilities from db cannot fail"),
5962                buffer_stake_bps,
5963            );
5964
5965        // since system packages are created during the current epoch, they should abide by the
5966        // rules of the current epoch, including the current epoch's max Move binary format version
5967        let config = epoch_store.protocol_config();
5968        let binary_config = config.binary_config(None);
5969        let Some(next_epoch_system_package_bytes) = self
5970            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5971            .await
5972        else {
5973            if next_epoch_protocol_version <= ProtocolVersion::MAX {
5974                // This case should only be hit if the validator supports the new protocol version,
5975                // but carries a different framework. The validator should still be able to
5976                // reconfigure, as the correct framework will be installed by the change epoch txn.
5977                debug_fatal!(
5978                    "upgraded system packages {:?} are not locally available, cannot create \
5979                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
5980                    next_epoch_system_packages
5981                );
5982            } else {
5983                error!(
5984                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5985                    next_epoch_protocol_version
5986                );
5987            }
5988            // the checkpoint builder will keep retrying forever when it hits this error.
5989            // Eventually, one of two things will happen:
5990            // - The operator will upgrade this binary to one that has the new packages locally,
5991            //   and this function will succeed.
5992            // - The final checkpoint will be certified by other validators, we will receive it via
5993            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
5994            //   and most likely shut down in the new epoch (this validator likely doesn't support
5995            //   the new protocol version, or else it should have had the packages.)
5996            return Err(CheckpointBuilderError::SystemPackagesMissing);
5997        };
5998
5999        let tx = if epoch_store
6000            .protocol_config()
6001            .end_of_epoch_transaction_supported()
6002        {
6003            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6004                next_epoch,
6005                next_epoch_protocol_version,
6006                gas_cost_summary.storage_cost,
6007                gas_cost_summary.computation_cost,
6008                gas_cost_summary.storage_rebate,
6009                gas_cost_summary.non_refundable_storage_fee,
6010                epoch_start_timestamp_ms,
6011                next_epoch_system_package_bytes,
6012            ));
6013
6014            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6015        } else {
6016            VerifiedTransaction::new_change_epoch(
6017                next_epoch,
6018                next_epoch_protocol_version,
6019                gas_cost_summary.storage_cost,
6020                gas_cost_summary.computation_cost,
6021                gas_cost_summary.storage_rebate,
6022                gas_cost_summary.non_refundable_storage_fee,
6023                epoch_start_timestamp_ms,
6024                next_epoch_system_package_bytes,
6025            )
6026        };
6027
6028        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6029            tx.clone(),
6030            epoch_store.epoch(),
6031            checkpoint,
6032        );
6033
6034        let tx_digest = executable_tx.digest();
6035
6036        info!(
6037            ?next_epoch,
6038            ?next_epoch_protocol_version,
6039            ?next_epoch_system_packages,
6040            computation_cost=?gas_cost_summary.computation_cost,
6041            storage_cost=?gas_cost_summary.storage_cost,
6042            storage_rebate=?gas_cost_summary.storage_rebate,
6043            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6044            ?tx_digest,
6045            "Creating advance epoch transaction"
6046        );
6047
6048        fail_point_async!("change_epoch_tx_delay");
6049        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6050
6051        // The tx could have been executed by state sync already - if so simply return an error.
6052        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6053        if self
6054            .get_transaction_cache_reader()
6055            .is_tx_already_executed(tx_digest)
6056        {
6057            warn!("change epoch tx has already been executed via state sync");
6058            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6059        }
6060
6061        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6062        else {
6063            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6064        };
6065
6066        // We must manually assign the shared object versions to the transaction before executing it.
6067        // This is because we do not sequence end-of-epoch transactions through consensus.
6068        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6069            self.get_object_cache_reader().as_ref(),
6070            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6071        )?;
6072
6073        assert_eq!(assigned_versions.0.len(), 1);
6074        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6075
6076        let input_objects = self.read_objects_for_execution(
6077            &tx_lock,
6078            &executable_tx,
6079            &assigned_versions,
6080            epoch_store,
6081        )?;
6082
6083        let (transaction_outputs, _timings, _execution_error_opt) = self
6084            .execute_certificate(
6085                &execution_guard,
6086                &executable_tx,
6087                input_objects,
6088                None,
6089                ExecutionEnv::default(),
6090                epoch_store,
6091            )
6092            .unwrap();
6093        let system_obj = get_sui_system_state(&transaction_outputs.written)
6094            .expect("change epoch tx must write to system object");
6095
6096        let effects = transaction_outputs.effects;
6097        // We must write tx and effects to the state sync tables so that state sync is able to
6098        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6099        // checkpoint.
6100        self.get_state_sync_store()
6101            .insert_transaction_and_effects(&tx, &effects);
6102
6103        info!(
6104            "Effects summary of the change epoch transaction: {:?}",
6105            effects.summary_for_debug()
6106        );
6107        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6108        // The change epoch transaction cannot fail to execute.
6109        assert!(effects.status().is_ok());
6110        Ok((system_obj, effects))
6111    }
6112
6113    #[instrument(level = "error", skip_all)]
6114    async fn reopen_epoch_db(
6115        &self,
6116        cur_epoch_store: &AuthorityPerEpochStore,
6117        new_committee: Committee,
6118        epoch_start_configuration: EpochStartConfiguration,
6119        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6120        epoch_last_checkpoint: CheckpointSequenceNumber,
6121    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6122        let new_epoch = new_committee.epoch;
6123        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6124        assert_eq!(
6125            epoch_start_configuration.epoch_start_state().epoch(),
6126            new_committee.epoch
6127        );
6128        fail_point!("before-open-new-epoch-store");
6129        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6130            self.name,
6131            new_committee,
6132            epoch_start_configuration,
6133            self.get_backing_package_store().clone(),
6134            self.get_object_store().clone(),
6135            expensive_safety_check_config,
6136            epoch_last_checkpoint,
6137        )?;
6138        self.epoch_store.store(new_epoch_store.clone());
6139        Ok(new_epoch_store)
6140    }
6141
6142    #[cfg(test)]
6143    pub(crate) fn iter_live_object_set_for_testing(
6144        &self,
6145    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6146        let include_wrapped_object = !self
6147            .epoch_store_for_testing()
6148            .protocol_config()
6149            .simplified_unwrap_then_delete();
6150        self.get_global_state_hash_store()
6151            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6152    }
6153
6154    #[cfg(test)]
6155    pub(crate) fn shutdown_execution_for_test(&self) {
6156        self.tx_execution_shutdown
6157            .lock()
6158            .take()
6159            .unwrap()
6160            .send(())
6161            .unwrap();
6162    }
6163
6164    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6165    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6166        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6167        self.get_object_cache_reader()
6168            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6169        self.get_reconfig_api()
6170            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6171        Ok(())
6172    }
6173}
6174
6175pub struct RandomnessRoundReceiver {
6176    authority_state: Arc<AuthorityState>,
6177    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6178}
6179
6180impl RandomnessRoundReceiver {
6181    pub fn spawn(
6182        authority_state: Arc<AuthorityState>,
6183        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6184    ) -> JoinHandle<()> {
6185        let rrr = RandomnessRoundReceiver {
6186            authority_state,
6187            randomness_rx,
6188        };
6189        spawn_monitored_task!(rrr.run())
6190    }
6191
6192    async fn run(mut self) {
6193        info!("RandomnessRoundReceiver event loop started");
6194
6195        loop {
6196            tokio::select! {
6197                maybe_recv = self.randomness_rx.recv() => {
6198                    if let Some((epoch, round, bytes)) = maybe_recv {
6199                        self.handle_new_randomness(epoch, round, bytes).await;
6200                    } else {
6201                        break;
6202                    }
6203                },
6204            }
6205        }
6206
6207        info!("RandomnessRoundReceiver event loop ended");
6208    }
6209
6210    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6211    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6212        fail_point_async!("randomness-delay");
6213
6214        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6215        if epoch_store.epoch() != epoch {
6216            warn!(
6217                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6218                epoch_store.epoch()
6219            );
6220            return;
6221        }
6222        let key = TransactionKey::RandomnessRound(epoch, round);
6223        let transaction = VerifiedTransaction::new_randomness_state_update(
6224            epoch,
6225            round,
6226            bytes,
6227            epoch_store
6228                .epoch_start_config()
6229                .randomness_obj_initial_shared_version()
6230                .expect("randomness state obj must exist"),
6231        );
6232        debug!(
6233            "created randomness state update transaction with digest: {:?}",
6234            transaction.digest()
6235        );
6236        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6237        let digest = *transaction.digest();
6238
6239        // Randomness state updates contain the full bls signature for the random round,
6240        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6241        // persist this transaction. If we crash before its outputs are committed, this
6242        // ensures we will be able to re-execute it.
6243        self.authority_state
6244            .get_cache_commit()
6245            .persist_transaction(&transaction);
6246
6247        // Notify the scheduler that the transaction key now has a known digest
6248        if epoch_store.insert_tx_key(key, digest).is_err() {
6249            warn!("epoch ended while handling new randomness");
6250        }
6251
6252        let authority_state = self.authority_state.clone();
6253        spawn_monitored_task!(async move {
6254            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6255            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6256            // output of the RandomnessStateUpdate from the previous round.)
6257            //
6258            // We set a very long timeout so that in case this gets stuck for some reason, the
6259            // validator will eventually crash rather than continuing in a zombie mode.
6260            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6261            let result = tokio::time::timeout(
6262                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6263                authority_state
6264                    .get_transaction_cache_reader()
6265                    .notify_read_executed_effects(
6266                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6267                        &[digest],
6268                    ),
6269            )
6270            .await;
6271            let mut effects = match result {
6272                Ok(result) => result,
6273                Err(_) => {
6274                    if cfg!(debug_assertions) {
6275                        // Crash on randomness update execution timeout in debug builds.
6276                        panic!(
6277                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6278                        );
6279                    }
6280                    warn!(
6281                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6282                    );
6283                    // Continue waiting as long as necessary in non-debug builds.
6284                    authority_state
6285                        .get_transaction_cache_reader()
6286                        .notify_read_executed_effects(
6287                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6288                            &[digest],
6289                        )
6290                        .await
6291                }
6292            };
6293
6294            let effects = effects.pop().expect("should return effects");
6295            if *effects.status() != ExecutionStatus::Success {
6296                fatal!(
6297                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6298                );
6299            }
6300            debug!(
6301                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6302            );
6303        });
6304    }
6305}
6306
6307#[async_trait]
6308impl TransactionKeyValueStoreTrait for AuthorityState {
6309    #[instrument(skip(self))]
6310    async fn multi_get(
6311        &self,
6312        transactions: &[TransactionDigest],
6313        effects: &[TransactionDigest],
6314    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6315        let txns = if !transactions.is_empty() {
6316            self.get_transaction_cache_reader()
6317                .multi_get_transaction_blocks(transactions)
6318                .into_iter()
6319                .map(|t| t.map(|t| (*t).clone().into_inner()))
6320                .collect()
6321        } else {
6322            vec![]
6323        };
6324
6325        let fx = if !effects.is_empty() {
6326            self.get_transaction_cache_reader()
6327                .multi_get_executed_effects(effects)
6328        } else {
6329            vec![]
6330        };
6331
6332        Ok((txns, fx))
6333    }
6334
6335    #[instrument(skip(self))]
6336    async fn multi_get_checkpoints(
6337        &self,
6338        checkpoint_summaries: &[CheckpointSequenceNumber],
6339        checkpoint_contents: &[CheckpointSequenceNumber],
6340        checkpoint_summaries_by_digest: &[CheckpointDigest],
6341    ) -> SuiResult<(
6342        Vec<Option<CertifiedCheckpointSummary>>,
6343        Vec<Option<CheckpointContents>>,
6344        Vec<Option<CertifiedCheckpointSummary>>,
6345    )> {
6346        // TODO: use multi-get methods if it ever becomes important (unlikely)
6347        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6348        let store = self.get_checkpoint_store();
6349        for seq in checkpoint_summaries {
6350            let checkpoint = store
6351                .get_checkpoint_by_sequence_number(*seq)?
6352                .map(|c| c.into_inner());
6353
6354            summaries.push(checkpoint);
6355        }
6356
6357        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6358        for seq in checkpoint_contents {
6359            let checkpoint = store
6360                .get_checkpoint_by_sequence_number(*seq)?
6361                .and_then(|summary| {
6362                    store
6363                        .get_checkpoint_contents(&summary.content_digest)
6364                        .expect("db read cannot fail")
6365                });
6366            contents.push(checkpoint);
6367        }
6368
6369        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6370        for digest in checkpoint_summaries_by_digest {
6371            let checkpoint = store
6372                .get_checkpoint_by_digest(digest)?
6373                .map(|c| c.into_inner());
6374            summaries_by_digest.push(checkpoint);
6375        }
6376        Ok((summaries, contents, summaries_by_digest))
6377    }
6378
6379    #[instrument(skip(self))]
6380    async fn deprecated_get_transaction_checkpoint(
6381        &self,
6382        digest: TransactionDigest,
6383    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6384        Ok(self
6385            .get_checkpoint_cache()
6386            .deprecated_get_transaction_checkpoint(&digest)
6387            .map(|(_epoch, checkpoint)| checkpoint))
6388    }
6389
6390    #[instrument(skip(self))]
6391    async fn get_object(
6392        &self,
6393        object_id: ObjectID,
6394        version: VersionNumber,
6395    ) -> SuiResult<Option<Object>> {
6396        Ok(self
6397            .get_object_cache_reader()
6398            .get_object_by_key(&object_id, version))
6399    }
6400
6401    #[instrument(skip_all)]
6402    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6403        Ok(self
6404            .get_object_cache_reader()
6405            .multi_get_objects_by_key(object_keys))
6406    }
6407
6408    #[instrument(skip(self))]
6409    async fn multi_get_transaction_checkpoint(
6410        &self,
6411        digests: &[TransactionDigest],
6412    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6413        let res = self
6414            .get_checkpoint_cache()
6415            .deprecated_multi_get_transaction_checkpoint(digests);
6416
6417        Ok(res
6418            .into_iter()
6419            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6420            .collect())
6421    }
6422
6423    #[instrument(skip(self))]
6424    async fn multi_get_events_by_tx_digests(
6425        &self,
6426        digests: &[TransactionDigest],
6427    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6428        if digests.is_empty() {
6429            return Ok(vec![]);
6430        }
6431
6432        Ok(self
6433            .get_transaction_cache_reader()
6434            .multi_get_events(digests))
6435    }
6436}
6437
6438#[cfg(msim)]
6439pub mod framework_injection {
6440    use move_binary_format::CompiledModule;
6441    use std::collections::BTreeMap;
6442    use std::{cell::RefCell, collections::BTreeSet};
6443    use sui_framework::{BuiltInFramework, SystemPackage};
6444    use sui_types::base_types::{AuthorityName, ObjectID};
6445    use sui_types::is_system_package;
6446
6447    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6448
6449    // Thread local cache because all simtests run in a single unique thread.
6450    thread_local! {
6451        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6452    }
6453
6454    type Framework = Vec<CompiledModule>;
6455
6456    pub type PackageUpgradeCallback =
6457        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6458
6459    enum PackageOverrideConfig {
6460        Global(Framework),
6461        PerValidator(PackageUpgradeCallback),
6462    }
6463
6464    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6465        modules
6466            .iter()
6467            .map(|m| {
6468                let mut buf = Vec::new();
6469                m.serialize_with_version(m.version, &mut buf).unwrap();
6470                buf
6471            })
6472            .collect()
6473    }
6474
6475    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6476        OVERRIDE.with(|bs| {
6477            bs.borrow_mut()
6478                .insert(package_id, PackageOverrideConfig::Global(modules))
6479        });
6480    }
6481
6482    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6483        OVERRIDE.with(|bs| {
6484            bs.borrow_mut()
6485                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6486        });
6487    }
6488
6489    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6490        OVERRIDE.with(|bs| {
6491            let mut new_packages_not_to_include: BTreeSet<_> =
6492                BuiltInFramework::all_package_ids().into_iter().collect();
6493            for pkg in &packages {
6494                new_packages_not_to_include.remove(&pkg.id);
6495            }
6496            for pkg in packages {
6497                bs.borrow_mut()
6498                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6499            }
6500            for empty_pkg in new_packages_not_to_include {
6501                bs.borrow_mut()
6502                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6503            }
6504        });
6505    }
6506
6507    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6508        OVERRIDE.with(|cfg| {
6509            cfg.borrow().get(package_id).and_then(|entry| match entry {
6510                PackageOverrideConfig::Global(framework) => {
6511                    Some(compiled_modules_to_bytes(framework))
6512                }
6513                PackageOverrideConfig::PerValidator(func) => {
6514                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6515                }
6516            })
6517        })
6518    }
6519
6520    pub fn get_override_modules(
6521        package_id: &ObjectID,
6522        name: AuthorityName,
6523    ) -> Option<Vec<CompiledModule>> {
6524        OVERRIDE.with(|cfg| {
6525            cfg.borrow().get(package_id).and_then(|entry| match entry {
6526                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6527                PackageOverrideConfig::PerValidator(func) => func(name),
6528            })
6529        })
6530    }
6531
6532    pub fn get_override_system_package(
6533        package_id: &ObjectID,
6534        name: AuthorityName,
6535    ) -> Option<SystemPackage> {
6536        let bytes = get_override_bytes(package_id, name)?;
6537        let dependencies = if is_system_package(*package_id) {
6538            BuiltInFramework::get_package_by_id(package_id)
6539                .dependencies
6540                .to_vec()
6541        } else {
6542            // Assume that entirely new injected packages depend on all existing system packages.
6543            BuiltInFramework::all_package_ids()
6544        };
6545        Some(SystemPackage {
6546            id: *package_id,
6547            bytes,
6548            dependencies,
6549        })
6550    }
6551
6552    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6553        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6554        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6555            cfg.borrow()
6556                .keys()
6557                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6558                .collect()
6559        });
6560
6561        extra
6562            .into_iter()
6563            .map(|package| SystemPackage {
6564                id: package,
6565                bytes: get_override_bytes(&package, name).unwrap(),
6566                dependencies: BuiltInFramework::all_package_ids(),
6567            })
6568            .collect()
6569    }
6570}
6571
6572#[derive(Debug, Serialize, Deserialize, Clone)]
6573pub struct ObjDumpFormat {
6574    pub id: ObjectID,
6575    pub version: VersionNumber,
6576    pub digest: ObjectDigest,
6577    pub object: Object,
6578}
6579
6580impl ObjDumpFormat {
6581    fn new(object: Object) -> Self {
6582        let oref = object.compute_object_reference();
6583        Self {
6584            id: oref.0,
6585            version: oref.1,
6586            digest: oref.2,
6587            object,
6588        }
6589    }
6590}
6591
6592#[derive(Debug, Serialize, Deserialize, Clone)]
6593pub struct NodeStateDump {
6594    pub tx_digest: TransactionDigest,
6595    pub sender_signed_data: SenderSignedData,
6596    pub executed_epoch: u64,
6597    pub reference_gas_price: u64,
6598    pub protocol_version: u64,
6599    pub epoch_start_timestamp_ms: u64,
6600    pub computed_effects: TransactionEffects,
6601    pub expected_effects_digest: TransactionEffectsDigest,
6602    pub relevant_system_packages: Vec<ObjDumpFormat>,
6603    pub shared_objects: Vec<ObjDumpFormat>,
6604    pub loaded_child_objects: Vec<ObjDumpFormat>,
6605    pub modified_at_versions: Vec<ObjDumpFormat>,
6606    pub runtime_reads: Vec<ObjDumpFormat>,
6607    pub input_objects: Vec<ObjDumpFormat>,
6608}
6609
6610impl NodeStateDump {
6611    pub fn new(
6612        tx_digest: &TransactionDigest,
6613        effects: &TransactionEffects,
6614        expected_effects_digest: TransactionEffectsDigest,
6615        object_store: &dyn ObjectStore,
6616        epoch_store: &Arc<AuthorityPerEpochStore>,
6617        inner_temporary_store: &InnerTemporaryStore,
6618        certificate: &VerifiedExecutableTransaction,
6619    ) -> SuiResult<Self> {
6620        // Epoch info
6621        let executed_epoch = epoch_store.epoch();
6622        let reference_gas_price = epoch_store.reference_gas_price();
6623        let epoch_start_config = epoch_store.epoch_start_config();
6624        let protocol_version = epoch_store.protocol_version().as_u64();
6625        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6626
6627        // Record all system packages at this version
6628        let mut relevant_system_packages = Vec::new();
6629        for sys_package_id in BuiltInFramework::all_package_ids() {
6630            if let Some(w) = object_store.get_object(&sys_package_id) {
6631                relevant_system_packages.push(ObjDumpFormat::new(w))
6632            }
6633        }
6634
6635        // Record all the shared objects
6636        let mut shared_objects = Vec::new();
6637        for kind in effects.input_consensus_objects() {
6638            match kind {
6639                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6640                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6641                        shared_objects.push(ObjDumpFormat::new(w))
6642                    }
6643                }
6644                InputConsensusObject::ReadConsensusStreamEnded(..)
6645                | InputConsensusObject::MutateConsensusStreamEnded(..)
6646                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6647            }
6648        }
6649
6650        // Record all loaded child objects
6651        // Child objects which are read but not mutated are not tracked anywhere else
6652        let mut loaded_child_objects = Vec::new();
6653        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6654            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6655                loaded_child_objects.push(ObjDumpFormat::new(w))
6656            }
6657        }
6658
6659        // Record all modified objects
6660        let mut modified_at_versions = Vec::new();
6661        for (id, ver) in effects.modified_at_versions() {
6662            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6663                modified_at_versions.push(ObjDumpFormat::new(w))
6664            }
6665        }
6666
6667        // Packages read at runtime, which were not previously loaded into the temoorary store
6668        // Some packages may be fetched at runtime and wont show up in input objects
6669        let mut runtime_reads = Vec::new();
6670        for obj in inner_temporary_store
6671            .runtime_packages_loaded_from_db
6672            .values()
6673        {
6674            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6675        }
6676
6677        // All other input objects should already be in `inner_temporary_store.objects`
6678
6679        Ok(Self {
6680            tx_digest: *tx_digest,
6681            executed_epoch,
6682            reference_gas_price,
6683            epoch_start_timestamp_ms,
6684            protocol_version,
6685            relevant_system_packages,
6686            shared_objects,
6687            loaded_child_objects,
6688            modified_at_versions,
6689            runtime_reads,
6690            sender_signed_data: certificate.clone().into_message(),
6691            input_objects: inner_temporary_store
6692                .input_objects
6693                .values()
6694                .map(|o| ObjDumpFormat::new(o.clone()))
6695                .collect(),
6696            computed_effects: effects.clone(),
6697            expected_effects_digest,
6698        })
6699    }
6700
6701    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6702        let mut objects = Vec::new();
6703        objects.extend(self.relevant_system_packages.clone());
6704        objects.extend(self.shared_objects.clone());
6705        objects.extend(self.loaded_child_objects.clone());
6706        objects.extend(self.modified_at_versions.clone());
6707        objects.extend(self.runtime_reads.clone());
6708        objects.extend(self.input_objects.clone());
6709        objects
6710    }
6711
6712    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6713        let file_name = format!(
6714            "{}_{}_NODE_DUMP.json",
6715            self.tx_digest,
6716            AuthorityState::unixtime_now_ms()
6717        );
6718        let mut path = path.to_path_buf();
6719        path.push(&file_name);
6720        let mut file = File::create(path.clone())?;
6721        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6722        Ok(path)
6723    }
6724
6725    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6726        let file = File::open(path)?;
6727        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6728    }
6729}