sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::AccumulatorSettlementTxBuilder;
6use crate::accumulators::balance_read::AccountBalanceRead;
7use crate::checkpoints::CheckpointBuilderError;
8use crate::checkpoints::CheckpointBuilderResult;
9use crate::congestion_tracker::CongestionTracker;
10use crate::consensus_adapter::ConsensusOverloadChecker;
11use crate::execution_cache::ExecutionCacheTraitPointers;
12use crate::execution_cache::TransactionCacheRead;
13use crate::execution_scheduler::ExecutionScheduler;
14use crate::execution_scheduler::SchedulingSource;
15use crate::jsonrpc_index::CoinIndexKey2;
16use crate::rpc_index::RpcIndexStore;
17use crate::traffic_controller::TrafficController;
18use crate::traffic_controller::metrics::TrafficControllerMetrics;
19use crate::transaction_outputs::TransactionOutputs;
20use crate::verify_indexes::{fix_indexes, verify_indexes};
21use arc_swap::{ArcSwap, Guard};
22use async_trait::async_trait;
23use authority_per_epoch_store::CertLockGuard;
24use fastcrypto::encoding::Base58;
25use fastcrypto::encoding::Encoding;
26use fastcrypto::hash::MultisetHash;
27use itertools::Itertools;
28use move_binary_format::CompiledModule;
29use move_binary_format::binary_config::BinaryConfig;
30use move_core_types::annotated_value::MoveStructLayout;
31use move_core_types::language_storage::ModuleId;
32use mysten_common::fatal;
33use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
34use parking_lot::Mutex;
35use prometheus::{
36    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37    register_histogram_vec_with_registry, register_histogram_with_registry,
38    register_int_counter_vec_with_registry, register_int_counter_with_registry,
39    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
40};
41use serde::de::DeserializeOwned;
42use serde::{Deserialize, Serialize};
43use shared_object_version_manager::AssignedVersions;
44use shared_object_version_manager::Schedulable;
45use std::collections::BTreeMap;
46use std::collections::BTreeSet;
47use std::fs::File;
48use std::io::Write;
49use std::path::{Path, PathBuf};
50use std::sync::atomic::Ordering;
51use std::time::Duration;
52use std::time::Instant;
53use std::time::SystemTime;
54use std::time::UNIX_EPOCH;
55use std::{
56    collections::{HashMap, HashSet},
57    fs,
58    pin::Pin,
59    str::FromStr,
60    sync::Arc,
61    vec,
62};
63use sui_config::NodeConfig;
64use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
65use sui_protocol_config::PerObjectCongestionControlMode;
66use sui_types::crypto::RandomnessRound;
67use sui_types::dynamic_field::visitor as DFV;
68use sui_types::execution::ExecutionOutput;
69use sui_types::execution::ExecutionTimeObservationKey;
70use sui_types::execution::ExecutionTiming;
71use sui_types::execution_params::BalanceWithdrawStatus;
72use sui_types::execution_params::ExecutionOrEarlyError;
73use sui_types::execution_params::get_early_execution_error;
74use sui_types::execution_status::ExecutionStatus;
75use sui_types::inner_temporary_store::PackageStoreWithFallback;
76use sui_types::layout_resolver::LayoutResolver;
77use sui_types::layout_resolver::into_struct_layout;
78use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
79use sui_types::object::bounded_visitor::BoundedVisitor;
80use sui_types::storage::ChildObjectResolver;
81use sui_types::storage::InputKey;
82use sui_types::storage::TrackingBackingStore;
83use sui_types::traffic_control::{
84    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
85};
86use sui_types::transaction_executor::SimulateTransactionResult;
87use sui_types::transaction_executor::TransactionChecks;
88use tap::TapFallible;
89use tokio::sync::RwLock;
90use tokio::sync::mpsc::unbounded_channel;
91use tokio::sync::{mpsc, oneshot};
92use tokio::task::JoinHandle;
93use tokio::time::timeout;
94use tracing::trace;
95use tracing::{debug, error, info, instrument, warn};
96
97use self::authority_store::ExecutionLockWriteGuard;
98use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
99pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
100use mysten_metrics::{monitored_scope, spawn_monitored_task};
101
102use crate::jsonrpc_index::IndexStore;
103use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
104use mysten_common::debug_fatal;
105use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
106use sui_config::genesis::Genesis;
107use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
108use sui_framework::{BuiltInFramework, SystemPackage};
109use sui_json_rpc_types::{
110    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
111    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
112    SuiTransactionBlockEvents, TransactionFilter,
113};
114use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
115use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
116use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
117use sui_types::authenticator_state::get_authenticator_state;
118use sui_types::committee::{EpochId, ProtocolVersion};
119use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
120use sui_types::deny_list_v1::check_coin_deny_list_v1;
121use sui_types::digests::ChainIdentifier;
122use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
123use sui_types::effects::{
124    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
125    TransactionEvents, VerifiedSignedTransactionEffects,
126};
127use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
128use sui_types::event::{Event, EventID};
129use sui_types::executable_transaction::VerifiedExecutableTransaction;
130use sui_types::gas::{GasCostSummary, SuiGasStatus};
131use sui_types::inner_temporary_store::{
132    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
133};
134use sui_types::message_envelope::Message;
135use sui_types::messages_checkpoint::{
136    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
137    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
138    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
139    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
140};
141use sui_types::messages_grpc::{
142    HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind,
143    ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
144};
145use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
146use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
147use sui_types::storage::{
148    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
149};
150use sui_types::sui_system_state::SuiSystemStateTrait;
151use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
152use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
153use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
154use sui_types::{
155    SUI_SYSTEM_ADDRESS,
156    base_types::*,
157    committee::Committee,
158    crypto::AuthoritySignature,
159    error::{SuiError, SuiResult},
160    object::{Object, ObjectRead},
161    transaction::*,
162};
163use sui_types::{TypeTag, is_system_package};
164use typed_store::TypedStoreError;
165
166use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
167use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
168use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
169use crate::authority::authority_store_pruner::{
170    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
171};
172use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
173use crate::authority::epoch_start_configuration::EpochStartConfiguration;
174use crate::checkpoints::CheckpointStore;
175use crate::epoch::committee_store::CommitteeStore;
176use crate::execution_cache::{
177    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
178    ObjectCacheRead, StateSyncAPI,
179};
180use crate::execution_driver::execution_process;
181use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
182use crate::metrics::LatencyObserver;
183use crate::metrics::RateTracker;
184use crate::module_cache_metrics::ResolverMetrics;
185use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
186use crate::stake_aggregator::StakeAggregator;
187use crate::subscription_handler::SubscriptionHandler;
188use crate::transaction_input_loader::TransactionInputLoader;
189
190#[cfg(msim)]
191pub use crate::checkpoints::checkpoint_executor::utils::{
192    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
193};
194
195use crate::authority::authority_store_tables::AuthorityPrunerTables;
196use crate::authority_client::NetworkAuthorityClient;
197use crate::validator_tx_finalizer::ValidatorTxFinalizer;
198#[cfg(msim)]
199use sui_types::committee::CommitteeTrait;
200use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
201
202#[cfg(test)]
203#[path = "unit_tests/authority_tests.rs"]
204pub mod authority_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/transaction_tests.rs"]
208pub mod transaction_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/batch_transaction_tests.rs"]
212mod batch_transaction_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/move_integration_tests.rs"]
216pub mod move_integration_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/gas_tests.rs"]
220mod gas_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_verification_tests.rs"]
224mod batch_verification_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/coin_deny_list_tests.rs"]
228mod coin_deny_list_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
232mod mysticeti_fastpath_execution_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/auth_unit_test_utils.rs"]
236pub mod auth_unit_test_utils;
237
238pub mod authority_test_utils;
239
240pub mod authority_per_epoch_store;
241pub mod authority_per_epoch_store_pruner;
242
243pub mod authority_store_pruner;
244pub mod authority_store_tables;
245pub mod authority_store_types;
246pub mod consensus_tx_status_cache;
247pub mod epoch_start_configuration;
248pub mod execution_time_estimator;
249pub mod shared_object_congestion_tracker;
250pub mod shared_object_version_manager;
251pub mod submitted_transaction_cache;
252pub mod test_authority_builder;
253pub mod transaction_deferral;
254pub mod transaction_reject_reason_cache;
255mod weighted_moving_average;
256
257pub(crate) mod authority_store;
258pub mod backpressure;
259
260/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
261pub struct AuthorityMetrics {
262    tx_orders: IntCounter,
263    total_certs: IntCounter,
264    total_cert_attempts: IntCounter,
265    total_effects: IntCounter,
266    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
267    pub shared_obj_tx: IntCounter,
268    sponsored_tx: IntCounter,
269    tx_already_processed: IntCounter,
270    num_input_objs: Histogram,
271    // TODO: this tracks consensus object count, not just shared. Consider renaming.
272    num_shared_objects: Histogram,
273    batch_size: Histogram,
274
275    authority_state_handle_transaction_latency: Histogram,
276    authority_state_handle_vote_transaction_latency: Histogram,
277
278    execute_certificate_latency_single_writer: Histogram,
279    execute_certificate_latency_shared_object: Histogram,
280    await_transaction_latency: Histogram,
281
282    internal_execution_latency: Histogram,
283    execution_load_input_objects_latency: Histogram,
284    prepare_certificate_latency: Histogram,
285    commit_certificate_latency: Histogram,
286    db_checkpoint_latency: Histogram,
287
288    // TODO: Rename these metrics.
289    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
290    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
291    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
292    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
293
294    pub(crate) execution_driver_executed_transactions: IntCounter,
295    pub(crate) execution_driver_paused_transactions: IntCounter,
296    pub(crate) execution_driver_dispatch_queue: IntGauge,
297    pub(crate) execution_queueing_delay_s: Histogram,
298    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
299    pub(crate) execution_gas_latency_ratio: Histogram,
300
301    pub(crate) skipped_consensus_txns: IntCounter,
302    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
303
304    pub(crate) authority_overload_status: IntGauge,
305    pub(crate) authority_load_shedding_percentage: IntGauge,
306
307    pub(crate) transaction_overload_sources: IntCounterVec,
308
309    /// Post processing metrics
310    post_processing_total_events_emitted: IntCounter,
311    post_processing_total_tx_indexed: IntCounter,
312    post_processing_total_tx_had_event_processed: IntCounter,
313    post_processing_total_failures: IntCounter,
314
315    /// Consensus commit and transaction handler metrics
316    pub consensus_handler_processed: IntCounterVec,
317    pub consensus_handler_transaction_sizes: HistogramVec,
318    pub consensus_handler_num_low_scoring_authorities: IntGauge,
319    pub consensus_handler_scores: IntGaugeVec,
320    pub consensus_handler_deferred_transactions: IntCounter,
321    pub consensus_handler_congested_transactions: IntCounter,
322    pub consensus_handler_cancelled_transactions: IntCounter,
323    pub consensus_handler_max_object_costs: IntGaugeVec,
324    pub consensus_committed_subdags: IntCounterVec,
325    pub consensus_committed_messages: IntGaugeVec,
326    pub consensus_committed_user_transactions: IntGaugeVec,
327    pub consensus_finalized_user_transactions: IntGaugeVec,
328    pub consensus_rejected_user_transactions: IntGaugeVec,
329    pub consensus_calculated_throughput: IntGauge,
330    pub consensus_calculated_throughput_profile: IntGauge,
331    pub consensus_block_handler_block_processed: IntCounter,
332    pub consensus_block_handler_txn_processed: IntCounterVec,
333    pub consensus_block_handler_fastpath_executions: IntCounter,
334    pub consensus_timestamp_bias: Histogram,
335
336    pub limits_metrics: Arc<LimitsMetrics>,
337
338    /// bytecode verifier metrics for tracking timeouts
339    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
340
341    /// Count of zklogin signatures
342    pub zklogin_sig_count: IntCounter,
343    /// Count of multisig signatures
344    pub multisig_sig_count: IntCounter,
345
346    // Tracks recent average txn queueing delay between when it is ready for execution
347    // until it starts executing.
348    pub execution_queueing_latency: LatencyObserver,
349
350    // Tracks the rate of transactions become ready for execution in transaction manager.
351    // The need for the Mutex is that the tracker is updated in transaction manager and read
352    // in the overload_monitor. There should be low mutex contention because
353    // transaction manager is single threaded and the read rate in overload_monitor is
354    // low. In the case where transaction manager becomes multi-threaded, we can
355    // create one rate tracker per thread.
356    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
357
358    // Tracks the rate of transactions starts execution in execution driver.
359    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
360    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
361}
362
363// Override default Prom buckets for positive numbers in 0-10M range
364const POSITIVE_INT_BUCKETS: &[f64] = &[
365    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
366    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
367    7000000., 10000000.,
368];
369
370const LATENCY_SEC_BUCKETS: &[f64] = &[
371    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
372    10., 20., 30., 60., 90.,
373];
374
375// Buckets for low latency samples. Starts from 10us.
376const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
377    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
378    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
379];
380
381// Buckets for consensus timestamp bias in seconds.
382// We expect 200-300ms, so we cluster the buckets more tightly in that range.
383const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
384    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
385    2.0, 10.0, 30.0,
386];
387
388const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
389    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
390    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
391];
392
393pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
394
395// Transaction author should have observed the input objects as finalized output,
396// so usually the wait does not need to be long.
397// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
398pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
399
400impl AuthorityMetrics {
401    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
402        let execute_certificate_latency = register_histogram_vec_with_registry!(
403            "authority_state_execute_certificate_latency",
404            "Latency of executing certificates, including waiting for inputs",
405            &["tx_type"],
406            LATENCY_SEC_BUCKETS.to_vec(),
407            registry,
408        )
409        .unwrap();
410
411        let execute_certificate_latency_single_writer =
412            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
413        let execute_certificate_latency_shared_object =
414            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
415
416        Self {
417            tx_orders: register_int_counter_with_registry!(
418                "total_transaction_orders",
419                "Total number of transaction orders",
420                registry,
421            )
422            .unwrap(),
423            total_certs: register_int_counter_with_registry!(
424                "total_transaction_certificates",
425                "Total number of transaction certificates handled",
426                registry,
427            )
428            .unwrap(),
429            total_cert_attempts: register_int_counter_with_registry!(
430                "total_handle_certificate_attempts",
431                "Number of calls to handle_certificate",
432                registry,
433            )
434            .unwrap(),
435            // total_effects == total transactions finished
436            total_effects: register_int_counter_with_registry!(
437                "total_transaction_effects",
438                "Total number of transaction effects produced",
439                registry,
440            )
441            .unwrap(),
442
443            shared_obj_tx: register_int_counter_with_registry!(
444                "num_shared_obj_tx",
445                "Number of transactions involving shared objects",
446                registry,
447            )
448            .unwrap(),
449
450            sponsored_tx: register_int_counter_with_registry!(
451                "num_sponsored_tx",
452                "Number of sponsored transactions",
453                registry,
454            )
455            .unwrap(),
456
457            tx_already_processed: register_int_counter_with_registry!(
458                "num_tx_already_processed",
459                "Number of transaction orders already processed previously",
460                registry,
461            )
462            .unwrap(),
463            num_input_objs: register_histogram_with_registry!(
464                "num_input_objects",
465                "Distribution of number of input TX objects per TX",
466                POSITIVE_INT_BUCKETS.to_vec(),
467                registry,
468            )
469            .unwrap(),
470            num_shared_objects: register_histogram_with_registry!(
471                "num_shared_objects",
472                "Number of shared input objects per TX",
473                POSITIVE_INT_BUCKETS.to_vec(),
474                registry,
475            )
476            .unwrap(),
477            batch_size: register_histogram_with_registry!(
478                "batch_size",
479                "Distribution of size of transaction batch",
480                POSITIVE_INT_BUCKETS.to_vec(),
481                registry,
482            )
483            .unwrap(),
484            authority_state_handle_transaction_latency: register_histogram_with_registry!(
485                "authority_state_handle_transaction_latency",
486                "Latency of handling transactions",
487                LATENCY_SEC_BUCKETS.to_vec(),
488                registry,
489            )
490            .unwrap(),
491            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
492                "authority_state_handle_vote_transaction_latency",
493                "Latency of voting on transactions without signing",
494                LATENCY_SEC_BUCKETS.to_vec(),
495                registry,
496            )
497            .unwrap(),
498            execute_certificate_latency_single_writer,
499            execute_certificate_latency_shared_object,
500            await_transaction_latency: register_histogram_with_registry!(
501                "await_transaction_latency",
502                "Latency of awaiting user transaction execution, including waiting for inputs",
503                LATENCY_SEC_BUCKETS.to_vec(),
504                registry,
505            )
506            .unwrap(),
507            internal_execution_latency: register_histogram_with_registry!(
508                "authority_state_internal_execution_latency",
509                "Latency of actual certificate executions",
510                LATENCY_SEC_BUCKETS.to_vec(),
511                registry,
512            )
513            .unwrap(),
514            execution_load_input_objects_latency: register_histogram_with_registry!(
515                "authority_state_execution_load_input_objects_latency",
516                "Latency of loading input objects for execution",
517                LOW_LATENCY_SEC_BUCKETS.to_vec(),
518                registry,
519            )
520            .unwrap(),
521            prepare_certificate_latency: register_histogram_with_registry!(
522                "authority_state_prepare_certificate_latency",
523                "Latency of executing certificates, before committing the results",
524                LATENCY_SEC_BUCKETS.to_vec(),
525                registry,
526            )
527            .unwrap(),
528            commit_certificate_latency: register_histogram_with_registry!(
529                "authority_state_commit_certificate_latency",
530                "Latency of committing certificate execution results",
531                LATENCY_SEC_BUCKETS.to_vec(),
532                registry,
533            )
534            .unwrap(),
535            db_checkpoint_latency: register_histogram_with_registry!(
536                "db_checkpoint_latency",
537                "Latency of checkpointing dbs",
538                LATENCY_SEC_BUCKETS.to_vec(),
539                registry,
540            ).unwrap(),
541            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
542                "transaction_manager_num_enqueued_certificates",
543                "Current number of certificates enqueued to ExecutionScheduler",
544                &["result"],
545                registry,
546            )
547            .unwrap(),
548            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
549                "transaction_manager_num_pending_certificates",
550                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
551                registry,
552            )
553            .unwrap(),
554            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
555                "transaction_manager_num_executing_certificates",
556                "Number of executing certificates, including queued and actually running certificates",
557                registry,
558            )
559            .unwrap(),
560            authority_overload_status: register_int_gauge_with_registry!(
561                "authority_overload_status",
562                "Whether authority is current experiencing overload and enters load shedding mode.",
563                registry)
564            .unwrap(),
565            authority_load_shedding_percentage: register_int_gauge_with_registry!(
566                "authority_load_shedding_percentage",
567                "The percentage of transactions is shed when the authority is in load shedding mode.",
568                registry)
569            .unwrap(),
570            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571                "transaction_manager_transaction_queue_age_s",
572                "Time spent in waiting for transaction in the queue",
573                LATENCY_SEC_BUCKETS.to_vec(),
574                registry,
575            )
576            .unwrap(),
577            transaction_overload_sources: register_int_counter_vec_with_registry!(
578                "transaction_overload_sources",
579                "Number of times each source indicates transaction overload.",
580                &["source"],
581                registry)
582            .unwrap(),
583            execution_driver_executed_transactions: register_int_counter_with_registry!(
584                "execution_driver_executed_transactions",
585                "Cumulative number of transaction executed by execution driver",
586                registry,
587            )
588            .unwrap(),
589            execution_driver_paused_transactions: register_int_counter_with_registry!(
590                "execution_driver_paused_transactions",
591                "Cumulative number of transactions paused by execution driver",
592                registry,
593            )
594            .unwrap(),
595            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
596                "execution_driver_dispatch_queue",
597                "Number of transaction pending in execution driver dispatch queue",
598                registry,
599            )
600            .unwrap(),
601            execution_queueing_delay_s: register_histogram_with_registry!(
602                "execution_queueing_delay_s",
603                "Queueing delay between a transaction is ready for execution until it starts executing.",
604                LATENCY_SEC_BUCKETS.to_vec(),
605                registry
606            )
607            .unwrap(),
608            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
609                "prepare_cert_gas_latency_ratio",
610                "The ratio of computation gas divided by VM execution latency.",
611                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
612                registry
613            )
614            .unwrap(),
615            execution_gas_latency_ratio: register_histogram_with_registry!(
616                "execution_gas_latency_ratio",
617                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
618                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
619                registry
620            )
621            .unwrap(),
622            skipped_consensus_txns: register_int_counter_with_registry!(
623                "skipped_consensus_txns",
624                "Total number of consensus transactions skipped",
625                registry,
626            )
627            .unwrap(),
628            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
629                "skipped_consensus_txns_cache_hit",
630                "Total number of consensus transactions skipped because of local cache hit",
631                registry,
632            )
633            .unwrap(),
634            post_processing_total_events_emitted: register_int_counter_with_registry!(
635                "post_processing_total_events_emitted",
636                "Total number of events emitted in post processing",
637                registry,
638            )
639            .unwrap(),
640            post_processing_total_tx_indexed: register_int_counter_with_registry!(
641                "post_processing_total_tx_indexed",
642                "Total number of txes indexed in post processing",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
647                "post_processing_total_tx_had_event_processed",
648                "Total number of txes finished event processing in post processing",
649                registry,
650            )
651            .unwrap(),
652            post_processing_total_failures: register_int_counter_with_registry!(
653                "post_processing_total_failures",
654                "Total number of failure in post processing",
655                registry,
656            )
657            .unwrap(),
658            consensus_handler_processed: register_int_counter_vec_with_registry!(
659                "consensus_handler_processed",
660                "Number of transactions processed by consensus handler",
661                &["class"],
662                registry
663            ).unwrap(),
664            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
665                "consensus_handler_transaction_sizes",
666                "Sizes of each type of transactions processed by consensus handler",
667                &["class"],
668                POSITIVE_INT_BUCKETS.to_vec(),
669                registry
670            ).unwrap(),
671            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
672                "consensus_handler_num_low_scoring_authorities",
673                "Number of low scoring authorities based on reputation scores from consensus",
674                registry
675            ).unwrap(),
676            consensus_handler_scores: register_int_gauge_vec_with_registry!(
677                "consensus_handler_scores",
678                "scores from consensus for each authority",
679                &["authority"],
680                registry,
681            ).unwrap(),
682            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
683                "consensus_handler_deferred_transactions",
684                "Number of transactions deferred by consensus handler",
685                registry,
686            ).unwrap(),
687            consensus_handler_congested_transactions: register_int_counter_with_registry!(
688                "consensus_handler_congested_transactions",
689                "Number of transactions deferred by consensus handler due to congestion",
690                registry,
691            ).unwrap(),
692            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
693                "consensus_handler_cancelled_transactions",
694                "Number of transactions cancelled by consensus handler",
695                registry,
696            ).unwrap(),
697            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
698                "consensus_handler_max_congestion_control_object_costs",
699                "Max object costs for congestion control in the current consensus commit",
700                &["commit_type"],
701                registry,
702            ).unwrap(),
703            consensus_committed_subdags: register_int_counter_vec_with_registry!(
704                "consensus_committed_subdags",
705                "Number of committed subdags, sliced by author",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            consensus_committed_messages: register_int_gauge_vec_with_registry!(
710                "consensus_committed_messages",
711                "Total number of committed consensus messages, sliced by author",
712                &["authority"],
713                registry,
714            ).unwrap(),
715            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
716                "consensus_committed_user_transactions",
717                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
718                &["authority"],
719                registry,
720            ).unwrap(),
721            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
722                "consensus_finalized_user_transactions",
723                "Number of user transactions finalized, sliced by submitter",
724                &["authority"],
725                registry,
726            ).unwrap(),
727            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
728                "consensus_rejected_user_transactions",
729                "Number of user transactions rejected, sliced by submitter",
730                &["authority"],
731                registry,
732            ).unwrap(),
733            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
734            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
735            zklogin_sig_count: register_int_counter_with_registry!(
736                "zklogin_sig_count",
737                "Count of zkLogin signatures",
738                registry,
739            )
740            .unwrap(),
741            multisig_sig_count: register_int_counter_with_registry!(
742                "multisig_sig_count",
743                "Count of zkLogin signatures",
744                registry,
745            )
746            .unwrap(),
747            consensus_calculated_throughput: register_int_gauge_with_registry!(
748                "consensus_calculated_throughput",
749                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
750                registry,
751            ).unwrap(),
752            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
753                "consensus_calculated_throughput_profile",
754                "The current active calculated throughput profile",
755                registry
756            ).unwrap(),
757            consensus_block_handler_block_processed: register_int_counter_with_registry!(
758                "consensus_block_handler_block_processed",
759                "Number of blocks processed by consensus block handler.",
760                registry
761            ).unwrap(),
762            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
763                "consensus_block_handler_txn_processed",
764                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
765                &["outcome"],
766                registry
767            ).unwrap(),
768            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
769                "consensus_block_handler_fastpath_executions",
770                "Number of fastpath transactions sent for execution by consensus transaction handler",
771                registry,
772            ).unwrap(),
773            consensus_timestamp_bias: register_histogram_with_registry!(
774                "consensus_timestamp_bias",
775                "Bias/delay from consensus timestamp to system time",
776                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
777                registry
778            ).unwrap(),
779            execution_queueing_latency: LatencyObserver::new(),
780            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
781            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
782        }
783    }
784}
785
786/// a Trait object for `Signer` that is:
787/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
788/// - Sync, i.e. can be safely shared between threads.
789///
790/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
791///
792pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794/// Execution env contains the "environment" for the transaction to be executed in, that is,
795/// all the information necessary for execution that is not specified by the transaction itself.
796#[derive(Debug, Clone)]
797pub struct ExecutionEnv {
798    /// The assigned version of each shared object for the transaction.
799    pub assigned_versions: AssignedVersions,
800    /// The expected digest of the effects of the transaction, if executing from checkpoint or
801    /// other sources where the effects are known in advance.
802    pub expected_effects_digest: Option<TransactionEffectsDigest>,
803    /// The source of the scheduling of the transaction.
804    pub scheduling_source: SchedulingSource,
805    /// Status of the balance withdraw scheduling of the transaction.
806    pub withdraw_status: BalanceWithdrawStatus,
807    /// Transactions that must finish before this transaction can be executed.
808    /// Used to schedule barrier transactions after non-exclusive writes.
809    pub barrier_dependencies: Vec<TransactionDigest>,
810}
811
812impl Default for ExecutionEnv {
813    fn default() -> Self {
814        Self {
815            assigned_versions: Default::default(),
816            expected_effects_digest: None,
817            scheduling_source: SchedulingSource::NonFastPath,
818            withdraw_status: BalanceWithdrawStatus::NoWithdraw,
819            barrier_dependencies: Default::default(),
820        }
821    }
822}
823
824impl ExecutionEnv {
825    pub fn new() -> Self {
826        Default::default()
827    }
828
829    pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
830        self.scheduling_source = scheduling_source;
831        self
832    }
833
834    pub fn with_expected_effects_digest(
835        mut self,
836        expected_effects_digest: TransactionEffectsDigest,
837    ) -> Self {
838        self.expected_effects_digest = Some(expected_effects_digest);
839        self
840    }
841
842    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
843        self.assigned_versions = assigned_versions;
844        self
845    }
846
847    pub fn with_sufficient_balance(mut self) -> Self {
848        self.withdraw_status = BalanceWithdrawStatus::SufficientBalance;
849        self
850    }
851
852    pub fn with_insufficient_balance(mut self) -> Self {
853        self.withdraw_status = BalanceWithdrawStatus::InsufficientBalance;
854        self
855    }
856
857    pub fn with_barrier_dependencies(
858        mut self,
859        barrier_dependencies: BTreeSet<TransactionDigest>,
860    ) -> Self {
861        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
862        self
863    }
864}
865
866#[derive(Debug)]
867pub struct ForkRecoveryState {
868    /// Transaction digest to effects digest overrides
869    transaction_overrides:
870        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
871}
872
873impl Default for ForkRecoveryState {
874    fn default() -> Self {
875        Self {
876            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
877        }
878    }
879}
880
881impl ForkRecoveryState {
882    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
883        let Some(config) = config else {
884            return Ok(Self::default());
885        };
886
887        let mut transaction_overrides = HashMap::new();
888        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
889            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
890                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
891            })?;
892            let effects_digest =
893                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
894                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
895                })?;
896            transaction_overrides.insert(tx_digest, effects_digest);
897        }
898
899        Ok(Self {
900            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
901        })
902    }
903
904    pub fn get_transaction_override(
905        &self,
906        tx_digest: &TransactionDigest,
907    ) -> Option<TransactionEffectsDigest> {
908        self.transaction_overrides.read().get(tx_digest).copied()
909    }
910}
911
912pub struct AuthorityState {
913    // Fixed size, static, identity of the authority
914    /// The name of this authority.
915    pub name: AuthorityName,
916    /// The signature key of the authority.
917    pub secret: StableSyncAuthoritySigner,
918
919    /// The database
920    input_loader: TransactionInputLoader,
921    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
922
923    epoch_store: ArcSwap<AuthorityPerEpochStore>,
924
925    /// This lock denotes current 'execution epoch'.
926    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
927    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
928    /// from previous epoch that are executed but did not make into checkpoint.
929    execution_lock: RwLock<EpochId>,
930
931    pub indexes: Option<Arc<IndexStore>>,
932    pub rpc_index: Option<Arc<RpcIndexStore>>,
933
934    pub subscription_handler: Arc<SubscriptionHandler>,
935    pub checkpoint_store: Arc<CheckpointStore>,
936
937    committee_store: Arc<CommitteeStore>,
938
939    /// Schedules transaction execution.
940    execution_scheduler: Arc<ExecutionScheduler>,
941
942    /// Shuts down the execution task. Used only in testing.
943    #[allow(unused)]
944    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
945
946    pub metrics: Arc<AuthorityMetrics>,
947    _pruner: AuthorityStorePruner,
948    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
949
950    /// Take db checkpoints of different dbs
951    db_checkpoint_config: DBCheckpointConfig,
952
953    pub config: NodeConfig,
954
955    /// Current overload status in this authority. Updated periodically.
956    pub overload_info: AuthorityOverloadInfo,
957
958    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
959
960    /// The chain identifier is derived from the digest of the genesis checkpoint.
961    chain_identifier: ChainIdentifier,
962
963    pub(crate) congestion_tracker: Arc<CongestionTracker>,
964
965    /// Traffic controller for Sui core servers (json-rpc, validator service)
966    pub traffic_controller: Option<Arc<TrafficController>>,
967
968    /// Fork recovery state for handling equivocation after forks
969    fork_recovery_state: Option<ForkRecoveryState>,
970}
971
972/// The authority state encapsulates all state, drives execution, and ensures safety.
973///
974/// Note the authority operations can be accessed through a read ref (&) and do not
975/// require &mut. Internally a database is synchronized through a mutex lock.
976///
977/// Repeating valid commands should produce no changes and return no error.
978impl AuthorityState {
979    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
980        epoch_store.committee().authority_exists(&self.name)
981    }
982
983    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
984        !self.is_validator(epoch_store)
985    }
986
987    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
988        &self.committee_store
989    }
990
991    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
992        self.committee_store.clone()
993    }
994
995    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
996        &self.config.authority_overload_config
997    }
998
999    pub fn get_epoch_state_commitments(
1000        &self,
1001        epoch: EpochId,
1002    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1003        self.checkpoint_store.get_epoch_state_commitments(epoch)
1004    }
1005
1006    fn handle_transaction_deny_checks(
1007        &self,
1008        transaction: &VerifiedTransaction,
1009        epoch_store: &Arc<AuthorityPerEpochStore>,
1010    ) -> SuiResult<CheckedInputObjects> {
1011        let tx_digest = transaction.digest();
1012        let tx_data = transaction.data().transaction_data();
1013
1014        let input_object_kinds = tx_data.input_objects()?;
1015        let receiving_objects_refs = tx_data.receiving_objects();
1016
1017        // Note: the deny checks may do redundant package loads but:
1018        // - they only load packages when there is an active package deny map
1019        // - the loads are cached anyway
1020        sui_transaction_checks::deny::check_transaction_for_signing(
1021            tx_data,
1022            transaction.tx_signatures(),
1023            &input_object_kinds,
1024            &receiving_objects_refs,
1025            &self.config.transaction_deny_config,
1026            self.get_backing_package_store().as_ref(),
1027        )?;
1028
1029        let withdraws = tx_data.process_funds_withdrawals_for_signing()?;
1030
1031        self.execution_cache_trait_pointers
1032            .child_object_resolver
1033            .check_balances_available(&withdraws)?;
1034
1035        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1036            Some(tx_digest),
1037            &input_object_kinds,
1038            &receiving_objects_refs,
1039            epoch_store.epoch(),
1040        )?;
1041
1042        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1043            epoch_store.protocol_config(),
1044            epoch_store.reference_gas_price(),
1045            tx_data,
1046            input_objects,
1047            &receiving_objects,
1048            &self.metrics.bytecode_verifier_metrics,
1049            &self.config.verifier_signing_config,
1050        )?;
1051
1052        self.handle_coin_deny_list_checks(
1053            tx_data,
1054            &checked_input_objects,
1055            &receiving_objects,
1056            epoch_store,
1057        )?;
1058
1059        Ok(checked_input_objects)
1060    }
1061
1062    fn handle_coin_deny_list_checks(
1063        &self,
1064        tx_data: &TransactionData,
1065        checked_input_objects: &CheckedInputObjects,
1066        receiving_objects: &ReceivingObjects,
1067        epoch_store: &Arc<AuthorityPerEpochStore>,
1068    ) -> SuiResult<()> {
1069        let funds_withdraw_types = tx_data
1070            .get_funds_withdrawals()
1071            .into_iter()
1072            .filter_map(|withdraw| {
1073                withdraw
1074                    .type_arg
1075                    .get_balance_type_param()
1076                    // unwrap safe because we already verified the transaction.
1077                    .unwrap()
1078                    .map(|ty| ty.to_canonical_string(false))
1079            })
1080            .collect::<BTreeSet<_>>();
1081
1082        if epoch_store.coin_deny_list_v1_enabled() {
1083            check_coin_deny_list_v1(
1084                tx_data.sender(),
1085                checked_input_objects,
1086                receiving_objects,
1087                funds_withdraw_types.clone(),
1088                &self.get_object_store(),
1089            )?;
1090        }
1091
1092        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1093            check_coin_deny_list_v2_during_signing(
1094                tx_data.sender(),
1095                checked_input_objects,
1096                receiving_objects,
1097                funds_withdraw_types.clone(),
1098                &self.get_object_store(),
1099            )?;
1100        }
1101
1102        Ok(())
1103    }
1104
1105    /// This is a private method and should be kept that way. It doesn't check whether
1106    /// the provided transaction is a system transaction, and hence can only be called internally.
1107    #[instrument(level = "trace", skip_all)]
1108    fn handle_transaction_impl(
1109        &self,
1110        transaction: VerifiedTransaction,
1111        sign: bool,
1112        epoch_store: &Arc<AuthorityPerEpochStore>,
1113    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1114        // Ensure that validator cannot reconfigure while we are signing the tx
1115        let _execution_lock = self.execution_lock_for_signing()?;
1116
1117        let checked_input_objects =
1118            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1119
1120        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1121
1122        let tx_digest = *transaction.digest();
1123        let signed_transaction = if sign {
1124            Some(VerifiedSignedTransaction::new(
1125                epoch_store.epoch(),
1126                transaction,
1127                self.name,
1128                &*self.secret,
1129            ))
1130        } else {
1131            None
1132        };
1133
1134        // Check and write locks, to signed transaction, into the database
1135        // The call to self.set_transaction_lock checks the lock is not conflicting,
1136        // and returns ConflictingTransaction error in case there is a lock on a different
1137        // existing transaction.
1138        self.get_cache_writer().acquire_transaction_locks(
1139            epoch_store,
1140            &owned_objects,
1141            tx_digest,
1142            signed_transaction.clone(),
1143        )?;
1144
1145        Ok(signed_transaction)
1146    }
1147
1148    /// Initiate a new transaction.
1149    #[instrument(level = "trace", skip_all)]
1150    pub async fn handle_transaction(
1151        &self,
1152        epoch_store: &Arc<AuthorityPerEpochStore>,
1153        transaction: VerifiedTransaction,
1154    ) -> SuiResult<HandleTransactionResponse> {
1155        let tx_digest = *transaction.digest();
1156        debug!("handle_transaction");
1157
1158        // Ensure an idempotent answer.
1159        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1160            return Ok(HandleTransactionResponse { status });
1161        }
1162
1163        let _metrics_guard = self
1164            .metrics
1165            .authority_state_handle_transaction_latency
1166            .start_timer();
1167        self.metrics.tx_orders.inc();
1168
1169        if epoch_store.protocol_config().mysticeti_fastpath()
1170            && !self
1171                .wait_for_fastpath_dependency_objects(&transaction, epoch_store.epoch())
1172                .await?
1173        {
1174            debug!("fastpath input objects are still unavailable after waiting");
1175            // Proceed with input checks to generate a proper error.
1176        }
1177
1178        self.handle_sign_transaction(epoch_store, transaction).await
1179    }
1180
1181    /// Signs a transaction. Exposed for testing.
1182    pub async fn handle_sign_transaction(
1183        &self,
1184        epoch_store: &Arc<AuthorityPerEpochStore>,
1185        transaction: VerifiedTransaction,
1186    ) -> SuiResult<HandleTransactionResponse> {
1187        let tx_digest = *transaction.digest();
1188        let signed = self.handle_transaction_impl(transaction, /* sign */ true, epoch_store);
1189        match signed {
1190            Ok(Some(s)) => {
1191                if self.is_validator(epoch_store)
1192                    && let Some(validator_tx_finalizer) = &self.validator_tx_finalizer
1193                {
1194                    let tx = s.clone();
1195                    let validator_tx_finalizer = validator_tx_finalizer.clone();
1196                    let cache_reader = self.get_transaction_cache_reader().clone();
1197                    let epoch_store = epoch_store.clone();
1198                    spawn_monitored_task!(epoch_store.within_alive_epoch(
1199                        validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1200                    ));
1201                }
1202                Ok(HandleTransactionResponse {
1203                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1204                })
1205            }
1206            Ok(None) => panic!("handle_transaction_impl should return a signed transaction"),
1207            // It happens frequently that while we are checking the validity of the transaction, it
1208            // has just been executed.
1209            // In that case, we could still return Ok to avoid showing confusing errors.
1210            Err(err) => Ok(HandleTransactionResponse {
1211                status: self
1212                    .get_transaction_status(&tx_digest, epoch_store)?
1213                    .ok_or(err)?
1214                    .1,
1215            }),
1216        }
1217    }
1218
1219    /// Vote for a transaction, either when validator receives a submit_transaction request,
1220    /// or sees a transaction from consensus. Performs the same types of checks as
1221    /// transaction signing, but does not explicitly sign the transaction.
1222    /// Note that if the transaction has been executed, we still go through the
1223    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1224    /// but not yet finalized, the signing will still work since the objects are still available.
1225    /// But if the transaction has been finalized, the signing will fail.
1226    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1227    /// has already been finalized executed.
1228    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1229    pub(crate) fn handle_vote_transaction(
1230        &self,
1231        epoch_store: &Arc<AuthorityPerEpochStore>,
1232        transaction: VerifiedTransaction,
1233    ) -> SuiResult<()> {
1234        debug!("handle_vote_transaction");
1235
1236        let _metrics_guard = self
1237            .metrics
1238            .authority_state_handle_vote_transaction_latency
1239            .start_timer();
1240        self.metrics.tx_orders.inc();
1241
1242        // The should_accept_user_certs check here is best effort, because
1243        // between a validator signs a tx and a cert is formed, the validator
1244        // could close the window.
1245        if !epoch_store
1246            .get_reconfig_state_read_lock_guard()
1247            .should_accept_user_certs()
1248        {
1249            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1250        }
1251
1252        // Accept finalized transactions, instead of voting to reject them.
1253        // Checking executed transactions is limited to the current epoch.
1254        // Otherwise there can be a race where the transaction is accepted because it has executed,
1255        // but the executed effects are pruned post consensus, leading to failures.
1256        let tx_digest = *transaction.digest();
1257        if epoch_store.is_recently_finalized(&tx_digest)
1258            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1259        {
1260            return Ok(());
1261        }
1262
1263        let result =
1264            self.handle_transaction_impl(transaction, false /* sign */, epoch_store)?;
1265        assert!(
1266            result.is_none(),
1267            "handle_transaction_impl should not return a signed transaction when sign is false"
1268        );
1269        Ok(())
1270    }
1271
1272    /// Used for early client validation check for transactions before submission to server.
1273    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1274    /// This allows for fast failure feedback to clients for non-retriable errors.
1275    ///
1276    /// The key addition is checking that owned object versions match live object versions.
1277    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1278    /// requested version (which may exist in storage as historical versions), whereas
1279    /// validators check against the live/current version during locking (verify_live_object).
1280    pub fn check_transaction_validity(
1281        &self,
1282        epoch_store: &Arc<AuthorityPerEpochStore>,
1283        transaction: &VerifiedTransaction,
1284    ) -> SuiResult<()> {
1285        if !epoch_store
1286            .get_reconfig_state_read_lock_guard()
1287            .should_accept_user_certs()
1288        {
1289            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1290        }
1291
1292        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1293
1294        let checked_input_objects =
1295            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1296
1297        // Check that owned object versions match live objects
1298        // This closely mimics verify_live_object logic from acquire_transaction_locks
1299        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1300        let cache_reader = self.get_object_cache_reader();
1301
1302        for obj_ref in &owned_objects {
1303            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1304                // Only reject if transaction references an old version. Allow newer
1305                // versions that may exist on validators but not yet synced to fullnode.
1306                if obj_ref.1 < live_object.version() {
1307                    return Err(SuiErrorKind::UserInputError {
1308                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1309                            provided_obj_ref: *obj_ref,
1310                            current_version: live_object.version(),
1311                        },
1312                    }
1313                    .into());
1314                }
1315
1316                // If version matches, verify digest also matches
1317                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1318                    return Err(SuiErrorKind::UserInputError {
1319                        error: UserInputError::InvalidObjectDigest {
1320                            object_id: obj_ref.0,
1321                            expected_digest: live_object.digest(),
1322                        },
1323                    }
1324                    .into());
1325                }
1326            }
1327        }
1328
1329        Ok(())
1330    }
1331
1332    pub fn check_system_overload_at_signing(&self) -> bool {
1333        self.config
1334            .authority_overload_config
1335            .check_system_overload_at_signing
1336    }
1337
1338    pub fn check_system_overload_at_execution(&self) -> bool {
1339        self.config
1340            .authority_overload_config
1341            .check_system_overload_at_execution
1342    }
1343
1344    pub(crate) fn check_system_overload(
1345        &self,
1346        consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1347        tx_data: &SenderSignedData,
1348        do_authority_overload_check: bool,
1349    ) -> SuiResult {
1350        if do_authority_overload_check {
1351            self.check_authority_overload(tx_data).tap_err(|_| {
1352                self.update_overload_metrics("execution_queue");
1353            })?;
1354        }
1355        self.execution_scheduler
1356            .check_execution_overload(self.overload_config(), tx_data)
1357            .tap_err(|_| {
1358                self.update_overload_metrics("execution_pending");
1359            })?;
1360        consensus_overload_checker
1361            .check_consensus_overload()
1362            .tap_err(|_| {
1363                self.update_overload_metrics("consensus");
1364            })?;
1365
1366        let pending_tx_count = self
1367            .get_cache_commit()
1368            .approximate_pending_transaction_count();
1369        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1370            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1371                retry_after_secs: 10,
1372            }
1373            .into());
1374        }
1375
1376        Ok(())
1377    }
1378
1379    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1380        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1381            return Ok(());
1382        }
1383
1384        let load_shedding_percentage = self
1385            .overload_info
1386            .load_shedding_percentage
1387            .load(Ordering::Relaxed);
1388        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1389    }
1390
1391    fn update_overload_metrics(&self, source: &str) {
1392        self.metrics
1393            .transaction_overload_sources
1394            .with_label_values(&[source])
1395            .inc();
1396    }
1397
1398    /// Waits for fastpath (owned, package) dependency objects to become available.
1399    /// Returns true if a chosen set of fastpath dependency objects are available,
1400    /// returns false otherwise after an internal timeout.
1401    pub(crate) async fn wait_for_fastpath_dependency_objects(
1402        &self,
1403        transaction: &VerifiedTransaction,
1404        epoch: EpochId,
1405    ) -> SuiResult<bool> {
1406        let txn_data = transaction.data().transaction_data();
1407        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1408
1409        // Gather and filter input objects to wait for.
1410        let fastpath_dependency_objects: Vec<_> = move_objects
1411            .into_iter()
1412            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1413            .chain(
1414                packages
1415                    .into_iter()
1416                    .map(|package_id| InputKey::Package { id: package_id }),
1417            )
1418            .collect();
1419        let receiving_keys: HashSet<_> = receiving_objects
1420            .into_iter()
1421            .filter_map(|receiving_obj_ref| {
1422                self.should_wait_for_dependency_object(receiving_obj_ref)
1423            })
1424            .collect();
1425        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1426            return Ok(true);
1427        }
1428
1429        // Use shorter wait timeout in simtests to exercise server-side error paths and
1430        // client-side retry logic.
1431        let max_wait = if cfg!(msim) {
1432            Duration::from_millis(200)
1433        } else {
1434            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1435        };
1436
1437        match timeout(
1438            max_wait,
1439            self.get_object_cache_reader().notify_read_input_objects(
1440                &fastpath_dependency_objects,
1441                &receiving_keys,
1442                epoch,
1443            ),
1444        )
1445        .await
1446        {
1447            Ok(()) => Ok(true),
1448            // Maybe return an error for unavailable input objects,
1449            // and allow the caller to skip the rest of input checks?
1450            Err(_) => Ok(false),
1451        }
1452    }
1453
1454    /// Returns Some(inputKey) if the object reference should be waited on until it is
1455    /// finalized, before proceeding to input checks.
1456    ///
1457    /// Incorrect decisions here should only affect user experience, not safety:
1458    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1459    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1460    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1461        let (obj_id, cur_version, _digest) = obj_ref;
1462        let Some(latest_obj_ref) = self
1463            .get_object_cache_reader()
1464            .get_latest_object_ref_or_tombstone(obj_id)
1465        else {
1466            // Object might not have been created.
1467            return Some(InputKey::VersionedObject {
1468                id: FullObjectID::new(obj_id, None),
1469                version: cur_version,
1470            });
1471        };
1472        let latest_digest = latest_obj_ref.2;
1473        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1474            // Do not wait for deleted object and rely on input check instead.
1475            return None;
1476        }
1477        let latest_version = latest_obj_ref.1;
1478        if cur_version <= latest_version {
1479            // Do not wait for version that already exists or has been consumed.
1480            // Let the input check to handle them and return the proper error.
1481            return None;
1482        }
1483        // Wait for the object version to become available.
1484        Some(InputKey::VersionedObject {
1485            id: FullObjectID::new(obj_id, None),
1486            version: cur_version,
1487        })
1488    }
1489
1490    /// Wait for a certificate to be executed.
1491    /// For consensus transactions, it needs to be sequenced by the consensus.
1492    /// For owned object transactions, this function will enqueue the transaction for execution.
1493    // TODO: The next 3 functions are very similar. We should refactor them.
1494    #[instrument(level = "trace", skip_all)]
1495    pub async fn wait_for_certificate_execution(
1496        &self,
1497        certificate: &VerifiedCertificate,
1498        epoch_store: &Arc<AuthorityPerEpochStore>,
1499    ) -> SuiResult<TransactionEffects> {
1500        self.wait_for_transaction_execution(
1501            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1502            epoch_store,
1503        )
1504        .await
1505    }
1506
1507    /// Wait for a transaction to be executed.
1508    /// For consensus transactions, it needs to be sequenced by the consensus.
1509    /// For owned object transactions, this function will enqueue the transaction for execution.
1510    #[instrument(level = "trace", skip_all)]
1511    pub async fn wait_for_transaction_execution(
1512        &self,
1513        transaction: &VerifiedExecutableTransaction,
1514        epoch_store: &Arc<AuthorityPerEpochStore>,
1515    ) -> SuiResult<TransactionEffects> {
1516        let _metrics_guard = if transaction.is_consensus_tx() {
1517            self.metrics
1518                .execute_certificate_latency_shared_object
1519                .start_timer()
1520        } else {
1521            self.metrics
1522                .execute_certificate_latency_single_writer
1523                .start_timer()
1524        };
1525        trace!("execute_transaction");
1526
1527        self.metrics.total_cert_attempts.inc();
1528
1529        if !transaction.is_consensus_tx() {
1530            // Shared object transactions need to be sequenced by the consensus before enqueueing
1531            // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
1532            // For owned object transactions, they can be enqueued for execution immediately.
1533            self.execution_scheduler.enqueue(
1534                vec![(
1535                    Schedulable::Transaction(transaction.clone()),
1536                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1537                )],
1538                epoch_store,
1539            );
1540        }
1541
1542        // tx could be reverted when epoch ends, so we must be careful not to return a result
1543        // here after the epoch ends.
1544        epoch_store
1545            .within_alive_epoch(self.notify_read_effects(
1546                "AuthorityState::wait_for_transaction_execution",
1547                *transaction.digest(),
1548            ))
1549            .await
1550            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1551            .and_then(|r| r)
1552    }
1553
1554    /// Awaits the effects of executing a user transaction.
1555    ///
1556    /// Relies on consensus to enqueue the transaction for execution.
1557    pub async fn await_transaction_effects(
1558        &self,
1559        digest: TransactionDigest,
1560        epoch_store: &Arc<AuthorityPerEpochStore>,
1561    ) -> SuiResult<TransactionEffects> {
1562        let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1563        debug!("await_transaction");
1564
1565        epoch_store
1566            .within_alive_epoch(
1567                self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1568            )
1569            .await
1570            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1571            .and_then(|r| r)
1572    }
1573
1574    /// Internal logic to execute a certificate.
1575    ///
1576    /// Guarantees that
1577    /// - If input objects are available, return no permanent failure.
1578    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1579    /// on successful execution; crashed execution has no observable effect and can be retried.
1580    ///
1581    /// It is caller's responsibility to ensure input objects are available and locks are set.
1582    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1583    ///
1584    /// Should only be called within sui-core.
1585    #[instrument(level = "trace", skip_all)]
1586    pub async fn try_execute_immediately(
1587        &self,
1588        certificate: &VerifiedExecutableTransaction,
1589        mut execution_env: ExecutionEnv,
1590        epoch_store: &Arc<AuthorityPerEpochStore>,
1591    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1592        let _scope = monitored_scope("Execution::try_execute_immediately");
1593        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1594
1595        let tx_digest = certificate.digest();
1596
1597        if let Some(fork_recovery) = &self.fork_recovery_state
1598            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1599        {
1600            warn!(
1601                ?tx_digest,
1602                original_digest = ?execution_env.expected_effects_digest,
1603                override_digest = ?override_digest,
1604                "Applying fork recovery override for transaction effects digest"
1605            );
1606            execution_env.expected_effects_digest = Some(override_digest);
1607        }
1608
1609        // prevent concurrent executions of the same tx.
1610        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1611
1612        let tx_cache_reader = self.get_transaction_cache_reader();
1613        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1614            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1615                assert_eq!(
1616                    effects.digest(),
1617                    expected_effects_digest,
1618                    "Unexpected effects digest for transaction {:?}",
1619                    tx_digest
1620                );
1621            }
1622            tx_guard.release();
1623            return ExecutionOutput::Success((effects, None));
1624        }
1625
1626        let execution_start_time = Instant::now();
1627
1628        // Any caller that verifies the signatures on the certificate will have already checked the
1629        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1630        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1631        // epoch, then it's invalid.
1632        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1633        else {
1634            tx_guard.release();
1635            return ExecutionOutput::EpochEnded;
1636        };
1637        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1638        // possible that reconfiguration has happened and they no longer match.
1639        // TODO: We may not need the following check anymore since the scheduler
1640        // should have checked that the certificate is from the same epoch as epoch_store.
1641        if *execution_guard != epoch_store.epoch() {
1642            tx_guard.release();
1643            info!("The epoch of the execution_guard doesn't match the epoch store");
1644            return ExecutionOutput::EpochEnded;
1645        }
1646
1647        let scheduling_source = execution_env.scheduling_source;
1648        let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1649            tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1650        } else {
1651            None
1652        };
1653
1654        let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1655            mysticeti_fp_outputs
1656        {
1657            assert!(
1658                !certificate.is_consensus_tx(),
1659                "Mysticeti fastpath executed transactions cannot be consensus transactions"
1660            );
1661            // If this transaction is not scheduled from fastpath, it must be either
1662            // from consensus or from checkpoint, i.e. it must be finalized.
1663            // To avoid re-executing fastpath transactions, we check if the outputs are already
1664            // in the mysticeti fastpath outputs cache. If so, we skip the execution and commit the transaction.
1665            debug!(
1666                ?tx_digest,
1667                "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1668            );
1669            (outputs, None, None)
1670        } else {
1671            let (transaction_outputs, timings, execution_error_opt) = match self
1672                .process_certificate(
1673                    &tx_guard,
1674                    &execution_guard,
1675                    certificate,
1676                    execution_env,
1677                    epoch_store,
1678                ) {
1679                ExecutionOutput::Success(result) => result,
1680                output => return output.unwrap_err(),
1681            };
1682            (
1683                Arc::new(transaction_outputs),
1684                Some(timings),
1685                execution_error_opt,
1686            )
1687        };
1688
1689        fail_point!("crash");
1690
1691        let effects = transaction_outputs.effects.clone();
1692        debug!(
1693            ?tx_digest,
1694            fx_digest=?effects.digest(),
1695            "process_certificate succeeded in {:.3}ms",
1696            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1697        );
1698
1699        if scheduling_source == SchedulingSource::MysticetiFastPath {
1700            self.get_cache_writer()
1701                .write_fastpath_transaction_outputs(transaction_outputs);
1702        } else {
1703            let commit_result =
1704                self.commit_certificate(certificate, transaction_outputs, epoch_store);
1705            if let Err(err) = commit_result {
1706                error!(?tx_digest, "Error committing transaction: {err}");
1707                tx_guard.release();
1708                return ExecutionOutput::Fatal(err);
1709            }
1710        }
1711
1712        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1713            certificate.data().transaction_data().kind()
1714        {
1715            if let Some(err) = &execution_error_opt {
1716                debug_fatal!("Authenticator state update failed: {:?}", err);
1717            }
1718            epoch_store.update_authenticator_state(auth_state);
1719
1720            // double check that the signature verifier always matches the authenticator state
1721            if cfg!(debug_assertions) {
1722                let authenticator_state = get_authenticator_state(self.get_object_store())
1723                    .expect("Read cannot fail")
1724                    .expect("Authenticator state must exist");
1725
1726                let mut sys_jwks: Vec<_> = authenticator_state
1727                    .active_jwks
1728                    .into_iter()
1729                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1730                    .collect();
1731                let mut active_jwks: Vec<_> = epoch_store
1732                    .signature_verifier
1733                    .get_jwks()
1734                    .into_iter()
1735                    .collect();
1736                sys_jwks.sort();
1737                active_jwks.sort();
1738
1739                assert_eq!(sys_jwks, active_jwks);
1740            }
1741        }
1742
1743        tx_guard.commit_tx();
1744
1745        let elapsed = execution_start_time.elapsed();
1746        if let Some(timings) = timings {
1747            epoch_store.record_local_execution_time(
1748                certificate.data().transaction_data(),
1749                &effects,
1750                timings,
1751                elapsed,
1752            );
1753        }
1754
1755        let elapsed_us = elapsed.as_micros() as f64;
1756        if elapsed_us > 0.0 {
1757            self.metrics
1758                .execution_gas_latency_ratio
1759                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1760        };
1761        ExecutionOutput::Success((effects, execution_error_opt))
1762    }
1763
1764    pub fn read_objects_for_execution(
1765        &self,
1766        tx_lock: &CertLockGuard,
1767        certificate: &VerifiedExecutableTransaction,
1768        assigned_shared_object_versions: AssignedVersions,
1769        epoch_store: &Arc<AuthorityPerEpochStore>,
1770    ) -> SuiResult<InputObjects> {
1771        let _scope = monitored_scope("Execution::load_input_objects");
1772        let _metrics_guard = self
1773            .metrics
1774            .execution_load_input_objects_latency
1775            .start_timer();
1776        let input_objects = &certificate.data().transaction_data().input_objects()?;
1777        self.input_loader.read_objects_for_execution(
1778            &certificate.key(),
1779            tx_lock,
1780            input_objects,
1781            &assigned_shared_object_versions,
1782            epoch_store.epoch(),
1783        )
1784    }
1785
1786    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1787    /// transactions. Assumes execution will always succeed.
1788    pub async fn try_execute_for_test(
1789        &self,
1790        certificate: &VerifiedCertificate,
1791        execution_env: ExecutionEnv,
1792    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1793        let epoch_store = self.epoch_store_for_testing();
1794        let (effects, execution_error_opt) = self
1795            .try_execute_immediately(
1796                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1797                execution_env,
1798                &epoch_store,
1799            )
1800            .await
1801            .unwrap();
1802        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1803        (signed_effects, execution_error_opt)
1804    }
1805
1806    pub async fn notify_read_effects(
1807        &self,
1808        task_name: &'static str,
1809        digest: TransactionDigest,
1810    ) -> SuiResult<TransactionEffects> {
1811        Ok(self
1812            .get_transaction_cache_reader()
1813            .notify_read_executed_effects(task_name, &[digest])
1814            .await
1815            .pop()
1816            .expect("must return correct number of effects"))
1817    }
1818
1819    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1820        self.get_object_cache_reader()
1821            .check_owned_objects_are_live(owned_object_refs)
1822    }
1823
1824    /// This function captures the required state to debug a forked transaction.
1825    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1826    /// NOTE: Since this info escapes the validator context,
1827    /// make sure not to leak any private info here
1828    pub(crate) fn debug_dump_transaction_state(
1829        &self,
1830        tx_digest: &TransactionDigest,
1831        effects: &TransactionEffects,
1832        expected_effects_digest: TransactionEffectsDigest,
1833        inner_temporary_store: &InnerTemporaryStore,
1834        certificate: &VerifiedExecutableTransaction,
1835        debug_dump_config: &StateDebugDumpConfig,
1836    ) -> SuiResult<PathBuf> {
1837        let dump_dir = debug_dump_config
1838            .dump_file_directory
1839            .as_ref()
1840            .cloned()
1841            .unwrap_or(std::env::temp_dir());
1842        let epoch_store = self.load_epoch_store_one_call_per_task();
1843
1844        NodeStateDump::new(
1845            tx_digest,
1846            effects,
1847            expected_effects_digest,
1848            self.get_object_store().as_ref(),
1849            &epoch_store,
1850            inner_temporary_store,
1851            certificate,
1852        )?
1853        .write_to_file(&dump_dir)
1854        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1855    }
1856
1857    #[instrument(level = "trace", skip_all)]
1858    pub(crate) fn process_certificate(
1859        &self,
1860        tx_guard: &CertTxGuard,
1861        execution_guard: &ExecutionLockReadGuard<'_>,
1862        certificate: &VerifiedExecutableTransaction,
1863        execution_env: ExecutionEnv,
1864        epoch_store: &Arc<AuthorityPerEpochStore>,
1865    ) -> ExecutionOutput<(
1866        TransactionOutputs,
1867        Vec<ExecutionTiming>,
1868        Option<ExecutionError>,
1869    )> {
1870        let _scope = monitored_scope("Execution::process_certificate");
1871        let tx_digest = *certificate.digest();
1872
1873        let input_objects = match self.read_objects_for_execution(
1874            tx_guard.as_lock_guard(),
1875            certificate,
1876            execution_env.assigned_versions,
1877            epoch_store,
1878        ) {
1879            Ok(objects) => objects,
1880            Err(e) => return ExecutionOutput::Fatal(e),
1881        };
1882
1883        let expected_effects_digest = match execution_env.expected_effects_digest {
1884            Some(expected_effects_digest) => Some(expected_effects_digest),
1885            None => {
1886                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1887                // restarting with a new binary. In this situation, if we have published an effects signature,
1888                // we must be sure not to equivocate.
1889                // TODO: read from cache instead of DB
1890                match epoch_store.get_signed_effects_digest(&tx_digest) {
1891                    Ok(digest) => digest,
1892                    Err(e) => return ExecutionOutput::Fatal(e),
1893                }
1894            }
1895        };
1896
1897        fail_point_if!("correlated-crash-process-certificate", || {
1898            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1899                sui_simulator::task::kill_current_node(None);
1900            }
1901        });
1902
1903        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1904        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1905        // this function occur before we have written anything to the db, so we commit the tx
1906        // guard and rely on the client to retry the tx (if it was transient).
1907        self.execute_certificate(
1908            execution_guard,
1909            certificate,
1910            input_objects,
1911            expected_effects_digest,
1912            execution_env.withdraw_status,
1913            epoch_store,
1914        )
1915    }
1916
1917    pub async fn reconfigure_traffic_control(
1918        &self,
1919        params: TrafficControlReconfigParams,
1920    ) -> Result<TrafficControlReconfigParams, SuiError> {
1921        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1922            traffic_controller.admin_reconfigure(params).await
1923        } else {
1924            Err(SuiErrorKind::InvalidAdminRequest(
1925                "Traffic controller is not configured on this node".to_string(),
1926            )
1927            .into())
1928        }
1929    }
1930
1931    #[instrument(level = "trace", skip_all)]
1932    fn commit_certificate(
1933        &self,
1934        certificate: &VerifiedExecutableTransaction,
1935        transaction_outputs: Arc<TransactionOutputs>,
1936        epoch_store: &Arc<AuthorityPerEpochStore>,
1937    ) -> SuiResult {
1938        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1939            monitored_scope("Execution::commit_certificate");
1940        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1941
1942        let tx_digest = certificate.digest();
1943
1944        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1945        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1946        epoch_store.insert_executed_in_epoch(tx_digest);
1947        let key = certificate.key();
1948        if !matches!(key, TransactionKey::Digest(_)) {
1949            epoch_store.insert_tx_key(key, *tx_digest)?;
1950        }
1951
1952        // Allow testing what happens if we crash here.
1953        fail_point!("crash");
1954
1955        self.get_cache_writer()
1956            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1957
1958        if certificate.transaction_data().is_end_of_epoch_tx() {
1959            // At the end of epoch, since system packages may have been upgraded, force
1960            // reload them in the cache.
1961            self.get_object_cache_reader()
1962                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1963        }
1964
1965        Ok(())
1966    }
1967
1968    fn update_metrics(
1969        &self,
1970        certificate: &VerifiedExecutableTransaction,
1971        inner_temporary_store: &InnerTemporaryStore,
1972        effects: &TransactionEffects,
1973    ) {
1974        // count signature by scheme, for zklogin and multisig
1975        if certificate.has_zklogin_sig() {
1976            self.metrics.zklogin_sig_count.inc();
1977        } else if certificate.has_upgraded_multisig() {
1978            self.metrics.multisig_sig_count.inc();
1979        }
1980
1981        self.metrics.total_effects.inc();
1982        self.metrics.total_certs.inc();
1983
1984        let consensus_object_count = effects.input_consensus_objects().len();
1985        if consensus_object_count > 0 {
1986            self.metrics.shared_obj_tx.inc();
1987        }
1988
1989        if certificate.is_sponsored_tx() {
1990            self.metrics.sponsored_tx.inc();
1991        }
1992
1993        let input_object_count = inner_temporary_store.input_objects.len();
1994        self.metrics
1995            .num_input_objs
1996            .observe(input_object_count as f64);
1997        self.metrics
1998            .num_shared_objects
1999            .observe(consensus_object_count as f64);
2000        self.metrics.batch_size.observe(
2001            certificate
2002                .data()
2003                .intent_message()
2004                .value
2005                .kind()
2006                .num_commands() as f64,
2007        );
2008    }
2009
2010    /// execute_certificate validates the transaction input, and executes the certificate,
2011    /// returning transaction outputs.
2012    ///
2013    /// It reads state from the db (both owned and shared locks), but it has no side effects.
2014    ///
2015    /// Executes a certificate and returns an ExecutionOutput.
2016    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
2017    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
2018    #[instrument(level = "trace", skip_all)]
2019    fn execute_certificate(
2020        &self,
2021        _execution_guard: &ExecutionLockReadGuard<'_>,
2022        certificate: &VerifiedExecutableTransaction,
2023        input_objects: InputObjects,
2024        expected_effects_digest: Option<TransactionEffectsDigest>,
2025        withdraw_status: BalanceWithdrawStatus,
2026        epoch_store: &Arc<AuthorityPerEpochStore>,
2027    ) -> ExecutionOutput<(
2028        TransactionOutputs,
2029        Vec<ExecutionTiming>,
2030        Option<ExecutionError>,
2031    )> {
2032        let _scope = monitored_scope("Execution::prepare_certificate");
2033        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2034        let prepare_certificate_start_time = tokio::time::Instant::now();
2035
2036        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
2037        let tx_data = certificate.data().transaction_data();
2038        if let Err(e) = tx_data.validity_check(epoch_store.protocol_config()) {
2039            return ExecutionOutput::Fatal(e.into());
2040        }
2041
2042        // The cost of partially re-auditing a transaction before execution is tolerated.
2043        // This step is required for correctness because, for example, ConsensusAddressOwner
2044        // object owner may have changed between signing and execution.
2045        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2046            certificate,
2047            input_objects,
2048            epoch_store.protocol_config(),
2049            epoch_store.reference_gas_price(),
2050        ) {
2051            Ok(result) => result,
2052            Err(e) => return ExecutionOutput::Fatal(e),
2053        };
2054
2055        let owned_object_refs = input_objects.inner().filter_owned_objects();
2056        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2057            return ExecutionOutput::Fatal(e);
2058        }
2059        let tx_digest = *certificate.digest();
2060        let protocol_config = epoch_store.protocol_config();
2061        let transaction_data = &certificate.data().intent_message().value;
2062        let (kind, signer, gas_data) = transaction_data.execution_parts();
2063        let early_execution_error = get_early_execution_error(
2064            &tx_digest,
2065            &input_objects,
2066            self.config.certificate_deny_config.certificate_deny_set(),
2067            &withdraw_status,
2068        );
2069        let execution_params = match early_execution_error {
2070            Some(error) => ExecutionOrEarlyError::Err(error),
2071            None => ExecutionOrEarlyError::Ok(()),
2072        };
2073
2074        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2075
2076        #[allow(unused_mut)]
2077        let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2078            epoch_store.executor().execute_transaction_to_effects(
2079                &tracking_store,
2080                protocol_config,
2081                self.metrics.limits_metrics.clone(),
2082                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2083                // cyclic dependency w/ sui-adapter
2084                self.config
2085                    .expensive_safety_check_config
2086                    .enable_deep_per_tx_sui_conservation_check(),
2087                execution_params,
2088                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2089                epoch_store
2090                    .epoch_start_config()
2091                    .epoch_data()
2092                    .epoch_start_timestamp(),
2093                input_objects,
2094                gas_data,
2095                gas_status,
2096                kind,
2097                signer,
2098                tx_digest,
2099                &mut None,
2100            );
2101
2102        if let Some(expected_effects_digest) = expected_effects_digest
2103            && effects.digest() != expected_effects_digest
2104        {
2105            // We dont want to mask the original error, so we log it and continue.
2106            match self.debug_dump_transaction_state(
2107                &tx_digest,
2108                &effects,
2109                expected_effects_digest,
2110                &inner_temp_store,
2111                certificate,
2112                &self.config.state_debug_dump_config,
2113            ) {
2114                Ok(out_path) => {
2115                    info!(
2116                        "Dumped node state for transaction {} to {}",
2117                        tx_digest,
2118                        out_path.as_path().display().to_string()
2119                    );
2120                }
2121                Err(e) => {
2122                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2123                }
2124            }
2125            let expected_effects = self
2126                .get_transaction_cache_reader()
2127                .get_effects(&expected_effects_digest);
2128            error!(
2129                ?tx_digest,
2130                ?expected_effects_digest,
2131                actual_effects = ?effects,
2132                expected_effects = ?expected_effects,
2133                "fork detected!"
2134            );
2135            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2136                tx_digest,
2137                expected_effects_digest,
2138                effects.digest(),
2139            ) {
2140                error!("Failed to record transaction fork: {e}");
2141            }
2142
2143            fail_point_if!("kill_transaction_fork_node", || {
2144                #[cfg(msim)]
2145                {
2146                    tracing::error!(
2147                        fatal = true,
2148                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2149                        tx_digest
2150                    );
2151                    sui_simulator::task::shutdown_current_node();
2152                }
2153            });
2154
2155            fatal!(
2156                "Transaction {} is expected to have effects digest {}, but got {}!",
2157                tx_digest,
2158                expected_effects_digest,
2159                effects.digest()
2160            );
2161        }
2162
2163        fail_point_arg!("simulate_fork_during_execution", |(
2164            forked_validators,
2165            full_halt,
2166            effects_overrides,
2167            fork_probability,
2168        ): (
2169            std::sync::Arc<
2170                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2171            >,
2172            bool,
2173            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2174            f32,
2175        )| {
2176            #[cfg(msim)]
2177            self.simulate_fork_during_execution(
2178                certificate,
2179                epoch_store,
2180                &mut effects,
2181                forked_validators,
2182                full_halt,
2183                effects_overrides,
2184                fork_probability,
2185            );
2186        });
2187
2188        let unchanged_loaded_runtime_objects =
2189            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2190                certificate.transaction_data(),
2191                &effects,
2192                &tracking_store.into_read_objects(),
2193            );
2194
2195        // index certificate
2196        let _ = self
2197            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2198            .tap_err(|e| {
2199                self.metrics.post_processing_total_failures.inc();
2200                error!(?tx_digest, "tx post processing failed: {e}");
2201            });
2202
2203        self.update_metrics(certificate, &inner_temp_store, &effects);
2204
2205        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2206            certificate.clone().into_unsigned(),
2207            effects,
2208            inner_temp_store,
2209            unchanged_loaded_runtime_objects,
2210        );
2211
2212        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2213        if elapsed > 0.0 {
2214            self.metrics.prepare_cert_gas_latency_ratio.observe(
2215                transaction_outputs
2216                    .effects
2217                    .gas_cost_summary()
2218                    .computation_cost as f64
2219                    / elapsed,
2220            );
2221        }
2222
2223        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2224    }
2225
2226    pub fn prepare_certificate_for_benchmark(
2227        &self,
2228        certificate: &VerifiedExecutableTransaction,
2229        input_objects: InputObjects,
2230        epoch_store: &Arc<AuthorityPerEpochStore>,
2231    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2232        let lock = RwLock::new(epoch_store.epoch());
2233        let execution_guard = lock.try_read().unwrap();
2234
2235        let (transaction_outputs, _timings, execution_error_opt) = self
2236            .execute_certificate(
2237                &execution_guard,
2238                certificate,
2239                input_objects,
2240                None,
2241                BalanceWithdrawStatus::NoWithdraw,
2242                epoch_store,
2243            )
2244            .unwrap();
2245        Ok((transaction_outputs, execution_error_opt))
2246    }
2247
2248    #[instrument(skip_all)]
2249    #[allow(clippy::type_complexity)]
2250    pub async fn dry_exec_transaction(
2251        &self,
2252        transaction: TransactionData,
2253        transaction_digest: TransactionDigest,
2254    ) -> SuiResult<(
2255        DryRunTransactionBlockResponse,
2256        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2257        TransactionEffects,
2258        Option<ObjectID>,
2259    )> {
2260        let epoch_store = self.load_epoch_store_one_call_per_task();
2261        if !self.is_fullnode(&epoch_store) {
2262            return Err(SuiErrorKind::UnsupportedFeatureError {
2263                error: "dry-exec is only supported on fullnodes".to_string(),
2264            }
2265            .into());
2266        }
2267
2268        if transaction.kind().is_system_tx() {
2269            return Err(SuiErrorKind::UnsupportedFeatureError {
2270                error: "dry-exec does not support system transactions".to_string(),
2271            }
2272            .into());
2273        }
2274
2275        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2276    }
2277
2278    #[allow(clippy::type_complexity)]
2279    pub fn dry_exec_transaction_for_benchmark(
2280        &self,
2281        transaction: TransactionData,
2282        transaction_digest: TransactionDigest,
2283    ) -> SuiResult<(
2284        DryRunTransactionBlockResponse,
2285        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2286        TransactionEffects,
2287        Option<ObjectID>,
2288    )> {
2289        let epoch_store = self.load_epoch_store_one_call_per_task();
2290        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2291    }
2292
2293    #[allow(clippy::type_complexity)]
2294    fn dry_exec_transaction_impl(
2295        &self,
2296        epoch_store: &AuthorityPerEpochStore,
2297        transaction: TransactionData,
2298        transaction_digest: TransactionDigest,
2299    ) -> SuiResult<(
2300        DryRunTransactionBlockResponse,
2301        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2302        TransactionEffects,
2303        Option<ObjectID>,
2304    )> {
2305        // Cheap validity checks for a transaction, including input size limits.
2306        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2307
2308        let input_object_kinds = transaction.input_objects()?;
2309        let receiving_object_refs = transaction.receiving_objects();
2310
2311        sui_transaction_checks::deny::check_transaction_for_signing(
2312            &transaction,
2313            &[],
2314            &input_object_kinds,
2315            &receiving_object_refs,
2316            &self.config.transaction_deny_config,
2317            self.get_backing_package_store().as_ref(),
2318        )?;
2319
2320        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2321            // We don't want to cache this transaction since it's a dry run.
2322            None,
2323            &input_object_kinds,
2324            &receiving_object_refs,
2325            epoch_store.epoch(),
2326        )?;
2327
2328        // make a gas object if one was not provided
2329        let mut gas_data = transaction.gas_data().clone();
2330        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2331            let sender = transaction.sender();
2332            // use a 1B sui coin
2333            const MIST_TO_SUI: u64 = 1_000_000_000;
2334            const DRY_RUN_SUI: u64 = 1_000_000_000;
2335            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2336            let gas_object_id = ObjectID::random();
2337            let gas_object = Object::new_move(
2338                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2339                Owner::AddressOwner(sender),
2340                TransactionDigest::genesis_marker(),
2341            );
2342            let gas_object_ref = gas_object.compute_object_reference();
2343            gas_data.payment = vec![gas_object_ref];
2344            (
2345                sui_transaction_checks::check_transaction_input_with_given_gas(
2346                    epoch_store.protocol_config(),
2347                    epoch_store.reference_gas_price(),
2348                    &transaction,
2349                    input_objects,
2350                    receiving_objects,
2351                    gas_object,
2352                    &self.metrics.bytecode_verifier_metrics,
2353                    &self.config.verifier_signing_config,
2354                )?,
2355                Some(gas_object_id),
2356            )
2357        } else {
2358            (
2359                sui_transaction_checks::check_transaction_input(
2360                    epoch_store.protocol_config(),
2361                    epoch_store.reference_gas_price(),
2362                    &transaction,
2363                    input_objects,
2364                    &receiving_objects,
2365                    &self.metrics.bytecode_verifier_metrics,
2366                    &self.config.verifier_signing_config,
2367                )?,
2368                None,
2369            )
2370        };
2371
2372        let protocol_config = epoch_store.protocol_config();
2373        let (kind, signer, _) = transaction.execution_parts();
2374
2375        let silent = true;
2376        let executor = sui_execution::executor(protocol_config, silent)
2377            .expect("Creating an executor should not fail here");
2378
2379        let expensive_checks = false;
2380        let early_execution_error = get_early_execution_error(
2381            &transaction_digest,
2382            &checked_input_objects,
2383            self.config.certificate_deny_config.certificate_deny_set(),
2384            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2385            &BalanceWithdrawStatus::NoWithdraw,
2386        );
2387        let execution_params = match early_execution_error {
2388            Some(error) => ExecutionOrEarlyError::Err(error),
2389            None => ExecutionOrEarlyError::Ok(()),
2390        };
2391        let (inner_temp_store, _, effects, _timings, execution_error) = executor
2392            .execute_transaction_to_effects(
2393                self.get_backing_store().as_ref(),
2394                protocol_config,
2395                self.metrics.limits_metrics.clone(),
2396                expensive_checks,
2397                execution_params,
2398                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2399                epoch_store
2400                    .epoch_start_config()
2401                    .epoch_data()
2402                    .epoch_start_timestamp(),
2403                checked_input_objects,
2404                gas_data,
2405                gas_status,
2406                kind,
2407                signer,
2408                transaction_digest,
2409                &mut None,
2410            );
2411        let tx_digest = *effects.transaction_digest();
2412
2413        let module_cache =
2414            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2415
2416        let mut layout_resolver =
2417            epoch_store
2418                .executor()
2419                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2420                    &inner_temp_store,
2421                    self.get_backing_package_store(),
2422                )));
2423        // Returning empty vector here because we recalculate changes in the rpc layer.
2424        let object_changes = Vec::new();
2425
2426        // Returning empty vector here because we recalculate changes in the rpc layer.
2427        let balance_changes = Vec::new();
2428
2429        let written_with_kind = effects
2430            .created()
2431            .into_iter()
2432            .map(|(oref, _)| (oref, WriteKind::Create))
2433            .chain(
2434                effects
2435                    .unwrapped()
2436                    .into_iter()
2437                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2438            )
2439            .chain(
2440                effects
2441                    .mutated()
2442                    .into_iter()
2443                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2444            )
2445            .map(|(oref, kind)| {
2446                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2447                // TODO: Avoid clones.
2448                (oref.0, (oref, obj.clone(), kind))
2449            })
2450            .collect();
2451
2452        let execution_error_source = execution_error
2453            .as_ref()
2454            .err()
2455            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2456
2457        Ok((
2458            DryRunTransactionBlockResponse {
2459                suggested_gas_price: self
2460                    .congestion_tracker
2461                    .get_suggested_gas_prices(&transaction),
2462                input: SuiTransactionBlockData::try_from_with_module_cache(
2463                    transaction,
2464                    &module_cache,
2465                )
2466                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2467                    error: format!(
2468                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2469                        e
2470                    ),
2471                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2472                effects: effects.clone().try_into()?,
2473                events: SuiTransactionBlockEvents::try_from(
2474                    inner_temp_store.events.clone(),
2475                    tx_digest,
2476                    None,
2477                    layout_resolver.as_mut(),
2478                )?,
2479                object_changes,
2480                balance_changes,
2481                execution_error_source,
2482            },
2483            written_with_kind,
2484            effects,
2485            mock_gas,
2486        ))
2487    }
2488
2489    pub fn simulate_transaction(
2490        &self,
2491        mut transaction: TransactionData,
2492        checks: TransactionChecks,
2493    ) -> SuiResult<SimulateTransactionResult> {
2494        if transaction.kind().is_system_tx() {
2495            return Err(SuiErrorKind::UnsupportedFeatureError {
2496                error: "simulate does not support system transactions".to_string(),
2497            }
2498            .into());
2499        }
2500
2501        let epoch_store = self.load_epoch_store_one_call_per_task();
2502        if !self.is_fullnode(&epoch_store) {
2503            return Err(SuiErrorKind::UnsupportedFeatureError {
2504                error: "simulate is only supported on fullnodes".to_string(),
2505            }
2506            .into());
2507        }
2508
2509        // Cheap validity checks for a transaction, including input size limits.
2510        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2511
2512        let input_object_kinds = transaction.input_objects()?;
2513        let receiving_object_refs = transaction.receiving_objects();
2514
2515        sui_transaction_checks::deny::check_transaction_for_signing(
2516            &transaction,
2517            &[],
2518            &input_object_kinds,
2519            &receiving_object_refs,
2520            &self.config.transaction_deny_config,
2521            self.get_backing_package_store().as_ref(),
2522        )?;
2523
2524        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2525            // We don't want to cache this transaction since it's a simulation.
2526            None,
2527            &input_object_kinds,
2528            &receiving_object_refs,
2529            epoch_store.epoch(),
2530        )?;
2531
2532        // mock a gas object if one was not provided
2533        let mock_gas_id = if transaction.gas().is_empty() {
2534            let mock_gas_object = Object::new_move(
2535                MoveObject::new_gas_coin(
2536                    OBJECT_START_VERSION,
2537                    ObjectID::MAX,
2538                    DEV_INSPECT_GAS_COIN_VALUE,
2539                ),
2540                Owner::AddressOwner(transaction.gas_data().owner),
2541                TransactionDigest::genesis_marker(),
2542            );
2543            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2544            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2545            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2546            Some(mock_gas_object.id())
2547        } else {
2548            None
2549        };
2550
2551        let protocol_config = epoch_store.protocol_config();
2552
2553        let (gas_status, checked_input_objects) = if checks.enabled() {
2554            sui_transaction_checks::check_transaction_input(
2555                epoch_store.protocol_config(),
2556                epoch_store.reference_gas_price(),
2557                &transaction,
2558                input_objects,
2559                &receiving_objects,
2560                &self.metrics.bytecode_verifier_metrics,
2561                &self.config.verifier_signing_config,
2562            )?
2563        } else {
2564            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2565                protocol_config,
2566                transaction.kind(),
2567                input_objects,
2568                receiving_objects,
2569            )?;
2570            let gas_status = SuiGasStatus::new(
2571                transaction.gas_budget(),
2572                transaction.gas_price(),
2573                epoch_store.reference_gas_price(),
2574                protocol_config,
2575            )?;
2576
2577            (gas_status, checked_input_objects)
2578        };
2579
2580        // TODO see if we can spin up a VM once and reuse it
2581        let executor = sui_execution::executor(
2582            protocol_config,
2583            true, // silent
2584        )
2585        .expect("Creating an executor should not fail here");
2586
2587        let (kind, signer, gas_data) = transaction.execution_parts();
2588        let early_execution_error = get_early_execution_error(
2589            &transaction.digest(),
2590            &checked_input_objects,
2591            self.config.certificate_deny_config.certificate_deny_set(),
2592            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2593            &BalanceWithdrawStatus::NoWithdraw,
2594        );
2595        let execution_params = match early_execution_error {
2596            Some(error) => ExecutionOrEarlyError::Err(error),
2597            None => ExecutionOrEarlyError::Ok(()),
2598        };
2599
2600        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2601
2602        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2603            &tracking_store,
2604            protocol_config,
2605            self.metrics.limits_metrics.clone(),
2606            false, // expensive_checks
2607            execution_params,
2608            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2609            epoch_store
2610                .epoch_start_config()
2611                .epoch_data()
2612                .epoch_start_timestamp(),
2613            checked_input_objects,
2614            gas_data,
2615            gas_status,
2616            kind,
2617            signer,
2618            transaction.digest(),
2619            checks.disabled(),
2620        );
2621
2622        let loaded_runtime_objects = tracking_store.into_read_objects();
2623        let unchanged_loaded_runtime_objects =
2624            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2625                &transaction,
2626                &effects,
2627                &loaded_runtime_objects,
2628            );
2629
2630        let object_set = {
2631            let objects = {
2632                let mut objects = loaded_runtime_objects;
2633
2634                for o in inner_temp_store
2635                    .input_objects
2636                    .into_values()
2637                    .chain(inner_temp_store.written.into_values())
2638                {
2639                    objects.insert(o);
2640                }
2641
2642                objects
2643            };
2644
2645            let object_keys = sui_types::storage::get_transaction_object_set(
2646                &transaction,
2647                &effects,
2648                &unchanged_loaded_runtime_objects,
2649            );
2650
2651            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2652            for k in object_keys {
2653                if let Some(o) = objects.get(&k) {
2654                    set.insert(o.clone());
2655                }
2656            }
2657
2658            set
2659        };
2660
2661        Ok(SimulateTransactionResult {
2662            objects: object_set,
2663            events: effects.events_digest().map(|_| inner_temp_store.events),
2664            effects,
2665            execution_result,
2666            mock_gas_id,
2667            unchanged_loaded_runtime_objects,
2668        })
2669    }
2670
2671    /// The object ID for gas can be any object ID, even for an uncreated object
2672    #[allow(clippy::collapsible_else_if)]
2673    #[instrument(skip_all)]
2674    pub async fn dev_inspect_transaction_block(
2675        &self,
2676        sender: SuiAddress,
2677        transaction_kind: TransactionKind,
2678        gas_price: Option<u64>,
2679        gas_budget: Option<u64>,
2680        gas_sponsor: Option<SuiAddress>,
2681        gas_objects: Option<Vec<ObjectRef>>,
2682        show_raw_txn_data_and_effects: Option<bool>,
2683        skip_checks: Option<bool>,
2684    ) -> SuiResult<DevInspectResults> {
2685        let epoch_store = self.load_epoch_store_one_call_per_task();
2686
2687        if !self.is_fullnode(&epoch_store) {
2688            return Err(SuiErrorKind::UnsupportedFeatureError {
2689                error: "dev-inspect is only supported on fullnodes".to_string(),
2690            }
2691            .into());
2692        }
2693
2694        if transaction_kind.is_system_tx() {
2695            return Err(SuiErrorKind::UnsupportedFeatureError {
2696                error: "system transactions are not supported".to_string(),
2697            }
2698            .into());
2699        }
2700
2701        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2702        let skip_checks = skip_checks.unwrap_or(true);
2703        let reference_gas_price = epoch_store.reference_gas_price();
2704        let protocol_config = epoch_store.protocol_config();
2705        let max_tx_gas = protocol_config.max_tx_gas();
2706
2707        let price = gas_price.unwrap_or(reference_gas_price);
2708        let budget = gas_budget.unwrap_or(max_tx_gas);
2709        let owner = gas_sponsor.unwrap_or(sender);
2710        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2711        let payment = gas_objects.unwrap_or_default();
2712        let mut transaction = TransactionData::V1(TransactionDataV1 {
2713            kind: transaction_kind.clone(),
2714            sender,
2715            gas_data: GasData {
2716                payment,
2717                owner,
2718                price,
2719                budget,
2720            },
2721            expiration: TransactionExpiration::None,
2722        });
2723
2724        let raw_txn_data = if show_raw_txn_data_and_effects {
2725            bcs::to_bytes(&transaction).map_err(|_| {
2726                SuiErrorKind::TransactionSerializationError {
2727                    error: "Failed to serialize transaction during dev inspect".to_string(),
2728                }
2729            })?
2730        } else {
2731            vec![]
2732        };
2733
2734        transaction.validity_check_no_gas_check(protocol_config)?;
2735
2736        let input_object_kinds = transaction.input_objects()?;
2737        let receiving_object_refs = transaction.receiving_objects();
2738
2739        sui_transaction_checks::deny::check_transaction_for_signing(
2740            &transaction,
2741            &[],
2742            &input_object_kinds,
2743            &receiving_object_refs,
2744            &self.config.transaction_deny_config,
2745            self.get_backing_package_store().as_ref(),
2746        )?;
2747
2748        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2749            // We don't want to cache this transaction since it's a dev inspect.
2750            None,
2751            &input_object_kinds,
2752            &receiving_object_refs,
2753            epoch_store.epoch(),
2754        )?;
2755
2756        let (gas_status, checked_input_objects) = if skip_checks {
2757            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2758            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2759            // use the dummy gas object so we need to add it to the input objects vector.
2760            if transaction.gas().is_empty() {
2761                // Create and use a dummy gas object if there is no gas object provided.
2762                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2763                    DEV_INSPECT_GAS_COIN_VALUE,
2764                    transaction.gas_owner(),
2765                );
2766                let gas_object_ref = dummy_gas_object.compute_object_reference();
2767                transaction.gas_data_mut().payment = vec![gas_object_ref];
2768                input_objects.push(ObjectReadResult::new(
2769                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2770                    dummy_gas_object.into(),
2771                ));
2772            }
2773            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2774                protocol_config,
2775                &transaction_kind,
2776                input_objects,
2777                receiving_objects,
2778            )?;
2779            let gas_status = SuiGasStatus::new(
2780                max_tx_gas,
2781                transaction.gas_price(),
2782                reference_gas_price,
2783                protocol_config,
2784            )?;
2785
2786            (gas_status, checked_input_objects)
2787        } else {
2788            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2789            // variant which will perform full fledged checks just like a real transaction execution.
2790            if transaction.gas().is_empty() {
2791                // Create and use a dummy gas object if there is no gas object provided.
2792                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2793                    DEV_INSPECT_GAS_COIN_VALUE,
2794                    transaction.gas_owner(),
2795                );
2796                let gas_object_ref = dummy_gas_object.compute_object_reference();
2797                transaction.gas_data_mut().payment = vec![gas_object_ref];
2798                sui_transaction_checks::check_transaction_input_with_given_gas(
2799                    epoch_store.protocol_config(),
2800                    epoch_store.reference_gas_price(),
2801                    &transaction,
2802                    input_objects,
2803                    receiving_objects,
2804                    dummy_gas_object,
2805                    &self.metrics.bytecode_verifier_metrics,
2806                    &self.config.verifier_signing_config,
2807                )?
2808            } else {
2809                sui_transaction_checks::check_transaction_input(
2810                    epoch_store.protocol_config(),
2811                    epoch_store.reference_gas_price(),
2812                    &transaction,
2813                    input_objects,
2814                    &receiving_objects,
2815                    &self.metrics.bytecode_verifier_metrics,
2816                    &self.config.verifier_signing_config,
2817                )?
2818            }
2819        };
2820
2821        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2822            .expect("Creating an executor should not fail here");
2823        let gas_data = transaction.gas_data().clone();
2824        let intent_msg = IntentMessage::new(
2825            Intent {
2826                version: IntentVersion::V0,
2827                scope: IntentScope::TransactionData,
2828                app_id: AppId::Sui,
2829            },
2830            transaction,
2831        );
2832        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2833        let early_execution_error = get_early_execution_error(
2834            &transaction_digest,
2835            &checked_input_objects,
2836            self.config.certificate_deny_config.certificate_deny_set(),
2837            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2838            &BalanceWithdrawStatus::NoWithdraw,
2839        );
2840        let execution_params = match early_execution_error {
2841            Some(error) => ExecutionOrEarlyError::Err(error),
2842            None => ExecutionOrEarlyError::Ok(()),
2843        };
2844        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2845            self.get_backing_store().as_ref(),
2846            protocol_config,
2847            self.metrics.limits_metrics.clone(),
2848            /* expensive checks */ false,
2849            execution_params,
2850            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2851            epoch_store
2852                .epoch_start_config()
2853                .epoch_data()
2854                .epoch_start_timestamp(),
2855            checked_input_objects,
2856            gas_data,
2857            gas_status,
2858            transaction_kind,
2859            sender,
2860            transaction_digest,
2861            skip_checks,
2862        );
2863
2864        let raw_effects = if show_raw_txn_data_and_effects {
2865            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2866                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2867            })?
2868        } else {
2869            vec![]
2870        };
2871
2872        let mut layout_resolver =
2873            epoch_store
2874                .executor()
2875                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2876                    &inner_temp_store,
2877                    self.get_backing_package_store(),
2878                )));
2879
2880        DevInspectResults::new(
2881            effects,
2882            inner_temp_store.events.clone(),
2883            execution_result,
2884            raw_txn_data,
2885            raw_effects,
2886            layout_resolver.as_mut(),
2887        )
2888    }
2889
2890    // Only used for testing because of how epoch store is loaded.
2891    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2892        let epoch_store = self.epoch_store_for_testing();
2893        Ok(epoch_store.reference_gas_price())
2894    }
2895
2896    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2897        self.get_transaction_cache_reader()
2898            .is_tx_already_executed(digest)
2899    }
2900
2901    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2902    fn index_tx(
2903        &self,
2904        indexes: &IndexStore,
2905        digest: &TransactionDigest,
2906        // TODO: index_tx really just need the transaction data here.
2907        cert: &VerifiedExecutableTransaction,
2908        effects: &TransactionEffects,
2909        events: &TransactionEvents,
2910        timestamp_ms: u64,
2911        tx_coins: Option<TxCoins>,
2912        written: &WrittenObjects,
2913        inner_temporary_store: &InnerTemporaryStore,
2914        epoch_store: &Arc<AuthorityPerEpochStore>,
2915    ) -> SuiResult<u64> {
2916        let changes = self
2917            .process_object_index(effects, written, inner_temporary_store, epoch_store)
2918            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2919
2920        indexes.index_tx(
2921            cert.data().intent_message().value.sender(),
2922            cert.data()
2923                .intent_message()
2924                .value
2925                .input_objects()?
2926                .iter()
2927                .map(|o| o.object_id()),
2928            effects
2929                .all_changed_objects()
2930                .into_iter()
2931                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2932            cert.data()
2933                .intent_message()
2934                .value
2935                .move_calls()
2936                .into_iter()
2937                .map(|(package, module, function)| {
2938                    (*package, module.to_owned(), function.to_owned())
2939                }),
2940            events,
2941            changes,
2942            digest,
2943            timestamp_ms,
2944            tx_coins,
2945        )
2946    }
2947
2948    #[cfg(msim)]
2949    fn simulate_fork_during_execution(
2950        &self,
2951        certificate: &VerifiedExecutableTransaction,
2952        epoch_store: &Arc<AuthorityPerEpochStore>,
2953        effects: &mut TransactionEffects,
2954        forked_validators: std::sync::Arc<
2955            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2956        >,
2957        full_halt: bool,
2958        effects_overrides: std::sync::Arc<
2959            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2960        >,
2961        fork_probability: f32,
2962    ) {
2963        use std::cell::RefCell;
2964        thread_local! {
2965            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2966        }
2967        if !certificate.data().intent_message().value.is_system_tx() {
2968            let committee = epoch_store.committee();
2969            let cur_stake = (**committee).weight(&self.name);
2970            if cur_stake > 0 {
2971                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2972                    let already_forked = forked_validators
2973                        .lock()
2974                        .ok()
2975                        .map(|set| set.contains(&self.name))
2976                        .unwrap_or(false);
2977
2978                    if !already_forked {
2979                        let should_fork = if full_halt {
2980                            // For full halt, fork enough nodes to reach validity threshold
2981                            *total_stake <= committee.validity_threshold()
2982                        } else {
2983                            // For partial fork, stay strictly below validity threshold
2984                            *total_stake + cur_stake < committee.validity_threshold()
2985                        };
2986
2987                        if should_fork {
2988                            *total_stake += cur_stake;
2989
2990                            if let Ok(mut external_set) = forked_validators.lock() {
2991                                external_set.insert(self.name);
2992                                info!("forked_validators: {:?}", external_set);
2993                            }
2994                        }
2995                    }
2996
2997                    if let Ok(external_set) = forked_validators.lock() {
2998                        if external_set.contains(&self.name) {
2999                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
3000                            // Fork this transaction and record the digest and the original effects to original_effects.
3001                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
3002                            // the transaction was forked on a different validator), fork this txn as well.
3003
3004                            let tx_digest = certificate.digest().to_string();
3005                            if let Ok(mut overrides) = effects_overrides.lock() {
3006                                if overrides.contains_key(&tx_digest)
3007                                    || overrides.is_empty()
3008                                        && sui_simulator::random::deterministic_probability(
3009                                            &tx_digest,
3010                                            fork_probability,
3011                                        )
3012                                {
3013                                    let original_effects_digest = effects.digest().to_string();
3014                                    overrides
3015                                        .insert(tx_digest.clone(), original_effects_digest.clone());
3016                                    info!(
3017                                        ?tx_digest,
3018                                        ?original_effects_digest,
3019                                        "Captured forked effects digest for transaction"
3020                                    );
3021                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
3022                                        1;
3023                                }
3024                            }
3025                        }
3026                    }
3027                });
3028            }
3029        }
3030    }
3031
3032    fn process_object_index(
3033        &self,
3034        effects: &TransactionEffects,
3035        written: &WrittenObjects,
3036        inner_temporary_store: &InnerTemporaryStore,
3037        epoch_store: &Arc<AuthorityPerEpochStore>,
3038    ) -> SuiResult<ObjectIndexChanges> {
3039        let mut layout_resolver =
3040            epoch_store
3041                .executor()
3042                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3043                    inner_temporary_store,
3044                    self.get_backing_package_store(),
3045                )));
3046
3047        let modified_at_version = effects
3048            .modified_at_versions()
3049            .into_iter()
3050            .collect::<HashMap<_, _>>();
3051
3052        let tx_digest = effects.transaction_digest();
3053        let mut deleted_owners = vec![];
3054        let mut deleted_dynamic_fields = vec![];
3055        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3056            let old_version = modified_at_version.get(&id).unwrap();
3057            // When we process the index, the latest object hasn't been written yet so
3058            // the old object must be present.
3059            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3060                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3061            ) {
3062                Owner::AddressOwner(addr)
3063                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3064                Owner::ObjectOwner(object_id) => {
3065                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3066                }
3067                _ => {}
3068            }
3069        }
3070
3071        let mut new_owners = vec![];
3072        let mut new_dynamic_fields = vec![];
3073
3074        for (oref, owner, kind) in effects.all_changed_objects() {
3075            let id = &oref.0;
3076            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3077            if let WriteKind::Mutate = kind {
3078                let Some(old_version) = modified_at_version.get(id) else {
3079                    panic!(
3080                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3081                        tx_digest
3082                    );
3083                };
3084                // When we process the index, the latest object hasn't been written yet so
3085                // the old object must be present.
3086                let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3087                else {
3088                    panic!(
3089                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3090                        tx_digest, id, old_version
3091                    );
3092                };
3093                if old_object.owner != owner {
3094                    match old_object.owner {
3095                        Owner::AddressOwner(addr)
3096                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3097                            deleted_owners.push((addr, *id));
3098                        }
3099                        Owner::ObjectOwner(object_id) => {
3100                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3101                        }
3102                        _ => {}
3103                    }
3104                }
3105            }
3106
3107            match owner {
3108                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3109                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3110                    let new_object = written.get(id).unwrap_or_else(
3111                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3112                    );
3113                    assert_eq!(
3114                        new_object.version(),
3115                        oref.1,
3116                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3117                        tx_digest,
3118                        id,
3119                        new_object.version(),
3120                        oref.1
3121                    );
3122
3123                    let type_ = new_object
3124                        .type_()
3125                        .map(|type_| ObjectType::Struct(type_.clone()))
3126                        .unwrap_or(ObjectType::Package);
3127
3128                    new_owners.push((
3129                        (addr, *id),
3130                        ObjectInfo {
3131                            object_id: *id,
3132                            version: oref.1,
3133                            digest: oref.2,
3134                            type_,
3135                            owner,
3136                            previous_transaction: *effects.transaction_digest(),
3137                        },
3138                    ));
3139                }
3140                Owner::ObjectOwner(owner) => {
3141                    let new_object = written.get(id).unwrap_or_else(
3142                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3143                    );
3144                    assert_eq!(
3145                        new_object.version(),
3146                        oref.1,
3147                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3148                        tx_digest,
3149                        id,
3150                        new_object.version(),
3151                        oref.1
3152                    );
3153
3154                    let Some(df_info) = self
3155                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3156                        .unwrap_or_else(|e| {
3157                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3158                            None
3159                        }
3160                    )
3161                        else {
3162                            // Skip indexing for non dynamic field objects.
3163                            continue;
3164                        };
3165                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3166                }
3167                _ => {}
3168            }
3169        }
3170
3171        Ok(ObjectIndexChanges {
3172            deleted_owners,
3173            deleted_dynamic_fields,
3174            new_owners,
3175            new_dynamic_fields,
3176        })
3177    }
3178
3179    fn try_create_dynamic_field_info(
3180        &self,
3181        o: &Object,
3182        written: &WrittenObjects,
3183        resolver: &mut dyn LayoutResolver,
3184    ) -> SuiResult<Option<DynamicFieldInfo>> {
3185        // Skip if not a move object
3186        let Some(move_object) = o.data.try_as_move().cloned() else {
3187            return Ok(None);
3188        };
3189
3190        // We only index dynamic field objects
3191        if !move_object.type_().is_dynamic_field() {
3192            return Ok(None);
3193        }
3194
3195        let layout = resolver
3196            .get_annotated_layout(&move_object.type_().clone().into())?
3197            .into_layout();
3198
3199        let field =
3200            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3201                SuiErrorKind::ObjectDeserializationError {
3202                    error: e.to_string(),
3203                }
3204            })?;
3205
3206        let type_ = field.kind;
3207        let name_type: TypeTag = field.name_layout.into();
3208        let bcs_name = field.name_bytes.to_owned();
3209
3210        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3211            .map_err(|e| {
3212                warn!("{e}");
3213                SuiErrorKind::ObjectDeserializationError {
3214                    error: e.to_string(),
3215                }
3216            })?;
3217
3218        let name = DynamicFieldName {
3219            type_: name_type,
3220            value: SuiMoveValue::from(name_value).to_json_value(),
3221        };
3222
3223        let value_metadata = field.value_metadata().map_err(|e| {
3224            warn!("{e}");
3225            SuiErrorKind::ObjectDeserializationError {
3226                error: e.to_string(),
3227            }
3228        })?;
3229
3230        Ok(Some(match value_metadata {
3231            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3232                name,
3233                bcs_name,
3234                type_,
3235                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3236                object_id: o.id(),
3237                version: o.version(),
3238                digest: o.digest(),
3239            },
3240
3241            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3242                // Find the actual object from storage using the object id obtained from the wrapper.
3243
3244                // Try to find the object in the written objects first.
3245                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3246                    let version = object.version();
3247                    let digest = object.digest();
3248                    let object_type = object.data.type_().unwrap().clone();
3249                    (version, digest, object_type)
3250                } else {
3251                    // If not found, try to find it in the database.
3252                    let object = self
3253                        .get_object_store()
3254                        .get_object_by_key(&object_id, o.version())
3255                        .ok_or_else(|| UserInputError::ObjectNotFound {
3256                            object_id,
3257                            version: Some(o.version()),
3258                        })?;
3259                    let version = object.version();
3260                    let digest = object.digest();
3261                    let object_type = object.data.type_().unwrap().clone();
3262                    (version, digest, object_type)
3263                };
3264
3265                DynamicFieldInfo {
3266                    name,
3267                    bcs_name,
3268                    type_,
3269                    object_type: object_type.to_string(),
3270                    object_id,
3271                    version,
3272                    digest,
3273                }
3274            }
3275        }))
3276    }
3277
3278    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3279    fn post_process_one_tx(
3280        &self,
3281        certificate: &VerifiedExecutableTransaction,
3282        effects: &TransactionEffects,
3283        inner_temporary_store: &InnerTemporaryStore,
3284        epoch_store: &Arc<AuthorityPerEpochStore>,
3285    ) -> SuiResult {
3286        if self.indexes.is_none() {
3287            return Ok(());
3288        }
3289
3290        let _scope = monitored_scope("Execution::post_process_one_tx");
3291
3292        let tx_digest = certificate.digest();
3293        let timestamp_ms = Self::unixtime_now_ms();
3294        let events = &inner_temporary_store.events;
3295        let written = &inner_temporary_store.written;
3296        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3297            effects,
3298            inner_temporary_store,
3299            epoch_store,
3300        );
3301
3302        // Index tx
3303        if let Some(indexes) = &self.indexes {
3304            let _ = self
3305                .index_tx(
3306                    indexes.as_ref(),
3307                    tx_digest,
3308                    certificate,
3309                    effects,
3310                    events,
3311                    timestamp_ms,
3312                    tx_coins,
3313                    written,
3314                    inner_temporary_store,
3315                    epoch_store,
3316                )
3317                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3318                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3319                .expect("Indexing tx should not fail");
3320
3321            let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3322            let events = self.make_transaction_block_events(
3323                events.clone(),
3324                *tx_digest,
3325                timestamp_ms,
3326                epoch_store,
3327                inner_temporary_store,
3328            )?;
3329            // Emit events
3330            self.subscription_handler
3331                .process_tx(certificate.data().transaction_data(), &effects, &events)
3332                .tap_ok(|_| {
3333                    self.metrics
3334                        .post_processing_total_tx_had_event_processed
3335                        .inc()
3336                })
3337                .tap_err(|e| {
3338                    warn!(
3339                        ?tx_digest,
3340                        "Post processing - Couldn't process events for tx: {}", e
3341                    )
3342                })?;
3343
3344            self.metrics
3345                .post_processing_total_events_emitted
3346                .inc_by(events.data.len() as u64);
3347        };
3348        Ok(())
3349    }
3350
3351    fn make_transaction_block_events(
3352        &self,
3353        transaction_events: TransactionEvents,
3354        digest: TransactionDigest,
3355        timestamp_ms: u64,
3356        epoch_store: &Arc<AuthorityPerEpochStore>,
3357        inner_temporary_store: &InnerTemporaryStore,
3358    ) -> SuiResult<SuiTransactionBlockEvents> {
3359        let mut layout_resolver =
3360            epoch_store
3361                .executor()
3362                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3363                    inner_temporary_store,
3364                    self.get_backing_package_store(),
3365                )));
3366        SuiTransactionBlockEvents::try_from(
3367            transaction_events,
3368            digest,
3369            Some(timestamp_ms),
3370            layout_resolver.as_mut(),
3371        )
3372    }
3373
3374    pub fn unixtime_now_ms() -> u64 {
3375        let now = SystemTime::now()
3376            .duration_since(UNIX_EPOCH)
3377            .expect("Time went backwards")
3378            .as_millis();
3379        u64::try_from(now).expect("Travelling in time machine")
3380    }
3381
3382    // TODO(fastpath): update this handler for Mysticeti fastpath.
3383    // There will no longer be validator quorum signed transactions or effects.
3384    // The proof of finality needs to come from checkpoints.
3385    #[instrument(level = "trace", skip_all)]
3386    pub async fn handle_transaction_info_request(
3387        &self,
3388        request: TransactionInfoRequest,
3389    ) -> SuiResult<TransactionInfoResponse> {
3390        let epoch_store = self.load_epoch_store_one_call_per_task();
3391        let (transaction, status) = self
3392            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3393            .ok_or(SuiErrorKind::TransactionNotFound {
3394                digest: request.transaction_digest,
3395            })?;
3396        Ok(TransactionInfoResponse {
3397            transaction,
3398            status,
3399        })
3400    }
3401
3402    #[instrument(level = "trace", skip_all)]
3403    pub async fn handle_object_info_request(
3404        &self,
3405        request: ObjectInfoRequest,
3406    ) -> SuiResult<ObjectInfoResponse> {
3407        let epoch_store = self.load_epoch_store_one_call_per_task();
3408
3409        let requested_object_seq = match request.request_kind {
3410            ObjectInfoRequestKind::LatestObjectInfo => {
3411                let (_, seq, _) = self
3412                    .get_object_or_tombstone(request.object_id)
3413                    .await
3414                    .ok_or_else(|| {
3415                        SuiError::from(UserInputError::ObjectNotFound {
3416                            object_id: request.object_id,
3417                            version: None,
3418                        })
3419                    })?;
3420                seq
3421            }
3422            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3423        };
3424
3425        let object = self
3426            .get_object_store()
3427            .get_object_by_key(&request.object_id, requested_object_seq)
3428            .ok_or_else(|| {
3429                SuiError::from(UserInputError::ObjectNotFound {
3430                    object_id: request.object_id,
3431                    version: Some(requested_object_seq),
3432                })
3433            })?;
3434
3435        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3436            (request.generate_layout, object.data.try_as_move())
3437        {
3438            Some(into_struct_layout(
3439                self.load_epoch_store_one_call_per_task()
3440                    .executor()
3441                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3442                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3443            )?)
3444        } else {
3445            None
3446        };
3447
3448        let lock = if !object.is_address_owned() {
3449            // Only address owned objects have locks.
3450            None
3451        } else {
3452            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3453                .await?
3454                .map(|s| s.into_inner())
3455        };
3456
3457        Ok(ObjectInfoResponse {
3458            object,
3459            layout,
3460            lock_for_debugging: lock,
3461        })
3462    }
3463
3464    #[instrument(level = "trace", skip_all)]
3465    pub fn handle_checkpoint_request(
3466        &self,
3467        request: &CheckpointRequest,
3468    ) -> SuiResult<CheckpointResponse> {
3469        let summary = match request.sequence_number {
3470            Some(seq) => self
3471                .checkpoint_store
3472                .get_checkpoint_by_sequence_number(seq)?,
3473            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474        }
3475        .map(|v| v.into_inner());
3476        let contents = match &summary {
3477            Some(s) => self
3478                .checkpoint_store
3479                .get_checkpoint_contents(&s.content_digest)?,
3480            None => None,
3481        };
3482        Ok(CheckpointResponse {
3483            checkpoint: summary,
3484            contents,
3485        })
3486    }
3487
3488    #[instrument(level = "trace", skip_all)]
3489    pub fn handle_checkpoint_request_v2(
3490        &self,
3491        request: &CheckpointRequestV2,
3492    ) -> SuiResult<CheckpointResponseV2> {
3493        let summary = if request.certified {
3494            let summary = match request.sequence_number {
3495                Some(seq) => self
3496                    .checkpoint_store
3497                    .get_checkpoint_by_sequence_number(seq)?,
3498                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3499            }
3500            .map(|v| v.into_inner());
3501            summary.map(CheckpointSummaryResponse::Certified)
3502        } else {
3503            let summary = match request.sequence_number {
3504                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3505                None => self
3506                    .checkpoint_store
3507                    .get_latest_locally_computed_checkpoint()?,
3508            };
3509            summary.map(CheckpointSummaryResponse::Pending)
3510        };
3511        let contents = match &summary {
3512            Some(s) => self
3513                .checkpoint_store
3514                .get_checkpoint_contents(&s.content_digest())?,
3515            None => None,
3516        };
3517        Ok(CheckpointResponseV2 {
3518            checkpoint: summary,
3519            contents,
3520        })
3521    }
3522
3523    fn check_protocol_version(
3524        supported_protocol_versions: SupportedProtocolVersions,
3525        current_version: ProtocolVersion,
3526    ) {
3527        info!("current protocol version is now {:?}", current_version);
3528        info!("supported versions are: {:?}", supported_protocol_versions);
3529        if !supported_protocol_versions.is_version_supported(current_version) {
3530            let msg = format!(
3531                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3532                current_version, supported_protocol_versions,
3533            );
3534
3535            error!("{}", msg);
3536            eprintln!("{}", msg);
3537
3538            #[cfg(not(msim))]
3539            std::process::exit(1);
3540
3541            #[cfg(msim)]
3542            sui_simulator::task::shutdown_current_node();
3543        }
3544    }
3545
3546    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3547    #[allow(clippy::too_many_arguments)]
3548    pub async fn new(
3549        name: AuthorityName,
3550        secret: StableSyncAuthoritySigner,
3551        supported_protocol_versions: SupportedProtocolVersions,
3552        store: Arc<AuthorityStore>,
3553        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3554        epoch_store: Arc<AuthorityPerEpochStore>,
3555        committee_store: Arc<CommitteeStore>,
3556        indexes: Option<Arc<IndexStore>>,
3557        rpc_index: Option<Arc<RpcIndexStore>>,
3558        checkpoint_store: Arc<CheckpointStore>,
3559        prometheus_registry: &Registry,
3560        genesis_objects: &[Object],
3561        db_checkpoint_config: &DBCheckpointConfig,
3562        config: NodeConfig,
3563        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3564        chain_identifier: ChainIdentifier,
3565        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3566        policy_config: Option<PolicyConfig>,
3567        firewall_config: Option<RemoteFirewallConfig>,
3568        pruner_watermarks: Arc<PrunerWatermarks>,
3569    ) -> Arc<Self> {
3570        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3571
3572        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3573        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3574        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3575            execution_cache_trait_pointers.object_cache_reader.clone(),
3576            execution_cache_trait_pointers.child_object_resolver.clone(),
3577            execution_cache_trait_pointers
3578                .transaction_cache_reader
3579                .clone(),
3580            tx_ready_certificates,
3581            &epoch_store,
3582            metrics.clone(),
3583        ));
3584        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3585
3586        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3587            epoch_store.get_parent_path(),
3588            &config.authority_store_pruning_config,
3589        );
3590        let _pruner = AuthorityStorePruner::new(
3591            store.perpetual_tables.clone(),
3592            checkpoint_store.clone(),
3593            rpc_index.clone(),
3594            indexes.clone(),
3595            config.authority_store_pruning_config.clone(),
3596            epoch_store.committee().authority_exists(&name),
3597            epoch_store.epoch_start_state().epoch_duration_ms(),
3598            prometheus_registry,
3599            pruner_db,
3600            pruner_watermarks,
3601        );
3602        let input_loader =
3603            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3604        let epoch = epoch_store.epoch();
3605        let traffic_controller_metrics =
3606            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3607        let traffic_controller = if let Some(policy_config) = policy_config {
3608            Some(Arc::new(
3609                TrafficController::init(
3610                    policy_config,
3611                    traffic_controller_metrics,
3612                    firewall_config.clone(),
3613                )
3614                .await,
3615            ))
3616        } else {
3617            None
3618        };
3619
3620        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3621            ForkRecoveryState::new(Some(fork_config))
3622                .expect("Failed to initialize fork recovery state")
3623        });
3624
3625        let state = Arc::new(AuthorityState {
3626            name,
3627            secret,
3628            execution_lock: RwLock::new(epoch),
3629            epoch_store: ArcSwap::new(epoch_store.clone()),
3630            input_loader,
3631            execution_cache_trait_pointers,
3632            indexes,
3633            rpc_index,
3634            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3635            checkpoint_store,
3636            committee_store,
3637            execution_scheduler,
3638            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3639            metrics,
3640            _pruner,
3641            _authority_per_epoch_pruner,
3642            db_checkpoint_config: db_checkpoint_config.clone(),
3643            config,
3644            overload_info: AuthorityOverloadInfo::default(),
3645            validator_tx_finalizer,
3646            chain_identifier,
3647            congestion_tracker: Arc::new(CongestionTracker::new()),
3648            traffic_controller,
3649            fork_recovery_state,
3650        });
3651
3652        let state_clone = Arc::downgrade(&state);
3653        spawn_monitored_task!(fix_indexes(state_clone));
3654        // Start a task to execute ready certificates.
3655        let authority_state = Arc::downgrade(&state);
3656        spawn_monitored_task!(execution_process(
3657            authority_state,
3658            rx_ready_certificates,
3659            rx_execution_shutdown,
3660        ));
3661        // TODO: This doesn't belong to the constructor of AuthorityState.
3662        state
3663            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3664            .expect("Error indexing genesis objects.");
3665
3666        state
3667    }
3668
3669    // TODO: Consolidate our traits to reduce the number of methods here.
3670    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3671        &self.execution_cache_trait_pointers.object_cache_reader
3672    }
3673
3674    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3675        &self.execution_cache_trait_pointers.transaction_cache_reader
3676    }
3677
3678    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3679        &self.execution_cache_trait_pointers.cache_writer
3680    }
3681
3682    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3683        &self.execution_cache_trait_pointers.backing_store
3684    }
3685
3686    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3687        &self.execution_cache_trait_pointers.child_object_resolver
3688    }
3689
3690    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3691        &self.execution_cache_trait_pointers.backing_package_store
3692    }
3693
3694    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3695        &self.execution_cache_trait_pointers.object_store
3696    }
3697
3698    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3699        &self.execution_cache_trait_pointers.reconfig_api
3700    }
3701
3702    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3703        &self.execution_cache_trait_pointers.global_state_hash_store
3704    }
3705
3706    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3707        &self.execution_cache_trait_pointers.checkpoint_cache
3708    }
3709
3710    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3711        &self.execution_cache_trait_pointers.state_sync_store
3712    }
3713
3714    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3715        &self.execution_cache_trait_pointers.cache_commit
3716    }
3717
3718    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3719        self.execution_cache_trait_pointers
3720            .testing_api
3721            .database_for_testing()
3722    }
3723
3724    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3725        &self,
3726        config: NodeConfig,
3727        metrics: Arc<AuthorityStorePruningMetrics>,
3728    ) -> anyhow::Result<()> {
3729        use crate::authority::authority_store_pruner::PrunerWatermarks;
3730        let watermarks = Arc::new(PrunerWatermarks::default());
3731        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3732            &self.database_for_testing().perpetual_tables,
3733            &self.checkpoint_store,
3734            self.rpc_index.as_deref(),
3735            None,
3736            config.authority_store_pruning_config,
3737            metrics,
3738            EPOCH_DURATION_MS_FOR_TESTING,
3739            &watermarks,
3740        )
3741        .await
3742    }
3743
3744    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3745        &self.execution_scheduler
3746    }
3747
3748    fn create_owner_index_if_empty(
3749        &self,
3750        genesis_objects: &[Object],
3751        epoch_store: &Arc<AuthorityPerEpochStore>,
3752    ) -> SuiResult {
3753        let Some(index_store) = &self.indexes else {
3754            return Ok(());
3755        };
3756        if !index_store.is_empty() {
3757            return Ok(());
3758        }
3759
3760        let mut new_owners = vec![];
3761        let mut new_dynamic_fields = vec![];
3762        let mut layout_resolver = epoch_store
3763            .executor()
3764            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3765        for o in genesis_objects.iter() {
3766            match o.owner {
3767                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3768                    new_owners.push((
3769                        (addr, o.id()),
3770                        ObjectInfo::new(&o.compute_object_reference(), o),
3771                    ))
3772                }
3773                Owner::ObjectOwner(object_id) => {
3774                    let id = o.id();
3775                    let Some(info) = self.try_create_dynamic_field_info(
3776                        o,
3777                        &BTreeMap::new(),
3778                        layout_resolver.as_mut(),
3779                    )?
3780                    else {
3781                        continue;
3782                    };
3783                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3784                }
3785                _ => {}
3786            }
3787        }
3788
3789        index_store.insert_genesis_objects(ObjectIndexChanges {
3790            deleted_owners: vec![],
3791            deleted_dynamic_fields: vec![],
3792            new_owners,
3793            new_dynamic_fields,
3794        })
3795    }
3796
3797    /// Attempts to acquire execution lock for an executable transaction.
3798    /// Returns Some(lock) if the transaction is matching current executed epoch.
3799    /// Returns None if validator is halted at epoch end or epoch mismatch.
3800    pub fn execution_lock_for_executable_transaction(
3801        &self,
3802        transaction: &VerifiedExecutableTransaction,
3803    ) -> Option<ExecutionLockReadGuard<'_>> {
3804        let lock = self.execution_lock.try_read().ok()?;
3805        if *lock == transaction.auth_sig().epoch() {
3806            Some(lock)
3807        } else {
3808            // TODO: Can this still happen?
3809            None
3810        }
3811    }
3812
3813    /// Acquires the execution lock for the duration of a transaction signing request.
3814    /// This prevents reconfiguration from starting until we are finished handling the signing request.
3815    /// Otherwise, in-memory lock state could be cleared (by `ObjectLocks::clear_cached_locks`)
3816    /// while we are attempting to acquire locks for the transaction.
3817    pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3818        self.execution_lock
3819            .try_read()
3820            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3821    }
3822
3823    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3824        self.execution_lock.write().await
3825    }
3826
3827    #[instrument(level = "error", skip_all)]
3828    pub async fn reconfigure(
3829        &self,
3830        cur_epoch_store: &AuthorityPerEpochStore,
3831        supported_protocol_versions: SupportedProtocolVersions,
3832        new_committee: Committee,
3833        epoch_start_configuration: EpochStartConfiguration,
3834        state_hasher: Arc<GlobalStateHasher>,
3835        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3836        epoch_last_checkpoint: CheckpointSequenceNumber,
3837    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3838        Self::check_protocol_version(
3839            supported_protocol_versions,
3840            epoch_start_configuration
3841                .epoch_start_state()
3842                .protocol_version(),
3843        );
3844
3845        self.committee_store.insert_new_committee(&new_committee)?;
3846
3847        // Wait until no transactions are being executed.
3848        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3849
3850        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3851        cur_epoch_store.epoch_terminated().await;
3852
3853        // Safe to being reconfiguration now. No transactions are being executed,
3854        // and no epoch-specific tasks are running.
3855
3856        {
3857            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3858            if state.should_accept_user_certs() {
3859                // Need to change this so that consensus adapter do not accept certificates from user.
3860                // This can happen if our local validator did not initiate epoch change locally,
3861                // but 2f+1 nodes already concluded the epoch.
3862                //
3863                // This lock is essentially a barrier for
3864                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
3865                cur_epoch_store.close_user_certs(state);
3866            }
3867            // lock is dropped here
3868        }
3869
3870        self.get_reconfig_api()
3871            .clear_state_end_of_epoch(&execution_lock);
3872        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3873        self.maybe_reaccumulate_state_hash(
3874            cur_epoch_store,
3875            epoch_start_configuration
3876                .epoch_start_state()
3877                .protocol_version(),
3878        );
3879        self.get_reconfig_api()
3880            .set_epoch_start_configuration(&epoch_start_configuration);
3881        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3882            && self
3883                .db_checkpoint_config
3884                .perform_db_checkpoints_at_epoch_end
3885        {
3886            let checkpoint_indexes = self
3887                .db_checkpoint_config
3888                .perform_index_db_checkpoints_at_epoch_end
3889                .unwrap_or(false);
3890            let current_epoch = cur_epoch_store.epoch();
3891            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3892            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3893        }
3894
3895        self.get_reconfig_api()
3896            .reconfigure_cache(&epoch_start_configuration)
3897            .await;
3898
3899        let new_epoch = new_committee.epoch;
3900        let new_epoch_store = self
3901            .reopen_epoch_db(
3902                cur_epoch_store,
3903                new_committee,
3904                epoch_start_configuration,
3905                expensive_safety_check_config,
3906                epoch_last_checkpoint,
3907            )
3908            .await?;
3909        assert_eq!(new_epoch_store.epoch(), new_epoch);
3910        self.execution_scheduler
3911            .reconfigure(&new_epoch_store, self.get_child_object_resolver());
3912        *execution_lock = new_epoch;
3913        // drop execution_lock after epoch store was updated
3914        // see also assert in AuthorityState::process_certificate
3915        // on the epoch store and execution lock epoch match
3916        Ok(new_epoch_store)
3917    }
3918
3919    pub async fn settle_transactions_for_testing(
3920        &self,
3921        ckpt_seq: CheckpointSequenceNumber,
3922        effects: &[TransactionEffects],
3923    ) {
3924        let builder = AccumulatorSettlementTxBuilder::new(
3925            Some(self.get_transaction_cache_reader().as_ref()),
3926            effects,
3927            ckpt_seq,
3928            0,
3929        );
3930        let epoch_store = self.epoch_store_for_testing();
3931        let epoch = epoch_store.epoch();
3932        let (settlements, barrier) = builder.build_tx(
3933            epoch_store.protocol_config(),
3934            epoch,
3935            epoch_store
3936                .epoch_start_config()
3937                .accumulator_root_obj_initial_shared_version()
3938                .unwrap(),
3939            ckpt_seq,
3940            ckpt_seq,
3941        );
3942
3943        let mut settlements: Vec<_> = settlements
3944            .into_iter()
3945            .map(|tx| {
3946                VerifiedExecutableTransaction::new_system(
3947                    VerifiedTransaction::new_system_transaction(tx),
3948                    epoch,
3949                )
3950            })
3951            .collect();
3952        let barrier = VerifiedExecutableTransaction::new_system(
3953            VerifiedTransaction::new_system_transaction(barrier),
3954            epoch,
3955        );
3956
3957        settlements.push(barrier);
3958
3959        let assigned_versions = epoch_store
3960            .assign_shared_object_versions_for_tests(
3961                self.get_object_cache_reader().as_ref(),
3962                &settlements,
3963            )
3964            .unwrap();
3965
3966        let version_map = assigned_versions.into_map();
3967
3968        for tx in settlements {
3969            let env = ExecutionEnv::new()
3970                .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
3971            let (effects, _) = self
3972                .try_execute_immediately(&tx, env, &epoch_store)
3973                .await
3974                .unwrap();
3975            assert!(effects.status().is_ok());
3976        }
3977    }
3978
3979    /// Advance the epoch store to the next epoch for testing only.
3980    /// This only manually sets all the places where we have the epoch number.
3981    /// It doesn't properly reconfigure the node, hence should be only used for testing.
3982    pub async fn reconfigure_for_testing(&self) {
3983        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3984        let epoch_store = self.epoch_store_for_testing().clone();
3985        let protocol_config = epoch_store.protocol_config().clone();
3986        // The current protocol config used in the epoch store may have been overridden and diverged from
3987        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
3988        // We reapply the override before creating the new epoch store, to make sure that
3989        // the new epoch store has the same protocol config as the current one.
3990        // Since this is for testing only, we mostly like to keep the protocol config the same
3991        // across epochs.
3992        let _guard =
3993            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3994        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3995            self.get_backing_package_store().clone(),
3996            self.get_object_store().clone(),
3997            &self.config.expensive_safety_check_config,
3998            self.checkpoint_store
3999                .get_epoch_last_checkpoint(epoch_store.epoch())
4000                .unwrap()
4001                .map(|c| *c.sequence_number())
4002                .unwrap_or_default(),
4003        );
4004        self.execution_scheduler
4005            .reconfigure(&new_epoch_store, self.get_child_object_resolver());
4006        let new_epoch = new_epoch_store.epoch();
4007        self.epoch_store.store(new_epoch_store);
4008        epoch_store.epoch_terminated().await;
4009
4010        *execution_lock = new_epoch;
4011    }
4012
4013    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4014    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4015    #[instrument(level = "error", skip_all)]
4016    fn maybe_reaccumulate_state_hash(
4017        &self,
4018        cur_epoch_store: &AuthorityPerEpochStore,
4019        new_protocol_version: ProtocolVersion,
4020    ) {
4021        self.get_reconfig_api()
4022            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4023    }
4024
4025    #[instrument(level = "error", skip_all)]
4026    fn check_system_consistency(
4027        &self,
4028        cur_epoch_store: &AuthorityPerEpochStore,
4029        state_hasher: Arc<GlobalStateHasher>,
4030        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4031    ) {
4032        info!(
4033            "Performing sui conservation consistency check for epoch {}",
4034            cur_epoch_store.epoch()
4035        );
4036
4037        if let Err(err) = self
4038            .get_reconfig_api()
4039            .expensive_check_sui_conservation(cur_epoch_store)
4040        {
4041            if cfg!(debug_assertions) {
4042                panic!("{}", err);
4043            } else {
4044                // We cannot panic in production yet because it is known that there are some
4045                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4046                warn!("Sui conservation consistency check failed: {}", err);
4047            }
4048        } else {
4049            info!("Sui conservation consistency check passed");
4050        }
4051
4052        // check for root state hash consistency with live object set
4053        if expensive_safety_check_config.enable_state_consistency_check() {
4054            info!(
4055                "Performing state consistency check for epoch {}",
4056                cur_epoch_store.epoch()
4057            );
4058            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4059        }
4060
4061        if expensive_safety_check_config.enable_secondary_index_checks()
4062            && let Some(indexes) = self.indexes.clone()
4063        {
4064            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4065                .expect("secondary indexes are inconsistent");
4066        }
4067    }
4068
4069    fn expensive_check_is_consistent_state(
4070        &self,
4071        state_hasher: Arc<GlobalStateHasher>,
4072        cur_epoch_store: &AuthorityPerEpochStore,
4073    ) {
4074        let live_object_set_hash = state_hasher.digest_live_object_set(
4075            !cur_epoch_store
4076                .protocol_config()
4077                .simplified_unwrap_then_delete(),
4078        );
4079
4080        let root_state_hash: ECMHLiveObjectSetDigest = self
4081            .get_global_state_hash_store()
4082            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4083            .expect("Retrieving root state hash cannot fail")
4084            .expect("Root state hash for epoch must exist")
4085            .1
4086            .digest()
4087            .into();
4088
4089        let is_inconsistent = root_state_hash != live_object_set_hash;
4090        if is_inconsistent {
4091            debug_fatal!(
4092                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4093                root_state_hash,
4094                live_object_set_hash
4095            );
4096        } else {
4097            info!("State consistency check passed");
4098        }
4099
4100        state_hasher.set_inconsistent_state(is_inconsistent);
4101    }
4102
4103    pub fn current_epoch_for_testing(&self) -> EpochId {
4104        self.epoch_store_for_testing().epoch()
4105    }
4106
4107    #[instrument(level = "error", skip_all)]
4108    pub fn checkpoint_all_dbs(
4109        &self,
4110        checkpoint_path: &Path,
4111        cur_epoch_store: &AuthorityPerEpochStore,
4112        checkpoint_indexes: bool,
4113    ) -> SuiResult {
4114        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4115        let current_epoch = cur_epoch_store.epoch();
4116
4117        if checkpoint_path.exists() {
4118            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4119            return Ok(());
4120        }
4121
4122        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4123        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4124
4125        if checkpoint_path_tmp.exists() {
4126            fs::remove_dir_all(&checkpoint_path_tmp)
4127                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4128        }
4129
4130        fs::create_dir_all(&checkpoint_path_tmp)
4131            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4132        fs::create_dir(&store_checkpoint_path_tmp)
4133            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4134
4135        // NOTE: Do not change the order of invoking these checkpoint calls
4136        // We want to snapshot checkpoint db first to not race with state sync
4137        self.checkpoint_store
4138            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4139
4140        self.get_reconfig_api()
4141            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4142
4143        self.committee_store
4144            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4145
4146        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4147            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4148        }
4149
4150        fs::rename(checkpoint_path_tmp, checkpoint_path)
4151            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4152        Ok(())
4153    }
4154
4155    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4156    /// we never end up accessing different epoch stores in a single task, we need to make sure
4157    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4158    /// the case. This also means we should minimize the number of call-sites. Only call it when
4159    /// there is no way to obtain it from somewhere else.
4160    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4161        self.epoch_store.load()
4162    }
4163
4164    // Load the epoch store, should be used in tests only.
4165    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4166        self.load_epoch_store_one_call_per_task()
4167    }
4168
4169    pub fn clone_committee_for_testing(&self) -> Committee {
4170        Committee::clone(self.epoch_store_for_testing().committee())
4171    }
4172
4173    #[instrument(level = "trace", skip_all)]
4174    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4175        self.get_object_store().get_object(object_id)
4176    }
4177
4178    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4179        Ok(self
4180            .get_object(&SUI_SYSTEM_ADDRESS.into())
4181            .await
4182            .expect("framework object should always exist")
4183            .compute_object_reference())
4184    }
4185
4186    // This function is only used for testing.
4187    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4188        self.get_object_cache_reader()
4189            .get_sui_system_state_object_unsafe()
4190    }
4191
4192    #[instrument(level = "trace", skip_all)]
4193    fn get_transaction_checkpoint_sequence(
4194        &self,
4195        digest: &TransactionDigest,
4196        epoch_store: &AuthorityPerEpochStore,
4197    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4198        epoch_store.get_transaction_checkpoint(digest)
4199    }
4200
4201    #[instrument(level = "trace", skip_all)]
4202    pub fn get_checkpoint_by_sequence_number(
4203        &self,
4204        sequence_number: CheckpointSequenceNumber,
4205    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4206        Ok(self
4207            .checkpoint_store
4208            .get_checkpoint_by_sequence_number(sequence_number)?)
4209    }
4210
4211    #[instrument(level = "trace", skip_all)]
4212    pub fn get_transaction_checkpoint_for_tests(
4213        &self,
4214        digest: &TransactionDigest,
4215        epoch_store: &AuthorityPerEpochStore,
4216    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4217        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4218        let Some(checkpoint) = checkpoint else {
4219            return Ok(None);
4220        };
4221        let checkpoint = self
4222            .checkpoint_store
4223            .get_checkpoint_by_sequence_number(checkpoint)?;
4224        Ok(checkpoint)
4225    }
4226
4227    #[instrument(level = "trace", skip_all)]
4228    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4229        Ok(
4230            match self
4231                .get_object_cache_reader()
4232                .get_latest_object_or_tombstone(*object_id)
4233            {
4234                Some((_, ObjectOrTombstone::Object(object))) => {
4235                    let layout = self.get_object_layout(&object)?;
4236                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4237                }
4238                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4239                None => ObjectRead::NotExists(*object_id),
4240            },
4241        )
4242    }
4243
4244    /// Chain Identifier is the digest of the genesis checkpoint.
4245    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4246        self.chain_identifier
4247    }
4248
4249    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4250        self.fork_recovery_state.as_ref()
4251    }
4252
4253    #[instrument(level = "trace", skip_all)]
4254    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4255    where
4256        T: DeserializeOwned,
4257    {
4258        let o = self.get_object_read(object_id)?.into_object()?;
4259        if let Some(move_object) = o.data.try_as_move() {
4260            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4261                SuiErrorKind::ObjectDeserializationError {
4262                    error: format!("{e}"),
4263                }
4264            })?)
4265        } else {
4266            Err(SuiErrorKind::ObjectDeserializationError {
4267                error: format!("Provided object : [{object_id}] is not a Move object."),
4268            }
4269            .into())
4270        }
4271    }
4272
4273    /// This function aims to serve rpc reads on past objects and
4274    /// we don't expect it to be called for other purposes.
4275    /// Depending on the object pruning policies that will be enforced in the
4276    /// future there is no software-level guarantee/SLA to retrieve an object
4277    /// with an old version even if it exists/existed.
4278    #[instrument(level = "trace", skip_all)]
4279    pub fn get_past_object_read(
4280        &self,
4281        object_id: &ObjectID,
4282        version: SequenceNumber,
4283    ) -> SuiResult<PastObjectRead> {
4284        // Firstly we see if the object ever existed by getting its latest data
4285        let Some(obj_ref) = self
4286            .get_object_cache_reader()
4287            .get_latest_object_ref_or_tombstone(*object_id)
4288        else {
4289            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4290        };
4291
4292        if version > obj_ref.1 {
4293            return Ok(PastObjectRead::VersionTooHigh {
4294                object_id: *object_id,
4295                asked_version: version,
4296                latest_version: obj_ref.1,
4297            });
4298        }
4299
4300        if version < obj_ref.1 {
4301            // Read past objects
4302            return Ok(match self.read_object_at_version(object_id, version)? {
4303                Some((object, layout)) => {
4304                    let obj_ref = object.compute_object_reference();
4305                    PastObjectRead::VersionFound(obj_ref, object, layout)
4306                }
4307
4308                None => PastObjectRead::VersionNotFound(*object_id, version),
4309            });
4310        }
4311
4312        if !obj_ref.2.is_alive() {
4313            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4314        }
4315
4316        match self.read_object_at_version(object_id, obj_ref.1)? {
4317            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4318            None => {
4319                debug_fatal!(
4320                    "Object with in parent_entry is missing from object store, datastore is \
4321                     inconsistent"
4322                );
4323                Err(UserInputError::ObjectNotFound {
4324                    object_id: *object_id,
4325                    version: Some(obj_ref.1),
4326                }
4327                .into())
4328            }
4329        }
4330    }
4331
4332    #[instrument(level = "trace", skip_all)]
4333    fn read_object_at_version(
4334        &self,
4335        object_id: &ObjectID,
4336        version: SequenceNumber,
4337    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4338        let Some(object) = self
4339            .get_object_cache_reader()
4340            .get_object_by_key(object_id, version)
4341        else {
4342            return Ok(None);
4343        };
4344
4345        let layout = self.get_object_layout(&object)?;
4346        Ok(Some((object, layout)))
4347    }
4348
4349    fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4350        let layout = object
4351            .data
4352            .try_as_move()
4353            .map(|object| {
4354                into_struct_layout(
4355                    self.load_epoch_store_one_call_per_task()
4356                        .executor()
4357                        // TODO(cache) - must read through cache
4358                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4359                        .get_annotated_layout(&object.type_().clone().into())?,
4360                )
4361            })
4362            .transpose()?;
4363        Ok(layout)
4364    }
4365
4366    fn get_owner_at_version(
4367        &self,
4368        object_id: &ObjectID,
4369        version: SequenceNumber,
4370    ) -> SuiResult<Owner> {
4371        self.get_object_store()
4372            .get_object_by_key(object_id, version)
4373            .ok_or_else(|| {
4374                SuiError::from(UserInputError::ObjectNotFound {
4375                    object_id: *object_id,
4376                    version: Some(version),
4377                })
4378            })
4379            .map(|o| o.owner.clone())
4380    }
4381
4382    #[instrument(level = "trace", skip_all)]
4383    pub fn get_owner_objects(
4384        &self,
4385        owner: SuiAddress,
4386        // If `Some`, the query will start from the next item after the specified cursor
4387        cursor: Option<ObjectID>,
4388        limit: usize,
4389        filter: Option<SuiObjectDataFilter>,
4390    ) -> SuiResult<Vec<ObjectInfo>> {
4391        if let Some(indexes) = &self.indexes {
4392            indexes.get_owner_objects(owner, cursor, limit, filter)
4393        } else {
4394            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4395        }
4396    }
4397
4398    #[instrument(level = "trace", skip_all)]
4399    pub fn get_owned_coins_iterator_with_cursor(
4400        &self,
4401        owner: SuiAddress,
4402        // If `Some`, the query will start from the next item after the specified cursor
4403        cursor: (String, u64, ObjectID),
4404        limit: usize,
4405        one_coin_type_only: bool,
4406    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4407        if let Some(indexes) = &self.indexes {
4408            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4409        } else {
4410            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4411        }
4412    }
4413
4414    #[instrument(level = "trace", skip_all)]
4415    pub fn get_owner_objects_iterator(
4416        &self,
4417        owner: SuiAddress,
4418        // If `Some`, the query will start from the next item after the specified cursor
4419        cursor: Option<ObjectID>,
4420        filter: Option<SuiObjectDataFilter>,
4421    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4422        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4423        if let Some(indexes) = &self.indexes {
4424            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4425        } else {
4426            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4427        }
4428    }
4429
4430    #[instrument(level = "trace", skip_all)]
4431    pub async fn get_move_objects<T>(
4432        &self,
4433        owner: SuiAddress,
4434        type_: MoveObjectType,
4435    ) -> SuiResult<Vec<T>>
4436    where
4437        T: DeserializeOwned,
4438    {
4439        let object_ids = self
4440            .get_owner_objects_iterator(owner, None, None)?
4441            .filter(|o| match &o.type_ {
4442                ObjectType::Struct(s) => &type_ == s,
4443                ObjectType::Package => false,
4444            })
4445            .map(|info| ObjectKey(info.object_id, info.version))
4446            .collect::<Vec<_>>();
4447        let mut move_objects = vec![];
4448
4449        let objects = self
4450            .get_object_store()
4451            .multi_get_objects_by_key(&object_ids);
4452
4453        for (o, id) in objects.into_iter().zip(object_ids) {
4454            let object = o.ok_or_else(|| {
4455                SuiError::from(UserInputError::ObjectNotFound {
4456                    object_id: id.0,
4457                    version: Some(id.1),
4458                })
4459            })?;
4460            let move_object = object.data.try_as_move().ok_or_else(|| {
4461                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4462            })?;
4463            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4464                SuiErrorKind::ObjectDeserializationError {
4465                    error: format!("{e}"),
4466                }
4467            })?);
4468        }
4469        Ok(move_objects)
4470    }
4471
4472    #[instrument(level = "trace", skip_all)]
4473    pub fn get_dynamic_fields(
4474        &self,
4475        owner: ObjectID,
4476        // If `Some`, the query will start from the next item after the specified cursor
4477        cursor: Option<ObjectID>,
4478        limit: usize,
4479    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4480        Ok(self
4481            .get_dynamic_fields_iterator(owner, cursor)?
4482            .take(limit)
4483            .collect::<Result<Vec<_>, _>>()?)
4484    }
4485
4486    fn get_dynamic_fields_iterator(
4487        &self,
4488        owner: ObjectID,
4489        // If `Some`, the query will start from the next item after the specified cursor
4490        cursor: Option<ObjectID>,
4491    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4492    {
4493        if let Some(indexes) = &self.indexes {
4494            indexes.get_dynamic_fields_iterator(owner, cursor)
4495        } else {
4496            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4497        }
4498    }
4499
4500    #[instrument(level = "trace", skip_all)]
4501    pub fn get_dynamic_field_object_id(
4502        &self,
4503        owner: ObjectID,
4504        name_type: TypeTag,
4505        name_bcs_bytes: &[u8],
4506    ) -> SuiResult<Option<ObjectID>> {
4507        if let Some(indexes) = &self.indexes {
4508            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4509        } else {
4510            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4511        }
4512    }
4513
4514    #[instrument(level = "trace", skip_all)]
4515    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4516        Ok(self.get_indexes()?.next_sequence_number())
4517    }
4518
4519    #[instrument(level = "trace", skip_all)]
4520    pub async fn get_executed_transaction_and_effects(
4521        &self,
4522        digest: TransactionDigest,
4523        kv_store: Arc<TransactionKeyValueStore>,
4524    ) -> SuiResult<(Transaction, TransactionEffects)> {
4525        let transaction = kv_store.get_tx(digest).await?;
4526        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4527        Ok((transaction, effects))
4528    }
4529
4530    #[instrument(level = "trace", skip_all)]
4531    pub fn multi_get_checkpoint_by_sequence_number(
4532        &self,
4533        sequence_numbers: &[CheckpointSequenceNumber],
4534    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4535        Ok(self
4536            .checkpoint_store
4537            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4538    }
4539
4540    #[instrument(level = "trace", skip_all)]
4541    pub fn get_transaction_events(
4542        &self,
4543        digest: &TransactionDigest,
4544    ) -> SuiResult<TransactionEvents> {
4545        self.get_transaction_cache_reader()
4546            .get_events(digest)
4547            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4548    }
4549
4550    pub fn get_transaction_input_objects(
4551        &self,
4552        effects: &TransactionEffects,
4553    ) -> SuiResult<Vec<Object>> {
4554        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4555            .map_err(Into::into)
4556    }
4557
4558    pub fn get_transaction_output_objects(
4559        &self,
4560        effects: &TransactionEffects,
4561    ) -> SuiResult<Vec<Object>> {
4562        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4563            .map_err(Into::into)
4564    }
4565
4566    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4567        match &self.indexes {
4568            Some(i) => Ok(i.clone()),
4569            None => Err(SuiErrorKind::UnsupportedFeatureError {
4570                error: "extended object indexing is not enabled on this server".into(),
4571            }
4572            .into()),
4573        }
4574    }
4575
4576    pub async fn get_transactions_for_tests(
4577        self: &Arc<Self>,
4578        filter: Option<TransactionFilter>,
4579        cursor: Option<TransactionDigest>,
4580        limit: Option<usize>,
4581        reverse: bool,
4582    ) -> SuiResult<Vec<TransactionDigest>> {
4583        let metrics = KeyValueStoreMetrics::new_for_tests();
4584        let kv_store = Arc::new(TransactionKeyValueStore::new(
4585            "rocksdb",
4586            metrics,
4587            self.clone(),
4588        ));
4589        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4590            .await
4591    }
4592
4593    #[instrument(level = "trace", skip_all)]
4594    pub async fn get_transactions(
4595        &self,
4596        kv_store: &Arc<TransactionKeyValueStore>,
4597        filter: Option<TransactionFilter>,
4598        // If `Some`, the query will start from the next item after the specified cursor
4599        cursor: Option<TransactionDigest>,
4600        limit: Option<usize>,
4601        reverse: bool,
4602    ) -> SuiResult<Vec<TransactionDigest>> {
4603        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4604            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4605            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4606            if reverse {
4607                let iter = iter
4608                    .rev()
4609                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4610                    .skip(usize::from(cursor.is_some()));
4611                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4612            } else {
4613                let iter = iter
4614                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4615                    .skip(usize::from(cursor.is_some()));
4616                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4617            }
4618        }
4619        self.get_indexes()?
4620            .get_transactions(filter, cursor, limit, reverse)
4621    }
4622
4623    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4624        &self.checkpoint_store
4625    }
4626
4627    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4628        self.get_checkpoint_store()
4629            .get_highest_executed_checkpoint_seq_number()?
4630            .ok_or(
4631                SuiErrorKind::UserInputError {
4632                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4633                }
4634                .into(),
4635            )
4636    }
4637
4638    #[cfg(msim)]
4639    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4640        self.database_for_testing()
4641            .perpetual_tables
4642            .get_highest_pruned_checkpoint()
4643            .map(|c| c.unwrap_or(0))
4644            .map_err(Into::into)
4645    }
4646
4647    #[instrument(level = "trace", skip_all)]
4648    pub fn get_checkpoint_summary_by_sequence_number(
4649        &self,
4650        sequence_number: CheckpointSequenceNumber,
4651    ) -> SuiResult<CheckpointSummary> {
4652        let verified_checkpoint = self
4653            .get_checkpoint_store()
4654            .get_checkpoint_by_sequence_number(sequence_number)?;
4655        match verified_checkpoint {
4656            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4657            None => Err(SuiErrorKind::UserInputError {
4658                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4659            }
4660            .into()),
4661        }
4662    }
4663
4664    #[instrument(level = "trace", skip_all)]
4665    pub fn get_checkpoint_summary_by_digest(
4666        &self,
4667        digest: CheckpointDigest,
4668    ) -> SuiResult<CheckpointSummary> {
4669        let verified_checkpoint = self
4670            .get_checkpoint_store()
4671            .get_checkpoint_by_digest(&digest)?;
4672        match verified_checkpoint {
4673            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4674            None => Err(SuiErrorKind::UserInputError {
4675                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4676            }
4677            .into()),
4678        }
4679    }
4680
4681    #[instrument(level = "trace", skip_all)]
4682    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4683        if is_system_package(package_id) {
4684            return self.find_genesis_txn_digest();
4685        }
4686        Ok(self
4687            .get_object_read(&package_id)?
4688            .into_object()?
4689            .previous_transaction)
4690    }
4691
4692    #[instrument(level = "trace", skip_all)]
4693    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4694        let summary = self
4695            .get_verified_checkpoint_by_sequence_number(0)?
4696            .into_message();
4697        let content = self.get_checkpoint_contents(summary.content_digest)?;
4698        let genesis_transaction = content.enumerate_transactions(&summary).next();
4699        Ok(genesis_transaction
4700            .ok_or(SuiErrorKind::UserInputError {
4701                error: UserInputError::GenesisTransactionNotFound,
4702            })?
4703            .1
4704            .transaction)
4705    }
4706
4707    #[instrument(level = "trace", skip_all)]
4708    pub fn get_verified_checkpoint_by_sequence_number(
4709        &self,
4710        sequence_number: CheckpointSequenceNumber,
4711    ) -> SuiResult<VerifiedCheckpoint> {
4712        let verified_checkpoint = self
4713            .get_checkpoint_store()
4714            .get_checkpoint_by_sequence_number(sequence_number)?;
4715        match verified_checkpoint {
4716            Some(verified_checkpoint) => Ok(verified_checkpoint),
4717            None => Err(SuiErrorKind::UserInputError {
4718                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4719            }
4720            .into()),
4721        }
4722    }
4723
4724    #[instrument(level = "trace", skip_all)]
4725    pub fn get_verified_checkpoint_summary_by_digest(
4726        &self,
4727        digest: CheckpointDigest,
4728    ) -> SuiResult<VerifiedCheckpoint> {
4729        let verified_checkpoint = self
4730            .get_checkpoint_store()
4731            .get_checkpoint_by_digest(&digest)?;
4732        match verified_checkpoint {
4733            Some(verified_checkpoint) => Ok(verified_checkpoint),
4734            None => Err(SuiErrorKind::UserInputError {
4735                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4736            }
4737            .into()),
4738        }
4739    }
4740
4741    #[instrument(level = "trace", skip_all)]
4742    pub fn get_checkpoint_contents(
4743        &self,
4744        digest: CheckpointContentsDigest,
4745    ) -> SuiResult<CheckpointContents> {
4746        self.get_checkpoint_store()
4747            .get_checkpoint_contents(&digest)?
4748            .ok_or(
4749                SuiErrorKind::UserInputError {
4750                    error: UserInputError::CheckpointContentsNotFound(digest),
4751                }
4752                .into(),
4753            )
4754    }
4755
4756    #[instrument(level = "trace", skip_all)]
4757    pub fn get_checkpoint_contents_by_sequence_number(
4758        &self,
4759        sequence_number: CheckpointSequenceNumber,
4760    ) -> SuiResult<CheckpointContents> {
4761        let verified_checkpoint = self
4762            .get_checkpoint_store()
4763            .get_checkpoint_by_sequence_number(sequence_number)?;
4764        match verified_checkpoint {
4765            Some(verified_checkpoint) => {
4766                let content_digest = verified_checkpoint.into_inner().content_digest;
4767                self.get_checkpoint_contents(content_digest)
4768            }
4769            None => Err(SuiErrorKind::UserInputError {
4770                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4771            }
4772            .into()),
4773        }
4774    }
4775
4776    #[instrument(level = "trace", skip_all)]
4777    pub async fn query_events(
4778        &self,
4779        kv_store: &Arc<TransactionKeyValueStore>,
4780        query: EventFilter,
4781        // If `Some`, the query will start from the next item after the specified cursor
4782        cursor: Option<EventID>,
4783        limit: usize,
4784        descending: bool,
4785    ) -> SuiResult<Vec<SuiEvent>> {
4786        let index_store = self.get_indexes()?;
4787
4788        //Get the tx_num from tx_digest
4789        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4790            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4791                SuiErrorKind::TransactionNotFound {
4792                    digest: cursor.tx_digest,
4793                },
4794            )?;
4795            (tx_seq, cursor.event_seq as usize)
4796        } else if descending {
4797            (u64::MAX, usize::MAX)
4798        } else {
4799            (0, 0)
4800        };
4801
4802        let limit = limit + 1;
4803        let mut event_keys = match query {
4804            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4805            EventFilter::Transaction(digest) => {
4806                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4807            }
4808            EventFilter::MoveModule { package, module } => {
4809                let module_id = ModuleId::new(package.into(), module);
4810                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4811            }
4812            EventFilter::MoveEventType(struct_name) => index_store
4813                .events_by_move_event_struct_name(
4814                    &struct_name,
4815                    tx_num,
4816                    event_num,
4817                    limit,
4818                    descending,
4819                )?,
4820            EventFilter::Sender(sender) => {
4821                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4822            }
4823            EventFilter::TimeRange {
4824                start_time,
4825                end_time,
4826            } => index_store
4827                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4828            EventFilter::MoveEventModule { package, module } => index_store
4829                .events_by_move_event_module(
4830                    &ModuleId::new(package.into(), module),
4831                    tx_num,
4832                    event_num,
4833                    limit,
4834                    descending,
4835                )?,
4836            // not using "_ =>" because we want to make sure we remember to add new variants here
4837            EventFilter::Any(_) => {
4838                return Err(SuiErrorKind::UserInputError {
4839                    error: UserInputError::Unsupported(
4840                        "'Any' queries are not supported by the fullnode.".to_string(),
4841                    ),
4842                }
4843                .into());
4844            }
4845        };
4846
4847        // skip one event if exclusive cursor is provided,
4848        // otherwise truncate to the original limit.
4849        if cursor.is_some() {
4850            if !event_keys.is_empty() {
4851                event_keys.remove(0);
4852            }
4853        } else {
4854            event_keys.truncate(limit - 1);
4855        }
4856
4857        // get the unique set of digests from the event_keys
4858        let transaction_digests = event_keys
4859            .iter()
4860            .map(|(_, digest, _, _)| *digest)
4861            .collect::<HashSet<_>>()
4862            .into_iter()
4863            .collect::<Vec<_>>();
4864
4865        let events = kv_store
4866            .multi_get_events_by_tx_digests(&transaction_digests)
4867            .await?;
4868
4869        let events_map: HashMap<_, _> =
4870            transaction_digests.iter().zip(events.into_iter()).collect();
4871
4872        let stored_events = event_keys
4873            .into_iter()
4874            .map(|k| {
4875                (
4876                    k,
4877                    events_map
4878                        .get(&k.1)
4879                        .expect("fetched digest is missing")
4880                        .clone()
4881                        .and_then(|e| e.data.get(k.2).cloned()),
4882                )
4883            })
4884            .map(
4885                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4886                    event
4887                        .map(|e| (e, tx_digest, event_seq, timestamp))
4888                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4889                },
4890            )
4891            .collect::<Result<Vec<_>, _>>()?;
4892
4893        let epoch_store = self.load_epoch_store_one_call_per_task();
4894        let backing_store = self.get_backing_package_store().as_ref();
4895        let mut layout_resolver = epoch_store
4896            .executor()
4897            .type_layout_resolver(Box::new(backing_store));
4898        let mut events = vec![];
4899        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4900            events.push(SuiEvent::try_from(
4901                e.clone(),
4902                tx_digest,
4903                event_seq as u64,
4904                Some(timestamp),
4905                layout_resolver.get_annotated_layout(&e.type_)?,
4906            )?)
4907        }
4908        Ok(events)
4909    }
4910
4911    pub async fn insert_genesis_object(&self, object: Object) {
4912        self.get_reconfig_api().insert_genesis_object(object);
4913    }
4914
4915    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4916        futures::future::join_all(
4917            objects
4918                .iter()
4919                .map(|o| self.insert_genesis_object(o.clone())),
4920        )
4921        .await;
4922    }
4923
4924    /// Make a status response for a transaction
4925    #[instrument(level = "trace", skip_all)]
4926    pub fn get_transaction_status(
4927        &self,
4928        transaction_digest: &TransactionDigest,
4929        epoch_store: &Arc<AuthorityPerEpochStore>,
4930    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
4931        // TODO: In the case of read path, we should not have to re-sign the effects.
4932        if let Some(effects) =
4933            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4934        {
4935            if let Some(transaction) = self
4936                .get_transaction_cache_reader()
4937                .get_transaction_block(transaction_digest)
4938            {
4939                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4940                let events = if effects.events_digest().is_some() {
4941                    self.get_transaction_events(effects.transaction_digest())?
4942                } else {
4943                    TransactionEvents::default()
4944                };
4945                return Ok(Some((
4946                    (*transaction).clone().into_message(),
4947                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4948                )));
4949            } else {
4950                // The read of effects and read of transaction are not atomic. It's possible that we reverted
4951                // the transaction (during epoch change) in between the above two reads, and we end up
4952                // having effects but not transaction. In this case, we just fall through.
4953                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4954            }
4955        }
4956        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4957            self.metrics.tx_already_processed.inc();
4958            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4959            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4960        } else {
4961            Ok(None)
4962        }
4963    }
4964
4965    /// Get the signed effects of the given transaction. If the effects was signed in a previous
4966    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
4967    /// epoch.
4968    #[instrument(level = "trace", skip_all)]
4969    pub fn get_signed_effects_and_maybe_resign(
4970        &self,
4971        transaction_digest: &TransactionDigest,
4972        epoch_store: &Arc<AuthorityPerEpochStore>,
4973    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
4974        let effects = self
4975            .get_transaction_cache_reader()
4976            .get_executed_effects(transaction_digest);
4977        match effects {
4978            Some(effects) => {
4979                // If the transaction was executed in previous epochs, the validator will
4980                // re-sign the effects with new current epoch so that a client is always able to
4981                // obtain an effects certificate at the current epoch.
4982                //
4983                // Why is this necessary? Consider the following case:
4984                // - assume there are 4 validators
4985                // - Quorum driver gets 2 signed effects before reconfig halt
4986                // - The tx makes it into final checkpoint.
4987                // - 2 validators go away and are replaced in the new epoch.
4988                // - The new epoch begins.
4989                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
4990                //   because it may not be able to reach either of the 2 former validators.
4991                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
4992                //   epoch, the QD can make a new effects cert and return it to the client.
4993                //
4994                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
4995                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
4996                // the case above, the Quorum Driver would return a proof of inclusion in the final
4997                // checkpoint, and this code would no longer be necessary.
4998                if effects.executed_epoch() != epoch_store.epoch() {
4999                    debug!(
5000                        tx_digest=?transaction_digest,
5001                        effects_epoch=?effects.executed_epoch(),
5002                        epoch=?epoch_store.epoch(),
5003                        "Re-signing the effects with the current epoch"
5004                    );
5005                }
5006                Ok(Some(self.sign_effects(effects, epoch_store)?))
5007            }
5008            None => Ok(None),
5009        }
5010    }
5011
5012    #[instrument(level = "trace", skip_all)]
5013    pub(crate) fn sign_effects(
5014        &self,
5015        effects: TransactionEffects,
5016        epoch_store: &Arc<AuthorityPerEpochStore>,
5017    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5018        let tx_digest = *effects.transaction_digest();
5019        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5020            Some(sig) => {
5021                debug_assert!(sig.epoch == epoch_store.epoch());
5022                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5023            }
5024            _ => {
5025                let sig = AuthoritySignInfo::new(
5026                    epoch_store.epoch(),
5027                    &effects,
5028                    Intent::sui_app(IntentScope::TransactionEffects),
5029                    self.name,
5030                    &*self.secret,
5031                );
5032
5033                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5034
5035                epoch_store.insert_effects_digest_and_signature(
5036                    &tx_digest,
5037                    effects.digest(),
5038                    &sig,
5039                )?;
5040
5041                effects
5042            }
5043        };
5044
5045        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5046            signed_effects,
5047        ))
5048    }
5049
5050    // Returns coin objects for indexing for fullnode if indexing is enabled.
5051    #[instrument(level = "trace", skip_all)]
5052    fn fullnode_only_get_tx_coins_for_indexing(
5053        &self,
5054        effects: &TransactionEffects,
5055        inner_temporary_store: &InnerTemporaryStore,
5056        epoch_store: &Arc<AuthorityPerEpochStore>,
5057    ) -> Option<TxCoins> {
5058        if self.indexes.is_none() || self.is_validator(epoch_store) {
5059            return None;
5060        }
5061        let written_coin_objects = inner_temporary_store
5062            .written
5063            .iter()
5064            .filter_map(|(k, v)| {
5065                if v.is_coin() {
5066                    Some((*k, v.clone()))
5067                } else {
5068                    None
5069                }
5070            })
5071            .collect();
5072        let mut input_coin_objects = inner_temporary_store
5073            .input_objects
5074            .iter()
5075            .filter_map(|(k, v)| {
5076                if v.is_coin() {
5077                    Some((*k, v.clone()))
5078                } else {
5079                    None
5080                }
5081            })
5082            .collect::<ObjectMap>();
5083
5084        // Check for receiving objects that were actually used and modified during execution. Their
5085        // updated version will already showup in "written_coins" but their input isn't included in
5086        // the set of input objects in a inner_temporary_store.
5087        for (object_id, version) in effects.modified_at_versions() {
5088            if inner_temporary_store
5089                .loaded_runtime_objects
5090                .contains_key(&object_id)
5091                && let Some(object) = self
5092                    .get_object_store()
5093                    .get_object_by_key(&object_id, version)
5094                && object.is_coin()
5095            {
5096                input_coin_objects.insert(object_id, object);
5097            }
5098        }
5099
5100        Some((input_coin_objects, written_coin_objects))
5101    }
5102
5103    /// Get the TransactionEnvelope that currently locks the given object, if any.
5104    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5105    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5106    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5107    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5108    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5109    ///     or cannot find the transaction in transaction table, because of data race etc.
5110    #[instrument(level = "trace", skip_all)]
5111    pub async fn get_transaction_lock(
5112        &self,
5113        object_ref: &ObjectRef,
5114        epoch_store: &AuthorityPerEpochStore,
5115    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5116        let lock_info = self
5117            .get_object_cache_reader()
5118            .get_lock(*object_ref, epoch_store)?;
5119        let lock_info = match lock_info {
5120            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5121                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5122                    provided_obj_ref: *object_ref,
5123                    current_version: locked_ref.1,
5124                }
5125                .into());
5126            }
5127            ObjectLockStatus::Initialized => {
5128                return Ok(None);
5129            }
5130            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5131        };
5132
5133        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5134    }
5135
5136    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5137        self.get_object_cache_reader().get_objects(objects)
5138    }
5139
5140    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5141        self.get_object_cache_reader()
5142            .get_latest_object_ref_or_tombstone(object_id)
5143    }
5144
5145    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5146    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5147    ///
5148    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5149    /// will go through with only 2f+1 votes.
5150    ///
5151    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5152    /// value), or you risk halting the chain.
5153    pub fn set_override_protocol_upgrade_buffer_stake(
5154        &self,
5155        expected_epoch: EpochId,
5156        buffer_stake_bps: u64,
5157    ) -> SuiResult {
5158        let epoch_store = self.load_epoch_store_one_call_per_task();
5159        let actual_epoch = epoch_store.epoch();
5160        if actual_epoch != expected_epoch {
5161            return Err(SuiErrorKind::WrongEpoch {
5162                expected_epoch,
5163                actual_epoch,
5164            }
5165            .into());
5166        }
5167
5168        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5169    }
5170
5171    pub fn clear_override_protocol_upgrade_buffer_stake(
5172        &self,
5173        expected_epoch: EpochId,
5174    ) -> SuiResult {
5175        let epoch_store = self.load_epoch_store_one_call_per_task();
5176        let actual_epoch = epoch_store.epoch();
5177        if actual_epoch != expected_epoch {
5178            return Err(SuiErrorKind::WrongEpoch {
5179                expected_epoch,
5180                actual_epoch,
5181            }
5182            .into());
5183        }
5184
5185        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5186    }
5187
5188    /// Get the set of system packages that are compiled in to this build, if those packages are
5189    /// compatible with the current versions of those packages on-chain.
5190    pub async fn get_available_system_packages(
5191        &self,
5192        binary_config: &BinaryConfig,
5193    ) -> Vec<ObjectRef> {
5194        let mut results = vec![];
5195
5196        let system_packages = BuiltInFramework::iter_system_packages();
5197
5198        // Add extra framework packages during simtest
5199        #[cfg(msim)]
5200        let extra_packages = framework_injection::get_extra_packages(self.name);
5201        #[cfg(msim)]
5202        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5203
5204        for system_package in system_packages {
5205            let modules = system_package.modules().to_vec();
5206            // In simtests, we could override the current built-in framework packages.
5207            #[cfg(msim)]
5208            let modules =
5209                match framework_injection::get_override_modules(&system_package.id, self.name) {
5210                    Some(overrides) if overrides.is_empty() => continue,
5211                    Some(overrides) => overrides,
5212                    None => modules,
5213                };
5214
5215            let Some(obj_ref) = sui_framework::compare_system_package(
5216                &self.get_object_store(),
5217                &system_package.id,
5218                &modules,
5219                system_package.dependencies.to_vec(),
5220                binary_config,
5221            )
5222            .await
5223            else {
5224                return vec![];
5225            };
5226            results.push(obj_ref);
5227        }
5228
5229        results
5230    }
5231
5232    /// Return the new versions, module bytes, and dependencies for the packages that have been
5233    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5234    /// the binary, and performs the following checks:
5235    ///
5236    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5237    ///   required, and its contents are omitted from the output.
5238    /// - Whether the contents in the binary can form a package whose digest matches the input,
5239    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5240    ///   which case the contents are included in the output.
5241    ///
5242    /// If the current version of the framework can't be loaded, the binary does not contain the
5243    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5244    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5245    async fn get_system_package_bytes(
5246        &self,
5247        system_packages: Vec<ObjectRef>,
5248        binary_config: &BinaryConfig,
5249    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5250        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5251        let objects = self.get_objects(&ids).await;
5252
5253        let mut res = Vec::with_capacity(system_packages.len());
5254        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5255            let prev_transaction = match object {
5256                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5257                    // Skip this one because it doesn't need to be upgraded.
5258                    info!("Framework {} does not need updating", system_package_ref.0);
5259                    continue;
5260                }
5261
5262                Some(cur_object) => cur_object.previous_transaction,
5263                None => TransactionDigest::genesis_marker(),
5264            };
5265
5266            #[cfg(msim)]
5267            let SystemPackage {
5268                id: _,
5269                bytes,
5270                dependencies,
5271            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5272                .unwrap_or_else(|| {
5273                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5274                });
5275
5276            #[cfg(not(msim))]
5277            let SystemPackage {
5278                id: _,
5279                bytes,
5280                dependencies,
5281            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5282
5283            let modules: Vec<_> = bytes
5284                .iter()
5285                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5286                .collect();
5287
5288            let new_object = Object::new_system_package(
5289                &modules,
5290                system_package_ref.1,
5291                dependencies.clone(),
5292                prev_transaction,
5293            );
5294
5295            let new_ref = new_object.compute_object_reference();
5296            if new_ref != system_package_ref {
5297                if cfg!(msim) {
5298                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5299                    debug_fatal!(
5300                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5301                    );
5302                } else {
5303                    error!(
5304                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5305                    );
5306                }
5307                return None;
5308            }
5309
5310            res.push((system_package_ref.1, bytes, dependencies));
5311        }
5312
5313        Some(res)
5314    }
5315
5316    // TODO: delete once authority_capabilities_v2 is deployed everywhere
5317    fn is_protocol_version_supported_v1(
5318        current_protocol_version: ProtocolVersion,
5319        proposed_protocol_version: ProtocolVersion,
5320        protocol_config: &ProtocolConfig,
5321        committee: &Committee,
5322        capabilities: Vec<AuthorityCapabilitiesV1>,
5323        mut buffer_stake_bps: u64,
5324    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5325        if proposed_protocol_version > current_protocol_version + 1
5326            && !protocol_config.advance_to_highest_supported_protocol_version()
5327        {
5328            return None;
5329        }
5330
5331        if buffer_stake_bps > 10000 {
5332            warn!("clamping buffer_stake_bps to 10000");
5333            buffer_stake_bps = 10000;
5334        }
5335
5336        // For each validator, gather the protocol version and system packages that it would like
5337        // to upgrade to in the next epoch.
5338        let mut desired_upgrades: Vec<_> = capabilities
5339            .into_iter()
5340            .filter_map(|mut cap| {
5341                // A validator that lists no packages is voting against any change at all.
5342                if cap.available_system_packages.is_empty() {
5343                    return None;
5344                }
5345
5346                cap.available_system_packages.sort();
5347
5348                info!(
5349                    "validator {:?} supports {:?} with system packages: {:?}",
5350                    cap.authority.concise(),
5351                    cap.supported_protocol_versions,
5352                    cap.available_system_packages,
5353                );
5354
5355                // A validator that only supports the current protocol version is also voting
5356                // against any change, because framework upgrades always require a protocol version
5357                // bump.
5358                cap.supported_protocol_versions
5359                    .is_version_supported(proposed_protocol_version)
5360                    .then_some((cap.available_system_packages, cap.authority))
5361            })
5362            .collect();
5363
5364        // There can only be one set of votes that have a majority, find one if it exists.
5365        desired_upgrades.sort();
5366        desired_upgrades
5367            .into_iter()
5368            .chunk_by(|(packages, _authority)| packages.clone())
5369            .into_iter()
5370            .find_map(|(packages, group)| {
5371                // should have been filtered out earlier.
5372                assert!(!packages.is_empty());
5373
5374                let mut stake_aggregator: StakeAggregator<(), true> =
5375                    StakeAggregator::new(Arc::new(committee.clone()));
5376
5377                for (_, authority) in group {
5378                    stake_aggregator.insert_generic(authority, ());
5379                }
5380
5381                let total_votes = stake_aggregator.total_votes();
5382                let quorum_threshold = committee.quorum_threshold();
5383                let f = committee.total_votes() - committee.quorum_threshold();
5384
5385                // multiple by buffer_stake_bps / 10000, rounded up.
5386                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5387                let effective_threshold = quorum_threshold + buffer_stake;
5388
5389                info!(
5390                    ?total_votes,
5391                    ?quorum_threshold,
5392                    ?buffer_stake_bps,
5393                    ?effective_threshold,
5394                    ?proposed_protocol_version,
5395                    ?packages,
5396                    "support for upgrade"
5397                );
5398
5399                let has_support = total_votes >= effective_threshold;
5400                has_support.then_some((proposed_protocol_version, packages))
5401            })
5402    }
5403
5404    fn is_protocol_version_supported_v2(
5405        current_protocol_version: ProtocolVersion,
5406        proposed_protocol_version: ProtocolVersion,
5407        protocol_config: &ProtocolConfig,
5408        committee: &Committee,
5409        capabilities: Vec<AuthorityCapabilitiesV2>,
5410        mut buffer_stake_bps: u64,
5411    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5412        if proposed_protocol_version > current_protocol_version + 1
5413            && !protocol_config.advance_to_highest_supported_protocol_version()
5414        {
5415            return None;
5416        }
5417
5418        if buffer_stake_bps > 10000 {
5419            warn!("clamping buffer_stake_bps to 10000");
5420            buffer_stake_bps = 10000;
5421        }
5422
5423        // For each validator, gather the protocol version and system packages that it would like
5424        // to upgrade to in the next epoch.
5425        let mut desired_upgrades: Vec<_> = capabilities
5426            .into_iter()
5427            .filter_map(|mut cap| {
5428                // A validator that lists no packages is voting against any change at all.
5429                if cap.available_system_packages.is_empty() {
5430                    return None;
5431                }
5432
5433                cap.available_system_packages.sort();
5434
5435                info!(
5436                    "validator {:?} supports {:?} with system packages: {:?}",
5437                    cap.authority.concise(),
5438                    cap.supported_protocol_versions,
5439                    cap.available_system_packages,
5440                );
5441
5442                // A validator that only supports the current protocol version is also voting
5443                // against any change, because framework upgrades always require a protocol version
5444                // bump.
5445                cap.supported_protocol_versions
5446                    .get_version_digest(proposed_protocol_version)
5447                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5448            })
5449            .collect();
5450
5451        // There can only be one set of votes that have a majority, find one if it exists.
5452        desired_upgrades.sort();
5453        desired_upgrades
5454            .into_iter()
5455            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5456            .into_iter()
5457            .find_map(|((digest, packages), group)| {
5458                // should have been filtered out earlier.
5459                assert!(!packages.is_empty());
5460
5461                let mut stake_aggregator: StakeAggregator<(), true> =
5462                    StakeAggregator::new(Arc::new(committee.clone()));
5463
5464                for (_, _, authority) in group {
5465                    stake_aggregator.insert_generic(authority, ());
5466                }
5467
5468                let total_votes = stake_aggregator.total_votes();
5469                let quorum_threshold = committee.quorum_threshold();
5470                let f = committee.total_votes() - committee.quorum_threshold();
5471
5472                // multiple by buffer_stake_bps / 10000, rounded up.
5473                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5474                let effective_threshold = quorum_threshold + buffer_stake;
5475
5476                info!(
5477                    protocol_config_digest = ?digest,
5478                    ?total_votes,
5479                    ?quorum_threshold,
5480                    ?buffer_stake_bps,
5481                    ?effective_threshold,
5482                    ?proposed_protocol_version,
5483                    ?packages,
5484                    "support for upgrade"
5485                );
5486
5487                let has_support = total_votes >= effective_threshold;
5488                has_support.then_some((proposed_protocol_version, packages))
5489            })
5490    }
5491
5492    // TODO: delete once authority_capabilities_v2 is deployed everywhere
5493    fn choose_protocol_version_and_system_packages_v1(
5494        current_protocol_version: ProtocolVersion,
5495        protocol_config: &ProtocolConfig,
5496        committee: &Committee,
5497        capabilities: Vec<AuthorityCapabilitiesV1>,
5498        buffer_stake_bps: u64,
5499    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5500        let mut next_protocol_version = current_protocol_version;
5501        let mut system_packages = vec![];
5502
5503        while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
5504            current_protocol_version,
5505            next_protocol_version + 1,
5506            protocol_config,
5507            committee,
5508            capabilities.clone(),
5509            buffer_stake_bps,
5510        ) {
5511            next_protocol_version = version;
5512            system_packages = packages;
5513        }
5514
5515        (next_protocol_version, system_packages)
5516    }
5517
5518    fn choose_protocol_version_and_system_packages_v2(
5519        current_protocol_version: ProtocolVersion,
5520        protocol_config: &ProtocolConfig,
5521        committee: &Committee,
5522        capabilities: Vec<AuthorityCapabilitiesV2>,
5523        buffer_stake_bps: u64,
5524    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5525        let mut next_protocol_version = current_protocol_version;
5526        let mut system_packages = vec![];
5527
5528        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5529            current_protocol_version,
5530            next_protocol_version + 1,
5531            protocol_config,
5532            committee,
5533            capabilities.clone(),
5534            buffer_stake_bps,
5535        ) {
5536            next_protocol_version = version;
5537            system_packages = packages;
5538        }
5539
5540        (next_protocol_version, system_packages)
5541    }
5542
5543    #[instrument(level = "debug", skip_all)]
5544    fn create_authenticator_state_tx(
5545        &self,
5546        epoch_store: &Arc<AuthorityPerEpochStore>,
5547    ) -> Option<EndOfEpochTransactionKind> {
5548        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5549            info!("authenticator state transactions not enabled");
5550            return None;
5551        }
5552
5553        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5554        let tx = if authenticator_state_exists {
5555            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5556            let min_epoch =
5557                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5558            let authenticator_obj_initial_shared_version = epoch_store
5559                .epoch_start_config()
5560                .authenticator_obj_initial_shared_version()
5561                .expect("initial version must exist");
5562
5563            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5564                min_epoch,
5565                authenticator_obj_initial_shared_version,
5566            );
5567
5568            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5569
5570            tx
5571        } else {
5572            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5573            info!("Creating AuthenticatorStateCreate tx");
5574            tx
5575        };
5576        Some(tx)
5577    }
5578
5579    #[instrument(level = "debug", skip_all)]
5580    fn create_randomness_state_tx(
5581        &self,
5582        epoch_store: &Arc<AuthorityPerEpochStore>,
5583    ) -> Option<EndOfEpochTransactionKind> {
5584        if !epoch_store.protocol_config().random_beacon() {
5585            info!("randomness state transactions not enabled");
5586            return None;
5587        }
5588
5589        if epoch_store.randomness_state_exists() {
5590            return None;
5591        }
5592
5593        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5594        info!("Creating RandomnessStateCreate tx");
5595        Some(tx)
5596    }
5597
5598    #[instrument(level = "debug", skip_all)]
5599    fn create_accumulator_root_tx(
5600        &self,
5601        epoch_store: &Arc<AuthorityPerEpochStore>,
5602    ) -> Option<EndOfEpochTransactionKind> {
5603        if !epoch_store
5604            .protocol_config()
5605            .create_root_accumulator_object()
5606        {
5607            info!("accumulator root creation not enabled");
5608            return None;
5609        }
5610
5611        if epoch_store.accumulator_root_exists() {
5612            return None;
5613        }
5614
5615        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5616        info!("Creating AccumulatorRootCreate tx");
5617        Some(tx)
5618    }
5619
5620    #[instrument(level = "debug", skip_all)]
5621    fn create_coin_registry_tx(
5622        &self,
5623        epoch_store: &Arc<AuthorityPerEpochStore>,
5624    ) -> Option<EndOfEpochTransactionKind> {
5625        if !epoch_store.protocol_config().enable_coin_registry() {
5626            info!("coin registry not enabled");
5627            return None;
5628        }
5629
5630        if epoch_store.coin_registry_exists() {
5631            return None;
5632        }
5633
5634        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5635        info!("Creating CoinRegistryCreate tx");
5636        Some(tx)
5637    }
5638
5639    #[instrument(level = "debug", skip_all)]
5640    fn create_display_registry_tx(
5641        &self,
5642        epoch_store: &Arc<AuthorityPerEpochStore>,
5643    ) -> Option<EndOfEpochTransactionKind> {
5644        if !epoch_store.protocol_config().enable_display_registry() {
5645            info!("display registry not enabled");
5646            return None;
5647        }
5648
5649        if epoch_store.display_registry_exists() {
5650            return None;
5651        }
5652
5653        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5654        info!("Creating DisplayRegistryCreate tx");
5655        Some(tx)
5656    }
5657
5658    #[instrument(level = "debug", skip_all)]
5659    fn create_bridge_tx(
5660        &self,
5661        epoch_store: &Arc<AuthorityPerEpochStore>,
5662    ) -> Option<EndOfEpochTransactionKind> {
5663        if !epoch_store.protocol_config().enable_bridge() {
5664            info!("bridge not enabled");
5665            return None;
5666        }
5667        if epoch_store.bridge_exists() {
5668            return None;
5669        }
5670        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5671        info!("Creating Bridge Create tx");
5672        Some(tx)
5673    }
5674
5675    #[instrument(level = "debug", skip_all)]
5676    fn init_bridge_committee_tx(
5677        &self,
5678        epoch_store: &Arc<AuthorityPerEpochStore>,
5679    ) -> Option<EndOfEpochTransactionKind> {
5680        if !epoch_store.protocol_config().enable_bridge() {
5681            info!("bridge not enabled");
5682            return None;
5683        }
5684        if !epoch_store
5685            .protocol_config()
5686            .should_try_to_finalize_bridge_committee()
5687        {
5688            info!("should not try to finalize bridge committee yet");
5689            return None;
5690        }
5691        // Only create this transaction if bridge exists
5692        if !epoch_store.bridge_exists() {
5693            return None;
5694        }
5695
5696        if epoch_store.bridge_committee_initiated() {
5697            return None;
5698        }
5699
5700        let bridge_initial_shared_version = epoch_store
5701            .epoch_start_config()
5702            .bridge_obj_initial_shared_version()
5703            .expect("initial version must exist");
5704        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5705        info!("Init Bridge committee tx");
5706        Some(tx)
5707    }
5708
5709    #[instrument(level = "debug", skip_all)]
5710    fn create_deny_list_state_tx(
5711        &self,
5712        epoch_store: &Arc<AuthorityPerEpochStore>,
5713    ) -> Option<EndOfEpochTransactionKind> {
5714        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5715            return None;
5716        }
5717
5718        if epoch_store.coin_deny_list_state_exists() {
5719            return None;
5720        }
5721
5722        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5723        info!("Creating DenyListStateCreate tx");
5724        Some(tx)
5725    }
5726
5727    #[instrument(level = "debug", skip_all)]
5728    fn create_address_alias_state_tx(
5729        &self,
5730        epoch_store: &Arc<AuthorityPerEpochStore>,
5731    ) -> Option<EndOfEpochTransactionKind> {
5732        if !epoch_store.protocol_config().address_aliases() {
5733            info!("address aliases not enabled");
5734            return None;
5735        }
5736
5737        if epoch_store.address_alias_state_exists() {
5738            return None;
5739        }
5740
5741        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5742        info!("Creating AddressAliasStateCreate tx");
5743        Some(tx)
5744    }
5745
5746    #[instrument(level = "debug", skip_all)]
5747    fn create_execution_time_observations_tx(
5748        &self,
5749        epoch_store: &Arc<AuthorityPerEpochStore>,
5750        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5751        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5752    ) -> Option<EndOfEpochTransactionKind> {
5753        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5754            .protocol_config()
5755            .per_object_congestion_control_mode()
5756        else {
5757            return None;
5758        };
5759
5760        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5761        // execution time observations for commands in these checkpoints.
5762        let start_checkpoint = std::cmp::max(
5763            last_checkpoint_before_end_of_epoch
5764                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5765            // If we have <N checkpoints in the epoch, use all of them.
5766            epoch_store
5767                .epoch()
5768                .checked_sub(1)
5769                .map(|prev_epoch| {
5770                    self.checkpoint_store
5771                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
5772                        .expect("typed store must not fail")
5773                        .expect(
5774                            "sequence number of last checkpoint of preceding epoch must be saved",
5775                        )
5776                        + 1
5777                })
5778                .unwrap_or(1),
5779        );
5780        info!(
5781            "reading checkpoint range {:?}..={:?}",
5782            start_checkpoint, last_checkpoint_before_end_of_epoch
5783        );
5784        let sequence_numbers =
5785            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5786        let contents_digests: Vec<_> = self
5787            .checkpoint_store
5788            .multi_get_locally_computed_checkpoints(&sequence_numbers)
5789            .expect("typed store must not fail")
5790            .into_iter()
5791            .zip(sequence_numbers)
5792            .map(|(maybe_checkpoint, sequence_number)| {
5793                if let Some(checkpoint) = maybe_checkpoint {
5794                    checkpoint.content_digest
5795                } else {
5796                    // If locally computed checkpoint summary was already pruned, load the
5797                    // certified checkpoint.
5798                    self.checkpoint_store
5799                        .get_checkpoint_by_sequence_number(sequence_number)
5800                        .expect("typed store must not fail")
5801                        .unwrap_or_else(|| {
5802                            fatal!("preceding checkpoints must exist by end of epoch")
5803                        })
5804                        .data()
5805                        .content_digest
5806                }
5807            })
5808            .collect();
5809        let tx_digests: Vec<_> = self
5810            .checkpoint_store
5811            .multi_get_checkpoint_content(&contents_digests)
5812            .expect("typed store must not fail")
5813            .into_iter()
5814            .flat_map(|maybe_contents| {
5815                maybe_contents
5816                    .expect("preceding checkpoint contents must exist by end of epoch")
5817                    .into_inner()
5818                    .into_iter()
5819                    .map(|digests| digests.transaction)
5820            })
5821            .collect();
5822        let included_execution_time_observations: HashSet<_> = self
5823            .get_transaction_cache_reader()
5824            .multi_get_transaction_blocks(&tx_digests)
5825            .into_iter()
5826            .flat_map(|maybe_tx| {
5827                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5828                    .expect("preceding transaction must exist by end of epoch")
5829                    .transaction_data()
5830                    .kind()
5831                {
5832                    #[allow(clippy::unnecessary_to_owned)]
5833                    itertools::Either::Left(
5834                        ptb.commands
5835                            .to_owned()
5836                            .into_iter()
5837                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5838                    )
5839                } else {
5840                    itertools::Either::Right(std::iter::empty())
5841                }
5842            })
5843            .chain(end_of_epoch_observation_keys.into_iter())
5844            .collect();
5845
5846        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5847            epoch_store
5848                .get_end_of_epoch_execution_time_observations()
5849                .filter_and_sort_v1(
5850                    |(key, _)| included_execution_time_observations.contains(key),
5851                    params.stored_observations_limit.try_into().unwrap(),
5852                ),
5853        );
5854        info!("Creating StoreExecutionTimeObservations tx");
5855        Some(tx)
5856    }
5857
5858    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
5859    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
5860    /// formed and executed by CheckpointExecutor.
5861    ///
5862    /// When a framework upgraded has been decided on, but the validator does not have the new
5863    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
5864    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
5865    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
5866    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
5867    /// by CheckpointExecutor.
5868    #[instrument(level = "error", skip_all)]
5869    pub async fn create_and_execute_advance_epoch_tx(
5870        &self,
5871        epoch_store: &Arc<AuthorityPerEpochStore>,
5872        gas_cost_summary: &GasCostSummary,
5873        checkpoint: CheckpointSequenceNumber,
5874        epoch_start_timestamp_ms: CheckpointTimestamp,
5875        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5876        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
5877        // >1 checkpoint.
5878        last_checkpoint: CheckpointSequenceNumber,
5879    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5880        let mut txns = Vec::new();
5881
5882        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5883            txns.push(tx);
5884        }
5885        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5886            txns.push(tx);
5887        }
5888        if let Some(tx) = self.create_bridge_tx(epoch_store) {
5889            txns.push(tx);
5890        }
5891        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5892            txns.push(tx);
5893        }
5894        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5895            txns.push(tx);
5896        }
5897        if let Some(tx) = self.create_execution_time_observations_tx(
5898            epoch_store,
5899            end_of_epoch_observation_keys,
5900            last_checkpoint,
5901        ) {
5902            txns.push(tx);
5903        }
5904        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5905            txns.push(tx);
5906        }
5907
5908        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5909            txns.push(tx);
5910        }
5911
5912        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5913            txns.push(tx);
5914        }
5915
5916        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
5917            txns.push(tx);
5918        }
5919
5920        let next_epoch = epoch_store.epoch() + 1;
5921
5922        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5923
5924        let (next_epoch_protocol_version, next_epoch_system_packages) =
5925            if epoch_store.protocol_config().authority_capabilities_v2() {
5926                Self::choose_protocol_version_and_system_packages_v2(
5927                    epoch_store.protocol_version(),
5928                    epoch_store.protocol_config(),
5929                    epoch_store.committee(),
5930                    epoch_store
5931                        .get_capabilities_v2()
5932                        .expect("read capabilities from db cannot fail"),
5933                    buffer_stake_bps,
5934                )
5935            } else {
5936                Self::choose_protocol_version_and_system_packages_v1(
5937                    epoch_store.protocol_version(),
5938                    epoch_store.protocol_config(),
5939                    epoch_store.committee(),
5940                    epoch_store
5941                        .get_capabilities_v1()
5942                        .expect("read capabilities from db cannot fail"),
5943                    buffer_stake_bps,
5944                )
5945            };
5946
5947        // since system packages are created during the current epoch, they should abide by the
5948        // rules of the current epoch, including the current epoch's max Move binary format version
5949        let config = epoch_store.protocol_config();
5950        let binary_config = config.binary_config(None);
5951        let Some(next_epoch_system_package_bytes) = self
5952            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5953            .await
5954        else {
5955            if next_epoch_protocol_version <= ProtocolVersion::MAX {
5956                // This case should only be hit if the validator supports the new protocol version,
5957                // but carries a different framework. The validator should still be able to
5958                // reconfigure, as the correct framework will be installed by the change epoch txn.
5959                debug_fatal!(
5960                    "upgraded system packages {:?} are not locally available, cannot create \
5961                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
5962                    next_epoch_system_packages
5963                );
5964            } else {
5965                error!(
5966                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5967                    next_epoch_protocol_version
5968                );
5969            }
5970            // the checkpoint builder will keep retrying forever when it hits this error.
5971            // Eventually, one of two things will happen:
5972            // - The operator will upgrade this binary to one that has the new packages locally,
5973            //   and this function will succeed.
5974            // - The final checkpoint will be certified by other validators, we will receive it via
5975            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
5976            //   and most likely shut down in the new epoch (this validator likely doesn't support
5977            //   the new protocol version, or else it should have had the packages.)
5978            return Err(CheckpointBuilderError::SystemPackagesMissing);
5979        };
5980
5981        let tx = if epoch_store
5982            .protocol_config()
5983            .end_of_epoch_transaction_supported()
5984        {
5985            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5986                next_epoch,
5987                next_epoch_protocol_version,
5988                gas_cost_summary.storage_cost,
5989                gas_cost_summary.computation_cost,
5990                gas_cost_summary.storage_rebate,
5991                gas_cost_summary.non_refundable_storage_fee,
5992                epoch_start_timestamp_ms,
5993                next_epoch_system_package_bytes,
5994            ));
5995
5996            VerifiedTransaction::new_end_of_epoch_transaction(txns)
5997        } else {
5998            VerifiedTransaction::new_change_epoch(
5999                next_epoch,
6000                next_epoch_protocol_version,
6001                gas_cost_summary.storage_cost,
6002                gas_cost_summary.computation_cost,
6003                gas_cost_summary.storage_rebate,
6004                gas_cost_summary.non_refundable_storage_fee,
6005                epoch_start_timestamp_ms,
6006                next_epoch_system_package_bytes,
6007            )
6008        };
6009
6010        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6011            tx.clone(),
6012            epoch_store.epoch(),
6013            checkpoint,
6014        );
6015
6016        let tx_digest = executable_tx.digest();
6017
6018        info!(
6019            ?next_epoch,
6020            ?next_epoch_protocol_version,
6021            ?next_epoch_system_packages,
6022            computation_cost=?gas_cost_summary.computation_cost,
6023            storage_cost=?gas_cost_summary.storage_cost,
6024            storage_rebate=?gas_cost_summary.storage_rebate,
6025            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6026            ?tx_digest,
6027            "Creating advance epoch transaction"
6028        );
6029
6030        fail_point_async!("change_epoch_tx_delay");
6031        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6032
6033        // The tx could have been executed by state sync already - if so simply return an error.
6034        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6035        if self
6036            .get_transaction_cache_reader()
6037            .is_tx_already_executed(tx_digest)
6038        {
6039            warn!("change epoch tx has already been executed via state sync");
6040            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6041        }
6042
6043        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6044        else {
6045            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6046        };
6047
6048        // We must manually assign the shared object versions to the transaction before executing it.
6049        // This is because we do not sequence end-of-epoch transactions through consensus.
6050        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6051            self.get_object_cache_reader().as_ref(),
6052            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6053        )?;
6054
6055        assert_eq!(assigned_versions.0.len(), 1);
6056        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6057
6058        let input_objects = self.read_objects_for_execution(
6059            &tx_lock,
6060            &executable_tx,
6061            assigned_versions,
6062            epoch_store,
6063        )?;
6064
6065        let (transaction_outputs, _timings, _execution_error_opt) = self
6066            .execute_certificate(
6067                &execution_guard,
6068                &executable_tx,
6069                input_objects,
6070                None,
6071                BalanceWithdrawStatus::NoWithdraw,
6072                epoch_store,
6073            )
6074            .unwrap();
6075        let system_obj = get_sui_system_state(&transaction_outputs.written)
6076            .expect("change epoch tx must write to system object");
6077
6078        let effects = transaction_outputs.effects;
6079        // We must write tx and effects to the state sync tables so that state sync is able to
6080        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6081        // checkpoint.
6082        self.get_state_sync_store()
6083            .insert_transaction_and_effects(&tx, &effects);
6084
6085        info!(
6086            "Effects summary of the change epoch transaction: {:?}",
6087            effects.summary_for_debug()
6088        );
6089        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6090        // The change epoch transaction cannot fail to execute.
6091        assert!(effects.status().is_ok());
6092        Ok((system_obj, effects))
6093    }
6094
6095    #[instrument(level = "error", skip_all)]
6096    async fn reopen_epoch_db(
6097        &self,
6098        cur_epoch_store: &AuthorityPerEpochStore,
6099        new_committee: Committee,
6100        epoch_start_configuration: EpochStartConfiguration,
6101        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6102        epoch_last_checkpoint: CheckpointSequenceNumber,
6103    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6104        let new_epoch = new_committee.epoch;
6105        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6106        assert_eq!(
6107            epoch_start_configuration.epoch_start_state().epoch(),
6108            new_committee.epoch
6109        );
6110        fail_point!("before-open-new-epoch-store");
6111        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6112            self.name,
6113            new_committee,
6114            epoch_start_configuration,
6115            self.get_backing_package_store().clone(),
6116            self.get_object_store().clone(),
6117            expensive_safety_check_config,
6118            epoch_last_checkpoint,
6119        )?;
6120        self.epoch_store.store(new_epoch_store.clone());
6121        Ok(new_epoch_store)
6122    }
6123
6124    #[cfg(test)]
6125    pub(crate) fn iter_live_object_set_for_testing(
6126        &self,
6127    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6128        let include_wrapped_object = !self
6129            .epoch_store_for_testing()
6130            .protocol_config()
6131            .simplified_unwrap_then_delete();
6132        self.get_global_state_hash_store()
6133            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6134    }
6135
6136    #[cfg(test)]
6137    pub(crate) fn shutdown_execution_for_test(&self) {
6138        self.tx_execution_shutdown
6139            .lock()
6140            .take()
6141            .unwrap()
6142            .send(())
6143            .unwrap();
6144    }
6145
6146    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6147    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6148        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6149        self.get_object_cache_reader()
6150            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6151        self.get_reconfig_api()
6152            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6153        Ok(())
6154    }
6155}
6156
6157pub struct RandomnessRoundReceiver {
6158    authority_state: Arc<AuthorityState>,
6159    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6160}
6161
6162impl RandomnessRoundReceiver {
6163    pub fn spawn(
6164        authority_state: Arc<AuthorityState>,
6165        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6166    ) -> JoinHandle<()> {
6167        let rrr = RandomnessRoundReceiver {
6168            authority_state,
6169            randomness_rx,
6170        };
6171        spawn_monitored_task!(rrr.run())
6172    }
6173
6174    async fn run(mut self) {
6175        info!("RandomnessRoundReceiver event loop started");
6176
6177        loop {
6178            tokio::select! {
6179                maybe_recv = self.randomness_rx.recv() => {
6180                    if let Some((epoch, round, bytes)) = maybe_recv {
6181                        self.handle_new_randomness(epoch, round, bytes).await;
6182                    } else {
6183                        break;
6184                    }
6185                },
6186            }
6187        }
6188
6189        info!("RandomnessRoundReceiver event loop ended");
6190    }
6191
6192    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6193    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6194        fail_point_async!("randomness-delay");
6195
6196        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6197        if epoch_store.epoch() != epoch {
6198            warn!(
6199                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6200                epoch_store.epoch()
6201            );
6202            return;
6203        }
6204        let key = TransactionKey::RandomnessRound(epoch, round);
6205        let transaction = VerifiedTransaction::new_randomness_state_update(
6206            epoch,
6207            round,
6208            bytes,
6209            epoch_store
6210                .epoch_start_config()
6211                .randomness_obj_initial_shared_version()
6212                .expect("randomness state obj must exist"),
6213        );
6214        debug!(
6215            "created randomness state update transaction with digest: {:?}",
6216            transaction.digest()
6217        );
6218        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6219        let digest = *transaction.digest();
6220
6221        // Randomness state updates contain the full bls signature for the random round,
6222        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6223        // persist this transaction. If we crash before its outputs are committed, this
6224        // ensures we will be able to re-execute it.
6225        self.authority_state
6226            .get_cache_commit()
6227            .persist_transaction(&transaction);
6228
6229        // Notify the scheduler that the transaction key now has a known digest
6230        if epoch_store.insert_tx_key(key, digest).is_err() {
6231            warn!("epoch ended while handling new randomness");
6232        }
6233
6234        let authority_state = self.authority_state.clone();
6235        spawn_monitored_task!(async move {
6236            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6237            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6238            // output of the RandomnessStateUpdate from the previous round.)
6239            //
6240            // We set a very long timeout so that in case this gets stuck for some reason, the
6241            // validator will eventually crash rather than continuing in a zombie mode.
6242            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6243            let result = tokio::time::timeout(
6244                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6245                authority_state
6246                    .get_transaction_cache_reader()
6247                    .notify_read_executed_effects(
6248                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6249                        &[digest],
6250                    ),
6251            )
6252            .await;
6253            let mut effects = match result {
6254                Ok(result) => result,
6255                Err(_) => {
6256                    if cfg!(debug_assertions) {
6257                        // Crash on randomness update execution timeout in debug builds.
6258                        panic!(
6259                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6260                        );
6261                    }
6262                    warn!(
6263                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6264                    );
6265                    // Continue waiting as long as necessary in non-debug builds.
6266                    authority_state
6267                        .get_transaction_cache_reader()
6268                        .notify_read_executed_effects(
6269                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6270                            &[digest],
6271                        )
6272                        .await
6273                }
6274            };
6275
6276            let effects = effects.pop().expect("should return effects");
6277            if *effects.status() != ExecutionStatus::Success {
6278                fatal!(
6279                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6280                );
6281            }
6282            debug!(
6283                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6284            );
6285        });
6286    }
6287}
6288
6289#[async_trait]
6290impl TransactionKeyValueStoreTrait for AuthorityState {
6291    #[instrument(skip(self))]
6292    async fn multi_get(
6293        &self,
6294        transactions: &[TransactionDigest],
6295        effects: &[TransactionDigest],
6296    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6297        let txns = if !transactions.is_empty() {
6298            self.get_transaction_cache_reader()
6299                .multi_get_transaction_blocks(transactions)
6300                .into_iter()
6301                .map(|t| t.map(|t| (*t).clone().into_inner()))
6302                .collect()
6303        } else {
6304            vec![]
6305        };
6306
6307        let fx = if !effects.is_empty() {
6308            self.get_transaction_cache_reader()
6309                .multi_get_executed_effects(effects)
6310        } else {
6311            vec![]
6312        };
6313
6314        Ok((txns, fx))
6315    }
6316
6317    #[instrument(skip(self))]
6318    async fn multi_get_checkpoints(
6319        &self,
6320        checkpoint_summaries: &[CheckpointSequenceNumber],
6321        checkpoint_contents: &[CheckpointSequenceNumber],
6322        checkpoint_summaries_by_digest: &[CheckpointDigest],
6323    ) -> SuiResult<(
6324        Vec<Option<CertifiedCheckpointSummary>>,
6325        Vec<Option<CheckpointContents>>,
6326        Vec<Option<CertifiedCheckpointSummary>>,
6327    )> {
6328        // TODO: use multi-get methods if it ever becomes important (unlikely)
6329        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6330        let store = self.get_checkpoint_store();
6331        for seq in checkpoint_summaries {
6332            let checkpoint = store
6333                .get_checkpoint_by_sequence_number(*seq)?
6334                .map(|c| c.into_inner());
6335
6336            summaries.push(checkpoint);
6337        }
6338
6339        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6340        for seq in checkpoint_contents {
6341            let checkpoint = store
6342                .get_checkpoint_by_sequence_number(*seq)?
6343                .and_then(|summary| {
6344                    store
6345                        .get_checkpoint_contents(&summary.content_digest)
6346                        .expect("db read cannot fail")
6347                });
6348            contents.push(checkpoint);
6349        }
6350
6351        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6352        for digest in checkpoint_summaries_by_digest {
6353            let checkpoint = store
6354                .get_checkpoint_by_digest(digest)?
6355                .map(|c| c.into_inner());
6356            summaries_by_digest.push(checkpoint);
6357        }
6358        Ok((summaries, contents, summaries_by_digest))
6359    }
6360
6361    #[instrument(skip(self))]
6362    async fn deprecated_get_transaction_checkpoint(
6363        &self,
6364        digest: TransactionDigest,
6365    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6366        Ok(self
6367            .get_checkpoint_cache()
6368            .deprecated_get_transaction_checkpoint(&digest)
6369            .map(|(_epoch, checkpoint)| checkpoint))
6370    }
6371
6372    #[instrument(skip(self))]
6373    async fn get_object(
6374        &self,
6375        object_id: ObjectID,
6376        version: VersionNumber,
6377    ) -> SuiResult<Option<Object>> {
6378        Ok(self
6379            .get_object_cache_reader()
6380            .get_object_by_key(&object_id, version))
6381    }
6382
6383    #[instrument(skip_all)]
6384    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6385        Ok(self
6386            .get_object_cache_reader()
6387            .multi_get_objects_by_key(object_keys))
6388    }
6389
6390    #[instrument(skip(self))]
6391    async fn multi_get_transaction_checkpoint(
6392        &self,
6393        digests: &[TransactionDigest],
6394    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6395        let res = self
6396            .get_checkpoint_cache()
6397            .deprecated_multi_get_transaction_checkpoint(digests);
6398
6399        Ok(res
6400            .into_iter()
6401            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6402            .collect())
6403    }
6404
6405    #[instrument(skip(self))]
6406    async fn multi_get_events_by_tx_digests(
6407        &self,
6408        digests: &[TransactionDigest],
6409    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6410        if digests.is_empty() {
6411            return Ok(vec![]);
6412        }
6413
6414        Ok(self
6415            .get_transaction_cache_reader()
6416            .multi_get_events(digests))
6417    }
6418}
6419
6420#[cfg(msim)]
6421pub mod framework_injection {
6422    use move_binary_format::CompiledModule;
6423    use std::collections::BTreeMap;
6424    use std::{cell::RefCell, collections::BTreeSet};
6425    use sui_framework::{BuiltInFramework, SystemPackage};
6426    use sui_types::base_types::{AuthorityName, ObjectID};
6427    use sui_types::is_system_package;
6428
6429    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6430
6431    // Thread local cache because all simtests run in a single unique thread.
6432    thread_local! {
6433        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6434    }
6435
6436    type Framework = Vec<CompiledModule>;
6437
6438    pub type PackageUpgradeCallback =
6439        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6440
6441    enum PackageOverrideConfig {
6442        Global(Framework),
6443        PerValidator(PackageUpgradeCallback),
6444    }
6445
6446    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6447        modules
6448            .iter()
6449            .map(|m| {
6450                let mut buf = Vec::new();
6451                m.serialize_with_version(m.version, &mut buf).unwrap();
6452                buf
6453            })
6454            .collect()
6455    }
6456
6457    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6458        OVERRIDE.with(|bs| {
6459            bs.borrow_mut()
6460                .insert(package_id, PackageOverrideConfig::Global(modules))
6461        });
6462    }
6463
6464    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6465        OVERRIDE.with(|bs| {
6466            bs.borrow_mut()
6467                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6468        });
6469    }
6470
6471    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6472        OVERRIDE.with(|bs| {
6473            let mut new_packages_not_to_include: BTreeSet<_> =
6474                BuiltInFramework::all_package_ids().into_iter().collect();
6475            for pkg in &packages {
6476                new_packages_not_to_include.remove(&pkg.id);
6477            }
6478            for pkg in packages {
6479                bs.borrow_mut()
6480                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6481            }
6482            for empty_pkg in new_packages_not_to_include {
6483                bs.borrow_mut()
6484                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6485            }
6486        });
6487    }
6488
6489    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6490        OVERRIDE.with(|cfg| {
6491            cfg.borrow().get(package_id).and_then(|entry| match entry {
6492                PackageOverrideConfig::Global(framework) => {
6493                    Some(compiled_modules_to_bytes(framework))
6494                }
6495                PackageOverrideConfig::PerValidator(func) => {
6496                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6497                }
6498            })
6499        })
6500    }
6501
6502    pub fn get_override_modules(
6503        package_id: &ObjectID,
6504        name: AuthorityName,
6505    ) -> Option<Vec<CompiledModule>> {
6506        OVERRIDE.with(|cfg| {
6507            cfg.borrow().get(package_id).and_then(|entry| match entry {
6508                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6509                PackageOverrideConfig::PerValidator(func) => func(name),
6510            })
6511        })
6512    }
6513
6514    pub fn get_override_system_package(
6515        package_id: &ObjectID,
6516        name: AuthorityName,
6517    ) -> Option<SystemPackage> {
6518        let bytes = get_override_bytes(package_id, name)?;
6519        let dependencies = if is_system_package(*package_id) {
6520            BuiltInFramework::get_package_by_id(package_id)
6521                .dependencies
6522                .to_vec()
6523        } else {
6524            // Assume that entirely new injected packages depend on all existing system packages.
6525            BuiltInFramework::all_package_ids()
6526        };
6527        Some(SystemPackage {
6528            id: *package_id,
6529            bytes,
6530            dependencies,
6531        })
6532    }
6533
6534    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6535        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6536        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6537            cfg.borrow()
6538                .keys()
6539                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6540                .collect()
6541        });
6542
6543        extra
6544            .into_iter()
6545            .map(|package| SystemPackage {
6546                id: package,
6547                bytes: get_override_bytes(&package, name).unwrap(),
6548                dependencies: BuiltInFramework::all_package_ids(),
6549            })
6550            .collect()
6551    }
6552}
6553
6554#[derive(Debug, Serialize, Deserialize, Clone)]
6555pub struct ObjDumpFormat {
6556    pub id: ObjectID,
6557    pub version: VersionNumber,
6558    pub digest: ObjectDigest,
6559    pub object: Object,
6560}
6561
6562impl ObjDumpFormat {
6563    fn new(object: Object) -> Self {
6564        let oref = object.compute_object_reference();
6565        Self {
6566            id: oref.0,
6567            version: oref.1,
6568            digest: oref.2,
6569            object,
6570        }
6571    }
6572}
6573
6574#[derive(Debug, Serialize, Deserialize, Clone)]
6575pub struct NodeStateDump {
6576    pub tx_digest: TransactionDigest,
6577    pub sender_signed_data: SenderSignedData,
6578    pub executed_epoch: u64,
6579    pub reference_gas_price: u64,
6580    pub protocol_version: u64,
6581    pub epoch_start_timestamp_ms: u64,
6582    pub computed_effects: TransactionEffects,
6583    pub expected_effects_digest: TransactionEffectsDigest,
6584    pub relevant_system_packages: Vec<ObjDumpFormat>,
6585    pub shared_objects: Vec<ObjDumpFormat>,
6586    pub loaded_child_objects: Vec<ObjDumpFormat>,
6587    pub modified_at_versions: Vec<ObjDumpFormat>,
6588    pub runtime_reads: Vec<ObjDumpFormat>,
6589    pub input_objects: Vec<ObjDumpFormat>,
6590}
6591
6592impl NodeStateDump {
6593    pub fn new(
6594        tx_digest: &TransactionDigest,
6595        effects: &TransactionEffects,
6596        expected_effects_digest: TransactionEffectsDigest,
6597        object_store: &dyn ObjectStore,
6598        epoch_store: &Arc<AuthorityPerEpochStore>,
6599        inner_temporary_store: &InnerTemporaryStore,
6600        certificate: &VerifiedExecutableTransaction,
6601    ) -> SuiResult<Self> {
6602        // Epoch info
6603        let executed_epoch = epoch_store.epoch();
6604        let reference_gas_price = epoch_store.reference_gas_price();
6605        let epoch_start_config = epoch_store.epoch_start_config();
6606        let protocol_version = epoch_store.protocol_version().as_u64();
6607        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6608
6609        // Record all system packages at this version
6610        let mut relevant_system_packages = Vec::new();
6611        for sys_package_id in BuiltInFramework::all_package_ids() {
6612            if let Some(w) = object_store.get_object(&sys_package_id) {
6613                relevant_system_packages.push(ObjDumpFormat::new(w))
6614            }
6615        }
6616
6617        // Record all the shared objects
6618        let mut shared_objects = Vec::new();
6619        for kind in effects.input_consensus_objects() {
6620            match kind {
6621                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6622                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6623                        shared_objects.push(ObjDumpFormat::new(w))
6624                    }
6625                }
6626                InputConsensusObject::ReadConsensusStreamEnded(..)
6627                | InputConsensusObject::MutateConsensusStreamEnded(..)
6628                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6629            }
6630        }
6631
6632        // Record all loaded child objects
6633        // Child objects which are read but not mutated are not tracked anywhere else
6634        let mut loaded_child_objects = Vec::new();
6635        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6636            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6637                loaded_child_objects.push(ObjDumpFormat::new(w))
6638            }
6639        }
6640
6641        // Record all modified objects
6642        let mut modified_at_versions = Vec::new();
6643        for (id, ver) in effects.modified_at_versions() {
6644            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6645                modified_at_versions.push(ObjDumpFormat::new(w))
6646            }
6647        }
6648
6649        // Packages read at runtime, which were not previously loaded into the temoorary store
6650        // Some packages may be fetched at runtime and wont show up in input objects
6651        let mut runtime_reads = Vec::new();
6652        for obj in inner_temporary_store
6653            .runtime_packages_loaded_from_db
6654            .values()
6655        {
6656            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6657        }
6658
6659        // All other input objects should already be in `inner_temporary_store.objects`
6660
6661        Ok(Self {
6662            tx_digest: *tx_digest,
6663            executed_epoch,
6664            reference_gas_price,
6665            epoch_start_timestamp_ms,
6666            protocol_version,
6667            relevant_system_packages,
6668            shared_objects,
6669            loaded_child_objects,
6670            modified_at_versions,
6671            runtime_reads,
6672            sender_signed_data: certificate.clone().into_message(),
6673            input_objects: inner_temporary_store
6674                .input_objects
6675                .values()
6676                .map(|o| ObjDumpFormat::new(o.clone()))
6677                .collect(),
6678            computed_effects: effects.clone(),
6679            expected_effects_digest,
6680        })
6681    }
6682
6683    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6684        let mut objects = Vec::new();
6685        objects.extend(self.relevant_system_packages.clone());
6686        objects.extend(self.shared_objects.clone());
6687        objects.extend(self.loaded_child_objects.clone());
6688        objects.extend(self.modified_at_versions.clone());
6689        objects.extend(self.runtime_reads.clone());
6690        objects.extend(self.input_objects.clone());
6691        objects
6692    }
6693
6694    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6695        let file_name = format!(
6696            "{}_{}_NODE_DUMP.json",
6697            self.tx_digest,
6698            AuthorityState::unixtime_now_ms()
6699        );
6700        let mut path = path.to_path_buf();
6701        path.push(&file_name);
6702        let mut file = File::create(path.clone())?;
6703        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6704        Ok(path)
6705    }
6706
6707    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6708        let file = File::open(path)?;
6709        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6710    }
6711}