sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::checkpoints::CheckpointBuilderError;
6use crate::checkpoints::CheckpointBuilderResult;
7use crate::congestion_tracker::CongestionTracker;
8use crate::consensus_adapter::ConsensusOverloadChecker;
9use crate::execution_cache::ExecutionCacheTraitPointers;
10use crate::execution_cache::TransactionCacheRead;
11use crate::execution_scheduler::ExecutionScheduler;
12use crate::execution_scheduler::SchedulingSource;
13use crate::jsonrpc_index::CoinIndexKey2;
14use crate::rpc_index::RpcIndexStore;
15use crate::traffic_controller::TrafficController;
16use crate::traffic_controller::metrics::TrafficControllerMetrics;
17use crate::transaction_outputs::TransactionOutputs;
18use crate::verify_indexes::{fix_indexes, verify_indexes};
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22use fastcrypto::encoding::Base58;
23use fastcrypto::encoding::Encoding;
24use fastcrypto::hash::MultisetHash;
25use itertools::Itertools;
26use move_binary_format::CompiledModule;
27use move_binary_format::binary_config::BinaryConfig;
28use move_core_types::annotated_value::MoveStructLayout;
29use move_core_types::language_storage::ModuleId;
30use mysten_common::fatal;
31use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
32use parking_lot::Mutex;
33use prometheus::{
34    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35    register_histogram_vec_with_registry, register_histogram_with_registry,
36    register_int_counter_vec_with_registry, register_int_counter_with_registry,
37    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
38};
39use serde::de::DeserializeOwned;
40use serde::{Deserialize, Serialize};
41use shared_object_version_manager::AssignedVersions;
42use shared_object_version_manager::Schedulable;
43use std::collections::BTreeMap;
44use std::collections::BTreeSet;
45use std::fs::File;
46use std::io::Write;
47use std::path::{Path, PathBuf};
48use std::sync::atomic::Ordering;
49use std::time::Duration;
50use std::time::Instant;
51use std::time::SystemTime;
52use std::time::UNIX_EPOCH;
53use std::{
54    collections::{HashMap, HashSet},
55    fs,
56    pin::Pin,
57    str::FromStr,
58    sync::Arc,
59    vec,
60};
61use sui_config::NodeConfig;
62use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
63use sui_protocol_config::PerObjectCongestionControlMode;
64use sui_types::crypto::RandomnessRound;
65use sui_types::dynamic_field::visitor as DFV;
66use sui_types::execution::ExecutionOutput;
67use sui_types::execution::ExecutionTimeObservationKey;
68use sui_types::execution::ExecutionTiming;
69use sui_types::execution_params::BalanceWithdrawStatus;
70use sui_types::execution_params::ExecutionOrEarlyError;
71use sui_types::execution_params::get_early_execution_error;
72use sui_types::execution_status::ExecutionStatus;
73use sui_types::inner_temporary_store::PackageStoreWithFallback;
74use sui_types::layout_resolver::LayoutResolver;
75use sui_types::layout_resolver::into_struct_layout;
76use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
77use sui_types::object::bounded_visitor::BoundedVisitor;
78use sui_types::storage::ChildObjectResolver;
79use sui_types::storage::InputKey;
80use sui_types::storage::TrackingBackingStore;
81use sui_types::traffic_control::{
82    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
83};
84use sui_types::transaction_executor::SimulateTransactionResult;
85use sui_types::transaction_executor::TransactionChecks;
86use tap::TapFallible;
87use tokio::sync::RwLock;
88use tokio::sync::mpsc::unbounded_channel;
89use tokio::sync::{mpsc, oneshot};
90use tokio::task::JoinHandle;
91use tokio::time::timeout;
92use tracing::trace;
93use tracing::{debug, error, info, instrument, warn};
94
95use self::authority_store::ExecutionLockWriteGuard;
96use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
97pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
98use mysten_metrics::{monitored_scope, spawn_monitored_task};
99
100use crate::jsonrpc_index::IndexStore;
101use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
102use mysten_common::debug_fatal;
103use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
104use sui_config::genesis::Genesis;
105use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
106use sui_framework::{BuiltInFramework, SystemPackage};
107use sui_json_rpc_types::{
108    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
109    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
110    SuiTransactionBlockEvents, TransactionFilter,
111};
112use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
113use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
114use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
115use sui_types::authenticator_state::get_authenticator_state;
116use sui_types::committee::{EpochId, ProtocolVersion};
117use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
118use sui_types::deny_list_v1::check_coin_deny_list_v1;
119use sui_types::digests::ChainIdentifier;
120use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
121use sui_types::effects::{
122    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
123    TransactionEvents, VerifiedSignedTransactionEffects,
124};
125use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
126use sui_types::event::{Event, EventID};
127use sui_types::executable_transaction::VerifiedExecutableTransaction;
128use sui_types::gas::{GasCostSummary, SuiGasStatus};
129use sui_types::inner_temporary_store::{
130    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
131};
132use sui_types::message_envelope::Message;
133use sui_types::messages_checkpoint::{
134    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
135    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
136    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
137    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
138};
139use sui_types::messages_grpc::{
140    HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind,
141    ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
142};
143use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
144use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
145use sui_types::storage::{
146    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
147};
148use sui_types::sui_system_state::SuiSystemStateTrait;
149use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
150use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
151use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
152use sui_types::{
153    SUI_SYSTEM_ADDRESS,
154    base_types::*,
155    committee::Committee,
156    crypto::AuthoritySignature,
157    error::{SuiError, SuiResult},
158    object::{Object, ObjectRead},
159    transaction::*,
160};
161use sui_types::{TypeTag, is_system_package};
162use typed_store::TypedStoreError;
163
164use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
165use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
166use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
167use crate::authority::authority_store_pruner::{
168    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
169};
170use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
171use crate::authority::epoch_start_configuration::EpochStartConfiguration;
172use crate::checkpoints::CheckpointStore;
173use crate::epoch::committee_store::CommitteeStore;
174use crate::execution_cache::{
175    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
176    ObjectCacheRead, StateSyncAPI,
177};
178use crate::execution_driver::execution_process;
179use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
180use crate::metrics::LatencyObserver;
181use crate::metrics::RateTracker;
182use crate::module_cache_metrics::ResolverMetrics;
183use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
184use crate::stake_aggregator::StakeAggregator;
185use crate::subscription_handler::SubscriptionHandler;
186use crate::transaction_input_loader::TransactionInputLoader;
187
188#[cfg(msim)]
189pub use crate::checkpoints::checkpoint_executor::utils::{
190    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
191};
192
193use crate::authority::authority_store_tables::AuthorityPrunerTables;
194use crate::authority_client::NetworkAuthorityClient;
195use crate::validator_tx_finalizer::ValidatorTxFinalizer;
196#[cfg(msim)]
197use sui_types::committee::CommitteeTrait;
198use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
199
200#[cfg(test)]
201#[path = "unit_tests/authority_tests.rs"]
202pub mod authority_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/transaction_tests.rs"]
206pub mod transaction_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/batch_transaction_tests.rs"]
210mod batch_transaction_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/move_integration_tests.rs"]
214pub mod move_integration_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/gas_tests.rs"]
218mod gas_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/batch_verification_tests.rs"]
222mod batch_verification_tests;
223
224#[cfg(test)]
225#[path = "unit_tests/coin_deny_list_tests.rs"]
226mod coin_deny_list_tests;
227
228#[cfg(test)]
229#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
230mod mysticeti_fastpath_execution_tests;
231
232#[cfg(test)]
233#[path = "unit_tests/auth_unit_test_utils.rs"]
234pub mod auth_unit_test_utils;
235
236pub mod authority_test_utils;
237
238pub mod authority_per_epoch_store;
239pub mod authority_per_epoch_store_pruner;
240
241pub mod authority_store_pruner;
242pub mod authority_store_tables;
243pub mod authority_store_types;
244pub mod consensus_tx_status_cache;
245pub mod epoch_start_configuration;
246pub mod execution_time_estimator;
247pub mod shared_object_congestion_tracker;
248pub mod shared_object_version_manager;
249pub mod submitted_transaction_cache;
250pub mod test_authority_builder;
251pub mod transaction_deferral;
252pub mod transaction_reject_reason_cache;
253mod weighted_moving_average;
254
255pub(crate) mod authority_store;
256pub mod backpressure;
257
258/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
259pub struct AuthorityMetrics {
260    tx_orders: IntCounter,
261    total_certs: IntCounter,
262    total_cert_attempts: IntCounter,
263    total_effects: IntCounter,
264    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
265    pub shared_obj_tx: IntCounter,
266    sponsored_tx: IntCounter,
267    tx_already_processed: IntCounter,
268    num_input_objs: Histogram,
269    // TODO: this tracks consensus object count, not just shared. Consider renaming.
270    num_shared_objects: Histogram,
271    batch_size: Histogram,
272
273    authority_state_handle_transaction_latency: Histogram,
274    authority_state_handle_vote_transaction_latency: Histogram,
275
276    execute_certificate_latency_single_writer: Histogram,
277    execute_certificate_latency_shared_object: Histogram,
278    await_transaction_latency: Histogram,
279
280    internal_execution_latency: Histogram,
281    execution_load_input_objects_latency: Histogram,
282    prepare_certificate_latency: Histogram,
283    commit_certificate_latency: Histogram,
284    db_checkpoint_latency: Histogram,
285
286    // TODO: Rename these metrics.
287    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
288    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
289    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
290    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
291
292    pub(crate) execution_driver_executed_transactions: IntCounter,
293    pub(crate) execution_driver_paused_transactions: IntCounter,
294    pub(crate) execution_driver_dispatch_queue: IntGauge,
295    pub(crate) execution_queueing_delay_s: Histogram,
296    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
297    pub(crate) execution_gas_latency_ratio: Histogram,
298
299    pub(crate) skipped_consensus_txns: IntCounter,
300    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
301
302    pub(crate) authority_overload_status: IntGauge,
303    pub(crate) authority_load_shedding_percentage: IntGauge,
304
305    pub(crate) transaction_overload_sources: IntCounterVec,
306
307    /// Post processing metrics
308    post_processing_total_events_emitted: IntCounter,
309    post_processing_total_tx_indexed: IntCounter,
310    post_processing_total_tx_had_event_processed: IntCounter,
311    post_processing_total_failures: IntCounter,
312
313    /// Consensus commit and transaction handler metrics
314    pub consensus_handler_processed: IntCounterVec,
315    pub consensus_handler_transaction_sizes: HistogramVec,
316    pub consensus_handler_num_low_scoring_authorities: IntGauge,
317    pub consensus_handler_scores: IntGaugeVec,
318    pub consensus_handler_deferred_transactions: IntCounter,
319    pub consensus_handler_congested_transactions: IntCounter,
320    pub consensus_handler_cancelled_transactions: IntCounter,
321    pub consensus_handler_max_object_costs: IntGaugeVec,
322    pub consensus_committed_subdags: IntCounterVec,
323    pub consensus_committed_messages: IntGaugeVec,
324    pub consensus_committed_user_transactions: IntGaugeVec,
325    pub consensus_finalized_user_transactions: IntGaugeVec,
326    pub consensus_rejected_user_transactions: IntGaugeVec,
327    pub consensus_calculated_throughput: IntGauge,
328    pub consensus_calculated_throughput_profile: IntGauge,
329    pub consensus_block_handler_block_processed: IntCounter,
330    pub consensus_block_handler_txn_processed: IntCounterVec,
331    pub consensus_block_handler_fastpath_executions: IntCounter,
332    pub consensus_timestamp_bias: Histogram,
333
334    pub limits_metrics: Arc<LimitsMetrics>,
335
336    /// bytecode verifier metrics for tracking timeouts
337    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
338
339    /// Count of zklogin signatures
340    pub zklogin_sig_count: IntCounter,
341    /// Count of multisig signatures
342    pub multisig_sig_count: IntCounter,
343
344    // Tracks recent average txn queueing delay between when it is ready for execution
345    // until it starts executing.
346    pub execution_queueing_latency: LatencyObserver,
347
348    // Tracks the rate of transactions become ready for execution in transaction manager.
349    // The need for the Mutex is that the tracker is updated in transaction manager and read
350    // in the overload_monitor. There should be low mutex contention because
351    // transaction manager is single threaded and the read rate in overload_monitor is
352    // low. In the case where transaction manager becomes multi-threaded, we can
353    // create one rate tracker per thread.
354    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
355
356    // Tracks the rate of transactions starts execution in execution driver.
357    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
358    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
359}
360
361// Override default Prom buckets for positive numbers in 0-10M range
362const POSITIVE_INT_BUCKETS: &[f64] = &[
363    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
364    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
365    7000000., 10000000.,
366];
367
368const LATENCY_SEC_BUCKETS: &[f64] = &[
369    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
370    10., 20., 30., 60., 90.,
371];
372
373// Buckets for low latency samples. Starts from 10us.
374const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
375    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
376    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
377];
378
379// Buckets for consensus timestamp bias in seconds.
380// We expect 200-300ms, so we cluster the buckets more tightly in that range.
381const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
382    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
383    2.0, 10.0, 30.0,
384];
385
386const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
387    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
388    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
389];
390
391pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000;
392
393// Transaction author should have observed the input objects as finalized output,
394// so usually the wait does not need to be long.
395// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
396pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
397
398impl AuthorityMetrics {
399    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
400        let execute_certificate_latency = register_histogram_vec_with_registry!(
401            "authority_state_execute_certificate_latency",
402            "Latency of executing certificates, including waiting for inputs",
403            &["tx_type"],
404            LATENCY_SEC_BUCKETS.to_vec(),
405            registry,
406        )
407        .unwrap();
408
409        let execute_certificate_latency_single_writer =
410            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
411        let execute_certificate_latency_shared_object =
412            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
413
414        Self {
415            tx_orders: register_int_counter_with_registry!(
416                "total_transaction_orders",
417                "Total number of transaction orders",
418                registry,
419            )
420            .unwrap(),
421            total_certs: register_int_counter_with_registry!(
422                "total_transaction_certificates",
423                "Total number of transaction certificates handled",
424                registry,
425            )
426            .unwrap(),
427            total_cert_attempts: register_int_counter_with_registry!(
428                "total_handle_certificate_attempts",
429                "Number of calls to handle_certificate",
430                registry,
431            )
432            .unwrap(),
433            // total_effects == total transactions finished
434            total_effects: register_int_counter_with_registry!(
435                "total_transaction_effects",
436                "Total number of transaction effects produced",
437                registry,
438            )
439            .unwrap(),
440
441            shared_obj_tx: register_int_counter_with_registry!(
442                "num_shared_obj_tx",
443                "Number of transactions involving shared objects",
444                registry,
445            )
446            .unwrap(),
447
448            sponsored_tx: register_int_counter_with_registry!(
449                "num_sponsored_tx",
450                "Number of sponsored transactions",
451                registry,
452            )
453            .unwrap(),
454
455            tx_already_processed: register_int_counter_with_registry!(
456                "num_tx_already_processed",
457                "Number of transaction orders already processed previously",
458                registry,
459            )
460            .unwrap(),
461            num_input_objs: register_histogram_with_registry!(
462                "num_input_objects",
463                "Distribution of number of input TX objects per TX",
464                POSITIVE_INT_BUCKETS.to_vec(),
465                registry,
466            )
467            .unwrap(),
468            num_shared_objects: register_histogram_with_registry!(
469                "num_shared_objects",
470                "Number of shared input objects per TX",
471                POSITIVE_INT_BUCKETS.to_vec(),
472                registry,
473            )
474            .unwrap(),
475            batch_size: register_histogram_with_registry!(
476                "batch_size",
477                "Distribution of size of transaction batch",
478                POSITIVE_INT_BUCKETS.to_vec(),
479                registry,
480            )
481            .unwrap(),
482            authority_state_handle_transaction_latency: register_histogram_with_registry!(
483                "authority_state_handle_transaction_latency",
484                "Latency of handling transactions",
485                LATENCY_SEC_BUCKETS.to_vec(),
486                registry,
487            )
488            .unwrap(),
489            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
490                "authority_state_handle_vote_transaction_latency",
491                "Latency of voting on transactions without signing",
492                LATENCY_SEC_BUCKETS.to_vec(),
493                registry,
494            )
495            .unwrap(),
496            execute_certificate_latency_single_writer,
497            execute_certificate_latency_shared_object,
498            await_transaction_latency: register_histogram_with_registry!(
499                "await_transaction_latency",
500                "Latency of awaiting user transaction execution, including waiting for inputs",
501                LATENCY_SEC_BUCKETS.to_vec(),
502                registry,
503            )
504            .unwrap(),
505            internal_execution_latency: register_histogram_with_registry!(
506                "authority_state_internal_execution_latency",
507                "Latency of actual certificate executions",
508                LATENCY_SEC_BUCKETS.to_vec(),
509                registry,
510            )
511            .unwrap(),
512            execution_load_input_objects_latency: register_histogram_with_registry!(
513                "authority_state_execution_load_input_objects_latency",
514                "Latency of loading input objects for execution",
515                LOW_LATENCY_SEC_BUCKETS.to_vec(),
516                registry,
517            )
518            .unwrap(),
519            prepare_certificate_latency: register_histogram_with_registry!(
520                "authority_state_prepare_certificate_latency",
521                "Latency of executing certificates, before committing the results",
522                LATENCY_SEC_BUCKETS.to_vec(),
523                registry,
524            )
525            .unwrap(),
526            commit_certificate_latency: register_histogram_with_registry!(
527                "authority_state_commit_certificate_latency",
528                "Latency of committing certificate execution results",
529                LATENCY_SEC_BUCKETS.to_vec(),
530                registry,
531            )
532            .unwrap(),
533            db_checkpoint_latency: register_histogram_with_registry!(
534                "db_checkpoint_latency",
535                "Latency of checkpointing dbs",
536                LATENCY_SEC_BUCKETS.to_vec(),
537                registry,
538            ).unwrap(),
539            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
540                "transaction_manager_num_enqueued_certificates",
541                "Current number of certificates enqueued to ExecutionScheduler",
542                &["result"],
543                registry,
544            )
545            .unwrap(),
546            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
547                "transaction_manager_num_pending_certificates",
548                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
549                registry,
550            )
551            .unwrap(),
552            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
553                "transaction_manager_num_executing_certificates",
554                "Number of executing certificates, including queued and actually running certificates",
555                registry,
556            )
557            .unwrap(),
558            authority_overload_status: register_int_gauge_with_registry!(
559                "authority_overload_status",
560                "Whether authority is current experiencing overload and enters load shedding mode.",
561                registry)
562            .unwrap(),
563            authority_load_shedding_percentage: register_int_gauge_with_registry!(
564                "authority_load_shedding_percentage",
565                "The percentage of transactions is shed when the authority is in load shedding mode.",
566                registry)
567            .unwrap(),
568            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
569                "transaction_manager_transaction_queue_age_s",
570                "Time spent in waiting for transaction in the queue",
571                LATENCY_SEC_BUCKETS.to_vec(),
572                registry,
573            )
574            .unwrap(),
575            transaction_overload_sources: register_int_counter_vec_with_registry!(
576                "transaction_overload_sources",
577                "Number of times each source indicates transaction overload.",
578                &["source"],
579                registry)
580            .unwrap(),
581            execution_driver_executed_transactions: register_int_counter_with_registry!(
582                "execution_driver_executed_transactions",
583                "Cumulative number of transaction executed by execution driver",
584                registry,
585            )
586            .unwrap(),
587            execution_driver_paused_transactions: register_int_counter_with_registry!(
588                "execution_driver_paused_transactions",
589                "Cumulative number of transactions paused by execution driver",
590                registry,
591            )
592            .unwrap(),
593            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
594                "execution_driver_dispatch_queue",
595                "Number of transaction pending in execution driver dispatch queue",
596                registry,
597            )
598            .unwrap(),
599            execution_queueing_delay_s: register_histogram_with_registry!(
600                "execution_queueing_delay_s",
601                "Queueing delay between a transaction is ready for execution until it starts executing.",
602                LATENCY_SEC_BUCKETS.to_vec(),
603                registry
604            )
605            .unwrap(),
606            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
607                "prepare_cert_gas_latency_ratio",
608                "The ratio of computation gas divided by VM execution latency.",
609                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
610                registry
611            )
612            .unwrap(),
613            execution_gas_latency_ratio: register_histogram_with_registry!(
614                "execution_gas_latency_ratio",
615                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
616                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            skipped_consensus_txns: register_int_counter_with_registry!(
621                "skipped_consensus_txns",
622                "Total number of consensus transactions skipped",
623                registry,
624            )
625            .unwrap(),
626            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
627                "skipped_consensus_txns_cache_hit",
628                "Total number of consensus transactions skipped because of local cache hit",
629                registry,
630            )
631            .unwrap(),
632            post_processing_total_events_emitted: register_int_counter_with_registry!(
633                "post_processing_total_events_emitted",
634                "Total number of events emitted in post processing",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_tx_indexed: register_int_counter_with_registry!(
639                "post_processing_total_tx_indexed",
640                "Total number of txes indexed in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
645                "post_processing_total_tx_had_event_processed",
646                "Total number of txes finished event processing in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_failures: register_int_counter_with_registry!(
651                "post_processing_total_failures",
652                "Total number of failure in post processing",
653                registry,
654            )
655            .unwrap(),
656            consensus_handler_processed: register_int_counter_vec_with_registry!(
657                "consensus_handler_processed",
658                "Number of transactions processed by consensus handler",
659                &["class"],
660                registry
661            ).unwrap(),
662            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
663                "consensus_handler_transaction_sizes",
664                "Sizes of each type of transactions processed by consensus handler",
665                &["class"],
666                POSITIVE_INT_BUCKETS.to_vec(),
667                registry
668            ).unwrap(),
669            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
670                "consensus_handler_num_low_scoring_authorities",
671                "Number of low scoring authorities based on reputation scores from consensus",
672                registry
673            ).unwrap(),
674            consensus_handler_scores: register_int_gauge_vec_with_registry!(
675                "consensus_handler_scores",
676                "scores from consensus for each authority",
677                &["authority"],
678                registry,
679            ).unwrap(),
680            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
681                "consensus_handler_deferred_transactions",
682                "Number of transactions deferred by consensus handler",
683                registry,
684            ).unwrap(),
685            consensus_handler_congested_transactions: register_int_counter_with_registry!(
686                "consensus_handler_congested_transactions",
687                "Number of transactions deferred by consensus handler due to congestion",
688                registry,
689            ).unwrap(),
690            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
691                "consensus_handler_cancelled_transactions",
692                "Number of transactions cancelled by consensus handler",
693                registry,
694            ).unwrap(),
695            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
696                "consensus_handler_max_congestion_control_object_costs",
697                "Max object costs for congestion control in the current consensus commit",
698                &["commit_type"],
699                registry,
700            ).unwrap(),
701            consensus_committed_subdags: register_int_counter_vec_with_registry!(
702                "consensus_committed_subdags",
703                "Number of committed subdags, sliced by author",
704                &["authority"],
705                registry,
706            ).unwrap(),
707            consensus_committed_messages: register_int_gauge_vec_with_registry!(
708                "consensus_committed_messages",
709                "Total number of committed consensus messages, sliced by author",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
714                "consensus_committed_user_transactions",
715                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_finalized_user_transactions",
721                "Number of user transactions finalized, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
726                "consensus_rejected_user_transactions",
727                "Number of user transactions rejected, sliced by submitter",
728                &["authority"],
729                registry,
730            ).unwrap(),
731            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733            zklogin_sig_count: register_int_counter_with_registry!(
734                "zklogin_sig_count",
735                "Count of zkLogin signatures",
736                registry,
737            )
738            .unwrap(),
739            multisig_sig_count: register_int_counter_with_registry!(
740                "multisig_sig_count",
741                "Count of zkLogin signatures",
742                registry,
743            )
744            .unwrap(),
745            consensus_calculated_throughput: register_int_gauge_with_registry!(
746                "consensus_calculated_throughput",
747                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748                registry,
749            ).unwrap(),
750            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751                "consensus_calculated_throughput_profile",
752                "The current active calculated throughput profile",
753                registry
754            ).unwrap(),
755            consensus_block_handler_block_processed: register_int_counter_with_registry!(
756                "consensus_block_handler_block_processed",
757                "Number of blocks processed by consensus block handler.",
758                registry
759            ).unwrap(),
760            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
761                "consensus_block_handler_txn_processed",
762                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
763                &["outcome"],
764                registry
765            ).unwrap(),
766            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
767                "consensus_block_handler_fastpath_executions",
768                "Number of fastpath transactions sent for execution by consensus transaction handler",
769                registry,
770            ).unwrap(),
771            consensus_timestamp_bias: register_histogram_with_registry!(
772                "consensus_timestamp_bias",
773                "Bias/delay from consensus timestamp to system time",
774                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
775                registry
776            ).unwrap(),
777            execution_queueing_latency: LatencyObserver::new(),
778            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
779            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780        }
781    }
782}
783
784/// a Trait object for `Signer` that is:
785/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
786/// - Sync, i.e. can be safely shared between threads.
787///
788/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
789///
790pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
791
792/// Execution env contains the "environment" for the transaction to be executed in, that is,
793/// all the information necessary for execution that is not specified by the transaction itself.
794#[derive(Debug, Clone)]
795pub struct ExecutionEnv {
796    /// The assigned version of each shared object for the transaction.
797    pub assigned_versions: AssignedVersions,
798    /// The expected digest of the effects of the transaction, if executing from checkpoint or
799    /// other sources where the effects are known in advance.
800    pub expected_effects_digest: Option<TransactionEffectsDigest>,
801    /// The source of the scheduling of the transaction.
802    pub scheduling_source: SchedulingSource,
803    /// Status of the balance withdraw scheduling of the transaction.
804    pub withdraw_status: BalanceWithdrawStatus,
805    /// Transactions that must finish before this transaction can be executed.
806    /// Used to schedule barrier transactions after non-exclusive writes.
807    pub barrier_dependencies: Vec<TransactionDigest>,
808}
809
810impl Default for ExecutionEnv {
811    fn default() -> Self {
812        Self {
813            assigned_versions: Default::default(),
814            expected_effects_digest: None,
815            scheduling_source: SchedulingSource::NonFastPath,
816            withdraw_status: BalanceWithdrawStatus::NoWithdraw,
817            barrier_dependencies: Default::default(),
818        }
819    }
820}
821
822impl ExecutionEnv {
823    pub fn new() -> Self {
824        Default::default()
825    }
826
827    pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
828        self.scheduling_source = scheduling_source;
829        self
830    }
831
832    pub fn with_expected_effects_digest(
833        mut self,
834        expected_effects_digest: TransactionEffectsDigest,
835    ) -> Self {
836        self.expected_effects_digest = Some(expected_effects_digest);
837        self
838    }
839
840    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
841        self.assigned_versions = assigned_versions;
842        self
843    }
844
845    pub fn with_sufficient_balance(mut self) -> Self {
846        self.withdraw_status = BalanceWithdrawStatus::SufficientBalance;
847        self
848    }
849
850    pub fn with_insufficient_balance(mut self) -> Self {
851        self.withdraw_status = BalanceWithdrawStatus::InsufficientBalance;
852        self
853    }
854
855    pub fn with_barrier_dependencies(
856        mut self,
857        barrier_dependencies: BTreeSet<TransactionDigest>,
858    ) -> Self {
859        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
860        self
861    }
862}
863
864#[derive(Debug)]
865pub struct ForkRecoveryState {
866    /// Transaction digest to effects digest overrides
867    transaction_overrides:
868        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
869}
870
871impl Default for ForkRecoveryState {
872    fn default() -> Self {
873        Self {
874            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
875        }
876    }
877}
878
879impl ForkRecoveryState {
880    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
881        let Some(config) = config else {
882            return Ok(Self::default());
883        };
884
885        let mut transaction_overrides = HashMap::new();
886        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
887            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
888                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
889            })?;
890            let effects_digest =
891                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
892                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
893                })?;
894            transaction_overrides.insert(tx_digest, effects_digest);
895        }
896
897        Ok(Self {
898            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
899        })
900    }
901
902    pub fn get_transaction_override(
903        &self,
904        tx_digest: &TransactionDigest,
905    ) -> Option<TransactionEffectsDigest> {
906        self.transaction_overrides.read().get(tx_digest).copied()
907    }
908}
909
910pub struct AuthorityState {
911    // Fixed size, static, identity of the authority
912    /// The name of this authority.
913    pub name: AuthorityName,
914    /// The signature key of the authority.
915    pub secret: StableSyncAuthoritySigner,
916
917    /// The database
918    input_loader: TransactionInputLoader,
919    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
920
921    epoch_store: ArcSwap<AuthorityPerEpochStore>,
922
923    /// This lock denotes current 'execution epoch'.
924    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
925    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
926    /// from previous epoch that are executed but did not make into checkpoint.
927    execution_lock: RwLock<EpochId>,
928
929    pub indexes: Option<Arc<IndexStore>>,
930    pub rpc_index: Option<Arc<RpcIndexStore>>,
931
932    pub subscription_handler: Arc<SubscriptionHandler>,
933    pub checkpoint_store: Arc<CheckpointStore>,
934
935    committee_store: Arc<CommitteeStore>,
936
937    /// Schedules transaction execution.
938    execution_scheduler: Arc<ExecutionScheduler>,
939
940    /// Shuts down the execution task. Used only in testing.
941    #[allow(unused)]
942    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
943
944    pub metrics: Arc<AuthorityMetrics>,
945    _pruner: AuthorityStorePruner,
946    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
947
948    /// Take db checkpoints of different dbs
949    db_checkpoint_config: DBCheckpointConfig,
950
951    pub config: NodeConfig,
952
953    /// Current overload status in this authority. Updated periodically.
954    pub overload_info: AuthorityOverloadInfo,
955
956    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
957
958    /// The chain identifier is derived from the digest of the genesis checkpoint.
959    chain_identifier: ChainIdentifier,
960
961    pub(crate) congestion_tracker: Arc<CongestionTracker>,
962
963    /// Traffic controller for Sui core servers (json-rpc, validator service)
964    pub traffic_controller: Option<Arc<TrafficController>>,
965
966    /// Fork recovery state for handling equivocation after forks
967    fork_recovery_state: Option<ForkRecoveryState>,
968}
969
970/// The authority state encapsulates all state, drives execution, and ensures safety.
971///
972/// Note the authority operations can be accessed through a read ref (&) and do not
973/// require &mut. Internally a database is synchronized through a mutex lock.
974///
975/// Repeating valid commands should produce no changes and return no error.
976impl AuthorityState {
977    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
978        epoch_store.committee().authority_exists(&self.name)
979    }
980
981    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
982        !self.is_validator(epoch_store)
983    }
984
985    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
986        &self.committee_store
987    }
988
989    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
990        self.committee_store.clone()
991    }
992
993    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
994        &self.config.authority_overload_config
995    }
996
997    pub fn get_epoch_state_commitments(
998        &self,
999        epoch: EpochId,
1000    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1001        self.checkpoint_store.get_epoch_state_commitments(epoch)
1002    }
1003
1004    fn handle_transaction_deny_checks(
1005        &self,
1006        transaction: &VerifiedTransaction,
1007        epoch_store: &Arc<AuthorityPerEpochStore>,
1008    ) -> SuiResult<CheckedInputObjects> {
1009        let tx_digest = transaction.digest();
1010        let tx_data = transaction.data().transaction_data();
1011
1012        let input_object_kinds = tx_data.input_objects()?;
1013        let receiving_objects_refs = tx_data.receiving_objects();
1014
1015        // Note: the deny checks may do redundant package loads but:
1016        // - they only load packages when there is an active package deny map
1017        // - the loads are cached anyway
1018        sui_transaction_checks::deny::check_transaction_for_signing(
1019            tx_data,
1020            transaction.tx_signatures(),
1021            &input_object_kinds,
1022            &receiving_objects_refs,
1023            &self.config.transaction_deny_config,
1024            self.get_backing_package_store().as_ref(),
1025        )?;
1026
1027        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1028            Some(tx_digest),
1029            &input_object_kinds,
1030            &receiving_objects_refs,
1031            epoch_store.epoch(),
1032        )?;
1033
1034        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1035            epoch_store.protocol_config(),
1036            epoch_store.reference_gas_price(),
1037            tx_data,
1038            input_objects,
1039            &receiving_objects,
1040            &self.metrics.bytecode_verifier_metrics,
1041            &self.config.verifier_signing_config,
1042        )?;
1043
1044        self.handle_coin_deny_list_checks(
1045            tx_data,
1046            &checked_input_objects,
1047            &receiving_objects,
1048            epoch_store,
1049        )?;
1050
1051        Ok(checked_input_objects)
1052    }
1053
1054    fn handle_coin_deny_list_checks(
1055        &self,
1056        tx_data: &TransactionData,
1057        checked_input_objects: &CheckedInputObjects,
1058        receiving_objects: &ReceivingObjects,
1059        epoch_store: &Arc<AuthorityPerEpochStore>,
1060    ) -> SuiResult<()> {
1061        let funds_withdraw_types = tx_data
1062            .get_funds_withdrawals()
1063            .into_iter()
1064            .filter_map(|withdraw| {
1065                withdraw
1066                    .type_arg
1067                    .get_balance_type_param()
1068                    // unwrap safe because we already verified the transaction.
1069                    .unwrap()
1070                    .map(|ty| ty.to_canonical_string(false))
1071            })
1072            .collect::<BTreeSet<_>>();
1073
1074        if epoch_store.coin_deny_list_v1_enabled() {
1075            check_coin_deny_list_v1(
1076                tx_data.sender(),
1077                checked_input_objects,
1078                receiving_objects,
1079                funds_withdraw_types.clone(),
1080                &self.get_object_store(),
1081            )?;
1082        }
1083
1084        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1085            check_coin_deny_list_v2_during_signing(
1086                tx_data.sender(),
1087                checked_input_objects,
1088                receiving_objects,
1089                funds_withdraw_types.clone(),
1090                &self.get_object_store(),
1091            )?;
1092        }
1093
1094        Ok(())
1095    }
1096
1097    /// This is a private method and should be kept that way. It doesn't check whether
1098    /// the provided transaction is a system transaction, and hence can only be called internally.
1099    #[instrument(level = "trace", skip_all)]
1100    fn handle_transaction_impl(
1101        &self,
1102        transaction: VerifiedTransaction,
1103        sign: bool,
1104        epoch_store: &Arc<AuthorityPerEpochStore>,
1105    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1106        // Ensure that validator cannot reconfigure while we are signing the tx
1107        let _execution_lock = self.execution_lock_for_signing()?;
1108
1109        let checked_input_objects =
1110            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1111
1112        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1113
1114        let tx_digest = *transaction.digest();
1115        let signed_transaction = if sign {
1116            Some(VerifiedSignedTransaction::new(
1117                epoch_store.epoch(),
1118                transaction,
1119                self.name,
1120                &*self.secret,
1121            ))
1122        } else {
1123            None
1124        };
1125
1126        // Check and write locks, to signed transaction, into the database
1127        // The call to self.set_transaction_lock checks the lock is not conflicting,
1128        // and returns ConflictingTransaction error in case there is a lock on a different
1129        // existing transaction.
1130        self.get_cache_writer().acquire_transaction_locks(
1131            epoch_store,
1132            &owned_objects,
1133            tx_digest,
1134            signed_transaction.clone(),
1135        )?;
1136
1137        Ok(signed_transaction)
1138    }
1139
1140    /// Initiate a new transaction.
1141    #[instrument(level = "trace", skip_all)]
1142    pub async fn handle_transaction(
1143        &self,
1144        epoch_store: &Arc<AuthorityPerEpochStore>,
1145        transaction: VerifiedTransaction,
1146    ) -> SuiResult<HandleTransactionResponse> {
1147        let tx_digest = *transaction.digest();
1148        debug!("handle_transaction");
1149
1150        // Ensure an idempotent answer.
1151        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1152            return Ok(HandleTransactionResponse { status });
1153        }
1154
1155        let _metrics_guard = self
1156            .metrics
1157            .authority_state_handle_transaction_latency
1158            .start_timer();
1159        self.metrics.tx_orders.inc();
1160
1161        if epoch_store.protocol_config().mysticeti_fastpath()
1162            && !self
1163                .wait_for_fastpath_dependency_objects(&transaction, epoch_store.epoch())
1164                .await?
1165        {
1166            debug!("fastpath input objects are still unavailable after waiting");
1167            // Proceed with input checks to generate a proper error.
1168        }
1169
1170        self.handle_sign_transaction(epoch_store, transaction).await
1171    }
1172
1173    /// Signs a transaction. Exposed for testing.
1174    pub async fn handle_sign_transaction(
1175        &self,
1176        epoch_store: &Arc<AuthorityPerEpochStore>,
1177        transaction: VerifiedTransaction,
1178    ) -> SuiResult<HandleTransactionResponse> {
1179        let tx_digest = *transaction.digest();
1180        let signed = self.handle_transaction_impl(transaction, /* sign */ true, epoch_store);
1181        match signed {
1182            Ok(Some(s)) => {
1183                if self.is_validator(epoch_store)
1184                    && let Some(validator_tx_finalizer) = &self.validator_tx_finalizer
1185                {
1186                    let tx = s.clone();
1187                    let validator_tx_finalizer = validator_tx_finalizer.clone();
1188                    let cache_reader = self.get_transaction_cache_reader().clone();
1189                    let epoch_store = epoch_store.clone();
1190                    spawn_monitored_task!(epoch_store.within_alive_epoch(
1191                        validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1192                    ));
1193                }
1194                Ok(HandleTransactionResponse {
1195                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1196                })
1197            }
1198            Ok(None) => panic!("handle_transaction_impl should return a signed transaction"),
1199            // It happens frequently that while we are checking the validity of the transaction, it
1200            // has just been executed.
1201            // In that case, we could still return Ok to avoid showing confusing errors.
1202            Err(err) => Ok(HandleTransactionResponse {
1203                status: self
1204                    .get_transaction_status(&tx_digest, epoch_store)?
1205                    .ok_or(err)?
1206                    .1,
1207            }),
1208        }
1209    }
1210
1211    /// Vote for a transaction, either when validator receives a submit_transaction request,
1212    /// or sees a transaction from consensus. Performs the same types of checks as
1213    /// transaction signing, but does not explicitly sign the transaction.
1214    /// Note that if the transaction has been executed, we still go through the
1215    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1216    /// but not yet finalized, the signing will still work since the objects are still available.
1217    /// But if the transaction has been finalized, the signing will fail.
1218    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1219    /// has already been finalized executed.
1220    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1221    pub(crate) fn handle_vote_transaction(
1222        &self,
1223        epoch_store: &Arc<AuthorityPerEpochStore>,
1224        transaction: VerifiedTransaction,
1225    ) -> SuiResult<()> {
1226        debug!("handle_vote_transaction");
1227
1228        let _metrics_guard = self
1229            .metrics
1230            .authority_state_handle_vote_transaction_latency
1231            .start_timer();
1232        self.metrics.tx_orders.inc();
1233
1234        // The should_accept_user_certs check here is best effort, because
1235        // between a validator signs a tx and a cert is formed, the validator
1236        // could close the window.
1237        if !epoch_store
1238            .get_reconfig_state_read_lock_guard()
1239            .should_accept_user_certs()
1240        {
1241            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1242        }
1243
1244        let result =
1245            self.handle_transaction_impl(transaction, false /* sign */, epoch_store)?;
1246        assert!(
1247            result.is_none(),
1248            "handle_transaction_impl should not return a signed transaction when sign is false"
1249        );
1250        Ok(())
1251    }
1252
1253    /// Used for early client validation check for transactions before submission to server.
1254    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1255    /// This allows for fast failure feedback to clients for non-retriable errors.
1256    ///
1257    /// The key addition is checking that owned object versions match live object versions.
1258    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1259    /// requested version (which may exist in storage as historical versions), whereas
1260    /// validators check against the live/current version during locking (verify_live_object).
1261    pub fn check_transaction_validity(
1262        &self,
1263        epoch_store: &Arc<AuthorityPerEpochStore>,
1264        transaction: &VerifiedTransaction,
1265    ) -> SuiResult<()> {
1266        if !epoch_store
1267            .get_reconfig_state_read_lock_guard()
1268            .should_accept_user_certs()
1269        {
1270            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1271        }
1272
1273        let checked_input_objects =
1274            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1275
1276        // Check that owned object versions match live objects
1277        // This closely mimics verify_live_object logic from acquire_transaction_locks
1278        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1279        let cache_reader = self.get_object_cache_reader();
1280
1281        for obj_ref in &owned_objects {
1282            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1283                // Only reject if transaction references an old version. Allow newer
1284                // versions that may exist on validators but not yet synced to fullnode.
1285                if obj_ref.1 < live_object.version() {
1286                    return Err(SuiErrorKind::UserInputError {
1287                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1288                            provided_obj_ref: *obj_ref,
1289                            current_version: live_object.version(),
1290                        },
1291                    }
1292                    .into());
1293                }
1294
1295                // If version matches, verify digest also matches
1296                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1297                    return Err(SuiErrorKind::UserInputError {
1298                        error: UserInputError::InvalidObjectDigest {
1299                            object_id: obj_ref.0,
1300                            expected_digest: live_object.digest(),
1301                        },
1302                    }
1303                    .into());
1304                }
1305            }
1306        }
1307
1308        Ok(())
1309    }
1310
1311    pub fn check_system_overload_at_signing(&self) -> bool {
1312        self.config
1313            .authority_overload_config
1314            .check_system_overload_at_signing
1315    }
1316
1317    pub fn check_system_overload_at_execution(&self) -> bool {
1318        self.config
1319            .authority_overload_config
1320            .check_system_overload_at_execution
1321    }
1322
1323    pub(crate) fn check_system_overload(
1324        &self,
1325        consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1326        tx_data: &SenderSignedData,
1327        do_authority_overload_check: bool,
1328    ) -> SuiResult {
1329        if do_authority_overload_check {
1330            self.check_authority_overload(tx_data).tap_err(|_| {
1331                self.update_overload_metrics("execution_queue");
1332            })?;
1333        }
1334        self.execution_scheduler
1335            .check_execution_overload(self.overload_config(), tx_data)
1336            .tap_err(|_| {
1337                self.update_overload_metrics("execution_pending");
1338            })?;
1339        consensus_overload_checker
1340            .check_consensus_overload()
1341            .tap_err(|_| {
1342                self.update_overload_metrics("consensus");
1343            })?;
1344
1345        let pending_tx_count = self
1346            .get_cache_commit()
1347            .approximate_pending_transaction_count();
1348        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1349            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1350                retry_after_secs: 10,
1351            }
1352            .into());
1353        }
1354
1355        Ok(())
1356    }
1357
1358    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1359        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1360            return Ok(());
1361        }
1362
1363        let load_shedding_percentage = self
1364            .overload_info
1365            .load_shedding_percentage
1366            .load(Ordering::Relaxed);
1367        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1368    }
1369
1370    fn update_overload_metrics(&self, source: &str) {
1371        self.metrics
1372            .transaction_overload_sources
1373            .with_label_values(&[source])
1374            .inc();
1375    }
1376
1377    /// Waits for fastpath (owned, package) dependency objects to become available.
1378    /// Returns true if a chosen set of fastpath dependency objects are available,
1379    /// returns false otherwise after an internal timeout.
1380    pub(crate) async fn wait_for_fastpath_dependency_objects(
1381        &self,
1382        transaction: &VerifiedTransaction,
1383        epoch: EpochId,
1384    ) -> SuiResult<bool> {
1385        let txn_data = transaction.data().transaction_data();
1386        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1387
1388        // Gather and filter input objects to wait for.
1389        let fastpath_dependency_objects: Vec<_> = move_objects
1390            .into_iter()
1391            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1392            .chain(
1393                packages
1394                    .into_iter()
1395                    .map(|package_id| InputKey::Package { id: package_id }),
1396            )
1397            .collect();
1398        let receiving_keys: HashSet<_> = receiving_objects
1399            .into_iter()
1400            .filter_map(|receiving_obj_ref| {
1401                self.should_wait_for_dependency_object(receiving_obj_ref)
1402            })
1403            .collect();
1404        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1405            return Ok(true);
1406        }
1407
1408        // Use shorter wait timeout in simtests to exercise server-side error paths and
1409        // client-side retry logic.
1410        let max_wait = if cfg!(msim) {
1411            Duration::from_millis(200)
1412        } else {
1413            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1414        };
1415
1416        match timeout(
1417            max_wait,
1418            self.get_object_cache_reader().notify_read_input_objects(
1419                &fastpath_dependency_objects,
1420                &receiving_keys,
1421                epoch,
1422            ),
1423        )
1424        .await
1425        {
1426            Ok(()) => Ok(true),
1427            // Maybe return an error for unavailable input objects,
1428            // and allow the caller to skip the rest of input checks?
1429            Err(_) => Ok(false),
1430        }
1431    }
1432
1433    /// Returns Some(inputKey) if the object reference should be waited on until it is
1434    /// finalized, before proceeding to input checks.
1435    ///
1436    /// Incorrect decisions here should only affect user experience, not safety:
1437    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1438    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1439    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1440        let (obj_id, cur_version, _digest) = obj_ref;
1441        let Some(latest_obj_ref) = self
1442            .get_object_cache_reader()
1443            .get_latest_object_ref_or_tombstone(obj_id)
1444        else {
1445            // Object might not have been created.
1446            return Some(InputKey::VersionedObject {
1447                id: FullObjectID::new(obj_id, None),
1448                version: cur_version,
1449            });
1450        };
1451        let latest_digest = latest_obj_ref.2;
1452        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1453            // Do not wait for deleted object and rely on input check instead.
1454            return None;
1455        }
1456        let latest_version = latest_obj_ref.1;
1457        if cur_version <= latest_version {
1458            // Do not wait for version that already exists or has been consumed.
1459            // Let the input check to handle them and return the proper error.
1460            return None;
1461        }
1462        // Wait for the object version to become available.
1463        Some(InputKey::VersionedObject {
1464            id: FullObjectID::new(obj_id, None),
1465            version: cur_version,
1466        })
1467    }
1468
1469    /// Wait for a certificate to be executed.
1470    /// For consensus transactions, it needs to be sequenced by the consensus.
1471    /// For owned object transactions, this function will enqueue the transaction for execution.
1472    // TODO: The next 3 functions are very similar. We should refactor them.
1473    #[instrument(level = "trace", skip_all)]
1474    pub async fn wait_for_certificate_execution(
1475        &self,
1476        certificate: &VerifiedCertificate,
1477        epoch_store: &Arc<AuthorityPerEpochStore>,
1478    ) -> SuiResult<TransactionEffects> {
1479        self.wait_for_transaction_execution(
1480            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1481            epoch_store,
1482        )
1483        .await
1484    }
1485
1486    /// Wait for a transaction to be executed.
1487    /// For consensus transactions, it needs to be sequenced by the consensus.
1488    /// For owned object transactions, this function will enqueue the transaction for execution.
1489    #[instrument(level = "trace", skip_all)]
1490    pub async fn wait_for_transaction_execution(
1491        &self,
1492        transaction: &VerifiedExecutableTransaction,
1493        epoch_store: &Arc<AuthorityPerEpochStore>,
1494    ) -> SuiResult<TransactionEffects> {
1495        let _metrics_guard = if transaction.is_consensus_tx() {
1496            self.metrics
1497                .execute_certificate_latency_shared_object
1498                .start_timer()
1499        } else {
1500            self.metrics
1501                .execute_certificate_latency_single_writer
1502                .start_timer()
1503        };
1504        trace!("execute_transaction");
1505
1506        self.metrics.total_cert_attempts.inc();
1507
1508        if !transaction.is_consensus_tx() {
1509            // Shared object transactions need to be sequenced by the consensus before enqueueing
1510            // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
1511            // For owned object transactions, they can be enqueued for execution immediately.
1512            self.execution_scheduler.enqueue(
1513                vec![(
1514                    Schedulable::Transaction(transaction.clone()),
1515                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1516                )],
1517                epoch_store,
1518            );
1519        }
1520
1521        // tx could be reverted when epoch ends, so we must be careful not to return a result
1522        // here after the epoch ends.
1523        epoch_store
1524            .within_alive_epoch(self.notify_read_effects(
1525                "AuthorityState::wait_for_transaction_execution",
1526                *transaction.digest(),
1527            ))
1528            .await
1529            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1530            .and_then(|r| r)
1531    }
1532
1533    /// Awaits the effects of executing a user transaction.
1534    ///
1535    /// Relies on consensus to enqueue the transaction for execution.
1536    pub async fn await_transaction_effects(
1537        &self,
1538        digest: TransactionDigest,
1539        epoch_store: &Arc<AuthorityPerEpochStore>,
1540    ) -> SuiResult<TransactionEffects> {
1541        let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1542        debug!("await_transaction");
1543
1544        epoch_store
1545            .within_alive_epoch(
1546                self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1547            )
1548            .await
1549            .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1550            .and_then(|r| r)
1551    }
1552
1553    /// Internal logic to execute a certificate.
1554    ///
1555    /// Guarantees that
1556    /// - If input objects are available, return no permanent failure.
1557    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1558    /// on successful execution; crashed execution has no observable effect and can be retried.
1559    ///
1560    /// It is caller's responsibility to ensure input objects are available and locks are set.
1561    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1562    ///
1563    /// Should only be called within sui-core.
1564    #[instrument(level = "trace", skip_all)]
1565    pub async fn try_execute_immediately(
1566        &self,
1567        certificate: &VerifiedExecutableTransaction,
1568        mut execution_env: ExecutionEnv,
1569        epoch_store: &Arc<AuthorityPerEpochStore>,
1570    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1571        let _scope = monitored_scope("Execution::try_execute_immediately");
1572        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1573
1574        let tx_digest = certificate.digest();
1575
1576        if let Some(fork_recovery) = &self.fork_recovery_state
1577            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1578        {
1579            warn!(
1580                ?tx_digest,
1581                original_digest = ?execution_env.expected_effects_digest,
1582                override_digest = ?override_digest,
1583                "Applying fork recovery override for transaction effects digest"
1584            );
1585            execution_env.expected_effects_digest = Some(override_digest);
1586        }
1587
1588        // prevent concurrent executions of the same tx.
1589        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1590
1591        let tx_cache_reader = self.get_transaction_cache_reader();
1592        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1593            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1594                assert_eq!(
1595                    effects.digest(),
1596                    expected_effects_digest,
1597                    "Unexpected effects digest for transaction {:?}",
1598                    tx_digest
1599                );
1600            }
1601            tx_guard.release();
1602            return ExecutionOutput::Success((effects, None));
1603        }
1604
1605        let execution_start_time = Instant::now();
1606
1607        // Any caller that verifies the signatures on the certificate will have already checked the
1608        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1609        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1610        // epoch, then it's invalid.
1611        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1612        else {
1613            tx_guard.release();
1614            return ExecutionOutput::EpochEnded;
1615        };
1616        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1617        // possible that reconfiguration has happened and they no longer match.
1618        // TODO: We may not need the following check anymore since the scheduler
1619        // should have checked that the certificate is from the same epoch as epoch_store.
1620        if *execution_guard != epoch_store.epoch() {
1621            tx_guard.release();
1622            info!("The epoch of the execution_guard doesn't match the epoch store");
1623            return ExecutionOutput::EpochEnded;
1624        }
1625
1626        let scheduling_source = execution_env.scheduling_source;
1627        let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1628            tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1629        } else {
1630            None
1631        };
1632
1633        let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1634            mysticeti_fp_outputs
1635        {
1636            assert!(
1637                !certificate.is_consensus_tx(),
1638                "Mysticeti fastpath executed transactions cannot be consensus transactions"
1639            );
1640            // If this transaction is not scheduled from fastpath, it must be either
1641            // from consensus or from checkpoint, i.e. it must be finalized.
1642            // To avoid re-executing fastpath transactions, we check if the outputs are already
1643            // in the mysticeti fastpath outputs cache. If so, we skip the execution and commit the transaction.
1644            debug!(
1645                ?tx_digest,
1646                "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1647            );
1648            (outputs, None, None)
1649        } else {
1650            let (transaction_outputs, timings, execution_error_opt) = match self
1651                .process_certificate(
1652                    &tx_guard,
1653                    &execution_guard,
1654                    certificate,
1655                    execution_env,
1656                    epoch_store,
1657                ) {
1658                ExecutionOutput::Success(result) => result,
1659                output => return output.unwrap_err(),
1660            };
1661            (
1662                Arc::new(transaction_outputs),
1663                Some(timings),
1664                execution_error_opt,
1665            )
1666        };
1667
1668        fail_point!("crash");
1669
1670        let effects = transaction_outputs.effects.clone();
1671        debug!(
1672            ?tx_digest,
1673            fx_digest=?effects.digest(),
1674            "process_certificate succeeded in {:.3}ms",
1675            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1676        );
1677
1678        if scheduling_source == SchedulingSource::MysticetiFastPath {
1679            self.get_cache_writer()
1680                .write_fastpath_transaction_outputs(transaction_outputs);
1681        } else {
1682            let commit_result =
1683                self.commit_certificate(certificate, transaction_outputs, epoch_store);
1684            if let Err(err) = commit_result {
1685                error!(?tx_digest, "Error committing transaction: {err}");
1686                tx_guard.release();
1687                return ExecutionOutput::Fatal(err);
1688            }
1689        }
1690
1691        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1692            certificate.data().transaction_data().kind()
1693        {
1694            if let Some(err) = &execution_error_opt {
1695                debug_fatal!("Authenticator state update failed: {:?}", err);
1696            }
1697            epoch_store.update_authenticator_state(auth_state);
1698
1699            // double check that the signature verifier always matches the authenticator state
1700            if cfg!(debug_assertions) {
1701                let authenticator_state = get_authenticator_state(self.get_object_store())
1702                    .expect("Read cannot fail")
1703                    .expect("Authenticator state must exist");
1704
1705                let mut sys_jwks: Vec<_> = authenticator_state
1706                    .active_jwks
1707                    .into_iter()
1708                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1709                    .collect();
1710                let mut active_jwks: Vec<_> = epoch_store
1711                    .signature_verifier
1712                    .get_jwks()
1713                    .into_iter()
1714                    .collect();
1715                sys_jwks.sort();
1716                active_jwks.sort();
1717
1718                assert_eq!(sys_jwks, active_jwks);
1719            }
1720        }
1721
1722        tx_guard.commit_tx();
1723
1724        let elapsed = execution_start_time.elapsed();
1725        if let Some(timings) = timings {
1726            epoch_store.record_local_execution_time(
1727                certificate.data().transaction_data(),
1728                &effects,
1729                timings,
1730                elapsed,
1731            );
1732        }
1733
1734        let elapsed_us = elapsed.as_micros() as f64;
1735        if elapsed_us > 0.0 {
1736            self.metrics
1737                .execution_gas_latency_ratio
1738                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1739        };
1740        ExecutionOutput::Success((effects, execution_error_opt))
1741    }
1742
1743    pub fn read_objects_for_execution(
1744        &self,
1745        tx_lock: &CertLockGuard,
1746        certificate: &VerifiedExecutableTransaction,
1747        assigned_shared_object_versions: AssignedVersions,
1748        epoch_store: &Arc<AuthorityPerEpochStore>,
1749    ) -> SuiResult<InputObjects> {
1750        let _scope = monitored_scope("Execution::load_input_objects");
1751        let _metrics_guard = self
1752            .metrics
1753            .execution_load_input_objects_latency
1754            .start_timer();
1755        let input_objects = &certificate.data().transaction_data().input_objects()?;
1756        self.input_loader.read_objects_for_execution(
1757            &certificate.key(),
1758            tx_lock,
1759            input_objects,
1760            &assigned_shared_object_versions,
1761            epoch_store.epoch(),
1762        )
1763    }
1764
1765    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1766    /// transactions. Assumes execution will always succeed.
1767    pub async fn try_execute_for_test(
1768        &self,
1769        certificate: &VerifiedCertificate,
1770        execution_env: ExecutionEnv,
1771    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1772        let epoch_store = self.epoch_store_for_testing();
1773        let (effects, execution_error_opt) = self
1774            .try_execute_immediately(
1775                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1776                execution_env,
1777                &epoch_store,
1778            )
1779            .await
1780            .unwrap();
1781        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1782        (signed_effects, execution_error_opt)
1783    }
1784
1785    pub async fn notify_read_effects(
1786        &self,
1787        task_name: &'static str,
1788        digest: TransactionDigest,
1789    ) -> SuiResult<TransactionEffects> {
1790        Ok(self
1791            .get_transaction_cache_reader()
1792            .notify_read_executed_effects(task_name, &[digest])
1793            .await
1794            .pop()
1795            .expect("must return correct number of effects"))
1796    }
1797
1798    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1799        self.get_object_cache_reader()
1800            .check_owned_objects_are_live(owned_object_refs)
1801    }
1802
1803    /// This function captures the required state to debug a forked transaction.
1804    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1805    /// NOTE: Since this info escapes the validator context,
1806    /// make sure not to leak any private info here
1807    pub(crate) fn debug_dump_transaction_state(
1808        &self,
1809        tx_digest: &TransactionDigest,
1810        effects: &TransactionEffects,
1811        expected_effects_digest: TransactionEffectsDigest,
1812        inner_temporary_store: &InnerTemporaryStore,
1813        certificate: &VerifiedExecutableTransaction,
1814        debug_dump_config: &StateDebugDumpConfig,
1815    ) -> SuiResult<PathBuf> {
1816        let dump_dir = debug_dump_config
1817            .dump_file_directory
1818            .as_ref()
1819            .cloned()
1820            .unwrap_or(std::env::temp_dir());
1821        let epoch_store = self.load_epoch_store_one_call_per_task();
1822
1823        NodeStateDump::new(
1824            tx_digest,
1825            effects,
1826            expected_effects_digest,
1827            self.get_object_store().as_ref(),
1828            &epoch_store,
1829            inner_temporary_store,
1830            certificate,
1831        )?
1832        .write_to_file(&dump_dir)
1833        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1834    }
1835
1836    #[instrument(level = "trace", skip_all)]
1837    pub(crate) fn process_certificate(
1838        &self,
1839        tx_guard: &CertTxGuard,
1840        execution_guard: &ExecutionLockReadGuard<'_>,
1841        certificate: &VerifiedExecutableTransaction,
1842        execution_env: ExecutionEnv,
1843        epoch_store: &Arc<AuthorityPerEpochStore>,
1844    ) -> ExecutionOutput<(
1845        TransactionOutputs,
1846        Vec<ExecutionTiming>,
1847        Option<ExecutionError>,
1848    )> {
1849        let _scope = monitored_scope("Execution::process_certificate");
1850        let tx_digest = *certificate.digest();
1851
1852        let input_objects = match self.read_objects_for_execution(
1853            tx_guard.as_lock_guard(),
1854            certificate,
1855            execution_env.assigned_versions,
1856            epoch_store,
1857        ) {
1858            Ok(objects) => objects,
1859            Err(e) => return ExecutionOutput::Fatal(e),
1860        };
1861
1862        let expected_effects_digest = match execution_env.expected_effects_digest {
1863            Some(expected_effects_digest) => Some(expected_effects_digest),
1864            None => {
1865                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1866                // restarting with a new binary. In this situation, if we have published an effects signature,
1867                // we must be sure not to equivocate.
1868                // TODO: read from cache instead of DB
1869                match epoch_store.get_signed_effects_digest(&tx_digest) {
1870                    Ok(digest) => digest,
1871                    Err(e) => return ExecutionOutput::Fatal(e),
1872                }
1873            }
1874        };
1875
1876        fail_point_if!("correlated-crash-process-certificate", || {
1877            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1878                sui_simulator::task::kill_current_node(None);
1879            }
1880        });
1881
1882        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1883        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1884        // this function occur before we have written anything to the db, so we commit the tx
1885        // guard and rely on the client to retry the tx (if it was transient).
1886        self.execute_certificate(
1887            execution_guard,
1888            certificate,
1889            input_objects,
1890            expected_effects_digest,
1891            execution_env.withdraw_status,
1892            epoch_store,
1893        )
1894    }
1895
1896    pub async fn reconfigure_traffic_control(
1897        &self,
1898        params: TrafficControlReconfigParams,
1899    ) -> Result<TrafficControlReconfigParams, SuiError> {
1900        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1901            traffic_controller.admin_reconfigure(params).await
1902        } else {
1903            Err(SuiErrorKind::InvalidAdminRequest(
1904                "Traffic controller is not configured on this node".to_string(),
1905            )
1906            .into())
1907        }
1908    }
1909
1910    #[instrument(level = "trace", skip_all)]
1911    fn commit_certificate(
1912        &self,
1913        certificate: &VerifiedExecutableTransaction,
1914        transaction_outputs: Arc<TransactionOutputs>,
1915        epoch_store: &Arc<AuthorityPerEpochStore>,
1916    ) -> SuiResult {
1917        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1918            monitored_scope("Execution::commit_certificate");
1919        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1920
1921        let tx_digest = certificate.digest();
1922
1923        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1924        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1925        epoch_store.insert_executed_in_epoch(tx_digest);
1926        let key = certificate.key();
1927        if !matches!(key, TransactionKey::Digest(_)) {
1928            epoch_store.insert_tx_key(key, *tx_digest)?;
1929        }
1930
1931        // Allow testing what happens if we crash here.
1932        fail_point!("crash");
1933
1934        self.get_cache_writer()
1935            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1936
1937        if certificate.transaction_data().is_end_of_epoch_tx() {
1938            // At the end of epoch, since system packages may have been upgraded, force
1939            // reload them in the cache.
1940            self.get_object_cache_reader()
1941                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1942        }
1943
1944        Ok(())
1945    }
1946
1947    fn update_metrics(
1948        &self,
1949        certificate: &VerifiedExecutableTransaction,
1950        inner_temporary_store: &InnerTemporaryStore,
1951        effects: &TransactionEffects,
1952    ) {
1953        // count signature by scheme, for zklogin and multisig
1954        if certificate.has_zklogin_sig() {
1955            self.metrics.zklogin_sig_count.inc();
1956        } else if certificate.has_upgraded_multisig() {
1957            self.metrics.multisig_sig_count.inc();
1958        }
1959
1960        self.metrics.total_effects.inc();
1961        self.metrics.total_certs.inc();
1962
1963        let consensus_object_count = effects.input_consensus_objects().len();
1964        if consensus_object_count > 0 {
1965            self.metrics.shared_obj_tx.inc();
1966        }
1967
1968        if certificate.is_sponsored_tx() {
1969            self.metrics.sponsored_tx.inc();
1970        }
1971
1972        let input_object_count = inner_temporary_store.input_objects.len();
1973        self.metrics
1974            .num_input_objs
1975            .observe(input_object_count as f64);
1976        self.metrics
1977            .num_shared_objects
1978            .observe(consensus_object_count as f64);
1979        self.metrics.batch_size.observe(
1980            certificate
1981                .data()
1982                .intent_message()
1983                .value
1984                .kind()
1985                .num_commands() as f64,
1986        );
1987    }
1988
1989    /// execute_certificate validates the transaction input, and executes the certificate,
1990    /// returning transaction outputs.
1991    ///
1992    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1993    ///
1994    /// Executes a certificate and returns an ExecutionOutput.
1995    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1996    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1997    #[instrument(level = "trace", skip_all)]
1998    fn execute_certificate(
1999        &self,
2000        _execution_guard: &ExecutionLockReadGuard<'_>,
2001        certificate: &VerifiedExecutableTransaction,
2002        input_objects: InputObjects,
2003        expected_effects_digest: Option<TransactionEffectsDigest>,
2004        withdraw_status: BalanceWithdrawStatus,
2005        epoch_store: &Arc<AuthorityPerEpochStore>,
2006    ) -> ExecutionOutput<(
2007        TransactionOutputs,
2008        Vec<ExecutionTiming>,
2009        Option<ExecutionError>,
2010    )> {
2011        let _scope = monitored_scope("Execution::prepare_certificate");
2012        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2013        let prepare_certificate_start_time = tokio::time::Instant::now();
2014
2015        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
2016        let tx_data = certificate.data().transaction_data();
2017        if let Err(e) = tx_data.validity_check(epoch_store.protocol_config()) {
2018            return ExecutionOutput::Fatal(e.into());
2019        }
2020
2021        // The cost of partially re-auditing a transaction before execution is tolerated.
2022        // This step is required for correctness because, for example, ConsensusAddressOwner
2023        // object owner may have changed between signing and execution.
2024        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2025            certificate,
2026            input_objects,
2027            epoch_store.protocol_config(),
2028            epoch_store.reference_gas_price(),
2029        ) {
2030            Ok(result) => result,
2031            Err(e) => return ExecutionOutput::Fatal(e),
2032        };
2033
2034        let owned_object_refs = input_objects.inner().filter_owned_objects();
2035        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2036            return ExecutionOutput::Fatal(e);
2037        }
2038        let tx_digest = *certificate.digest();
2039        let protocol_config = epoch_store.protocol_config();
2040        let transaction_data = &certificate.data().intent_message().value;
2041        let (kind, signer, gas_data) = transaction_data.execution_parts();
2042        let early_execution_error = get_early_execution_error(
2043            &tx_digest,
2044            &input_objects,
2045            self.config.certificate_deny_config.certificate_deny_set(),
2046            &withdraw_status,
2047        );
2048        let execution_params = match early_execution_error {
2049            Some(error) => ExecutionOrEarlyError::Err(error),
2050            None => ExecutionOrEarlyError::Ok(()),
2051        };
2052
2053        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2054
2055        #[allow(unused_mut)]
2056        let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2057            epoch_store.executor().execute_transaction_to_effects(
2058                &tracking_store,
2059                protocol_config,
2060                self.metrics.limits_metrics.clone(),
2061                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
2062                // cyclic dependency w/ sui-adapter
2063                self.config
2064                    .expensive_safety_check_config
2065                    .enable_deep_per_tx_sui_conservation_check(),
2066                execution_params,
2067                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2068                epoch_store
2069                    .epoch_start_config()
2070                    .epoch_data()
2071                    .epoch_start_timestamp(),
2072                input_objects,
2073                gas_data,
2074                gas_status,
2075                kind,
2076                signer,
2077                tx_digest,
2078                &mut None,
2079            );
2080
2081        if let Some(expected_effects_digest) = expected_effects_digest
2082            && effects.digest() != expected_effects_digest
2083        {
2084            // We dont want to mask the original error, so we log it and continue.
2085            match self.debug_dump_transaction_state(
2086                &tx_digest,
2087                &effects,
2088                expected_effects_digest,
2089                &inner_temp_store,
2090                certificate,
2091                &self.config.state_debug_dump_config,
2092            ) {
2093                Ok(out_path) => {
2094                    info!(
2095                        "Dumped node state for transaction {} to {}",
2096                        tx_digest,
2097                        out_path.as_path().display().to_string()
2098                    );
2099                }
2100                Err(e) => {
2101                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2102                }
2103            }
2104            error!(
2105                ?tx_digest,
2106                ?expected_effects_digest,
2107                actual_effects = ?effects,
2108                "fork detected!"
2109            );
2110            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2111                tx_digest,
2112                expected_effects_digest,
2113                effects.digest(),
2114            ) {
2115                error!("Failed to record transaction fork: {e}");
2116            }
2117
2118            fail_point_if!("kill_transaction_fork_node", || {
2119                #[cfg(msim)]
2120                {
2121                    tracing::error!(
2122                        fatal = true,
2123                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2124                        tx_digest
2125                    );
2126                    sui_simulator::task::shutdown_current_node();
2127                }
2128            });
2129
2130            fatal!(
2131                "Transaction {} is expected to have effects digest {}, but got {}!",
2132                tx_digest,
2133                expected_effects_digest,
2134                effects.digest()
2135            );
2136        }
2137
2138        fail_point_arg!("simulate_fork_during_execution", |(
2139            forked_validators,
2140            full_halt,
2141            effects_overrides,
2142            fork_probability,
2143        ): (
2144            std::sync::Arc<
2145                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2146            >,
2147            bool,
2148            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2149            f32,
2150        )| {
2151            #[cfg(msim)]
2152            self.simulate_fork_during_execution(
2153                certificate,
2154                epoch_store,
2155                &mut effects,
2156                forked_validators,
2157                full_halt,
2158                effects_overrides,
2159                fork_probability,
2160            );
2161        });
2162
2163        let unchanged_loaded_runtime_objects =
2164            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2165                certificate.transaction_data(),
2166                &effects,
2167                &tracking_store.into_read_objects(),
2168            );
2169
2170        // index certificate
2171        let _ = self
2172            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2173            .tap_err(|e| {
2174                self.metrics.post_processing_total_failures.inc();
2175                error!(?tx_digest, "tx post processing failed: {e}");
2176            });
2177
2178        self.update_metrics(certificate, &inner_temp_store, &effects);
2179
2180        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2181            certificate.clone().into_unsigned(),
2182            effects,
2183            inner_temp_store,
2184            unchanged_loaded_runtime_objects,
2185        );
2186
2187        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2188        if elapsed > 0.0 {
2189            self.metrics.prepare_cert_gas_latency_ratio.observe(
2190                transaction_outputs
2191                    .effects
2192                    .gas_cost_summary()
2193                    .computation_cost as f64
2194                    / elapsed,
2195            );
2196        }
2197
2198        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2199    }
2200
2201    pub fn prepare_certificate_for_benchmark(
2202        &self,
2203        certificate: &VerifiedExecutableTransaction,
2204        input_objects: InputObjects,
2205        epoch_store: &Arc<AuthorityPerEpochStore>,
2206    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2207        let lock = RwLock::new(epoch_store.epoch());
2208        let execution_guard = lock.try_read().unwrap();
2209
2210        let (transaction_outputs, _timings, execution_error_opt) = self
2211            .execute_certificate(
2212                &execution_guard,
2213                certificate,
2214                input_objects,
2215                None,
2216                BalanceWithdrawStatus::NoWithdraw,
2217                epoch_store,
2218            )
2219            .unwrap();
2220        Ok((transaction_outputs, execution_error_opt))
2221    }
2222
2223    #[instrument(skip_all)]
2224    #[allow(clippy::type_complexity)]
2225    pub async fn dry_exec_transaction(
2226        &self,
2227        transaction: TransactionData,
2228        transaction_digest: TransactionDigest,
2229    ) -> SuiResult<(
2230        DryRunTransactionBlockResponse,
2231        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2232        TransactionEffects,
2233        Option<ObjectID>,
2234    )> {
2235        let epoch_store = self.load_epoch_store_one_call_per_task();
2236        if !self.is_fullnode(&epoch_store) {
2237            return Err(SuiErrorKind::UnsupportedFeatureError {
2238                error: "dry-exec is only supported on fullnodes".to_string(),
2239            }
2240            .into());
2241        }
2242
2243        if transaction.kind().is_system_tx() {
2244            return Err(SuiErrorKind::UnsupportedFeatureError {
2245                error: "dry-exec does not support system transactions".to_string(),
2246            }
2247            .into());
2248        }
2249
2250        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2251    }
2252
2253    #[allow(clippy::type_complexity)]
2254    pub fn dry_exec_transaction_for_benchmark(
2255        &self,
2256        transaction: TransactionData,
2257        transaction_digest: TransactionDigest,
2258    ) -> SuiResult<(
2259        DryRunTransactionBlockResponse,
2260        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2261        TransactionEffects,
2262        Option<ObjectID>,
2263    )> {
2264        let epoch_store = self.load_epoch_store_one_call_per_task();
2265        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2266    }
2267
2268    #[allow(clippy::type_complexity)]
2269    fn dry_exec_transaction_impl(
2270        &self,
2271        epoch_store: &AuthorityPerEpochStore,
2272        transaction: TransactionData,
2273        transaction_digest: TransactionDigest,
2274    ) -> SuiResult<(
2275        DryRunTransactionBlockResponse,
2276        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2277        TransactionEffects,
2278        Option<ObjectID>,
2279    )> {
2280        // Cheap validity checks for a transaction, including input size limits.
2281        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2282
2283        let input_object_kinds = transaction.input_objects()?;
2284        let receiving_object_refs = transaction.receiving_objects();
2285
2286        sui_transaction_checks::deny::check_transaction_for_signing(
2287            &transaction,
2288            &[],
2289            &input_object_kinds,
2290            &receiving_object_refs,
2291            &self.config.transaction_deny_config,
2292            self.get_backing_package_store().as_ref(),
2293        )?;
2294
2295        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2296            // We don't want to cache this transaction since it's a dry run.
2297            None,
2298            &input_object_kinds,
2299            &receiving_object_refs,
2300            epoch_store.epoch(),
2301        )?;
2302
2303        // make a gas object if one was not provided
2304        let mut gas_data = transaction.gas_data().clone();
2305        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2306            let sender = transaction.sender();
2307            // use a 1B sui coin
2308            const MIST_TO_SUI: u64 = 1_000_000_000;
2309            const DRY_RUN_SUI: u64 = 1_000_000_000;
2310            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2311            let gas_object_id = ObjectID::random();
2312            let gas_object = Object::new_move(
2313                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2314                Owner::AddressOwner(sender),
2315                TransactionDigest::genesis_marker(),
2316            );
2317            let gas_object_ref = gas_object.compute_object_reference();
2318            gas_data.payment = vec![gas_object_ref];
2319            (
2320                sui_transaction_checks::check_transaction_input_with_given_gas(
2321                    epoch_store.protocol_config(),
2322                    epoch_store.reference_gas_price(),
2323                    &transaction,
2324                    input_objects,
2325                    receiving_objects,
2326                    gas_object,
2327                    &self.metrics.bytecode_verifier_metrics,
2328                    &self.config.verifier_signing_config,
2329                )?,
2330                Some(gas_object_id),
2331            )
2332        } else {
2333            (
2334                sui_transaction_checks::check_transaction_input(
2335                    epoch_store.protocol_config(),
2336                    epoch_store.reference_gas_price(),
2337                    &transaction,
2338                    input_objects,
2339                    &receiving_objects,
2340                    &self.metrics.bytecode_verifier_metrics,
2341                    &self.config.verifier_signing_config,
2342                )?,
2343                None,
2344            )
2345        };
2346
2347        let protocol_config = epoch_store.protocol_config();
2348        let (kind, signer, _) = transaction.execution_parts();
2349
2350        let silent = true;
2351        let executor = sui_execution::executor(protocol_config, silent)
2352            .expect("Creating an executor should not fail here");
2353
2354        let expensive_checks = false;
2355        let early_execution_error = get_early_execution_error(
2356            &transaction_digest,
2357            &checked_input_objects,
2358            self.config.certificate_deny_config.certificate_deny_set(),
2359            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2360            &BalanceWithdrawStatus::NoWithdraw,
2361        );
2362        let execution_params = match early_execution_error {
2363            Some(error) => ExecutionOrEarlyError::Err(error),
2364            None => ExecutionOrEarlyError::Ok(()),
2365        };
2366        let (inner_temp_store, _, effects, _timings, execution_error) = executor
2367            .execute_transaction_to_effects(
2368                self.get_backing_store().as_ref(),
2369                protocol_config,
2370                self.metrics.limits_metrics.clone(),
2371                expensive_checks,
2372                execution_params,
2373                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2374                epoch_store
2375                    .epoch_start_config()
2376                    .epoch_data()
2377                    .epoch_start_timestamp(),
2378                checked_input_objects,
2379                gas_data,
2380                gas_status,
2381                kind,
2382                signer,
2383                transaction_digest,
2384                &mut None,
2385            );
2386        let tx_digest = *effects.transaction_digest();
2387
2388        let module_cache =
2389            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2390
2391        let mut layout_resolver =
2392            epoch_store
2393                .executor()
2394                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2395                    &inner_temp_store,
2396                    self.get_backing_package_store(),
2397                )));
2398        // Returning empty vector here because we recalculate changes in the rpc layer.
2399        let object_changes = Vec::new();
2400
2401        // Returning empty vector here because we recalculate changes in the rpc layer.
2402        let balance_changes = Vec::new();
2403
2404        let written_with_kind = effects
2405            .created()
2406            .into_iter()
2407            .map(|(oref, _)| (oref, WriteKind::Create))
2408            .chain(
2409                effects
2410                    .unwrapped()
2411                    .into_iter()
2412                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2413            )
2414            .chain(
2415                effects
2416                    .mutated()
2417                    .into_iter()
2418                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2419            )
2420            .map(|(oref, kind)| {
2421                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2422                // TODO: Avoid clones.
2423                (oref.0, (oref, obj.clone(), kind))
2424            })
2425            .collect();
2426
2427        let execution_error_source = execution_error
2428            .as_ref()
2429            .err()
2430            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2431
2432        Ok((
2433            DryRunTransactionBlockResponse {
2434                suggested_gas_price: self
2435                    .congestion_tracker
2436                    .get_suggested_gas_prices(&transaction),
2437                input: SuiTransactionBlockData::try_from_with_module_cache(
2438                    transaction,
2439                    &module_cache,
2440                )
2441                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2442                    error: format!(
2443                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2444                        e
2445                    ),
2446                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2447                effects: effects.clone().try_into()?,
2448                events: SuiTransactionBlockEvents::try_from(
2449                    inner_temp_store.events.clone(),
2450                    tx_digest,
2451                    None,
2452                    layout_resolver.as_mut(),
2453                )?,
2454                object_changes,
2455                balance_changes,
2456                execution_error_source,
2457            },
2458            written_with_kind,
2459            effects,
2460            mock_gas,
2461        ))
2462    }
2463
2464    pub fn simulate_transaction(
2465        &self,
2466        mut transaction: TransactionData,
2467        checks: TransactionChecks,
2468    ) -> SuiResult<SimulateTransactionResult> {
2469        if transaction.kind().is_system_tx() {
2470            return Err(SuiErrorKind::UnsupportedFeatureError {
2471                error: "simulate does not support system transactions".to_string(),
2472            }
2473            .into());
2474        }
2475
2476        let epoch_store = self.load_epoch_store_one_call_per_task();
2477        if !self.is_fullnode(&epoch_store) {
2478            return Err(SuiErrorKind::UnsupportedFeatureError {
2479                error: "simulate is only supported on fullnodes".to_string(),
2480            }
2481            .into());
2482        }
2483
2484        // Cheap validity checks for a transaction, including input size limits.
2485        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2486
2487        let input_object_kinds = transaction.input_objects()?;
2488        let receiving_object_refs = transaction.receiving_objects();
2489
2490        sui_transaction_checks::deny::check_transaction_for_signing(
2491            &transaction,
2492            &[],
2493            &input_object_kinds,
2494            &receiving_object_refs,
2495            &self.config.transaction_deny_config,
2496            self.get_backing_package_store().as_ref(),
2497        )?;
2498
2499        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2500            // We don't want to cache this transaction since it's a simulation.
2501            None,
2502            &input_object_kinds,
2503            &receiving_object_refs,
2504            epoch_store.epoch(),
2505        )?;
2506
2507        // mock a gas object if one was not provided
2508        let mock_gas_id = if transaction.gas().is_empty() {
2509            let mock_gas_object = Object::new_move(
2510                MoveObject::new_gas_coin(
2511                    OBJECT_START_VERSION,
2512                    ObjectID::MAX,
2513                    DEV_INSPECT_GAS_COIN_VALUE,
2514                ),
2515                Owner::AddressOwner(transaction.gas_data().owner),
2516                TransactionDigest::genesis_marker(),
2517            );
2518            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2519            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2520            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2521            Some(mock_gas_object.id())
2522        } else {
2523            None
2524        };
2525
2526        let protocol_config = epoch_store.protocol_config();
2527
2528        let (gas_status, checked_input_objects) = if checks.enabled() {
2529            sui_transaction_checks::check_transaction_input(
2530                epoch_store.protocol_config(),
2531                epoch_store.reference_gas_price(),
2532                &transaction,
2533                input_objects,
2534                &receiving_objects,
2535                &self.metrics.bytecode_verifier_metrics,
2536                &self.config.verifier_signing_config,
2537            )?
2538        } else {
2539            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2540                protocol_config,
2541                transaction.kind(),
2542                input_objects,
2543                receiving_objects,
2544            )?;
2545            let gas_status = SuiGasStatus::new(
2546                transaction.gas_budget(),
2547                transaction.gas_price(),
2548                epoch_store.reference_gas_price(),
2549                protocol_config,
2550            )?;
2551
2552            (gas_status, checked_input_objects)
2553        };
2554
2555        // TODO see if we can spin up a VM once and reuse it
2556        let executor = sui_execution::executor(
2557            protocol_config,
2558            true, // silent
2559        )
2560        .expect("Creating an executor should not fail here");
2561
2562        let (kind, signer, gas_data) = transaction.execution_parts();
2563        let early_execution_error = get_early_execution_error(
2564            &transaction.digest(),
2565            &checked_input_objects,
2566            self.config.certificate_deny_config.certificate_deny_set(),
2567            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2568            &BalanceWithdrawStatus::NoWithdraw,
2569        );
2570        let execution_params = match early_execution_error {
2571            Some(error) => ExecutionOrEarlyError::Err(error),
2572            None => ExecutionOrEarlyError::Ok(()),
2573        };
2574
2575        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2576
2577        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2578            &tracking_store,
2579            protocol_config,
2580            self.metrics.limits_metrics.clone(),
2581            false, // expensive_checks
2582            execution_params,
2583            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2584            epoch_store
2585                .epoch_start_config()
2586                .epoch_data()
2587                .epoch_start_timestamp(),
2588            checked_input_objects,
2589            gas_data,
2590            gas_status,
2591            kind,
2592            signer,
2593            transaction.digest(),
2594            checks.disabled(),
2595        );
2596
2597        let loaded_runtime_objects = tracking_store.into_read_objects();
2598        let unchanged_loaded_runtime_objects =
2599            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2600                &transaction,
2601                &effects,
2602                &loaded_runtime_objects,
2603            );
2604
2605        let object_set = {
2606            let objects = {
2607                let mut objects = loaded_runtime_objects;
2608
2609                for o in inner_temp_store
2610                    .input_objects
2611                    .into_values()
2612                    .chain(inner_temp_store.written.into_values())
2613                {
2614                    objects.insert(o);
2615                }
2616
2617                objects
2618            };
2619
2620            let object_keys = sui_types::storage::get_transaction_object_set(
2621                &transaction,
2622                &effects,
2623                &unchanged_loaded_runtime_objects,
2624            );
2625
2626            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2627            for k in object_keys {
2628                if let Some(o) = objects.get(&k) {
2629                    set.insert(o.clone());
2630                }
2631            }
2632
2633            set
2634        };
2635
2636        Ok(SimulateTransactionResult {
2637            objects: object_set,
2638            events: effects.events_digest().map(|_| inner_temp_store.events),
2639            effects,
2640            execution_result,
2641            mock_gas_id,
2642            unchanged_loaded_runtime_objects,
2643        })
2644    }
2645
2646    /// The object ID for gas can be any object ID, even for an uncreated object
2647    #[allow(clippy::collapsible_else_if)]
2648    #[instrument(skip_all)]
2649    pub async fn dev_inspect_transaction_block(
2650        &self,
2651        sender: SuiAddress,
2652        transaction_kind: TransactionKind,
2653        gas_price: Option<u64>,
2654        gas_budget: Option<u64>,
2655        gas_sponsor: Option<SuiAddress>,
2656        gas_objects: Option<Vec<ObjectRef>>,
2657        show_raw_txn_data_and_effects: Option<bool>,
2658        skip_checks: Option<bool>,
2659    ) -> SuiResult<DevInspectResults> {
2660        let epoch_store = self.load_epoch_store_one_call_per_task();
2661
2662        if !self.is_fullnode(&epoch_store) {
2663            return Err(SuiErrorKind::UnsupportedFeatureError {
2664                error: "dev-inspect is only supported on fullnodes".to_string(),
2665            }
2666            .into());
2667        }
2668
2669        if transaction_kind.is_system_tx() {
2670            return Err(SuiErrorKind::UnsupportedFeatureError {
2671                error: "system transactions are not supported".to_string(),
2672            }
2673            .into());
2674        }
2675
2676        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2677        let skip_checks = skip_checks.unwrap_or(true);
2678        let reference_gas_price = epoch_store.reference_gas_price();
2679        let protocol_config = epoch_store.protocol_config();
2680        let max_tx_gas = protocol_config.max_tx_gas();
2681
2682        let price = gas_price.unwrap_or(reference_gas_price);
2683        let budget = gas_budget.unwrap_or(max_tx_gas);
2684        let owner = gas_sponsor.unwrap_or(sender);
2685        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2686        let payment = gas_objects.unwrap_or_default();
2687        let mut transaction = TransactionData::V1(TransactionDataV1 {
2688            kind: transaction_kind.clone(),
2689            sender,
2690            gas_data: GasData {
2691                payment,
2692                owner,
2693                price,
2694                budget,
2695            },
2696            expiration: TransactionExpiration::None,
2697        });
2698
2699        let raw_txn_data = if show_raw_txn_data_and_effects {
2700            bcs::to_bytes(&transaction).map_err(|_| {
2701                SuiErrorKind::TransactionSerializationError {
2702                    error: "Failed to serialize transaction during dev inspect".to_string(),
2703                }
2704            })?
2705        } else {
2706            vec![]
2707        };
2708
2709        transaction.validity_check_no_gas_check(protocol_config)?;
2710
2711        let input_object_kinds = transaction.input_objects()?;
2712        let receiving_object_refs = transaction.receiving_objects();
2713
2714        sui_transaction_checks::deny::check_transaction_for_signing(
2715            &transaction,
2716            &[],
2717            &input_object_kinds,
2718            &receiving_object_refs,
2719            &self.config.transaction_deny_config,
2720            self.get_backing_package_store().as_ref(),
2721        )?;
2722
2723        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2724            // We don't want to cache this transaction since it's a dev inspect.
2725            None,
2726            &input_object_kinds,
2727            &receiving_object_refs,
2728            epoch_store.epoch(),
2729        )?;
2730
2731        let (gas_status, checked_input_objects) = if skip_checks {
2732            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2733            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2734            // use the dummy gas object so we need to add it to the input objects vector.
2735            if transaction.gas().is_empty() {
2736                // Create and use a dummy gas object if there is no gas object provided.
2737                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2738                    DEV_INSPECT_GAS_COIN_VALUE,
2739                    transaction.gas_owner(),
2740                );
2741                let gas_object_ref = dummy_gas_object.compute_object_reference();
2742                transaction.gas_data_mut().payment = vec![gas_object_ref];
2743                input_objects.push(ObjectReadResult::new(
2744                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2745                    dummy_gas_object.into(),
2746                ));
2747            }
2748            let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2749                protocol_config,
2750                &transaction_kind,
2751                input_objects,
2752                receiving_objects,
2753            )?;
2754            let gas_status = SuiGasStatus::new(
2755                max_tx_gas,
2756                transaction.gas_price(),
2757                reference_gas_price,
2758                protocol_config,
2759            )?;
2760
2761            (gas_status, checked_input_objects)
2762        } else {
2763            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2764            // variant which will perform full fledged checks just like a real transaction execution.
2765            if transaction.gas().is_empty() {
2766                // Create and use a dummy gas object if there is no gas object provided.
2767                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2768                    DEV_INSPECT_GAS_COIN_VALUE,
2769                    transaction.gas_owner(),
2770                );
2771                let gas_object_ref = dummy_gas_object.compute_object_reference();
2772                transaction.gas_data_mut().payment = vec![gas_object_ref];
2773                sui_transaction_checks::check_transaction_input_with_given_gas(
2774                    epoch_store.protocol_config(),
2775                    epoch_store.reference_gas_price(),
2776                    &transaction,
2777                    input_objects,
2778                    receiving_objects,
2779                    dummy_gas_object,
2780                    &self.metrics.bytecode_verifier_metrics,
2781                    &self.config.verifier_signing_config,
2782                )?
2783            } else {
2784                sui_transaction_checks::check_transaction_input(
2785                    epoch_store.protocol_config(),
2786                    epoch_store.reference_gas_price(),
2787                    &transaction,
2788                    input_objects,
2789                    &receiving_objects,
2790                    &self.metrics.bytecode_verifier_metrics,
2791                    &self.config.verifier_signing_config,
2792                )?
2793            }
2794        };
2795
2796        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2797            .expect("Creating an executor should not fail here");
2798        let gas_data = transaction.gas_data().clone();
2799        let intent_msg = IntentMessage::new(
2800            Intent {
2801                version: IntentVersion::V0,
2802                scope: IntentScope::TransactionData,
2803                app_id: AppId::Sui,
2804            },
2805            transaction,
2806        );
2807        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2808        let early_execution_error = get_early_execution_error(
2809            &transaction_digest,
2810            &checked_input_objects,
2811            self.config.certificate_deny_config.certificate_deny_set(),
2812            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2813            &BalanceWithdrawStatus::NoWithdraw,
2814        );
2815        let execution_params = match early_execution_error {
2816            Some(error) => ExecutionOrEarlyError::Err(error),
2817            None => ExecutionOrEarlyError::Ok(()),
2818        };
2819        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2820            self.get_backing_store().as_ref(),
2821            protocol_config,
2822            self.metrics.limits_metrics.clone(),
2823            /* expensive checks */ false,
2824            execution_params,
2825            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2826            epoch_store
2827                .epoch_start_config()
2828                .epoch_data()
2829                .epoch_start_timestamp(),
2830            checked_input_objects,
2831            gas_data,
2832            gas_status,
2833            transaction_kind,
2834            sender,
2835            transaction_digest,
2836            skip_checks,
2837        );
2838
2839        let raw_effects = if show_raw_txn_data_and_effects {
2840            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2841                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2842            })?
2843        } else {
2844            vec![]
2845        };
2846
2847        let mut layout_resolver =
2848            epoch_store
2849                .executor()
2850                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2851                    &inner_temp_store,
2852                    self.get_backing_package_store(),
2853                )));
2854
2855        DevInspectResults::new(
2856            effects,
2857            inner_temp_store.events.clone(),
2858            execution_result,
2859            raw_txn_data,
2860            raw_effects,
2861            layout_resolver.as_mut(),
2862        )
2863    }
2864
2865    // Only used for testing because of how epoch store is loaded.
2866    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2867        let epoch_store = self.epoch_store_for_testing();
2868        Ok(epoch_store.reference_gas_price())
2869    }
2870
2871    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2872        self.get_transaction_cache_reader()
2873            .is_tx_already_executed(digest)
2874    }
2875
2876    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2877    fn index_tx(
2878        &self,
2879        indexes: &IndexStore,
2880        digest: &TransactionDigest,
2881        // TODO: index_tx really just need the transaction data here.
2882        cert: &VerifiedExecutableTransaction,
2883        effects: &TransactionEffects,
2884        events: &TransactionEvents,
2885        timestamp_ms: u64,
2886        tx_coins: Option<TxCoins>,
2887        written: &WrittenObjects,
2888        inner_temporary_store: &InnerTemporaryStore,
2889        epoch_store: &Arc<AuthorityPerEpochStore>,
2890    ) -> SuiResult<u64> {
2891        let changes = self
2892            .process_object_index(effects, written, inner_temporary_store, epoch_store)
2893            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2894
2895        indexes.index_tx(
2896            cert.data().intent_message().value.sender(),
2897            cert.data()
2898                .intent_message()
2899                .value
2900                .input_objects()?
2901                .iter()
2902                .map(|o| o.object_id()),
2903            effects
2904                .all_changed_objects()
2905                .into_iter()
2906                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2907            cert.data()
2908                .intent_message()
2909                .value
2910                .move_calls()
2911                .into_iter()
2912                .map(|(package, module, function)| {
2913                    (*package, module.to_owned(), function.to_owned())
2914                }),
2915            events,
2916            changes,
2917            digest,
2918            timestamp_ms,
2919            tx_coins,
2920        )
2921    }
2922
2923    #[cfg(msim)]
2924    fn simulate_fork_during_execution(
2925        &self,
2926        certificate: &VerifiedExecutableTransaction,
2927        epoch_store: &Arc<AuthorityPerEpochStore>,
2928        effects: &mut TransactionEffects,
2929        forked_validators: std::sync::Arc<
2930            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2931        >,
2932        full_halt: bool,
2933        effects_overrides: std::sync::Arc<
2934            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2935        >,
2936        fork_probability: f32,
2937    ) {
2938        use std::cell::RefCell;
2939        thread_local! {
2940            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2941        }
2942        if !certificate.data().intent_message().value.is_system_tx() {
2943            let committee = epoch_store.committee();
2944            let cur_stake = (**committee).weight(&self.name);
2945            if cur_stake > 0 {
2946                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2947                    let already_forked = forked_validators
2948                        .lock()
2949                        .ok()
2950                        .map(|set| set.contains(&self.name))
2951                        .unwrap_or(false);
2952
2953                    if !already_forked {
2954                        let should_fork = if full_halt {
2955                            // For full halt, fork enough nodes to reach validity threshold
2956                            *total_stake <= committee.validity_threshold()
2957                        } else {
2958                            // For partial fork, stay strictly below validity threshold
2959                            *total_stake + cur_stake < committee.validity_threshold()
2960                        };
2961
2962                        if should_fork {
2963                            *total_stake += cur_stake;
2964
2965                            if let Ok(mut external_set) = forked_validators.lock() {
2966                                external_set.insert(self.name);
2967                                info!("forked_validators: {:?}", external_set);
2968                            }
2969                        }
2970                    }
2971
2972                    if let Ok(external_set) = forked_validators.lock() {
2973                        if external_set.contains(&self.name) {
2974                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
2975                            // Fork this transaction and record the digest and the original effects to original_effects.
2976                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
2977                            // the transaction was forked on a different validator), fork this txn as well.
2978
2979                            let tx_digest = certificate.digest().to_string();
2980                            if let Ok(mut overrides) = effects_overrides.lock() {
2981                                if overrides.contains_key(&tx_digest)
2982                                    || overrides.is_empty()
2983                                        && sui_simulator::random::deterministic_probability(
2984                                            &tx_digest,
2985                                            fork_probability,
2986                                        )
2987                                {
2988                                    let original_effects_digest = effects.digest().to_string();
2989                                    overrides
2990                                        .insert(tx_digest.clone(), original_effects_digest.clone());
2991                                    info!(
2992                                        ?tx_digest,
2993                                        ?original_effects_digest,
2994                                        "Captured forked effects digest for transaction"
2995                                    );
2996                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
2997                                        1;
2998                                }
2999                            }
3000                        }
3001                    }
3002                });
3003            }
3004        }
3005    }
3006
3007    fn process_object_index(
3008        &self,
3009        effects: &TransactionEffects,
3010        written: &WrittenObjects,
3011        inner_temporary_store: &InnerTemporaryStore,
3012        epoch_store: &Arc<AuthorityPerEpochStore>,
3013    ) -> SuiResult<ObjectIndexChanges> {
3014        let mut layout_resolver =
3015            epoch_store
3016                .executor()
3017                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3018                    inner_temporary_store,
3019                    self.get_backing_package_store(),
3020                )));
3021
3022        let modified_at_version = effects
3023            .modified_at_versions()
3024            .into_iter()
3025            .collect::<HashMap<_, _>>();
3026
3027        let tx_digest = effects.transaction_digest();
3028        let mut deleted_owners = vec![];
3029        let mut deleted_dynamic_fields = vec![];
3030        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3031            let old_version = modified_at_version.get(&id).unwrap();
3032            // When we process the index, the latest object hasn't been written yet so
3033            // the old object must be present.
3034            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3035                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3036            ) {
3037                Owner::AddressOwner(addr)
3038                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3039                Owner::ObjectOwner(object_id) => {
3040                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3041                }
3042                _ => {}
3043            }
3044        }
3045
3046        let mut new_owners = vec![];
3047        let mut new_dynamic_fields = vec![];
3048
3049        for (oref, owner, kind) in effects.all_changed_objects() {
3050            let id = &oref.0;
3051            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3052            if let WriteKind::Mutate = kind {
3053                let Some(old_version) = modified_at_version.get(id) else {
3054                    panic!(
3055                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3056                        tx_digest
3057                    );
3058                };
3059                // When we process the index, the latest object hasn't been written yet so
3060                // the old object must be present.
3061                let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3062                else {
3063                    panic!(
3064                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3065                        tx_digest, id, old_version
3066                    );
3067                };
3068                if old_object.owner != owner {
3069                    match old_object.owner {
3070                        Owner::AddressOwner(addr)
3071                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3072                            deleted_owners.push((addr, *id));
3073                        }
3074                        Owner::ObjectOwner(object_id) => {
3075                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3076                        }
3077                        _ => {}
3078                    }
3079                }
3080            }
3081
3082            match owner {
3083                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3084                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3085                    let new_object = written.get(id).unwrap_or_else(
3086                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3087                    );
3088                    assert_eq!(
3089                        new_object.version(),
3090                        oref.1,
3091                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3092                        tx_digest,
3093                        id,
3094                        new_object.version(),
3095                        oref.1
3096                    );
3097
3098                    let type_ = new_object
3099                        .type_()
3100                        .map(|type_| ObjectType::Struct(type_.clone()))
3101                        .unwrap_or(ObjectType::Package);
3102
3103                    new_owners.push((
3104                        (addr, *id),
3105                        ObjectInfo {
3106                            object_id: *id,
3107                            version: oref.1,
3108                            digest: oref.2,
3109                            type_,
3110                            owner,
3111                            previous_transaction: *effects.transaction_digest(),
3112                        },
3113                    ));
3114                }
3115                Owner::ObjectOwner(owner) => {
3116                    let new_object = written.get(id).unwrap_or_else(
3117                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3118                    );
3119                    assert_eq!(
3120                        new_object.version(),
3121                        oref.1,
3122                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3123                        tx_digest,
3124                        id,
3125                        new_object.version(),
3126                        oref.1
3127                    );
3128
3129                    let Some(df_info) = self
3130                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3131                        .unwrap_or_else(|e| {
3132                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3133                            None
3134                        }
3135                    )
3136                        else {
3137                            // Skip indexing for non dynamic field objects.
3138                            continue;
3139                        };
3140                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3141                }
3142                _ => {}
3143            }
3144        }
3145
3146        Ok(ObjectIndexChanges {
3147            deleted_owners,
3148            deleted_dynamic_fields,
3149            new_owners,
3150            new_dynamic_fields,
3151        })
3152    }
3153
3154    fn try_create_dynamic_field_info(
3155        &self,
3156        o: &Object,
3157        written: &WrittenObjects,
3158        resolver: &mut dyn LayoutResolver,
3159    ) -> SuiResult<Option<DynamicFieldInfo>> {
3160        // Skip if not a move object
3161        let Some(move_object) = o.data.try_as_move().cloned() else {
3162            return Ok(None);
3163        };
3164
3165        // We only index dynamic field objects
3166        if !move_object.type_().is_dynamic_field() {
3167            return Ok(None);
3168        }
3169
3170        let layout = resolver
3171            .get_annotated_layout(&move_object.type_().clone().into())?
3172            .into_layout();
3173
3174        let field =
3175            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3176                SuiErrorKind::ObjectDeserializationError {
3177                    error: e.to_string(),
3178                }
3179            })?;
3180
3181        let type_ = field.kind;
3182        let name_type: TypeTag = field.name_layout.into();
3183        let bcs_name = field.name_bytes.to_owned();
3184
3185        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3186            .map_err(|e| {
3187                warn!("{e}");
3188                SuiErrorKind::ObjectDeserializationError {
3189                    error: e.to_string(),
3190                }
3191            })?;
3192
3193        let name = DynamicFieldName {
3194            type_: name_type,
3195            value: SuiMoveValue::from(name_value).to_json_value(),
3196        };
3197
3198        let value_metadata = field.value_metadata().map_err(|e| {
3199            warn!("{e}");
3200            SuiErrorKind::ObjectDeserializationError {
3201                error: e.to_string(),
3202            }
3203        })?;
3204
3205        Ok(Some(match value_metadata {
3206            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3207                name,
3208                bcs_name,
3209                type_,
3210                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3211                object_id: o.id(),
3212                version: o.version(),
3213                digest: o.digest(),
3214            },
3215
3216            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3217                // Find the actual object from storage using the object id obtained from the wrapper.
3218
3219                // Try to find the object in the written objects first.
3220                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3221                    let version = object.version();
3222                    let digest = object.digest();
3223                    let object_type = object.data.type_().unwrap().clone();
3224                    (version, digest, object_type)
3225                } else {
3226                    // If not found, try to find it in the database.
3227                    let object = self
3228                        .get_object_store()
3229                        .get_object_by_key(&object_id, o.version())
3230                        .ok_or_else(|| UserInputError::ObjectNotFound {
3231                            object_id,
3232                            version: Some(o.version()),
3233                        })?;
3234                    let version = object.version();
3235                    let digest = object.digest();
3236                    let object_type = object.data.type_().unwrap().clone();
3237                    (version, digest, object_type)
3238                };
3239
3240                DynamicFieldInfo {
3241                    name,
3242                    bcs_name,
3243                    type_,
3244                    object_type: object_type.to_string(),
3245                    object_id,
3246                    version,
3247                    digest,
3248                }
3249            }
3250        }))
3251    }
3252
3253    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3254    fn post_process_one_tx(
3255        &self,
3256        certificate: &VerifiedExecutableTransaction,
3257        effects: &TransactionEffects,
3258        inner_temporary_store: &InnerTemporaryStore,
3259        epoch_store: &Arc<AuthorityPerEpochStore>,
3260    ) -> SuiResult {
3261        if self.indexes.is_none() {
3262            return Ok(());
3263        }
3264
3265        let _scope = monitored_scope("Execution::post_process_one_tx");
3266
3267        let tx_digest = certificate.digest();
3268        let timestamp_ms = Self::unixtime_now_ms();
3269        let events = &inner_temporary_store.events;
3270        let written = &inner_temporary_store.written;
3271        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3272            effects,
3273            inner_temporary_store,
3274            epoch_store,
3275        );
3276
3277        // Index tx
3278        if let Some(indexes) = &self.indexes {
3279            let _ = self
3280                .index_tx(
3281                    indexes.as_ref(),
3282                    tx_digest,
3283                    certificate,
3284                    effects,
3285                    events,
3286                    timestamp_ms,
3287                    tx_coins,
3288                    written,
3289                    inner_temporary_store,
3290                    epoch_store,
3291                )
3292                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3293                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3294                .expect("Indexing tx should not fail");
3295
3296            let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3297            let events = self.make_transaction_block_events(
3298                events.clone(),
3299                *tx_digest,
3300                timestamp_ms,
3301                epoch_store,
3302                inner_temporary_store,
3303            )?;
3304            // Emit events
3305            self.subscription_handler
3306                .process_tx(certificate.data().transaction_data(), &effects, &events)
3307                .tap_ok(|_| {
3308                    self.metrics
3309                        .post_processing_total_tx_had_event_processed
3310                        .inc()
3311                })
3312                .tap_err(|e| {
3313                    warn!(
3314                        ?tx_digest,
3315                        "Post processing - Couldn't process events for tx: {}", e
3316                    )
3317                })?;
3318
3319            self.metrics
3320                .post_processing_total_events_emitted
3321                .inc_by(events.data.len() as u64);
3322        };
3323        Ok(())
3324    }
3325
3326    fn make_transaction_block_events(
3327        &self,
3328        transaction_events: TransactionEvents,
3329        digest: TransactionDigest,
3330        timestamp_ms: u64,
3331        epoch_store: &Arc<AuthorityPerEpochStore>,
3332        inner_temporary_store: &InnerTemporaryStore,
3333    ) -> SuiResult<SuiTransactionBlockEvents> {
3334        let mut layout_resolver =
3335            epoch_store
3336                .executor()
3337                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3338                    inner_temporary_store,
3339                    self.get_backing_package_store(),
3340                )));
3341        SuiTransactionBlockEvents::try_from(
3342            transaction_events,
3343            digest,
3344            Some(timestamp_ms),
3345            layout_resolver.as_mut(),
3346        )
3347    }
3348
3349    pub fn unixtime_now_ms() -> u64 {
3350        let now = SystemTime::now()
3351            .duration_since(UNIX_EPOCH)
3352            .expect("Time went backwards")
3353            .as_millis();
3354        u64::try_from(now).expect("Travelling in time machine")
3355    }
3356
3357    // TODO(fastpath): update this handler for Mysticeti fastpath.
3358    // There will no longer be validator quorum signed transactions or effects.
3359    // The proof of finality needs to come from checkpoints.
3360    #[instrument(level = "trace", skip_all)]
3361    pub async fn handle_transaction_info_request(
3362        &self,
3363        request: TransactionInfoRequest,
3364    ) -> SuiResult<TransactionInfoResponse> {
3365        let epoch_store = self.load_epoch_store_one_call_per_task();
3366        let (transaction, status) = self
3367            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3368            .ok_or(SuiErrorKind::TransactionNotFound {
3369                digest: request.transaction_digest,
3370            })?;
3371        Ok(TransactionInfoResponse {
3372            transaction,
3373            status,
3374        })
3375    }
3376
3377    #[instrument(level = "trace", skip_all)]
3378    pub async fn handle_object_info_request(
3379        &self,
3380        request: ObjectInfoRequest,
3381    ) -> SuiResult<ObjectInfoResponse> {
3382        let epoch_store = self.load_epoch_store_one_call_per_task();
3383
3384        let requested_object_seq = match request.request_kind {
3385            ObjectInfoRequestKind::LatestObjectInfo => {
3386                let (_, seq, _) = self
3387                    .get_object_or_tombstone(request.object_id)
3388                    .await
3389                    .ok_or_else(|| {
3390                        SuiError::from(UserInputError::ObjectNotFound {
3391                            object_id: request.object_id,
3392                            version: None,
3393                        })
3394                    })?;
3395                seq
3396            }
3397            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3398        };
3399
3400        let object = self
3401            .get_object_store()
3402            .get_object_by_key(&request.object_id, requested_object_seq)
3403            .ok_or_else(|| {
3404                SuiError::from(UserInputError::ObjectNotFound {
3405                    object_id: request.object_id,
3406                    version: Some(requested_object_seq),
3407                })
3408            })?;
3409
3410        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3411            (request.generate_layout, object.data.try_as_move())
3412        {
3413            Some(into_struct_layout(
3414                self.load_epoch_store_one_call_per_task()
3415                    .executor()
3416                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3417                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3418            )?)
3419        } else {
3420            None
3421        };
3422
3423        let lock = if !object.is_address_owned() {
3424            // Only address owned objects have locks.
3425            None
3426        } else {
3427            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3428                .await?
3429                .map(|s| s.into_inner())
3430        };
3431
3432        Ok(ObjectInfoResponse {
3433            object,
3434            layout,
3435            lock_for_debugging: lock,
3436        })
3437    }
3438
3439    #[instrument(level = "trace", skip_all)]
3440    pub fn handle_checkpoint_request(
3441        &self,
3442        request: &CheckpointRequest,
3443    ) -> SuiResult<CheckpointResponse> {
3444        let summary = match request.sequence_number {
3445            Some(seq) => self
3446                .checkpoint_store
3447                .get_checkpoint_by_sequence_number(seq)?,
3448            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3449        }
3450        .map(|v| v.into_inner());
3451        let contents = match &summary {
3452            Some(s) => self
3453                .checkpoint_store
3454                .get_checkpoint_contents(&s.content_digest)?,
3455            None => None,
3456        };
3457        Ok(CheckpointResponse {
3458            checkpoint: summary,
3459            contents,
3460        })
3461    }
3462
3463    #[instrument(level = "trace", skip_all)]
3464    pub fn handle_checkpoint_request_v2(
3465        &self,
3466        request: &CheckpointRequestV2,
3467    ) -> SuiResult<CheckpointResponseV2> {
3468        let summary = if request.certified {
3469            let summary = match request.sequence_number {
3470                Some(seq) => self
3471                    .checkpoint_store
3472                    .get_checkpoint_by_sequence_number(seq)?,
3473                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474            }
3475            .map(|v| v.into_inner());
3476            summary.map(CheckpointSummaryResponse::Certified)
3477        } else {
3478            let summary = match request.sequence_number {
3479                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3480                None => self
3481                    .checkpoint_store
3482                    .get_latest_locally_computed_checkpoint()?,
3483            };
3484            summary.map(CheckpointSummaryResponse::Pending)
3485        };
3486        let contents = match &summary {
3487            Some(s) => self
3488                .checkpoint_store
3489                .get_checkpoint_contents(&s.content_digest())?,
3490            None => None,
3491        };
3492        Ok(CheckpointResponseV2 {
3493            checkpoint: summary,
3494            contents,
3495        })
3496    }
3497
3498    fn check_protocol_version(
3499        supported_protocol_versions: SupportedProtocolVersions,
3500        current_version: ProtocolVersion,
3501    ) {
3502        info!("current protocol version is now {:?}", current_version);
3503        info!("supported versions are: {:?}", supported_protocol_versions);
3504        if !supported_protocol_versions.is_version_supported(current_version) {
3505            let msg = format!(
3506                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3507                current_version, supported_protocol_versions,
3508            );
3509
3510            error!("{}", msg);
3511            eprintln!("{}", msg);
3512
3513            #[cfg(not(msim))]
3514            std::process::exit(1);
3515
3516            #[cfg(msim)]
3517            sui_simulator::task::shutdown_current_node();
3518        }
3519    }
3520
3521    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3522    #[allow(clippy::too_many_arguments)]
3523    pub async fn new(
3524        name: AuthorityName,
3525        secret: StableSyncAuthoritySigner,
3526        supported_protocol_versions: SupportedProtocolVersions,
3527        store: Arc<AuthorityStore>,
3528        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3529        epoch_store: Arc<AuthorityPerEpochStore>,
3530        committee_store: Arc<CommitteeStore>,
3531        indexes: Option<Arc<IndexStore>>,
3532        rpc_index: Option<Arc<RpcIndexStore>>,
3533        checkpoint_store: Arc<CheckpointStore>,
3534        prometheus_registry: &Registry,
3535        genesis_objects: &[Object],
3536        db_checkpoint_config: &DBCheckpointConfig,
3537        config: NodeConfig,
3538        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3539        chain_identifier: ChainIdentifier,
3540        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3541        policy_config: Option<PolicyConfig>,
3542        firewall_config: Option<RemoteFirewallConfig>,
3543        pruner_watermarks: Arc<PrunerWatermarks>,
3544    ) -> Arc<Self> {
3545        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3546
3547        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3548        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3549        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3550            execution_cache_trait_pointers.object_cache_reader.clone(),
3551            execution_cache_trait_pointers.object_store.clone(),
3552            execution_cache_trait_pointers
3553                .transaction_cache_reader
3554                .clone(),
3555            tx_ready_certificates,
3556            &epoch_store,
3557            metrics.clone(),
3558        ));
3559        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3560
3561        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3562            epoch_store.get_parent_path(),
3563            &config.authority_store_pruning_config,
3564        );
3565        let _pruner = AuthorityStorePruner::new(
3566            store.perpetual_tables.clone(),
3567            checkpoint_store.clone(),
3568            rpc_index.clone(),
3569            indexes.clone(),
3570            config.authority_store_pruning_config.clone(),
3571            epoch_store.committee().authority_exists(&name),
3572            epoch_store.epoch_start_state().epoch_duration_ms(),
3573            prometheus_registry,
3574            pruner_db,
3575            pruner_watermarks,
3576        );
3577        let input_loader =
3578            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3579        let epoch = epoch_store.epoch();
3580        let traffic_controller_metrics =
3581            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3582        let traffic_controller = if let Some(policy_config) = policy_config {
3583            Some(Arc::new(
3584                TrafficController::init(
3585                    policy_config,
3586                    traffic_controller_metrics,
3587                    firewall_config.clone(),
3588                )
3589                .await,
3590            ))
3591        } else {
3592            None
3593        };
3594
3595        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3596            ForkRecoveryState::new(Some(fork_config))
3597                .expect("Failed to initialize fork recovery state")
3598        });
3599
3600        let state = Arc::new(AuthorityState {
3601            name,
3602            secret,
3603            execution_lock: RwLock::new(epoch),
3604            epoch_store: ArcSwap::new(epoch_store.clone()),
3605            input_loader,
3606            execution_cache_trait_pointers,
3607            indexes,
3608            rpc_index,
3609            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3610            checkpoint_store,
3611            committee_store,
3612            execution_scheduler,
3613            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3614            metrics,
3615            _pruner,
3616            _authority_per_epoch_pruner,
3617            db_checkpoint_config: db_checkpoint_config.clone(),
3618            config,
3619            overload_info: AuthorityOverloadInfo::default(),
3620            validator_tx_finalizer,
3621            chain_identifier,
3622            congestion_tracker: Arc::new(CongestionTracker::new()),
3623            traffic_controller,
3624            fork_recovery_state,
3625        });
3626
3627        let state_clone = Arc::downgrade(&state);
3628        spawn_monitored_task!(fix_indexes(state_clone));
3629        // Start a task to execute ready certificates.
3630        let authority_state = Arc::downgrade(&state);
3631        spawn_monitored_task!(execution_process(
3632            authority_state,
3633            rx_ready_certificates,
3634            rx_execution_shutdown,
3635        ));
3636        // TODO: This doesn't belong to the constructor of AuthorityState.
3637        state
3638            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3639            .expect("Error indexing genesis objects.");
3640
3641        state
3642    }
3643
3644    // TODO: Consolidate our traits to reduce the number of methods here.
3645    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3646        &self.execution_cache_trait_pointers.object_cache_reader
3647    }
3648
3649    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3650        &self.execution_cache_trait_pointers.transaction_cache_reader
3651    }
3652
3653    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3654        &self.execution_cache_trait_pointers.cache_writer
3655    }
3656
3657    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3658        &self.execution_cache_trait_pointers.backing_store
3659    }
3660
3661    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3662        &self.execution_cache_trait_pointers.child_object_resolver
3663    }
3664
3665    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3666        &self.execution_cache_trait_pointers.backing_package_store
3667    }
3668
3669    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3670        &self.execution_cache_trait_pointers.object_store
3671    }
3672
3673    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3674        &self.execution_cache_trait_pointers.reconfig_api
3675    }
3676
3677    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3678        &self.execution_cache_trait_pointers.global_state_hash_store
3679    }
3680
3681    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3682        &self.execution_cache_trait_pointers.checkpoint_cache
3683    }
3684
3685    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3686        &self.execution_cache_trait_pointers.state_sync_store
3687    }
3688
3689    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3690        &self.execution_cache_trait_pointers.cache_commit
3691    }
3692
3693    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3694        self.execution_cache_trait_pointers
3695            .testing_api
3696            .database_for_testing()
3697    }
3698
3699    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3700        &self,
3701        config: NodeConfig,
3702        metrics: Arc<AuthorityStorePruningMetrics>,
3703    ) -> anyhow::Result<()> {
3704        use crate::authority::authority_store_pruner::PrunerWatermarks;
3705        let watermarks = Arc::new(PrunerWatermarks::default());
3706        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3707            &self.database_for_testing().perpetual_tables,
3708            &self.checkpoint_store,
3709            self.rpc_index.as_deref(),
3710            None,
3711            config.authority_store_pruning_config,
3712            metrics,
3713            EPOCH_DURATION_MS_FOR_TESTING,
3714            &watermarks,
3715        )
3716        .await
3717    }
3718
3719    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3720        &self.execution_scheduler
3721    }
3722
3723    fn create_owner_index_if_empty(
3724        &self,
3725        genesis_objects: &[Object],
3726        epoch_store: &Arc<AuthorityPerEpochStore>,
3727    ) -> SuiResult {
3728        let Some(index_store) = &self.indexes else {
3729            return Ok(());
3730        };
3731        if !index_store.is_empty() {
3732            return Ok(());
3733        }
3734
3735        let mut new_owners = vec![];
3736        let mut new_dynamic_fields = vec![];
3737        let mut layout_resolver = epoch_store
3738            .executor()
3739            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3740        for o in genesis_objects.iter() {
3741            match o.owner {
3742                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3743                    new_owners.push((
3744                        (addr, o.id()),
3745                        ObjectInfo::new(&o.compute_object_reference(), o),
3746                    ))
3747                }
3748                Owner::ObjectOwner(object_id) => {
3749                    let id = o.id();
3750                    let Some(info) = self.try_create_dynamic_field_info(
3751                        o,
3752                        &BTreeMap::new(),
3753                        layout_resolver.as_mut(),
3754                    )?
3755                    else {
3756                        continue;
3757                    };
3758                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3759                }
3760                _ => {}
3761            }
3762        }
3763
3764        index_store.insert_genesis_objects(ObjectIndexChanges {
3765            deleted_owners: vec![],
3766            deleted_dynamic_fields: vec![],
3767            new_owners,
3768            new_dynamic_fields,
3769        })
3770    }
3771
3772    /// Attempts to acquire execution lock for an executable transaction.
3773    /// Returns Some(lock) if the transaction is matching current executed epoch.
3774    /// Returns None if validator is halted at epoch end or epoch mismatch.
3775    pub fn execution_lock_for_executable_transaction(
3776        &self,
3777        transaction: &VerifiedExecutableTransaction,
3778    ) -> Option<ExecutionLockReadGuard<'_>> {
3779        let lock = self.execution_lock.try_read().ok()?;
3780        if *lock == transaction.auth_sig().epoch() {
3781            Some(lock)
3782        } else {
3783            // TODO: Can this still happen?
3784            None
3785        }
3786    }
3787
3788    /// Acquires the execution lock for the duration of a transaction signing request.
3789    /// This prevents reconfiguration from starting until we are finished handling the signing request.
3790    /// Otherwise, in-memory lock state could be cleared (by `ObjectLocks::clear_cached_locks`)
3791    /// while we are attempting to acquire locks for the transaction.
3792    pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3793        self.execution_lock
3794            .try_read()
3795            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3796    }
3797
3798    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3799        self.execution_lock.write().await
3800    }
3801
3802    #[instrument(level = "error", skip_all)]
3803    pub async fn reconfigure(
3804        &self,
3805        cur_epoch_store: &AuthorityPerEpochStore,
3806        supported_protocol_versions: SupportedProtocolVersions,
3807        new_committee: Committee,
3808        epoch_start_configuration: EpochStartConfiguration,
3809        state_hasher: Arc<GlobalStateHasher>,
3810        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3811        epoch_last_checkpoint: CheckpointSequenceNumber,
3812    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3813        Self::check_protocol_version(
3814            supported_protocol_versions,
3815            epoch_start_configuration
3816                .epoch_start_state()
3817                .protocol_version(),
3818        );
3819
3820        self.committee_store.insert_new_committee(&new_committee)?;
3821
3822        // Wait until no transactions are being executed.
3823        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3824
3825        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3826        cur_epoch_store.epoch_terminated().await;
3827
3828        // Safe to being reconfiguration now. No transactions are being executed,
3829        // and no epoch-specific tasks are running.
3830
3831        {
3832            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3833            if state.should_accept_user_certs() {
3834                // Need to change this so that consensus adapter do not accept certificates from user.
3835                // This can happen if our local validator did not initiate epoch change locally,
3836                // but 2f+1 nodes already concluded the epoch.
3837                //
3838                // This lock is essentially a barrier for
3839                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
3840                cur_epoch_store.close_user_certs(state);
3841            }
3842            // lock is dropped here
3843        }
3844
3845        self.get_reconfig_api()
3846            .clear_state_end_of_epoch(&execution_lock);
3847        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3848        self.maybe_reaccumulate_state_hash(
3849            cur_epoch_store,
3850            epoch_start_configuration
3851                .epoch_start_state()
3852                .protocol_version(),
3853        );
3854        self.get_reconfig_api()
3855            .set_epoch_start_configuration(&epoch_start_configuration);
3856        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3857            && self
3858                .db_checkpoint_config
3859                .perform_db_checkpoints_at_epoch_end
3860        {
3861            let checkpoint_indexes = self
3862                .db_checkpoint_config
3863                .perform_index_db_checkpoints_at_epoch_end
3864                .unwrap_or(false);
3865            let current_epoch = cur_epoch_store.epoch();
3866            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3867            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3868        }
3869
3870        self.get_reconfig_api()
3871            .reconfigure_cache(&epoch_start_configuration)
3872            .await;
3873
3874        let new_epoch = new_committee.epoch;
3875        let new_epoch_store = self
3876            .reopen_epoch_db(
3877                cur_epoch_store,
3878                new_committee,
3879                epoch_start_configuration,
3880                expensive_safety_check_config,
3881                epoch_last_checkpoint,
3882            )
3883            .await?;
3884        assert_eq!(new_epoch_store.epoch(), new_epoch);
3885        self.execution_scheduler.reconfigure(&new_epoch_store);
3886        *execution_lock = new_epoch;
3887        // drop execution_lock after epoch store was updated
3888        // see also assert in AuthorityState::process_certificate
3889        // on the epoch store and execution lock epoch match
3890        Ok(new_epoch_store)
3891    }
3892
3893    /// Advance the epoch store to the next epoch for testing only.
3894    /// This only manually sets all the places where we have the epoch number.
3895    /// It doesn't properly reconfigure the node, hence should be only used for testing.
3896    pub async fn reconfigure_for_testing(&self) {
3897        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3898        let epoch_store = self.epoch_store_for_testing().clone();
3899        let protocol_config = epoch_store.protocol_config().clone();
3900        // The current protocol config used in the epoch store may have been overridden and diverged from
3901        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
3902        // We reapply the override before creating the new epoch store, to make sure that
3903        // the new epoch store has the same protocol config as the current one.
3904        // Since this is for testing only, we mostly like to keep the protocol config the same
3905        // across epochs.
3906        let _guard =
3907            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3908        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3909            self.get_backing_package_store().clone(),
3910            self.get_object_store().clone(),
3911            &self.config.expensive_safety_check_config,
3912            self.checkpoint_store
3913                .get_epoch_last_checkpoint(epoch_store.epoch())
3914                .unwrap()
3915                .map(|c| *c.sequence_number())
3916                .unwrap_or_default(),
3917        );
3918        self.execution_scheduler.reconfigure(&new_epoch_store);
3919        let new_epoch = new_epoch_store.epoch();
3920        self.epoch_store.store(new_epoch_store);
3921        epoch_store.epoch_terminated().await;
3922
3923        *execution_lock = new_epoch;
3924    }
3925
3926    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
3927    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
3928    #[instrument(level = "error", skip_all)]
3929    fn maybe_reaccumulate_state_hash(
3930        &self,
3931        cur_epoch_store: &AuthorityPerEpochStore,
3932        new_protocol_version: ProtocolVersion,
3933    ) {
3934        self.get_reconfig_api()
3935            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
3936    }
3937
3938    #[instrument(level = "error", skip_all)]
3939    fn check_system_consistency(
3940        &self,
3941        cur_epoch_store: &AuthorityPerEpochStore,
3942        state_hasher: Arc<GlobalStateHasher>,
3943        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3944    ) {
3945        info!(
3946            "Performing sui conservation consistency check for epoch {}",
3947            cur_epoch_store.epoch()
3948        );
3949
3950        if let Err(err) = self
3951            .get_reconfig_api()
3952            .expensive_check_sui_conservation(cur_epoch_store)
3953        {
3954            if cfg!(debug_assertions) {
3955                panic!("{}", err);
3956            } else {
3957                // We cannot panic in production yet because it is known that there are some
3958                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
3959                warn!("Sui conservation consistency check failed: {}", err);
3960            }
3961        } else {
3962            info!("Sui conservation consistency check passed");
3963        }
3964
3965        // check for root state hash consistency with live object set
3966        if expensive_safety_check_config.enable_state_consistency_check() {
3967            info!(
3968                "Performing state consistency check for epoch {}",
3969                cur_epoch_store.epoch()
3970            );
3971            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
3972        }
3973
3974        if expensive_safety_check_config.enable_secondary_index_checks()
3975            && let Some(indexes) = self.indexes.clone()
3976        {
3977            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3978                .expect("secondary indexes are inconsistent");
3979        }
3980    }
3981
3982    fn expensive_check_is_consistent_state(
3983        &self,
3984        state_hasher: Arc<GlobalStateHasher>,
3985        cur_epoch_store: &AuthorityPerEpochStore,
3986    ) {
3987        let live_object_set_hash = state_hasher.digest_live_object_set(
3988            !cur_epoch_store
3989                .protocol_config()
3990                .simplified_unwrap_then_delete(),
3991        );
3992
3993        let root_state_hash: ECMHLiveObjectSetDigest = self
3994            .get_global_state_hash_store()
3995            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3996            .expect("Retrieving root state hash cannot fail")
3997            .expect("Root state hash for epoch must exist")
3998            .1
3999            .digest()
4000            .into();
4001
4002        let is_inconsistent = root_state_hash != live_object_set_hash;
4003        if is_inconsistent {
4004            debug_fatal!(
4005                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4006                root_state_hash,
4007                live_object_set_hash
4008            );
4009        } else {
4010            info!("State consistency check passed");
4011        }
4012
4013        state_hasher.set_inconsistent_state(is_inconsistent);
4014    }
4015
4016    pub fn current_epoch_for_testing(&self) -> EpochId {
4017        self.epoch_store_for_testing().epoch()
4018    }
4019
4020    #[instrument(level = "error", skip_all)]
4021    pub fn checkpoint_all_dbs(
4022        &self,
4023        checkpoint_path: &Path,
4024        cur_epoch_store: &AuthorityPerEpochStore,
4025        checkpoint_indexes: bool,
4026    ) -> SuiResult {
4027        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4028        let current_epoch = cur_epoch_store.epoch();
4029
4030        if checkpoint_path.exists() {
4031            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4032            return Ok(());
4033        }
4034
4035        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4036        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4037
4038        if checkpoint_path_tmp.exists() {
4039            fs::remove_dir_all(&checkpoint_path_tmp)
4040                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4041        }
4042
4043        fs::create_dir_all(&checkpoint_path_tmp)
4044            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4045        fs::create_dir(&store_checkpoint_path_tmp)
4046            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4047
4048        // NOTE: Do not change the order of invoking these checkpoint calls
4049        // We want to snapshot checkpoint db first to not race with state sync
4050        self.checkpoint_store
4051            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4052
4053        self.get_reconfig_api()
4054            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4055
4056        self.committee_store
4057            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4058
4059        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4060            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4061        }
4062
4063        fs::rename(checkpoint_path_tmp, checkpoint_path)
4064            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4065        Ok(())
4066    }
4067
4068    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4069    /// we never end up accessing different epoch stores in a single task, we need to make sure
4070    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4071    /// the case. This also means we should minimize the number of call-sites. Only call it when
4072    /// there is no way to obtain it from somewhere else.
4073    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4074        self.epoch_store.load()
4075    }
4076
4077    // Load the epoch store, should be used in tests only.
4078    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4079        self.load_epoch_store_one_call_per_task()
4080    }
4081
4082    pub fn clone_committee_for_testing(&self) -> Committee {
4083        Committee::clone(self.epoch_store_for_testing().committee())
4084    }
4085
4086    #[instrument(level = "trace", skip_all)]
4087    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4088        self.get_object_store().get_object(object_id)
4089    }
4090
4091    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4092        Ok(self
4093            .get_object(&SUI_SYSTEM_ADDRESS.into())
4094            .await
4095            .expect("framework object should always exist")
4096            .compute_object_reference())
4097    }
4098
4099    // This function is only used for testing.
4100    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4101        self.get_object_cache_reader()
4102            .get_sui_system_state_object_unsafe()
4103    }
4104
4105    #[instrument(level = "trace", skip_all)]
4106    fn get_transaction_checkpoint_sequence(
4107        &self,
4108        digest: &TransactionDigest,
4109        epoch_store: &AuthorityPerEpochStore,
4110    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4111        epoch_store.get_transaction_checkpoint(digest)
4112    }
4113
4114    #[instrument(level = "trace", skip_all)]
4115    pub fn get_checkpoint_by_sequence_number(
4116        &self,
4117        sequence_number: CheckpointSequenceNumber,
4118    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4119        Ok(self
4120            .checkpoint_store
4121            .get_checkpoint_by_sequence_number(sequence_number)?)
4122    }
4123
4124    #[instrument(level = "trace", skip_all)]
4125    pub fn get_transaction_checkpoint_for_tests(
4126        &self,
4127        digest: &TransactionDigest,
4128        epoch_store: &AuthorityPerEpochStore,
4129    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4130        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4131        let Some(checkpoint) = checkpoint else {
4132            return Ok(None);
4133        };
4134        let checkpoint = self
4135            .checkpoint_store
4136            .get_checkpoint_by_sequence_number(checkpoint)?;
4137        Ok(checkpoint)
4138    }
4139
4140    #[instrument(level = "trace", skip_all)]
4141    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4142        Ok(
4143            match self
4144                .get_object_cache_reader()
4145                .get_latest_object_or_tombstone(*object_id)
4146            {
4147                Some((_, ObjectOrTombstone::Object(object))) => {
4148                    let layout = self.get_object_layout(&object)?;
4149                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4150                }
4151                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4152                None => ObjectRead::NotExists(*object_id),
4153            },
4154        )
4155    }
4156
4157    /// Chain Identifier is the digest of the genesis checkpoint.
4158    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4159        self.chain_identifier
4160    }
4161
4162    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4163        self.fork_recovery_state.as_ref()
4164    }
4165
4166    #[instrument(level = "trace", skip_all)]
4167    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4168    where
4169        T: DeserializeOwned,
4170    {
4171        let o = self.get_object_read(object_id)?.into_object()?;
4172        if let Some(move_object) = o.data.try_as_move() {
4173            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4174                SuiErrorKind::ObjectDeserializationError {
4175                    error: format!("{e}"),
4176                }
4177            })?)
4178        } else {
4179            Err(SuiErrorKind::ObjectDeserializationError {
4180                error: format!("Provided object : [{object_id}] is not a Move object."),
4181            }
4182            .into())
4183        }
4184    }
4185
4186    /// This function aims to serve rpc reads on past objects and
4187    /// we don't expect it to be called for other purposes.
4188    /// Depending on the object pruning policies that will be enforced in the
4189    /// future there is no software-level guarantee/SLA to retrieve an object
4190    /// with an old version even if it exists/existed.
4191    #[instrument(level = "trace", skip_all)]
4192    pub fn get_past_object_read(
4193        &self,
4194        object_id: &ObjectID,
4195        version: SequenceNumber,
4196    ) -> SuiResult<PastObjectRead> {
4197        // Firstly we see if the object ever existed by getting its latest data
4198        let Some(obj_ref) = self
4199            .get_object_cache_reader()
4200            .get_latest_object_ref_or_tombstone(*object_id)
4201        else {
4202            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4203        };
4204
4205        if version > obj_ref.1 {
4206            return Ok(PastObjectRead::VersionTooHigh {
4207                object_id: *object_id,
4208                asked_version: version,
4209                latest_version: obj_ref.1,
4210            });
4211        }
4212
4213        if version < obj_ref.1 {
4214            // Read past objects
4215            return Ok(match self.read_object_at_version(object_id, version)? {
4216                Some((object, layout)) => {
4217                    let obj_ref = object.compute_object_reference();
4218                    PastObjectRead::VersionFound(obj_ref, object, layout)
4219                }
4220
4221                None => PastObjectRead::VersionNotFound(*object_id, version),
4222            });
4223        }
4224
4225        if !obj_ref.2.is_alive() {
4226            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4227        }
4228
4229        match self.read_object_at_version(object_id, obj_ref.1)? {
4230            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4231            None => {
4232                debug_fatal!(
4233                    "Object with in parent_entry is missing from object store, datastore is \
4234                     inconsistent"
4235                );
4236                Err(UserInputError::ObjectNotFound {
4237                    object_id: *object_id,
4238                    version: Some(obj_ref.1),
4239                }
4240                .into())
4241            }
4242        }
4243    }
4244
4245    #[instrument(level = "trace", skip_all)]
4246    fn read_object_at_version(
4247        &self,
4248        object_id: &ObjectID,
4249        version: SequenceNumber,
4250    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4251        let Some(object) = self
4252            .get_object_cache_reader()
4253            .get_object_by_key(object_id, version)
4254        else {
4255            return Ok(None);
4256        };
4257
4258        let layout = self.get_object_layout(&object)?;
4259        Ok(Some((object, layout)))
4260    }
4261
4262    fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4263        let layout = object
4264            .data
4265            .try_as_move()
4266            .map(|object| {
4267                into_struct_layout(
4268                    self.load_epoch_store_one_call_per_task()
4269                        .executor()
4270                        // TODO(cache) - must read through cache
4271                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4272                        .get_annotated_layout(&object.type_().clone().into())?,
4273                )
4274            })
4275            .transpose()?;
4276        Ok(layout)
4277    }
4278
4279    fn get_owner_at_version(
4280        &self,
4281        object_id: &ObjectID,
4282        version: SequenceNumber,
4283    ) -> SuiResult<Owner> {
4284        self.get_object_store()
4285            .get_object_by_key(object_id, version)
4286            .ok_or_else(|| {
4287                SuiError::from(UserInputError::ObjectNotFound {
4288                    object_id: *object_id,
4289                    version: Some(version),
4290                })
4291            })
4292            .map(|o| o.owner.clone())
4293    }
4294
4295    #[instrument(level = "trace", skip_all)]
4296    pub fn get_owner_objects(
4297        &self,
4298        owner: SuiAddress,
4299        // If `Some`, the query will start from the next item after the specified cursor
4300        cursor: Option<ObjectID>,
4301        limit: usize,
4302        filter: Option<SuiObjectDataFilter>,
4303    ) -> SuiResult<Vec<ObjectInfo>> {
4304        if let Some(indexes) = &self.indexes {
4305            indexes.get_owner_objects(owner, cursor, limit, filter)
4306        } else {
4307            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4308        }
4309    }
4310
4311    #[instrument(level = "trace", skip_all)]
4312    pub fn get_owned_coins_iterator_with_cursor(
4313        &self,
4314        owner: SuiAddress,
4315        // If `Some`, the query will start from the next item after the specified cursor
4316        cursor: (String, u64, ObjectID),
4317        limit: usize,
4318        one_coin_type_only: bool,
4319    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4320        if let Some(indexes) = &self.indexes {
4321            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4322        } else {
4323            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4324        }
4325    }
4326
4327    #[instrument(level = "trace", skip_all)]
4328    pub fn get_owner_objects_iterator(
4329        &self,
4330        owner: SuiAddress,
4331        // If `Some`, the query will start from the next item after the specified cursor
4332        cursor: Option<ObjectID>,
4333        filter: Option<SuiObjectDataFilter>,
4334    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4335        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4336        if let Some(indexes) = &self.indexes {
4337            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4338        } else {
4339            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4340        }
4341    }
4342
4343    #[instrument(level = "trace", skip_all)]
4344    pub async fn get_move_objects<T>(
4345        &self,
4346        owner: SuiAddress,
4347        type_: MoveObjectType,
4348    ) -> SuiResult<Vec<T>>
4349    where
4350        T: DeserializeOwned,
4351    {
4352        let object_ids = self
4353            .get_owner_objects_iterator(owner, None, None)?
4354            .filter(|o| match &o.type_ {
4355                ObjectType::Struct(s) => &type_ == s,
4356                ObjectType::Package => false,
4357            })
4358            .map(|info| ObjectKey(info.object_id, info.version))
4359            .collect::<Vec<_>>();
4360        let mut move_objects = vec![];
4361
4362        let objects = self
4363            .get_object_store()
4364            .multi_get_objects_by_key(&object_ids);
4365
4366        for (o, id) in objects.into_iter().zip(object_ids) {
4367            let object = o.ok_or_else(|| {
4368                SuiError::from(UserInputError::ObjectNotFound {
4369                    object_id: id.0,
4370                    version: Some(id.1),
4371                })
4372            })?;
4373            let move_object = object.data.try_as_move().ok_or_else(|| {
4374                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4375            })?;
4376            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4377                SuiErrorKind::ObjectDeserializationError {
4378                    error: format!("{e}"),
4379                }
4380            })?);
4381        }
4382        Ok(move_objects)
4383    }
4384
4385    #[instrument(level = "trace", skip_all)]
4386    pub fn get_dynamic_fields(
4387        &self,
4388        owner: ObjectID,
4389        // If `Some`, the query will start from the next item after the specified cursor
4390        cursor: Option<ObjectID>,
4391        limit: usize,
4392    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4393        Ok(self
4394            .get_dynamic_fields_iterator(owner, cursor)?
4395            .take(limit)
4396            .collect::<Result<Vec<_>, _>>()?)
4397    }
4398
4399    fn get_dynamic_fields_iterator(
4400        &self,
4401        owner: ObjectID,
4402        // If `Some`, the query will start from the next item after the specified cursor
4403        cursor: Option<ObjectID>,
4404    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4405    {
4406        if let Some(indexes) = &self.indexes {
4407            indexes.get_dynamic_fields_iterator(owner, cursor)
4408        } else {
4409            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4410        }
4411    }
4412
4413    #[instrument(level = "trace", skip_all)]
4414    pub fn get_dynamic_field_object_id(
4415        &self,
4416        owner: ObjectID,
4417        name_type: TypeTag,
4418        name_bcs_bytes: &[u8],
4419    ) -> SuiResult<Option<ObjectID>> {
4420        if let Some(indexes) = &self.indexes {
4421            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4422        } else {
4423            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4424        }
4425    }
4426
4427    #[instrument(level = "trace", skip_all)]
4428    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4429        Ok(self.get_indexes()?.next_sequence_number())
4430    }
4431
4432    #[instrument(level = "trace", skip_all)]
4433    pub async fn get_executed_transaction_and_effects(
4434        &self,
4435        digest: TransactionDigest,
4436        kv_store: Arc<TransactionKeyValueStore>,
4437    ) -> SuiResult<(Transaction, TransactionEffects)> {
4438        let transaction = kv_store.get_tx(digest).await?;
4439        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4440        Ok((transaction, effects))
4441    }
4442
4443    #[instrument(level = "trace", skip_all)]
4444    pub fn multi_get_checkpoint_by_sequence_number(
4445        &self,
4446        sequence_numbers: &[CheckpointSequenceNumber],
4447    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4448        Ok(self
4449            .checkpoint_store
4450            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4451    }
4452
4453    #[instrument(level = "trace", skip_all)]
4454    pub fn get_transaction_events(
4455        &self,
4456        digest: &TransactionDigest,
4457    ) -> SuiResult<TransactionEvents> {
4458        self.get_transaction_cache_reader()
4459            .get_events(digest)
4460            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4461    }
4462
4463    pub fn get_transaction_input_objects(
4464        &self,
4465        effects: &TransactionEffects,
4466    ) -> SuiResult<Vec<Object>> {
4467        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4468            .map_err(Into::into)
4469    }
4470
4471    pub fn get_transaction_output_objects(
4472        &self,
4473        effects: &TransactionEffects,
4474    ) -> SuiResult<Vec<Object>> {
4475        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4476            .map_err(Into::into)
4477    }
4478
4479    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4480        match &self.indexes {
4481            Some(i) => Ok(i.clone()),
4482            None => Err(SuiErrorKind::UnsupportedFeatureError {
4483                error: "extended object indexing is not enabled on this server".into(),
4484            }
4485            .into()),
4486        }
4487    }
4488
4489    pub async fn get_transactions_for_tests(
4490        self: &Arc<Self>,
4491        filter: Option<TransactionFilter>,
4492        cursor: Option<TransactionDigest>,
4493        limit: Option<usize>,
4494        reverse: bool,
4495    ) -> SuiResult<Vec<TransactionDigest>> {
4496        let metrics = KeyValueStoreMetrics::new_for_tests();
4497        let kv_store = Arc::new(TransactionKeyValueStore::new(
4498            "rocksdb",
4499            metrics,
4500            self.clone(),
4501        ));
4502        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4503            .await
4504    }
4505
4506    #[instrument(level = "trace", skip_all)]
4507    pub async fn get_transactions(
4508        &self,
4509        kv_store: &Arc<TransactionKeyValueStore>,
4510        filter: Option<TransactionFilter>,
4511        // If `Some`, the query will start from the next item after the specified cursor
4512        cursor: Option<TransactionDigest>,
4513        limit: Option<usize>,
4514        reverse: bool,
4515    ) -> SuiResult<Vec<TransactionDigest>> {
4516        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4517            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4518            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4519            if reverse {
4520                let iter = iter
4521                    .rev()
4522                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4523                    .skip(usize::from(cursor.is_some()));
4524                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4525            } else {
4526                let iter = iter
4527                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4528                    .skip(usize::from(cursor.is_some()));
4529                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4530            }
4531        }
4532        self.get_indexes()?
4533            .get_transactions(filter, cursor, limit, reverse)
4534    }
4535
4536    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4537        &self.checkpoint_store
4538    }
4539
4540    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4541        self.get_checkpoint_store()
4542            .get_highest_executed_checkpoint_seq_number()?
4543            .ok_or(
4544                SuiErrorKind::UserInputError {
4545                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4546                }
4547                .into(),
4548            )
4549    }
4550
4551    #[cfg(msim)]
4552    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4553        self.database_for_testing()
4554            .perpetual_tables
4555            .get_highest_pruned_checkpoint()
4556            .map(|c| c.unwrap_or(0))
4557            .map_err(Into::into)
4558    }
4559
4560    #[instrument(level = "trace", skip_all)]
4561    pub fn get_checkpoint_summary_by_sequence_number(
4562        &self,
4563        sequence_number: CheckpointSequenceNumber,
4564    ) -> SuiResult<CheckpointSummary> {
4565        let verified_checkpoint = self
4566            .get_checkpoint_store()
4567            .get_checkpoint_by_sequence_number(sequence_number)?;
4568        match verified_checkpoint {
4569            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4570            None => Err(SuiErrorKind::UserInputError {
4571                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4572            }
4573            .into()),
4574        }
4575    }
4576
4577    #[instrument(level = "trace", skip_all)]
4578    pub fn get_checkpoint_summary_by_digest(
4579        &self,
4580        digest: CheckpointDigest,
4581    ) -> SuiResult<CheckpointSummary> {
4582        let verified_checkpoint = self
4583            .get_checkpoint_store()
4584            .get_checkpoint_by_digest(&digest)?;
4585        match verified_checkpoint {
4586            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4587            None => Err(SuiErrorKind::UserInputError {
4588                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4589            }
4590            .into()),
4591        }
4592    }
4593
4594    #[instrument(level = "trace", skip_all)]
4595    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4596        if is_system_package(package_id) {
4597            return self.find_genesis_txn_digest();
4598        }
4599        Ok(self
4600            .get_object_read(&package_id)?
4601            .into_object()?
4602            .previous_transaction)
4603    }
4604
4605    #[instrument(level = "trace", skip_all)]
4606    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4607        let summary = self
4608            .get_verified_checkpoint_by_sequence_number(0)?
4609            .into_message();
4610        let content = self.get_checkpoint_contents(summary.content_digest)?;
4611        let genesis_transaction = content.enumerate_transactions(&summary).next();
4612        Ok(genesis_transaction
4613            .ok_or(SuiErrorKind::UserInputError {
4614                error: UserInputError::GenesisTransactionNotFound,
4615            })?
4616            .1
4617            .transaction)
4618    }
4619
4620    #[instrument(level = "trace", skip_all)]
4621    pub fn get_verified_checkpoint_by_sequence_number(
4622        &self,
4623        sequence_number: CheckpointSequenceNumber,
4624    ) -> SuiResult<VerifiedCheckpoint> {
4625        let verified_checkpoint = self
4626            .get_checkpoint_store()
4627            .get_checkpoint_by_sequence_number(sequence_number)?;
4628        match verified_checkpoint {
4629            Some(verified_checkpoint) => Ok(verified_checkpoint),
4630            None => Err(SuiErrorKind::UserInputError {
4631                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4632            }
4633            .into()),
4634        }
4635    }
4636
4637    #[instrument(level = "trace", skip_all)]
4638    pub fn get_verified_checkpoint_summary_by_digest(
4639        &self,
4640        digest: CheckpointDigest,
4641    ) -> SuiResult<VerifiedCheckpoint> {
4642        let verified_checkpoint = self
4643            .get_checkpoint_store()
4644            .get_checkpoint_by_digest(&digest)?;
4645        match verified_checkpoint {
4646            Some(verified_checkpoint) => Ok(verified_checkpoint),
4647            None => Err(SuiErrorKind::UserInputError {
4648                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4649            }
4650            .into()),
4651        }
4652    }
4653
4654    #[instrument(level = "trace", skip_all)]
4655    pub fn get_checkpoint_contents(
4656        &self,
4657        digest: CheckpointContentsDigest,
4658    ) -> SuiResult<CheckpointContents> {
4659        self.get_checkpoint_store()
4660            .get_checkpoint_contents(&digest)?
4661            .ok_or(
4662                SuiErrorKind::UserInputError {
4663                    error: UserInputError::CheckpointContentsNotFound(digest),
4664                }
4665                .into(),
4666            )
4667    }
4668
4669    #[instrument(level = "trace", skip_all)]
4670    pub fn get_checkpoint_contents_by_sequence_number(
4671        &self,
4672        sequence_number: CheckpointSequenceNumber,
4673    ) -> SuiResult<CheckpointContents> {
4674        let verified_checkpoint = self
4675            .get_checkpoint_store()
4676            .get_checkpoint_by_sequence_number(sequence_number)?;
4677        match verified_checkpoint {
4678            Some(verified_checkpoint) => {
4679                let content_digest = verified_checkpoint.into_inner().content_digest;
4680                self.get_checkpoint_contents(content_digest)
4681            }
4682            None => Err(SuiErrorKind::UserInputError {
4683                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4684            }
4685            .into()),
4686        }
4687    }
4688
4689    #[instrument(level = "trace", skip_all)]
4690    pub async fn query_events(
4691        &self,
4692        kv_store: &Arc<TransactionKeyValueStore>,
4693        query: EventFilter,
4694        // If `Some`, the query will start from the next item after the specified cursor
4695        cursor: Option<EventID>,
4696        limit: usize,
4697        descending: bool,
4698    ) -> SuiResult<Vec<SuiEvent>> {
4699        let index_store = self.get_indexes()?;
4700
4701        //Get the tx_num from tx_digest
4702        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4703            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4704                SuiErrorKind::TransactionNotFound {
4705                    digest: cursor.tx_digest,
4706                },
4707            )?;
4708            (tx_seq, cursor.event_seq as usize)
4709        } else if descending {
4710            (u64::MAX, usize::MAX)
4711        } else {
4712            (0, 0)
4713        };
4714
4715        let limit = limit + 1;
4716        let mut event_keys = match query {
4717            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4718            EventFilter::Transaction(digest) => {
4719                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4720            }
4721            EventFilter::MoveModule { package, module } => {
4722                let module_id = ModuleId::new(package.into(), module);
4723                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4724            }
4725            EventFilter::MoveEventType(struct_name) => index_store
4726                .events_by_move_event_struct_name(
4727                    &struct_name,
4728                    tx_num,
4729                    event_num,
4730                    limit,
4731                    descending,
4732                )?,
4733            EventFilter::Sender(sender) => {
4734                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4735            }
4736            EventFilter::TimeRange {
4737                start_time,
4738                end_time,
4739            } => index_store
4740                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4741            EventFilter::MoveEventModule { package, module } => index_store
4742                .events_by_move_event_module(
4743                    &ModuleId::new(package.into(), module),
4744                    tx_num,
4745                    event_num,
4746                    limit,
4747                    descending,
4748                )?,
4749            // not using "_ =>" because we want to make sure we remember to add new variants here
4750            EventFilter::Any(_) => {
4751                return Err(SuiErrorKind::UserInputError {
4752                    error: UserInputError::Unsupported(
4753                        "'Any' queries are not supported by the fullnode.".to_string(),
4754                    ),
4755                }
4756                .into());
4757            }
4758        };
4759
4760        // skip one event if exclusive cursor is provided,
4761        // otherwise truncate to the original limit.
4762        if cursor.is_some() {
4763            if !event_keys.is_empty() {
4764                event_keys.remove(0);
4765            }
4766        } else {
4767            event_keys.truncate(limit - 1);
4768        }
4769
4770        // get the unique set of digests from the event_keys
4771        let transaction_digests = event_keys
4772            .iter()
4773            .map(|(_, digest, _, _)| *digest)
4774            .collect::<HashSet<_>>()
4775            .into_iter()
4776            .collect::<Vec<_>>();
4777
4778        let events = kv_store
4779            .multi_get_events_by_tx_digests(&transaction_digests)
4780            .await?;
4781
4782        let events_map: HashMap<_, _> =
4783            transaction_digests.iter().zip(events.into_iter()).collect();
4784
4785        let stored_events = event_keys
4786            .into_iter()
4787            .map(|k| {
4788                (
4789                    k,
4790                    events_map
4791                        .get(&k.1)
4792                        .expect("fetched digest is missing")
4793                        .clone()
4794                        .and_then(|e| e.data.get(k.2).cloned()),
4795                )
4796            })
4797            .map(
4798                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4799                    event
4800                        .map(|e| (e, tx_digest, event_seq, timestamp))
4801                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4802                },
4803            )
4804            .collect::<Result<Vec<_>, _>>()?;
4805
4806        let epoch_store = self.load_epoch_store_one_call_per_task();
4807        let backing_store = self.get_backing_package_store().as_ref();
4808        let mut layout_resolver = epoch_store
4809            .executor()
4810            .type_layout_resolver(Box::new(backing_store));
4811        let mut events = vec![];
4812        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4813            events.push(SuiEvent::try_from(
4814                e.clone(),
4815                tx_digest,
4816                event_seq as u64,
4817                Some(timestamp),
4818                layout_resolver.get_annotated_layout(&e.type_)?,
4819            )?)
4820        }
4821        Ok(events)
4822    }
4823
4824    pub async fn insert_genesis_object(&self, object: Object) {
4825        self.get_reconfig_api().insert_genesis_object(object);
4826    }
4827
4828    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4829        futures::future::join_all(
4830            objects
4831                .iter()
4832                .map(|o| self.insert_genesis_object(o.clone())),
4833        )
4834        .await;
4835    }
4836
4837    /// Make a status response for a transaction
4838    #[instrument(level = "trace", skip_all)]
4839    pub fn get_transaction_status(
4840        &self,
4841        transaction_digest: &TransactionDigest,
4842        epoch_store: &Arc<AuthorityPerEpochStore>,
4843    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
4844        // TODO: In the case of read path, we should not have to re-sign the effects.
4845        if let Some(effects) =
4846            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4847        {
4848            if let Some(transaction) = self
4849                .get_transaction_cache_reader()
4850                .get_transaction_block(transaction_digest)
4851            {
4852                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4853                let events = if effects.events_digest().is_some() {
4854                    self.get_transaction_events(effects.transaction_digest())?
4855                } else {
4856                    TransactionEvents::default()
4857                };
4858                return Ok(Some((
4859                    (*transaction).clone().into_message(),
4860                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4861                )));
4862            } else {
4863                // The read of effects and read of transaction are not atomic. It's possible that we reverted
4864                // the transaction (during epoch change) in between the above two reads, and we end up
4865                // having effects but not transaction. In this case, we just fall through.
4866                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4867            }
4868        }
4869        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4870            self.metrics.tx_already_processed.inc();
4871            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4872            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4873        } else {
4874            Ok(None)
4875        }
4876    }
4877
4878    /// Get the signed effects of the given transaction. If the effects was signed in a previous
4879    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
4880    /// epoch.
4881    #[instrument(level = "trace", skip_all)]
4882    pub fn get_signed_effects_and_maybe_resign(
4883        &self,
4884        transaction_digest: &TransactionDigest,
4885        epoch_store: &Arc<AuthorityPerEpochStore>,
4886    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
4887        let effects = self
4888            .get_transaction_cache_reader()
4889            .get_executed_effects(transaction_digest);
4890        match effects {
4891            Some(effects) => {
4892                // If the transaction was executed in previous epochs, the validator will
4893                // re-sign the effects with new current epoch so that a client is always able to
4894                // obtain an effects certificate at the current epoch.
4895                //
4896                // Why is this necessary? Consider the following case:
4897                // - assume there are 4 validators
4898                // - Quorum driver gets 2 signed effects before reconfig halt
4899                // - The tx makes it into final checkpoint.
4900                // - 2 validators go away and are replaced in the new epoch.
4901                // - The new epoch begins.
4902                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
4903                //   because it may not be able to reach either of the 2 former validators.
4904                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
4905                //   epoch, the QD can make a new effects cert and return it to the client.
4906                //
4907                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
4908                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
4909                // the case above, the Quorum Driver would return a proof of inclusion in the final
4910                // checkpoint, and this code would no longer be necessary.
4911                if effects.executed_epoch() != epoch_store.epoch() {
4912                    debug!(
4913                        tx_digest=?transaction_digest,
4914                        effects_epoch=?effects.executed_epoch(),
4915                        epoch=?epoch_store.epoch(),
4916                        "Re-signing the effects with the current epoch"
4917                    );
4918                }
4919                Ok(Some(self.sign_effects(effects, epoch_store)?))
4920            }
4921            None => Ok(None),
4922        }
4923    }
4924
4925    #[instrument(level = "trace", skip_all)]
4926    pub(crate) fn sign_effects(
4927        &self,
4928        effects: TransactionEffects,
4929        epoch_store: &Arc<AuthorityPerEpochStore>,
4930    ) -> SuiResult<VerifiedSignedTransactionEffects> {
4931        let tx_digest = *effects.transaction_digest();
4932        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4933            Some(sig) => {
4934                debug_assert!(sig.epoch == epoch_store.epoch());
4935                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4936            }
4937            _ => {
4938                let sig = AuthoritySignInfo::new(
4939                    epoch_store.epoch(),
4940                    &effects,
4941                    Intent::sui_app(IntentScope::TransactionEffects),
4942                    self.name,
4943                    &*self.secret,
4944                );
4945
4946                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4947
4948                epoch_store.insert_effects_digest_and_signature(
4949                    &tx_digest,
4950                    effects.digest(),
4951                    &sig,
4952                )?;
4953
4954                effects
4955            }
4956        };
4957
4958        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4959            signed_effects,
4960        ))
4961    }
4962
4963    // Returns coin objects for indexing for fullnode if indexing is enabled.
4964    #[instrument(level = "trace", skip_all)]
4965    fn fullnode_only_get_tx_coins_for_indexing(
4966        &self,
4967        effects: &TransactionEffects,
4968        inner_temporary_store: &InnerTemporaryStore,
4969        epoch_store: &Arc<AuthorityPerEpochStore>,
4970    ) -> Option<TxCoins> {
4971        if self.indexes.is_none() || self.is_validator(epoch_store) {
4972            return None;
4973        }
4974        let written_coin_objects = inner_temporary_store
4975            .written
4976            .iter()
4977            .filter_map(|(k, v)| {
4978                if v.is_coin() {
4979                    Some((*k, v.clone()))
4980                } else {
4981                    None
4982                }
4983            })
4984            .collect();
4985        let mut input_coin_objects = inner_temporary_store
4986            .input_objects
4987            .iter()
4988            .filter_map(|(k, v)| {
4989                if v.is_coin() {
4990                    Some((*k, v.clone()))
4991                } else {
4992                    None
4993                }
4994            })
4995            .collect::<ObjectMap>();
4996
4997        // Check for receiving objects that were actually used and modified during execution. Their
4998        // updated version will already showup in "written_coins" but their input isn't included in
4999        // the set of input objects in a inner_temporary_store.
5000        for (object_id, version) in effects.modified_at_versions() {
5001            if inner_temporary_store
5002                .loaded_runtime_objects
5003                .contains_key(&object_id)
5004                && let Some(object) = self
5005                    .get_object_store()
5006                    .get_object_by_key(&object_id, version)
5007                && object.is_coin()
5008            {
5009                input_coin_objects.insert(object_id, object);
5010            }
5011        }
5012
5013        Some((input_coin_objects, written_coin_objects))
5014    }
5015
5016    /// Get the TransactionEnvelope that currently locks the given object, if any.
5017    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5018    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5019    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5020    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5021    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5022    ///     or cannot find the transaction in transaction table, because of data race etc.
5023    #[instrument(level = "trace", skip_all)]
5024    pub async fn get_transaction_lock(
5025        &self,
5026        object_ref: &ObjectRef,
5027        epoch_store: &AuthorityPerEpochStore,
5028    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5029        let lock_info = self
5030            .get_object_cache_reader()
5031            .get_lock(*object_ref, epoch_store)?;
5032        let lock_info = match lock_info {
5033            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5034                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5035                    provided_obj_ref: *object_ref,
5036                    current_version: locked_ref.1,
5037                }
5038                .into());
5039            }
5040            ObjectLockStatus::Initialized => {
5041                return Ok(None);
5042            }
5043            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5044        };
5045
5046        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5047    }
5048
5049    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5050        self.get_object_cache_reader().get_objects(objects)
5051    }
5052
5053    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5054        self.get_object_cache_reader()
5055            .get_latest_object_ref_or_tombstone(object_id)
5056    }
5057
5058    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5059    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5060    ///
5061    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5062    /// will go through with only 2f+1 votes.
5063    ///
5064    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5065    /// value), or you risk halting the chain.
5066    pub fn set_override_protocol_upgrade_buffer_stake(
5067        &self,
5068        expected_epoch: EpochId,
5069        buffer_stake_bps: u64,
5070    ) -> SuiResult {
5071        let epoch_store = self.load_epoch_store_one_call_per_task();
5072        let actual_epoch = epoch_store.epoch();
5073        if actual_epoch != expected_epoch {
5074            return Err(SuiErrorKind::WrongEpoch {
5075                expected_epoch,
5076                actual_epoch,
5077            }
5078            .into());
5079        }
5080
5081        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5082    }
5083
5084    pub fn clear_override_protocol_upgrade_buffer_stake(
5085        &self,
5086        expected_epoch: EpochId,
5087    ) -> SuiResult {
5088        let epoch_store = self.load_epoch_store_one_call_per_task();
5089        let actual_epoch = epoch_store.epoch();
5090        if actual_epoch != expected_epoch {
5091            return Err(SuiErrorKind::WrongEpoch {
5092                expected_epoch,
5093                actual_epoch,
5094            }
5095            .into());
5096        }
5097
5098        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5099    }
5100
5101    /// Get the set of system packages that are compiled in to this build, if those packages are
5102    /// compatible with the current versions of those packages on-chain.
5103    pub async fn get_available_system_packages(
5104        &self,
5105        binary_config: &BinaryConfig,
5106    ) -> Vec<ObjectRef> {
5107        let mut results = vec![];
5108
5109        let system_packages = BuiltInFramework::iter_system_packages();
5110
5111        // Add extra framework packages during simtest
5112        #[cfg(msim)]
5113        let extra_packages = framework_injection::get_extra_packages(self.name);
5114        #[cfg(msim)]
5115        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5116
5117        for system_package in system_packages {
5118            let modules = system_package.modules().to_vec();
5119            // In simtests, we could override the current built-in framework packages.
5120            #[cfg(msim)]
5121            let modules =
5122                match framework_injection::get_override_modules(&system_package.id, self.name) {
5123                    Some(overrides) if overrides.is_empty() => continue,
5124                    Some(overrides) => overrides,
5125                    None => modules,
5126                };
5127
5128            let Some(obj_ref) = sui_framework::compare_system_package(
5129                &self.get_object_store(),
5130                &system_package.id,
5131                &modules,
5132                system_package.dependencies.to_vec(),
5133                binary_config,
5134            )
5135            .await
5136            else {
5137                return vec![];
5138            };
5139            results.push(obj_ref);
5140        }
5141
5142        results
5143    }
5144
5145    /// Return the new versions, module bytes, and dependencies for the packages that have been
5146    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5147    /// the binary, and performs the following checks:
5148    ///
5149    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5150    ///   required, and its contents are omitted from the output.
5151    /// - Whether the contents in the binary can form a package whose digest matches the input,
5152    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5153    ///   which case the contents are included in the output.
5154    ///
5155    /// If the current version of the framework can't be loaded, the binary does not contain the
5156    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5157    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5158    async fn get_system_package_bytes(
5159        &self,
5160        system_packages: Vec<ObjectRef>,
5161        binary_config: &BinaryConfig,
5162    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5163        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5164        let objects = self.get_objects(&ids).await;
5165
5166        let mut res = Vec::with_capacity(system_packages.len());
5167        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5168            let prev_transaction = match object {
5169                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5170                    // Skip this one because it doesn't need to be upgraded.
5171                    info!("Framework {} does not need updating", system_package_ref.0);
5172                    continue;
5173                }
5174
5175                Some(cur_object) => cur_object.previous_transaction,
5176                None => TransactionDigest::genesis_marker(),
5177            };
5178
5179            #[cfg(msim)]
5180            let SystemPackage {
5181                id: _,
5182                bytes,
5183                dependencies,
5184            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5185                .unwrap_or_else(|| {
5186                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5187                });
5188
5189            #[cfg(not(msim))]
5190            let SystemPackage {
5191                id: _,
5192                bytes,
5193                dependencies,
5194            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5195
5196            let modules: Vec<_> = bytes
5197                .iter()
5198                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5199                .collect();
5200
5201            let new_object = Object::new_system_package(
5202                &modules,
5203                system_package_ref.1,
5204                dependencies.clone(),
5205                prev_transaction,
5206            );
5207
5208            let new_ref = new_object.compute_object_reference();
5209            if new_ref != system_package_ref {
5210                if cfg!(msim) {
5211                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5212                    debug_fatal!(
5213                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5214                    );
5215                } else {
5216                    error!(
5217                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5218                    );
5219                }
5220                return None;
5221            }
5222
5223            res.push((system_package_ref.1, bytes, dependencies));
5224        }
5225
5226        Some(res)
5227    }
5228
5229    // TODO: delete once authority_capabilities_v2 is deployed everywhere
5230    fn is_protocol_version_supported_v1(
5231        current_protocol_version: ProtocolVersion,
5232        proposed_protocol_version: ProtocolVersion,
5233        protocol_config: &ProtocolConfig,
5234        committee: &Committee,
5235        capabilities: Vec<AuthorityCapabilitiesV1>,
5236        mut buffer_stake_bps: u64,
5237    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5238        if proposed_protocol_version > current_protocol_version + 1
5239            && !protocol_config.advance_to_highest_supported_protocol_version()
5240        {
5241            return None;
5242        }
5243
5244        if buffer_stake_bps > 10000 {
5245            warn!("clamping buffer_stake_bps to 10000");
5246            buffer_stake_bps = 10000;
5247        }
5248
5249        // For each validator, gather the protocol version and system packages that it would like
5250        // to upgrade to in the next epoch.
5251        let mut desired_upgrades: Vec<_> = capabilities
5252            .into_iter()
5253            .filter_map(|mut cap| {
5254                // A validator that lists no packages is voting against any change at all.
5255                if cap.available_system_packages.is_empty() {
5256                    return None;
5257                }
5258
5259                cap.available_system_packages.sort();
5260
5261                info!(
5262                    "validator {:?} supports {:?} with system packages: {:?}",
5263                    cap.authority.concise(),
5264                    cap.supported_protocol_versions,
5265                    cap.available_system_packages,
5266                );
5267
5268                // A validator that only supports the current protocol version is also voting
5269                // against any change, because framework upgrades always require a protocol version
5270                // bump.
5271                cap.supported_protocol_versions
5272                    .is_version_supported(proposed_protocol_version)
5273                    .then_some((cap.available_system_packages, cap.authority))
5274            })
5275            .collect();
5276
5277        // There can only be one set of votes that have a majority, find one if it exists.
5278        desired_upgrades.sort();
5279        desired_upgrades
5280            .into_iter()
5281            .chunk_by(|(packages, _authority)| packages.clone())
5282            .into_iter()
5283            .find_map(|(packages, group)| {
5284                // should have been filtered out earlier.
5285                assert!(!packages.is_empty());
5286
5287                let mut stake_aggregator: StakeAggregator<(), true> =
5288                    StakeAggregator::new(Arc::new(committee.clone()));
5289
5290                for (_, authority) in group {
5291                    stake_aggregator.insert_generic(authority, ());
5292                }
5293
5294                let total_votes = stake_aggregator.total_votes();
5295                let quorum_threshold = committee.quorum_threshold();
5296                let f = committee.total_votes() - committee.quorum_threshold();
5297
5298                // multiple by buffer_stake_bps / 10000, rounded up.
5299                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5300                let effective_threshold = quorum_threshold + buffer_stake;
5301
5302                info!(
5303                    ?total_votes,
5304                    ?quorum_threshold,
5305                    ?buffer_stake_bps,
5306                    ?effective_threshold,
5307                    ?proposed_protocol_version,
5308                    ?packages,
5309                    "support for upgrade"
5310                );
5311
5312                let has_support = total_votes >= effective_threshold;
5313                has_support.then_some((proposed_protocol_version, packages))
5314            })
5315    }
5316
5317    fn is_protocol_version_supported_v2(
5318        current_protocol_version: ProtocolVersion,
5319        proposed_protocol_version: ProtocolVersion,
5320        protocol_config: &ProtocolConfig,
5321        committee: &Committee,
5322        capabilities: Vec<AuthorityCapabilitiesV2>,
5323        mut buffer_stake_bps: u64,
5324    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5325        if proposed_protocol_version > current_protocol_version + 1
5326            && !protocol_config.advance_to_highest_supported_protocol_version()
5327        {
5328            return None;
5329        }
5330
5331        if buffer_stake_bps > 10000 {
5332            warn!("clamping buffer_stake_bps to 10000");
5333            buffer_stake_bps = 10000;
5334        }
5335
5336        // For each validator, gather the protocol version and system packages that it would like
5337        // to upgrade to in the next epoch.
5338        let mut desired_upgrades: Vec<_> = capabilities
5339            .into_iter()
5340            .filter_map(|mut cap| {
5341                // A validator that lists no packages is voting against any change at all.
5342                if cap.available_system_packages.is_empty() {
5343                    return None;
5344                }
5345
5346                cap.available_system_packages.sort();
5347
5348                info!(
5349                    "validator {:?} supports {:?} with system packages: {:?}",
5350                    cap.authority.concise(),
5351                    cap.supported_protocol_versions,
5352                    cap.available_system_packages,
5353                );
5354
5355                // A validator that only supports the current protocol version is also voting
5356                // against any change, because framework upgrades always require a protocol version
5357                // bump.
5358                cap.supported_protocol_versions
5359                    .get_version_digest(proposed_protocol_version)
5360                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5361            })
5362            .collect();
5363
5364        // There can only be one set of votes that have a majority, find one if it exists.
5365        desired_upgrades.sort();
5366        desired_upgrades
5367            .into_iter()
5368            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5369            .into_iter()
5370            .find_map(|((digest, packages), group)| {
5371                // should have been filtered out earlier.
5372                assert!(!packages.is_empty());
5373
5374                let mut stake_aggregator: StakeAggregator<(), true> =
5375                    StakeAggregator::new(Arc::new(committee.clone()));
5376
5377                for (_, _, authority) in group {
5378                    stake_aggregator.insert_generic(authority, ());
5379                }
5380
5381                let total_votes = stake_aggregator.total_votes();
5382                let quorum_threshold = committee.quorum_threshold();
5383                let f = committee.total_votes() - committee.quorum_threshold();
5384
5385                // multiple by buffer_stake_bps / 10000, rounded up.
5386                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5387                let effective_threshold = quorum_threshold + buffer_stake;
5388
5389                info!(
5390                    protocol_config_digest = ?digest,
5391                    ?total_votes,
5392                    ?quorum_threshold,
5393                    ?buffer_stake_bps,
5394                    ?effective_threshold,
5395                    ?proposed_protocol_version,
5396                    ?packages,
5397                    "support for upgrade"
5398                );
5399
5400                let has_support = total_votes >= effective_threshold;
5401                has_support.then_some((proposed_protocol_version, packages))
5402            })
5403    }
5404
5405    // TODO: delete once authority_capabilities_v2 is deployed everywhere
5406    fn choose_protocol_version_and_system_packages_v1(
5407        current_protocol_version: ProtocolVersion,
5408        protocol_config: &ProtocolConfig,
5409        committee: &Committee,
5410        capabilities: Vec<AuthorityCapabilitiesV1>,
5411        buffer_stake_bps: u64,
5412    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5413        let mut next_protocol_version = current_protocol_version;
5414        let mut system_packages = vec![];
5415
5416        while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
5417            current_protocol_version,
5418            next_protocol_version + 1,
5419            protocol_config,
5420            committee,
5421            capabilities.clone(),
5422            buffer_stake_bps,
5423        ) {
5424            next_protocol_version = version;
5425            system_packages = packages;
5426        }
5427
5428        (next_protocol_version, system_packages)
5429    }
5430
5431    fn choose_protocol_version_and_system_packages_v2(
5432        current_protocol_version: ProtocolVersion,
5433        protocol_config: &ProtocolConfig,
5434        committee: &Committee,
5435        capabilities: Vec<AuthorityCapabilitiesV2>,
5436        buffer_stake_bps: u64,
5437    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5438        let mut next_protocol_version = current_protocol_version;
5439        let mut system_packages = vec![];
5440
5441        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5442            current_protocol_version,
5443            next_protocol_version + 1,
5444            protocol_config,
5445            committee,
5446            capabilities.clone(),
5447            buffer_stake_bps,
5448        ) {
5449            next_protocol_version = version;
5450            system_packages = packages;
5451        }
5452
5453        (next_protocol_version, system_packages)
5454    }
5455
5456    #[instrument(level = "debug", skip_all)]
5457    fn create_authenticator_state_tx(
5458        &self,
5459        epoch_store: &Arc<AuthorityPerEpochStore>,
5460    ) -> Option<EndOfEpochTransactionKind> {
5461        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5462            info!("authenticator state transactions not enabled");
5463            return None;
5464        }
5465
5466        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5467        let tx = if authenticator_state_exists {
5468            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5469            let min_epoch =
5470                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5471            let authenticator_obj_initial_shared_version = epoch_store
5472                .epoch_start_config()
5473                .authenticator_obj_initial_shared_version()
5474                .expect("initial version must exist");
5475
5476            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5477                min_epoch,
5478                authenticator_obj_initial_shared_version,
5479            );
5480
5481            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5482
5483            tx
5484        } else {
5485            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5486            info!("Creating AuthenticatorStateCreate tx");
5487            tx
5488        };
5489        Some(tx)
5490    }
5491
5492    #[instrument(level = "debug", skip_all)]
5493    fn create_randomness_state_tx(
5494        &self,
5495        epoch_store: &Arc<AuthorityPerEpochStore>,
5496    ) -> Option<EndOfEpochTransactionKind> {
5497        if !epoch_store.protocol_config().random_beacon() {
5498            info!("randomness state transactions not enabled");
5499            return None;
5500        }
5501
5502        if epoch_store.randomness_state_exists() {
5503            return None;
5504        }
5505
5506        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5507        info!("Creating RandomnessStateCreate tx");
5508        Some(tx)
5509    }
5510
5511    #[instrument(level = "debug", skip_all)]
5512    fn create_accumulator_root_tx(
5513        &self,
5514        epoch_store: &Arc<AuthorityPerEpochStore>,
5515    ) -> Option<EndOfEpochTransactionKind> {
5516        if !epoch_store
5517            .protocol_config()
5518            .create_root_accumulator_object()
5519        {
5520            info!("accumulator root creation not enabled");
5521            return None;
5522        }
5523
5524        if epoch_store.accumulator_root_exists() {
5525            return None;
5526        }
5527
5528        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5529        info!("Creating AccumulatorRootCreate tx");
5530        Some(tx)
5531    }
5532
5533    #[instrument(level = "debug", skip_all)]
5534    fn create_coin_registry_tx(
5535        &self,
5536        epoch_store: &Arc<AuthorityPerEpochStore>,
5537    ) -> Option<EndOfEpochTransactionKind> {
5538        if !epoch_store.protocol_config().enable_coin_registry() {
5539            info!("coin registry not enabled");
5540            return None;
5541        }
5542
5543        if epoch_store.coin_registry_exists() {
5544            return None;
5545        }
5546
5547        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5548        info!("Creating CoinRegistryCreate tx");
5549        Some(tx)
5550    }
5551
5552    #[instrument(level = "debug", skip_all)]
5553    fn create_display_registry_tx(
5554        &self,
5555        epoch_store: &Arc<AuthorityPerEpochStore>,
5556    ) -> Option<EndOfEpochTransactionKind> {
5557        if !epoch_store.protocol_config().enable_display_registry() {
5558            info!("display registry not enabled");
5559            return None;
5560        }
5561
5562        if epoch_store.display_registry_exists() {
5563            return None;
5564        }
5565
5566        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5567        info!("Creating DisplayRegistryCreate tx");
5568        Some(tx)
5569    }
5570
5571    #[instrument(level = "debug", skip_all)]
5572    fn create_bridge_tx(
5573        &self,
5574        epoch_store: &Arc<AuthorityPerEpochStore>,
5575    ) -> Option<EndOfEpochTransactionKind> {
5576        if !epoch_store.protocol_config().enable_bridge() {
5577            info!("bridge not enabled");
5578            return None;
5579        }
5580        if epoch_store.bridge_exists() {
5581            return None;
5582        }
5583        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5584        info!("Creating Bridge Create tx");
5585        Some(tx)
5586    }
5587
5588    #[instrument(level = "debug", skip_all)]
5589    fn init_bridge_committee_tx(
5590        &self,
5591        epoch_store: &Arc<AuthorityPerEpochStore>,
5592    ) -> Option<EndOfEpochTransactionKind> {
5593        if !epoch_store.protocol_config().enable_bridge() {
5594            info!("bridge not enabled");
5595            return None;
5596        }
5597        if !epoch_store
5598            .protocol_config()
5599            .should_try_to_finalize_bridge_committee()
5600        {
5601            info!("should not try to finalize bridge committee yet");
5602            return None;
5603        }
5604        // Only create this transaction if bridge exists
5605        if !epoch_store.bridge_exists() {
5606            return None;
5607        }
5608
5609        if epoch_store.bridge_committee_initiated() {
5610            return None;
5611        }
5612
5613        let bridge_initial_shared_version = epoch_store
5614            .epoch_start_config()
5615            .bridge_obj_initial_shared_version()
5616            .expect("initial version must exist");
5617        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5618        info!("Init Bridge committee tx");
5619        Some(tx)
5620    }
5621
5622    #[instrument(level = "debug", skip_all)]
5623    fn create_deny_list_state_tx(
5624        &self,
5625        epoch_store: &Arc<AuthorityPerEpochStore>,
5626    ) -> Option<EndOfEpochTransactionKind> {
5627        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5628            return None;
5629        }
5630
5631        if epoch_store.coin_deny_list_state_exists() {
5632            return None;
5633        }
5634
5635        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5636        info!("Creating DenyListStateCreate tx");
5637        Some(tx)
5638    }
5639
5640    #[instrument(level = "debug", skip_all)]
5641    fn create_execution_time_observations_tx(
5642        &self,
5643        epoch_store: &Arc<AuthorityPerEpochStore>,
5644        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5645        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5646    ) -> Option<EndOfEpochTransactionKind> {
5647        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5648            .protocol_config()
5649            .per_object_congestion_control_mode()
5650        else {
5651            return None;
5652        };
5653
5654        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5655        // execution time observations for commands in these checkpoints.
5656        let start_checkpoint = std::cmp::max(
5657            last_checkpoint_before_end_of_epoch
5658                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5659            // If we have <N checkpoints in the epoch, use all of them.
5660            epoch_store
5661                .epoch()
5662                .checked_sub(1)
5663                .map(|prev_epoch| {
5664                    self.checkpoint_store
5665                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
5666                        .expect("typed store must not fail")
5667                        .expect(
5668                            "sequence number of last checkpoint of preceding epoch must be saved",
5669                        )
5670                        + 1
5671                })
5672                .unwrap_or(1),
5673        );
5674        info!(
5675            "reading checkpoint range {:?}..={:?}",
5676            start_checkpoint, last_checkpoint_before_end_of_epoch
5677        );
5678        let sequence_numbers =
5679            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5680        let contents_digests: Vec<_> = self
5681            .checkpoint_store
5682            .multi_get_locally_computed_checkpoints(&sequence_numbers)
5683            .expect("typed store must not fail")
5684            .into_iter()
5685            .zip(sequence_numbers)
5686            .map(|(maybe_checkpoint, sequence_number)| {
5687                if let Some(checkpoint) = maybe_checkpoint {
5688                    checkpoint.content_digest
5689                } else {
5690                    // If locally computed checkpoint summary was already pruned, load the
5691                    // certified checkpoint.
5692                    self.checkpoint_store
5693                        .get_checkpoint_by_sequence_number(sequence_number)
5694                        .expect("typed store must not fail")
5695                        .unwrap_or_else(|| {
5696                            fatal!("preceding checkpoints must exist by end of epoch")
5697                        })
5698                        .data()
5699                        .content_digest
5700                }
5701            })
5702            .collect();
5703        let tx_digests: Vec<_> = self
5704            .checkpoint_store
5705            .multi_get_checkpoint_content(&contents_digests)
5706            .expect("typed store must not fail")
5707            .into_iter()
5708            .flat_map(|maybe_contents| {
5709                maybe_contents
5710                    .expect("preceding checkpoint contents must exist by end of epoch")
5711                    .into_inner()
5712                    .into_iter()
5713                    .map(|digests| digests.transaction)
5714            })
5715            .collect();
5716        let included_execution_time_observations: HashSet<_> = self
5717            .get_transaction_cache_reader()
5718            .multi_get_transaction_blocks(&tx_digests)
5719            .into_iter()
5720            .flat_map(|maybe_tx| {
5721                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5722                    .expect("preceding transaction must exist by end of epoch")
5723                    .transaction_data()
5724                    .kind()
5725                {
5726                    #[allow(clippy::unnecessary_to_owned)]
5727                    itertools::Either::Left(
5728                        ptb.commands
5729                            .to_owned()
5730                            .into_iter()
5731                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5732                    )
5733                } else {
5734                    itertools::Either::Right(std::iter::empty())
5735                }
5736            })
5737            .chain(end_of_epoch_observation_keys.into_iter())
5738            .collect();
5739
5740        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5741            epoch_store
5742                .get_end_of_epoch_execution_time_observations()
5743                .filter_and_sort_v1(
5744                    |(key, _)| included_execution_time_observations.contains(key),
5745                    params.stored_observations_limit.try_into().unwrap(),
5746                ),
5747        );
5748        info!("Creating StoreExecutionTimeObservations tx");
5749        Some(tx)
5750    }
5751
5752    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
5753    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
5754    /// formed and executed by CheckpointExecutor.
5755    ///
5756    /// When a framework upgraded has been decided on, but the validator does not have the new
5757    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
5758    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
5759    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
5760    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
5761    /// by CheckpointExecutor.
5762    #[instrument(level = "error", skip_all)]
5763    pub async fn create_and_execute_advance_epoch_tx(
5764        &self,
5765        epoch_store: &Arc<AuthorityPerEpochStore>,
5766        gas_cost_summary: &GasCostSummary,
5767        checkpoint: CheckpointSequenceNumber,
5768        epoch_start_timestamp_ms: CheckpointTimestamp,
5769        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5770        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
5771        // >1 checkpoint.
5772        last_checkpoint: CheckpointSequenceNumber,
5773    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5774        let mut txns = Vec::new();
5775
5776        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5777            txns.push(tx);
5778        }
5779        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5780            txns.push(tx);
5781        }
5782        if let Some(tx) = self.create_bridge_tx(epoch_store) {
5783            txns.push(tx);
5784        }
5785        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5786            txns.push(tx);
5787        }
5788        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5789            txns.push(tx);
5790        }
5791        if let Some(tx) = self.create_execution_time_observations_tx(
5792            epoch_store,
5793            end_of_epoch_observation_keys,
5794            last_checkpoint,
5795        ) {
5796            txns.push(tx);
5797        }
5798        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5799            txns.push(tx);
5800        }
5801
5802        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5803            txns.push(tx);
5804        }
5805
5806        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5807            txns.push(tx);
5808        }
5809
5810        let next_epoch = epoch_store.epoch() + 1;
5811
5812        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5813
5814        let (next_epoch_protocol_version, next_epoch_system_packages) =
5815            if epoch_store.protocol_config().authority_capabilities_v2() {
5816                Self::choose_protocol_version_and_system_packages_v2(
5817                    epoch_store.protocol_version(),
5818                    epoch_store.protocol_config(),
5819                    epoch_store.committee(),
5820                    epoch_store
5821                        .get_capabilities_v2()
5822                        .expect("read capabilities from db cannot fail"),
5823                    buffer_stake_bps,
5824                )
5825            } else {
5826                Self::choose_protocol_version_and_system_packages_v1(
5827                    epoch_store.protocol_version(),
5828                    epoch_store.protocol_config(),
5829                    epoch_store.committee(),
5830                    epoch_store
5831                        .get_capabilities_v1()
5832                        .expect("read capabilities from db cannot fail"),
5833                    buffer_stake_bps,
5834                )
5835            };
5836
5837        // since system packages are created during the current epoch, they should abide by the
5838        // rules of the current epoch, including the current epoch's max Move binary format version
5839        let config = epoch_store.protocol_config();
5840        let binary_config = config.binary_config(None);
5841        let Some(next_epoch_system_package_bytes) = self
5842            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5843            .await
5844        else {
5845            if next_epoch_protocol_version <= ProtocolVersion::MAX {
5846                // This case should only be hit if the validator supports the new protocol version,
5847                // but carries a different framework. The validator should still be able to
5848                // reconfigure, as the correct framework will be installed by the change epoch txn.
5849                debug_fatal!(
5850                    "upgraded system packages {:?} are not locally available, cannot create \
5851                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
5852                    next_epoch_system_packages
5853                );
5854            } else {
5855                error!(
5856                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5857                    next_epoch_protocol_version
5858                );
5859            }
5860            // the checkpoint builder will keep retrying forever when it hits this error.
5861            // Eventually, one of two things will happen:
5862            // - The operator will upgrade this binary to one that has the new packages locally,
5863            //   and this function will succeed.
5864            // - The final checkpoint will be certified by other validators, we will receive it via
5865            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
5866            //   and most likely shut down in the new epoch (this validator likely doesn't support
5867            //   the new protocol version, or else it should have had the packages.)
5868            return Err(CheckpointBuilderError::SystemPackagesMissing);
5869        };
5870
5871        let tx = if epoch_store
5872            .protocol_config()
5873            .end_of_epoch_transaction_supported()
5874        {
5875            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5876                next_epoch,
5877                next_epoch_protocol_version,
5878                gas_cost_summary.storage_cost,
5879                gas_cost_summary.computation_cost,
5880                gas_cost_summary.storage_rebate,
5881                gas_cost_summary.non_refundable_storage_fee,
5882                epoch_start_timestamp_ms,
5883                next_epoch_system_package_bytes,
5884            ));
5885
5886            VerifiedTransaction::new_end_of_epoch_transaction(txns)
5887        } else {
5888            VerifiedTransaction::new_change_epoch(
5889                next_epoch,
5890                next_epoch_protocol_version,
5891                gas_cost_summary.storage_cost,
5892                gas_cost_summary.computation_cost,
5893                gas_cost_summary.storage_rebate,
5894                gas_cost_summary.non_refundable_storage_fee,
5895                epoch_start_timestamp_ms,
5896                next_epoch_system_package_bytes,
5897            )
5898        };
5899
5900        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5901            tx.clone(),
5902            epoch_store.epoch(),
5903            checkpoint,
5904        );
5905
5906        let tx_digest = executable_tx.digest();
5907
5908        info!(
5909            ?next_epoch,
5910            ?next_epoch_protocol_version,
5911            ?next_epoch_system_packages,
5912            computation_cost=?gas_cost_summary.computation_cost,
5913            storage_cost=?gas_cost_summary.storage_cost,
5914            storage_rebate=?gas_cost_summary.storage_rebate,
5915            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5916            ?tx_digest,
5917            "Creating advance epoch transaction"
5918        );
5919
5920        fail_point_async!("change_epoch_tx_delay");
5921        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5922
5923        // The tx could have been executed by state sync already - if so simply return an error.
5924        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
5925        if self
5926            .get_transaction_cache_reader()
5927            .is_tx_already_executed(tx_digest)
5928        {
5929            warn!("change epoch tx has already been executed via state sync");
5930            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
5931        }
5932
5933        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
5934        else {
5935            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
5936        };
5937
5938        // We must manually assign the shared object versions to the transaction before executing it.
5939        // This is because we do not sequence end-of-epoch transactions through consensus.
5940        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
5941            self.get_object_cache_reader().as_ref(),
5942            std::iter::once(&Schedulable::Transaction(&executable_tx)),
5943        )?;
5944
5945        assert_eq!(assigned_versions.0.len(), 1);
5946        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
5947
5948        let input_objects = self.read_objects_for_execution(
5949            &tx_lock,
5950            &executable_tx,
5951            assigned_versions,
5952            epoch_store,
5953        )?;
5954
5955        let (transaction_outputs, _timings, _execution_error_opt) = self
5956            .execute_certificate(
5957                &execution_guard,
5958                &executable_tx,
5959                input_objects,
5960                None,
5961                BalanceWithdrawStatus::NoWithdraw,
5962                epoch_store,
5963            )
5964            .unwrap();
5965        let system_obj = get_sui_system_state(&transaction_outputs.written)
5966            .expect("change epoch tx must write to system object");
5967
5968        let effects = transaction_outputs.effects;
5969        // We must write tx and effects to the state sync tables so that state sync is able to
5970        // deliver to the transaction to CheckpointExecutor after it is included in a certified
5971        // checkpoint.
5972        self.get_state_sync_store()
5973            .insert_transaction_and_effects(&tx, &effects);
5974
5975        info!(
5976            "Effects summary of the change epoch transaction: {:?}",
5977            effects.summary_for_debug()
5978        );
5979        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5980        // The change epoch transaction cannot fail to execute.
5981        assert!(effects.status().is_ok());
5982        Ok((system_obj, effects))
5983    }
5984
5985    #[instrument(level = "error", skip_all)]
5986    async fn reopen_epoch_db(
5987        &self,
5988        cur_epoch_store: &AuthorityPerEpochStore,
5989        new_committee: Committee,
5990        epoch_start_configuration: EpochStartConfiguration,
5991        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5992        epoch_last_checkpoint: CheckpointSequenceNumber,
5993    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
5994        let new_epoch = new_committee.epoch;
5995        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5996        assert_eq!(
5997            epoch_start_configuration.epoch_start_state().epoch(),
5998            new_committee.epoch
5999        );
6000        fail_point!("before-open-new-epoch-store");
6001        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6002            self.name,
6003            new_committee,
6004            epoch_start_configuration,
6005            self.get_backing_package_store().clone(),
6006            self.get_object_store().clone(),
6007            expensive_safety_check_config,
6008            epoch_last_checkpoint,
6009        )?;
6010        self.epoch_store.store(new_epoch_store.clone());
6011        Ok(new_epoch_store)
6012    }
6013
6014    #[cfg(test)]
6015    pub(crate) fn iter_live_object_set_for_testing(
6016        &self,
6017    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6018        let include_wrapped_object = !self
6019            .epoch_store_for_testing()
6020            .protocol_config()
6021            .simplified_unwrap_then_delete();
6022        self.get_global_state_hash_store()
6023            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6024    }
6025
6026    #[cfg(test)]
6027    pub(crate) fn shutdown_execution_for_test(&self) {
6028        self.tx_execution_shutdown
6029            .lock()
6030            .take()
6031            .unwrap()
6032            .send(())
6033            .unwrap();
6034    }
6035
6036    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6037    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6038        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6039        self.get_object_cache_reader()
6040            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6041        self.get_reconfig_api()
6042            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6043        Ok(())
6044    }
6045}
6046
6047pub struct RandomnessRoundReceiver {
6048    authority_state: Arc<AuthorityState>,
6049    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6050}
6051
6052impl RandomnessRoundReceiver {
6053    pub fn spawn(
6054        authority_state: Arc<AuthorityState>,
6055        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6056    ) -> JoinHandle<()> {
6057        let rrr = RandomnessRoundReceiver {
6058            authority_state,
6059            randomness_rx,
6060        };
6061        spawn_monitored_task!(rrr.run())
6062    }
6063
6064    async fn run(mut self) {
6065        info!("RandomnessRoundReceiver event loop started");
6066
6067        loop {
6068            tokio::select! {
6069                maybe_recv = self.randomness_rx.recv() => {
6070                    if let Some((epoch, round, bytes)) = maybe_recv {
6071                        self.handle_new_randomness(epoch, round, bytes).await;
6072                    } else {
6073                        break;
6074                    }
6075                },
6076            }
6077        }
6078
6079        info!("RandomnessRoundReceiver event loop ended");
6080    }
6081
6082    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6083    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6084        fail_point_async!("randomness-delay");
6085
6086        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6087        if epoch_store.epoch() != epoch {
6088            warn!(
6089                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6090                epoch_store.epoch()
6091            );
6092            return;
6093        }
6094        let key = TransactionKey::RandomnessRound(epoch, round);
6095        let transaction = VerifiedTransaction::new_randomness_state_update(
6096            epoch,
6097            round,
6098            bytes,
6099            epoch_store
6100                .epoch_start_config()
6101                .randomness_obj_initial_shared_version()
6102                .expect("randomness state obj must exist"),
6103        );
6104        debug!(
6105            "created randomness state update transaction with digest: {:?}",
6106            transaction.digest()
6107        );
6108        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6109        let digest = *transaction.digest();
6110
6111        // Randomness state updates contain the full bls signature for the random round,
6112        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6113        // persist this transaction. If we crash before its outputs are committed, this
6114        // ensures we will be able to re-execute it.
6115        self.authority_state
6116            .get_cache_commit()
6117            .persist_transaction(&transaction);
6118
6119        // Notify the scheduler that the transaction key now has a known digest
6120        if epoch_store.insert_tx_key(key, digest).is_err() {
6121            warn!("epoch ended while handling new randomness");
6122        }
6123
6124        let authority_state = self.authority_state.clone();
6125        spawn_monitored_task!(async move {
6126            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6127            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6128            // output of the RandomnessStateUpdate from the previous round.)
6129            //
6130            // We set a very long timeout so that in case this gets stuck for some reason, the
6131            // validator will eventually crash rather than continuing in a zombie mode.
6132            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6133            let result = tokio::time::timeout(
6134                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6135                authority_state
6136                    .get_transaction_cache_reader()
6137                    .notify_read_executed_effects(
6138                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6139                        &[digest],
6140                    ),
6141            )
6142            .await;
6143            let mut effects = match result {
6144                Ok(result) => result,
6145                Err(_) => {
6146                    if cfg!(debug_assertions) {
6147                        // Crash on randomness update execution timeout in debug builds.
6148                        panic!(
6149                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6150                        );
6151                    }
6152                    warn!(
6153                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6154                    );
6155                    // Continue waiting as long as necessary in non-debug builds.
6156                    authority_state
6157                        .get_transaction_cache_reader()
6158                        .notify_read_executed_effects(
6159                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6160                            &[digest],
6161                        )
6162                        .await
6163                }
6164            };
6165
6166            let effects = effects.pop().expect("should return effects");
6167            if *effects.status() != ExecutionStatus::Success {
6168                fatal!(
6169                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6170                );
6171            }
6172            debug!(
6173                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6174            );
6175        });
6176    }
6177}
6178
6179#[async_trait]
6180impl TransactionKeyValueStoreTrait for AuthorityState {
6181    #[instrument(skip(self))]
6182    async fn multi_get(
6183        &self,
6184        transactions: &[TransactionDigest],
6185        effects: &[TransactionDigest],
6186    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6187        let txns = if !transactions.is_empty() {
6188            self.get_transaction_cache_reader()
6189                .multi_get_transaction_blocks(transactions)
6190                .into_iter()
6191                .map(|t| t.map(|t| (*t).clone().into_inner()))
6192                .collect()
6193        } else {
6194            vec![]
6195        };
6196
6197        let fx = if !effects.is_empty() {
6198            self.get_transaction_cache_reader()
6199                .multi_get_executed_effects(effects)
6200        } else {
6201            vec![]
6202        };
6203
6204        Ok((txns, fx))
6205    }
6206
6207    #[instrument(skip(self))]
6208    async fn multi_get_checkpoints(
6209        &self,
6210        checkpoint_summaries: &[CheckpointSequenceNumber],
6211        checkpoint_contents: &[CheckpointSequenceNumber],
6212        checkpoint_summaries_by_digest: &[CheckpointDigest],
6213    ) -> SuiResult<(
6214        Vec<Option<CertifiedCheckpointSummary>>,
6215        Vec<Option<CheckpointContents>>,
6216        Vec<Option<CertifiedCheckpointSummary>>,
6217    )> {
6218        // TODO: use multi-get methods if it ever becomes important (unlikely)
6219        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6220        let store = self.get_checkpoint_store();
6221        for seq in checkpoint_summaries {
6222            let checkpoint = store
6223                .get_checkpoint_by_sequence_number(*seq)?
6224                .map(|c| c.into_inner());
6225
6226            summaries.push(checkpoint);
6227        }
6228
6229        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6230        for seq in checkpoint_contents {
6231            let checkpoint = store
6232                .get_checkpoint_by_sequence_number(*seq)?
6233                .and_then(|summary| {
6234                    store
6235                        .get_checkpoint_contents(&summary.content_digest)
6236                        .expect("db read cannot fail")
6237                });
6238            contents.push(checkpoint);
6239        }
6240
6241        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6242        for digest in checkpoint_summaries_by_digest {
6243            let checkpoint = store
6244                .get_checkpoint_by_digest(digest)?
6245                .map(|c| c.into_inner());
6246            summaries_by_digest.push(checkpoint);
6247        }
6248        Ok((summaries, contents, summaries_by_digest))
6249    }
6250
6251    #[instrument(skip(self))]
6252    async fn deprecated_get_transaction_checkpoint(
6253        &self,
6254        digest: TransactionDigest,
6255    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6256        Ok(self
6257            .get_checkpoint_cache()
6258            .deprecated_get_transaction_checkpoint(&digest)
6259            .map(|(_epoch, checkpoint)| checkpoint))
6260    }
6261
6262    #[instrument(skip(self))]
6263    async fn get_object(
6264        &self,
6265        object_id: ObjectID,
6266        version: VersionNumber,
6267    ) -> SuiResult<Option<Object>> {
6268        Ok(self
6269            .get_object_cache_reader()
6270            .get_object_by_key(&object_id, version))
6271    }
6272
6273    #[instrument(skip_all)]
6274    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6275        Ok(self
6276            .get_object_cache_reader()
6277            .multi_get_objects_by_key(object_keys))
6278    }
6279
6280    #[instrument(skip(self))]
6281    async fn multi_get_transaction_checkpoint(
6282        &self,
6283        digests: &[TransactionDigest],
6284    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6285        let res = self
6286            .get_checkpoint_cache()
6287            .deprecated_multi_get_transaction_checkpoint(digests);
6288
6289        Ok(res
6290            .into_iter()
6291            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6292            .collect())
6293    }
6294
6295    #[instrument(skip(self))]
6296    async fn multi_get_events_by_tx_digests(
6297        &self,
6298        digests: &[TransactionDigest],
6299    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6300        if digests.is_empty() {
6301            return Ok(vec![]);
6302        }
6303
6304        Ok(self
6305            .get_transaction_cache_reader()
6306            .multi_get_events(digests))
6307    }
6308}
6309
6310#[cfg(msim)]
6311pub mod framework_injection {
6312    use move_binary_format::CompiledModule;
6313    use std::collections::BTreeMap;
6314    use std::{cell::RefCell, collections::BTreeSet};
6315    use sui_framework::{BuiltInFramework, SystemPackage};
6316    use sui_types::base_types::{AuthorityName, ObjectID};
6317    use sui_types::is_system_package;
6318
6319    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6320
6321    // Thread local cache because all simtests run in a single unique thread.
6322    thread_local! {
6323        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6324    }
6325
6326    type Framework = Vec<CompiledModule>;
6327
6328    pub type PackageUpgradeCallback =
6329        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6330
6331    enum PackageOverrideConfig {
6332        Global(Framework),
6333        PerValidator(PackageUpgradeCallback),
6334    }
6335
6336    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6337        modules
6338            .iter()
6339            .map(|m| {
6340                let mut buf = Vec::new();
6341                m.serialize_with_version(m.version, &mut buf).unwrap();
6342                buf
6343            })
6344            .collect()
6345    }
6346
6347    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6348        OVERRIDE.with(|bs| {
6349            bs.borrow_mut()
6350                .insert(package_id, PackageOverrideConfig::Global(modules))
6351        });
6352    }
6353
6354    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6355        OVERRIDE.with(|bs| {
6356            bs.borrow_mut()
6357                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6358        });
6359    }
6360
6361    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6362        OVERRIDE.with(|bs| {
6363            let mut new_packages_not_to_include: BTreeSet<_> =
6364                BuiltInFramework::all_package_ids().into_iter().collect();
6365            for pkg in &packages {
6366                new_packages_not_to_include.remove(&pkg.id);
6367            }
6368            for pkg in packages {
6369                bs.borrow_mut()
6370                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6371            }
6372            for empty_pkg in new_packages_not_to_include {
6373                bs.borrow_mut()
6374                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6375            }
6376        });
6377    }
6378
6379    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6380        OVERRIDE.with(|cfg| {
6381            cfg.borrow().get(package_id).and_then(|entry| match entry {
6382                PackageOverrideConfig::Global(framework) => {
6383                    Some(compiled_modules_to_bytes(framework))
6384                }
6385                PackageOverrideConfig::PerValidator(func) => {
6386                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6387                }
6388            })
6389        })
6390    }
6391
6392    pub fn get_override_modules(
6393        package_id: &ObjectID,
6394        name: AuthorityName,
6395    ) -> Option<Vec<CompiledModule>> {
6396        OVERRIDE.with(|cfg| {
6397            cfg.borrow().get(package_id).and_then(|entry| match entry {
6398                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6399                PackageOverrideConfig::PerValidator(func) => func(name),
6400            })
6401        })
6402    }
6403
6404    pub fn get_override_system_package(
6405        package_id: &ObjectID,
6406        name: AuthorityName,
6407    ) -> Option<SystemPackage> {
6408        let bytes = get_override_bytes(package_id, name)?;
6409        let dependencies = if is_system_package(*package_id) {
6410            BuiltInFramework::get_package_by_id(package_id)
6411                .dependencies
6412                .to_vec()
6413        } else {
6414            // Assume that entirely new injected packages depend on all existing system packages.
6415            BuiltInFramework::all_package_ids()
6416        };
6417        Some(SystemPackage {
6418            id: *package_id,
6419            bytes,
6420            dependencies,
6421        })
6422    }
6423
6424    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6425        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6426        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6427            cfg.borrow()
6428                .keys()
6429                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6430                .collect()
6431        });
6432
6433        extra
6434            .into_iter()
6435            .map(|package| SystemPackage {
6436                id: package,
6437                bytes: get_override_bytes(&package, name).unwrap(),
6438                dependencies: BuiltInFramework::all_package_ids(),
6439            })
6440            .collect()
6441    }
6442}
6443
6444#[derive(Debug, Serialize, Deserialize, Clone)]
6445pub struct ObjDumpFormat {
6446    pub id: ObjectID,
6447    pub version: VersionNumber,
6448    pub digest: ObjectDigest,
6449    pub object: Object,
6450}
6451
6452impl ObjDumpFormat {
6453    fn new(object: Object) -> Self {
6454        let oref = object.compute_object_reference();
6455        Self {
6456            id: oref.0,
6457            version: oref.1,
6458            digest: oref.2,
6459            object,
6460        }
6461    }
6462}
6463
6464#[derive(Debug, Serialize, Deserialize, Clone)]
6465pub struct NodeStateDump {
6466    pub tx_digest: TransactionDigest,
6467    pub sender_signed_data: SenderSignedData,
6468    pub executed_epoch: u64,
6469    pub reference_gas_price: u64,
6470    pub protocol_version: u64,
6471    pub epoch_start_timestamp_ms: u64,
6472    pub computed_effects: TransactionEffects,
6473    pub expected_effects_digest: TransactionEffectsDigest,
6474    pub relevant_system_packages: Vec<ObjDumpFormat>,
6475    pub shared_objects: Vec<ObjDumpFormat>,
6476    pub loaded_child_objects: Vec<ObjDumpFormat>,
6477    pub modified_at_versions: Vec<ObjDumpFormat>,
6478    pub runtime_reads: Vec<ObjDumpFormat>,
6479    pub input_objects: Vec<ObjDumpFormat>,
6480}
6481
6482impl NodeStateDump {
6483    pub fn new(
6484        tx_digest: &TransactionDigest,
6485        effects: &TransactionEffects,
6486        expected_effects_digest: TransactionEffectsDigest,
6487        object_store: &dyn ObjectStore,
6488        epoch_store: &Arc<AuthorityPerEpochStore>,
6489        inner_temporary_store: &InnerTemporaryStore,
6490        certificate: &VerifiedExecutableTransaction,
6491    ) -> SuiResult<Self> {
6492        // Epoch info
6493        let executed_epoch = epoch_store.epoch();
6494        let reference_gas_price = epoch_store.reference_gas_price();
6495        let epoch_start_config = epoch_store.epoch_start_config();
6496        let protocol_version = epoch_store.protocol_version().as_u64();
6497        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6498
6499        // Record all system packages at this version
6500        let mut relevant_system_packages = Vec::new();
6501        for sys_package_id in BuiltInFramework::all_package_ids() {
6502            if let Some(w) = object_store.get_object(&sys_package_id) {
6503                relevant_system_packages.push(ObjDumpFormat::new(w))
6504            }
6505        }
6506
6507        // Record all the shared objects
6508        let mut shared_objects = Vec::new();
6509        for kind in effects.input_consensus_objects() {
6510            match kind {
6511                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6512                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6513                        shared_objects.push(ObjDumpFormat::new(w))
6514                    }
6515                }
6516                InputConsensusObject::ReadConsensusStreamEnded(..)
6517                | InputConsensusObject::MutateConsensusStreamEnded(..)
6518                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6519            }
6520        }
6521
6522        // Record all loaded child objects
6523        // Child objects which are read but not mutated are not tracked anywhere else
6524        let mut loaded_child_objects = Vec::new();
6525        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6526            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6527                loaded_child_objects.push(ObjDumpFormat::new(w))
6528            }
6529        }
6530
6531        // Record all modified objects
6532        let mut modified_at_versions = Vec::new();
6533        for (id, ver) in effects.modified_at_versions() {
6534            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6535                modified_at_versions.push(ObjDumpFormat::new(w))
6536            }
6537        }
6538
6539        // Packages read at runtime, which were not previously loaded into the temoorary store
6540        // Some packages may be fetched at runtime and wont show up in input objects
6541        let mut runtime_reads = Vec::new();
6542        for obj in inner_temporary_store
6543            .runtime_packages_loaded_from_db
6544            .values()
6545        {
6546            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6547        }
6548
6549        // All other input objects should already be in `inner_temporary_store.objects`
6550
6551        Ok(Self {
6552            tx_digest: *tx_digest,
6553            executed_epoch,
6554            reference_gas_price,
6555            epoch_start_timestamp_ms,
6556            protocol_version,
6557            relevant_system_packages,
6558            shared_objects,
6559            loaded_child_objects,
6560            modified_at_versions,
6561            runtime_reads,
6562            sender_signed_data: certificate.clone().into_message(),
6563            input_objects: inner_temporary_store
6564                .input_objects
6565                .values()
6566                .map(|o| ObjDumpFormat::new(o.clone()))
6567                .collect(),
6568            computed_effects: effects.clone(),
6569            expected_effects_digest,
6570        })
6571    }
6572
6573    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6574        let mut objects = Vec::new();
6575        objects.extend(self.relevant_system_packages.clone());
6576        objects.extend(self.shared_objects.clone());
6577        objects.extend(self.loaded_child_objects.clone());
6578        objects.extend(self.modified_at_versions.clone());
6579        objects.extend(self.runtime_reads.clone());
6580        objects.extend(self.input_objects.clone());
6581        objects
6582    }
6583
6584    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6585        let file_name = format!(
6586            "{}_{}_NODE_DUMP.json",
6587            self.tx_digest,
6588            AuthorityState::unixtime_now_ms()
6589        );
6590        let mut path = path.to_path_buf();
6591        path.push(&file_name);
6592        let mut file = File::create(path.clone())?;
6593        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6594        Ok(path)
6595    }
6596
6597    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6598        let file = File::open(path)?;
6599        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6600    }
6601}