1use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_common::{assert_reachable, fatal};
39use nonempty::NonEmpty;
40use parking_lot::Mutex;
41use prometheus::{
42 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43 register_histogram_vec_with_registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62 collections::{HashMap, HashSet},
63 fs,
64 pin::Pin,
65 str::FromStr,
66 sync::Arc,
67 vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::dynamic_field::visitor as DFV;
75use sui_types::execution::ExecutionOutput;
76use sui_types::execution::ExecutionTimeObservationKey;
77use sui_types::execution::ExecutionTiming;
78use sui_types::execution_params::ExecutionOrEarlyError;
79use sui_types::execution_params::FundsWithdrawStatus;
80use sui_types::execution_params::get_early_execution_error;
81use sui_types::inner_temporary_store::PackageStoreWithFallback;
82use sui_types::layout_resolver::LayoutResolver;
83use sui_types::layout_resolver::into_struct_layout;
84use sui_types::messages_consensus::AuthorityCapabilitiesV2;
85use sui_types::node_role::NodeRole;
86use sui_types::object::bounded_visitor::BoundedVisitor;
87use sui_types::storage::ChildObjectResolver;
88use sui_types::storage::InputKey;
89use sui_types::storage::OverlayBackingPackageStore;
90use sui_types::storage::TrackingBackingStore;
91use sui_types::traffic_control::{
92 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
93};
94use sui_types::transaction_executor::SimulateTransactionResult;
95use sui_types::transaction_executor::TransactionChecks;
96use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
97use tap::TapFallible;
98use tokio::sync::RwLock;
99use tokio::sync::mpsc::unbounded_channel;
100use tokio::sync::oneshot;
101use tokio::sync::watch::error::RecvError;
102use tokio::time::timeout;
103use tracing::{debug, error, info, instrument, warn};
104
105use self::authority_store::ExecutionLockWriteGuard;
106use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
107pub use authority_store::{AuthorityStore, ResolverWrapper};
108use mysten_metrics::{monitored_scope, spawn_monitored_task};
109
110use crate::jsonrpc_index::IndexStore;
111use crate::jsonrpc_index::{
112 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
113};
114use mysten_common::debug_fatal;
115use shared_crypto::intent::{Intent, IntentScope};
116use sui_config::genesis::Genesis;
117use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
118use sui_framework::{BuiltInFramework, SystemPackage};
119use sui_json_rpc_types::{
120 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
121 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
122 SuiTransactionBlockEvents, TransactionFilter,
123};
124use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
125use sui_rpc_store::Store as RpcStore;
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139 TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{InnerTemporaryStore, ObjectMap, TxCoins, WrittenObjects};
147use sui_types::message_envelope::Message;
148use sui_types::messages_checkpoint::{
149 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
150 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
151 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
152 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
153};
154use sui_types::messages_grpc::{
155 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
156 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
157};
158use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
159use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
160use sui_types::signature::GenericSignature;
161use sui_types::storage::{
162 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
163};
164use sui_types::sui_system_state::SuiSystemStateTrait;
165use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
166use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
167use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
168use sui_types::{
169 SUI_SYSTEM_ADDRESS,
170 base_types::*,
171 committee::Committee,
172 crypto::AuthoritySignature,
173 error::{SuiError, SuiResult},
174 object::{Object, ObjectRead},
175 transaction::*,
176};
177use sui_types::{TypeTag, is_system_package};
178use typed_store::TypedStoreError;
179use typed_store::rocks::StagedBatch;
180
181use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
182use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
183use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
184use crate::authority::authority_store_pruner::{
185 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
186};
187use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
188use crate::authority::epoch_start_configuration::EpochStartConfiguration;
189use crate::checkpoints::CheckpointStore;
190use crate::epoch::committee_store::CommitteeStore;
191use crate::execution_cache::{
192 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
193 ObjectCacheRead, StateSyncAPI,
194};
195use crate::execution_driver::execution_process;
196use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
197use crate::metrics::LatencyObserver;
198use crate::metrics::RateTracker;
199use crate::module_cache_metrics::ResolverMetrics;
200use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
201use crate::stake_aggregator::StakeAggregator;
202use crate::subscription_handler::SubscriptionHandler;
203use crate::transaction_input_loader::TransactionInputLoader;
204
205#[cfg(msim)]
206pub use crate::checkpoints::checkpoint_executor::utils::{
207 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
208};
209
210#[cfg(msim)]
211use sui_types::committee::CommitteeTrait;
212use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
213
214#[cfg(test)]
215#[path = "unit_tests/authority_tests.rs"]
216pub mod authority_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/transaction_tests.rs"]
220pub mod transaction_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_transaction_tests.rs"]
224mod batch_transaction_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/move_integration_tests.rs"]
228pub mod move_integration_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/gas_tests.rs"]
232mod gas_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/gas_data_tests.rs"]
236mod gas_data_tests;
237
238#[cfg(test)]
239#[path = "unit_tests/batch_verification_tests.rs"]
240mod batch_verification_tests;
241
242#[cfg(test)]
243#[path = "unit_tests/coin_deny_list_tests.rs"]
244mod coin_deny_list_tests;
245
246#[cfg(test)]
247#[path = "unit_tests/auth_unit_test_utils.rs"]
248pub mod auth_unit_test_utils;
249
250pub mod authority_test_utils;
251
252pub mod authority_per_epoch_store;
253pub mod authority_per_epoch_store_pruner;
254
255pub mod authority_store_pruner;
256pub mod authority_store_tables;
257pub mod authority_store_types;
258pub mod congestion_log;
259pub mod consensus_tx_status_cache;
260pub(crate) mod epoch_marker_key;
261pub mod epoch_start_configuration;
262pub mod execution_time_estimator;
263pub mod finalized_transactions_cache;
264pub mod shared_object_congestion_tracker;
265pub mod shared_object_version_manager;
266pub mod submitted_transaction_cache;
267pub mod test_authority_builder;
268pub mod transaction_deferral;
269pub mod transaction_reject_reason_cache;
270mod weighted_moving_average;
271
272pub(crate) mod authority_store;
273pub mod backpressure;
274
275pub struct AuthorityMetrics {
277 tx_orders: IntCounter,
278 total_certs: IntCounter,
279 total_effects: IntCounter,
280 pub shared_obj_tx: IntCounter,
282 sponsored_tx: IntCounter,
283 tx_already_processed: IntCounter,
284 num_input_objs: Histogram,
285 num_shared_objects: Histogram,
287 batch_size: Histogram,
288
289 authority_state_handle_vote_transaction_latency: Histogram,
290
291 internal_execution_latency: Histogram,
292 execution_load_input_objects_latency: Histogram,
293 prepare_certificate_latency: Histogram,
294 commit_certificate_latency: Histogram,
295 db_checkpoint_latency: Histogram,
296
297 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
299 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
300 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
301 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
302
303 pub(crate) execution_driver_executed_transactions: IntCounter,
304 pub(crate) execution_driver_paused_transactions: IntCounter,
305 pub(crate) execution_driver_dispatch_queue: IntGauge,
306 pub(crate) execution_queueing_delay_s: Histogram,
307 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
308 pub(crate) execution_gas_latency_ratio: Histogram,
309
310 pub(crate) skipped_consensus_txns: IntCounter,
311 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
312 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
313
314 pub(crate) authority_overload_status: IntGauge,
315 pub(crate) authority_load_shedding_percentage: IntGauge,
316
317 pub(crate) transaction_overload_sources: IntCounterVec,
318
319 post_processing_total_events_emitted: IntCounter,
321 post_processing_total_tx_indexed: IntCounter,
322 post_processing_total_tx_had_event_processed: IntCounter,
323 post_processing_total_failures: IntCounter,
324
325 pub consensus_handler_processed: IntCounterVec,
327 pub consensus_handler_processed_user_transactions: IntCounterVec,
328 pub consensus_handler_transaction_sizes: HistogramVec,
329 pub consensus_handler_deferred_transactions: IntCounter,
330 pub consensus_handler_congested_transactions: IntCounter,
331 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
332 pub consensus_handler_cancelled_transactions: IntCounter,
333 pub consensus_handler_dropped_transactions: IntCounter,
334 pub consensus_handler_max_object_costs: IntGaugeVec,
335 pub consensus_committed_subdags: IntCounterVec,
336 pub accumulator_deposits: IntCounter,
337 pub accumulator_withdrawals: IntCounter,
338 pub consensus_committed_messages: IntGaugeVec,
339 pub consensus_committed_user_transactions: IntGaugeVec,
340 pub consensus_finalized_user_transactions: IntGaugeVec,
341 pub consensus_rejected_user_transactions: IntGaugeVec,
342 pub consensus_calculated_throughput: IntGauge,
343 pub consensus_calculated_throughput_profile: IntGauge,
344 pub consensus_block_handler_block_processed: IntCounter,
345 pub consensus_block_handler_txn_processed: IntCounterVec,
346 pub consensus_block_handler_fastpath_executions: IntCounter,
347 pub consensus_timestamp_bias: Histogram,
348
349 pub execution_metrics: Arc<ExecutionMetrics>,
350
351 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
353
354 pub zklogin_sig_count: IntCounter,
356 pub multisig_sig_count: IntCounter,
358
359 pub execution_queueing_latency: LatencyObserver,
362
363 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
370
371 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
374}
375
376const POSITIVE_INT_BUCKETS: &[f64] = &[
378 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
379 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
380 7000000., 10000000.,
381];
382
383const LATENCY_SEC_BUCKETS: &[f64] = &[
384 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
385 10., 20., 30., 60., 90.,
386];
387
388const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
390 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
391 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
392];
393
394const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
397 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
398 2.0, 10.0, 30.0,
399];
400
401const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
402 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
403 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
404];
405
406pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
407
408pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
412
413impl AuthorityMetrics {
414 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
415 Self {
416 tx_orders: register_int_counter_with_registry!(
417 "total_transaction_orders",
418 "Total number of transaction orders",
419 registry,
420 )
421 .unwrap(),
422 total_certs: register_int_counter_with_registry!(
423 "total_transaction_certificates",
424 "Total number of transaction certificates handled",
425 registry,
426 )
427 .unwrap(),
428 total_effects: register_int_counter_with_registry!(
430 "total_transaction_effects",
431 "Total number of transaction effects produced",
432 registry,
433 )
434 .unwrap(),
435
436 shared_obj_tx: register_int_counter_with_registry!(
437 "num_shared_obj_tx",
438 "Number of transactions involving shared objects",
439 registry,
440 )
441 .unwrap(),
442
443 sponsored_tx: register_int_counter_with_registry!(
444 "num_sponsored_tx",
445 "Number of sponsored transactions",
446 registry,
447 )
448 .unwrap(),
449
450 tx_already_processed: register_int_counter_with_registry!(
451 "num_tx_already_processed",
452 "Number of transaction orders already processed previously",
453 registry,
454 )
455 .unwrap(),
456 num_input_objs: register_histogram_with_registry!(
457 "num_input_objects",
458 "Distribution of number of input TX objects per TX",
459 POSITIVE_INT_BUCKETS.to_vec(),
460 registry,
461 )
462 .unwrap(),
463 num_shared_objects: register_histogram_with_registry!(
464 "num_shared_objects",
465 "Number of shared input objects per TX",
466 POSITIVE_INT_BUCKETS.to_vec(),
467 registry,
468 )
469 .unwrap(),
470 batch_size: register_histogram_with_registry!(
471 "batch_size",
472 "Distribution of size of transaction batch",
473 POSITIVE_INT_BUCKETS.to_vec(),
474 registry,
475 )
476 .unwrap(),
477 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
478 "authority_state_handle_vote_transaction_latency",
479 "Latency of voting on transactions without signing",
480 LATENCY_SEC_BUCKETS.to_vec(),
481 registry,
482 )
483 .unwrap(),
484 internal_execution_latency: register_histogram_with_registry!(
485 "authority_state_internal_execution_latency",
486 "Latency of actual certificate executions",
487 LATENCY_SEC_BUCKETS.to_vec(),
488 registry,
489 )
490 .unwrap(),
491 execution_load_input_objects_latency: register_histogram_with_registry!(
492 "authority_state_execution_load_input_objects_latency",
493 "Latency of loading input objects for execution",
494 LOW_LATENCY_SEC_BUCKETS.to_vec(),
495 registry,
496 )
497 .unwrap(),
498 prepare_certificate_latency: register_histogram_with_registry!(
499 "authority_state_prepare_certificate_latency",
500 "Latency of executing certificates, before committing the results",
501 LATENCY_SEC_BUCKETS.to_vec(),
502 registry,
503 )
504 .unwrap(),
505 commit_certificate_latency: register_histogram_with_registry!(
506 "authority_state_commit_certificate_latency",
507 "Latency of committing certificate execution results",
508 LATENCY_SEC_BUCKETS.to_vec(),
509 registry,
510 )
511 .unwrap(),
512 db_checkpoint_latency: register_histogram_with_registry!(
513 "db_checkpoint_latency",
514 "Latency of checkpointing dbs",
515 LATENCY_SEC_BUCKETS.to_vec(),
516 registry,
517 ).unwrap(),
518 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
519 "transaction_manager_num_enqueued_certificates",
520 "Current number of certificates enqueued to ExecutionScheduler",
521 &["result"],
522 registry,
523 )
524 .unwrap(),
525 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
526 "transaction_manager_num_pending_certificates",
527 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
528 registry,
529 )
530 .unwrap(),
531 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
532 "transaction_manager_num_executing_certificates",
533 "Number of executing certificates, including queued and actually running certificates",
534 registry,
535 )
536 .unwrap(),
537 authority_overload_status: register_int_gauge_with_registry!(
538 "authority_overload_status",
539 "Whether authority is current experiencing overload and enters load shedding mode.",
540 registry)
541 .unwrap(),
542 authority_load_shedding_percentage: register_int_gauge_with_registry!(
543 "authority_load_shedding_percentage",
544 "The percentage of transactions is shed when the authority is in load shedding mode.",
545 registry)
546 .unwrap(),
547 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
548 "transaction_manager_transaction_queue_age_s",
549 "Time spent in waiting for transaction in the queue",
550 LATENCY_SEC_BUCKETS.to_vec(),
551 registry,
552 )
553 .unwrap(),
554 transaction_overload_sources: register_int_counter_vec_with_registry!(
555 "transaction_overload_sources",
556 "Number of times each source indicates transaction overload.",
557 &["source"],
558 registry)
559 .unwrap(),
560 execution_driver_executed_transactions: register_int_counter_with_registry!(
561 "execution_driver_executed_transactions",
562 "Cumulative number of transaction executed by execution driver",
563 registry,
564 )
565 .unwrap(),
566 execution_driver_paused_transactions: register_int_counter_with_registry!(
567 "execution_driver_paused_transactions",
568 "Cumulative number of transactions paused by execution driver",
569 registry,
570 )
571 .unwrap(),
572 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
573 "execution_driver_dispatch_queue",
574 "Number of transaction pending in execution driver dispatch queue",
575 registry,
576 )
577 .unwrap(),
578 execution_queueing_delay_s: register_histogram_with_registry!(
579 "execution_queueing_delay_s",
580 "Queueing delay between a transaction is ready for execution until it starts executing.",
581 LATENCY_SEC_BUCKETS.to_vec(),
582 registry
583 )
584 .unwrap(),
585 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
586 "prepare_cert_gas_latency_ratio",
587 "The ratio of computation gas divided by VM execution latency.",
588 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
589 registry
590 )
591 .unwrap(),
592 execution_gas_latency_ratio: register_histogram_with_registry!(
593 "execution_gas_latency_ratio",
594 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
595 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
596 registry
597 )
598 .unwrap(),
599 skipped_consensus_txns: register_int_counter_with_registry!(
600 "skipped_consensus_txns",
601 "Total number of consensus transactions skipped",
602 registry,
603 )
604 .unwrap(),
605 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
606 "skipped_consensus_txns_cache_hit",
607 "Total number of consensus transactions skipped because of local cache hit",
608 registry,
609 )
610 .unwrap(),
611 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
612 "consensus_handler_duplicate_tx_count",
613 "Number of times each transaction appears in its first consensus commit",
614 POSITIVE_INT_BUCKETS.to_vec(),
615 registry,
616 )
617 .unwrap(),
618 post_processing_total_events_emitted: register_int_counter_with_registry!(
619 "post_processing_total_events_emitted",
620 "Total number of events emitted in post processing",
621 registry,
622 )
623 .unwrap(),
624 post_processing_total_tx_indexed: register_int_counter_with_registry!(
625 "post_processing_total_tx_indexed",
626 "Total number of txes indexed in post processing",
627 registry,
628 )
629 .unwrap(),
630 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
631 "post_processing_total_tx_had_event_processed",
632 "Total number of txes finished event processing in post processing",
633 registry,
634 )
635 .unwrap(),
636 post_processing_total_failures: register_int_counter_with_registry!(
637 "post_processing_total_failures",
638 "Total number of failure in post processing",
639 registry,
640 )
641 .unwrap(),
642 consensus_handler_processed: register_int_counter_vec_with_registry!(
643 "consensus_handler_processed",
644 "Number of transactions processed by consensus handler, sliced by class and commit outcome (accepted/rejected)",
645 &["class", "outcome"],
646 registry
647 ).unwrap(),
648 consensus_handler_processed_user_transactions: register_int_counter_vec_with_registry!(
649 "consensus_handler_processed_user_transactions",
650 "Number of user transactions processed by consensus handler, sliced by commit outcome (accepted/rejected) and block author",
651 &["outcome", "authority"],
652 registry
653 ).unwrap(),
654 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
655 "consensus_handler_transaction_sizes",
656 "Sizes of each type of transactions processed by consensus handler, sliced by class and commit outcome (accepted/rejected)",
657 &["class", "outcome"],
658 POSITIVE_INT_BUCKETS.to_vec(),
659 registry
660 ).unwrap(),
661 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
662 "consensus_handler_deferred_transactions",
663 "Number of transactions deferred by consensus handler",
664 registry,
665 ).unwrap(),
666 consensus_handler_congested_transactions: register_int_counter_with_registry!(
667 "consensus_handler_congested_transactions",
668 "Number of transactions deferred by consensus handler due to congestion",
669 registry,
670 ).unwrap(),
671 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
672 "consensus_handler_unpaid_amplification_deferrals",
673 "Number of transactions deferred due to unpaid consensus amplification",
674 registry,
675 ).unwrap(),
676 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
677 "consensus_handler_cancelled_transactions",
678 "Number of transactions cancelled by consensus handler",
679 registry,
680 ).unwrap(),
681 consensus_handler_dropped_transactions: register_int_counter_with_registry!(
682 "consensus_handler_dropped_transactions",
683 "Number of transactions dropped by consensus handler",
684 registry,
685 ).unwrap(),
686 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
687 "consensus_handler_max_congestion_control_object_costs",
688 "Max object costs for congestion control in the current consensus commit",
689 &["commit_type"],
690 registry,
691 ).unwrap(),
692 consensus_committed_subdags: register_int_counter_vec_with_registry!(
693 "consensus_committed_subdags",
694 "Number of committed subdags, sliced by author",
695 &["authority"],
696 registry,
697 ).unwrap(),
698 accumulator_deposits: register_int_counter_with_registry!(
699 "accumulator_deposits",
700 "Total accumulator deposit (merge) events processed in settlement",
701 registry,
702 ).unwrap(),
703 accumulator_withdrawals: register_int_counter_with_registry!(
704 "accumulator_withdrawals",
705 "Total accumulator withdrawal (split) events processed in settlement",
706 registry,
707 ).unwrap(),
708 consensus_committed_messages: register_int_gauge_vec_with_registry!(
709 "consensus_committed_messages",
710 "Total number of committed consensus messages, sliced by author",
711 &["authority"],
712 registry,
713 ).unwrap(),
714 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
715 "consensus_committed_user_transactions",
716 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
717 &["authority"],
718 registry,
719 ).unwrap(),
720 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
721 "consensus_finalized_user_transactions",
722 "Number of user transactions finalized, sliced by submitter",
723 &["authority"],
724 registry,
725 ).unwrap(),
726 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
727 "consensus_rejected_user_transactions",
728 "Number of user transactions rejected, sliced by submitter",
729 &["authority"],
730 registry,
731 ).unwrap(),
732 execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
733 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
734 zklogin_sig_count: register_int_counter_with_registry!(
735 "zklogin_sig_count",
736 "Count of zkLogin signatures",
737 registry,
738 )
739 .unwrap(),
740 multisig_sig_count: register_int_counter_with_registry!(
741 "multisig_sig_count",
742 "Count of zkLogin signatures",
743 registry,
744 )
745 .unwrap(),
746 consensus_calculated_throughput: register_int_gauge_with_registry!(
747 "consensus_calculated_throughput",
748 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
749 registry,
750 ).unwrap(),
751 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
752 "consensus_calculated_throughput_profile",
753 "The current active calculated throughput profile",
754 registry
755 ).unwrap(),
756 consensus_block_handler_block_processed: register_int_counter_with_registry!(
757 "consensus_block_handler_block_processed",
758 "Number of blocks processed by consensus block handler.",
759 registry
760 ).unwrap(),
761 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
762 "consensus_block_handler_txn_processed",
763 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
764 &["outcome"],
765 registry
766 ).unwrap(),
767 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
768 "consensus_block_handler_fastpath_executions",
769 "Number of fastpath transactions sent for execution by consensus transaction handler",
770 registry,
771 ).unwrap(),
772 consensus_timestamp_bias: register_histogram_with_registry!(
773 "consensus_timestamp_bias",
774 "Bias/delay from consensus timestamp to system time",
775 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
776 registry
777 ).unwrap(),
778 execution_queueing_latency: LatencyObserver::new(),
779 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
781 }
782 }
783}
784
785pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
792
793#[derive(Debug, Clone)]
796pub struct ExecutionEnv {
797 pub assigned_versions: AssignedVersions,
799 pub expected_effects_digest: Option<TransactionEffectsDigest>,
802 pub funds_withdraw_status: FundsWithdrawStatus,
805 pub barrier_dependencies: Vec<TransactionDigest>,
808}
809
810impl Default for ExecutionEnv {
811 fn default() -> Self {
812 Self {
813 assigned_versions: Default::default(),
814 expected_effects_digest: None,
815 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
816 barrier_dependencies: Default::default(),
817 }
818 }
819}
820
821impl ExecutionEnv {
822 pub fn new() -> Self {
823 Default::default()
824 }
825
826 pub fn with_expected_effects_digest(
827 mut self,
828 expected_effects_digest: TransactionEffectsDigest,
829 ) -> Self {
830 self.expected_effects_digest = Some(expected_effects_digest);
831 self
832 }
833
834 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
835 self.assigned_versions = assigned_versions;
836 self
837 }
838
839 pub fn with_insufficient_funds(mut self) -> Self {
840 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
841 self
842 }
843
844 pub fn with_barrier_dependencies(
845 mut self,
846 barrier_dependencies: BTreeSet<TransactionDigest>,
847 ) -> Self {
848 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
849 self
850 }
851}
852
853#[derive(Debug)]
854pub struct ForkRecoveryState {
855 transaction_overrides:
857 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
858}
859
860impl Default for ForkRecoveryState {
861 fn default() -> Self {
862 Self {
863 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
864 }
865 }
866}
867
868impl ForkRecoveryState {
869 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
870 let Some(config) = config else {
871 return Ok(Self::default());
872 };
873
874 let mut transaction_overrides = HashMap::new();
875 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
876 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
877 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
878 })?;
879 let effects_digest =
880 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
881 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
882 })?;
883 transaction_overrides.insert(tx_digest, effects_digest);
884 }
885
886 Ok(Self {
887 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
888 })
889 }
890
891 pub fn get_transaction_override(
892 &self,
893 tx_digest: &TransactionDigest,
894 ) -> Option<TransactionEffectsDigest> {
895 self.transaction_overrides.read().get(tx_digest).copied()
896 }
897}
898
899pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
900
901pub struct AuthorityState {
902 pub name: AuthorityName,
905 pub secret: StableSyncAuthoritySigner,
907
908 input_loader: TransactionInputLoader,
910 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
911 coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
912
913 epoch_store: ArcSwap<AuthorityPerEpochStore>,
914
915 execution_lock: RwLock<EpochId>,
920
921 pub indexes: Option<Arc<IndexStore>>,
922 pub rpc_index: Option<Arc<RpcIndexStore>>,
923
924 pub subscription_handler: Arc<SubscriptionHandler>,
925 pub checkpoint_store: Arc<CheckpointStore>,
926
927 committee_store: Arc<CommitteeStore>,
928
929 execution_scheduler: Arc<ExecutionScheduler>,
931
932 #[allow(unused)]
934 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
935
936 pub metrics: Arc<AuthorityMetrics>,
937 _pruner: AuthorityStorePruner,
938 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
939
940 db_checkpoint_config: DBCheckpointConfig,
942
943 pub config: NodeConfig,
944
945 pub overload_info: AuthorityOverloadInfo,
947
948 chain_identifier: ChainIdentifier,
950
951 pub(crate) congestion_tracker: Arc<CongestionTracker>,
952
953 pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
955
956 pub traffic_controller: Option<Arc<TrafficController>>,
958
959 fork_recovery_state: Option<ForkRecoveryState>,
961
962 notify_epoch: tokio::sync::watch::Sender<EpochId>,
964
965 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
966 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
967
968 pending_post_processing:
972 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
973
974 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
977}
978
979impl AuthorityState {
986 pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
987 epoch_store.node_role()
988 }
989
990 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
991 epoch_store.node_role().is_validator()
992 }
993
994 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
995 epoch_store.node_role().is_fullnode()
996 }
997
998 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
999 &self.committee_store
1000 }
1001
1002 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1003 self.committee_store.clone()
1004 }
1005
1006 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
1007 &self.config.authority_overload_config
1008 }
1009
1010 pub fn get_epoch_state_commitments(
1011 &self,
1012 epoch: EpochId,
1013 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1014 self.checkpoint_store.get_epoch_state_commitments(epoch)
1015 }
1016
1017 fn pre_object_load_checks(
1020 &self,
1021 tx_data: &TransactionData,
1022 tx_signatures: &[GenericSignature],
1023 input_object_kinds: &[InputObjectKind],
1024 receiving_objects_refs: &[ObjectRef],
1025 protocol_config: &ProtocolConfig,
1026 ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1027 sui_transaction_checks::deny::check_transaction_for_signing(
1031 tx_data,
1032 tx_signatures,
1033 input_object_kinds,
1034 receiving_objects_refs,
1035 &self.config.transaction_deny_config,
1036 self.get_backing_package_store().as_ref(),
1037 )?;
1038
1039 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1040 self.chain_identifier,
1041 self.coin_reservation_resolver.as_ref(),
1042 )?;
1043
1044 self.execution_cache_trait_pointers
1045 .account_funds_read
1046 .check_amounts_available(&declared_withdrawals)?;
1047
1048 if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1049 let min_amounts =
1050 sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1051 self.execution_cache_trait_pointers
1052 .account_funds_read
1053 .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1054 }
1055
1056 Ok(declared_withdrawals)
1057 }
1058
1059 fn handle_transaction_deny_checks(
1060 &self,
1061 transaction: &VerifiedTransaction,
1062 epoch_store: &Arc<AuthorityPerEpochStore>,
1063 ) -> SuiResult<CheckedInputObjects> {
1064 let tx_digest = transaction.digest();
1065 let tx_data = transaction.data().transaction_data();
1066
1067 let input_object_kinds = tx_data.input_objects()?;
1068 let receiving_objects_refs = tx_data.receiving_objects();
1069
1070 self.pre_object_load_checks(
1071 tx_data,
1072 transaction.tx_signatures(),
1073 &input_object_kinds,
1074 &receiving_objects_refs,
1075 epoch_store.protocol_config(),
1076 )?;
1077
1078 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1079 Some(tx_digest),
1080 &input_object_kinds,
1081 &receiving_objects_refs,
1082 epoch_store.epoch(),
1083 )?;
1084
1085 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1086 epoch_store.protocol_config(),
1087 epoch_store.reference_gas_price(),
1088 tx_data,
1089 input_objects,
1090 &receiving_objects,
1091 &self.metrics.bytecode_verifier_metrics,
1092 &self.config.verifier_signing_config,
1093 )?;
1094
1095 self.handle_coin_deny_list_checks(
1096 tx_data,
1097 &checked_input_objects,
1098 &receiving_objects,
1099 epoch_store,
1100 )?;
1101
1102 Ok(checked_input_objects)
1103 }
1104
1105 fn handle_coin_deny_list_checks(
1106 &self,
1107 tx_data: &TransactionData,
1108 checked_input_objects: &CheckedInputObjects,
1109 receiving_objects: &ReceivingObjects,
1110 epoch_store: &Arc<AuthorityPerEpochStore>,
1111 ) -> SuiResult<()> {
1112 use sui_types::balance::Balance;
1113
1114 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1115 self.chain_identifier,
1116 self.coin_reservation_resolver.as_ref(),
1117 )?;
1118
1119 let funds_withdraw_types = declared_withdrawals
1120 .values()
1121 .filter_map(|(_, type_tag)| {
1122 Balance::maybe_get_balance_type_param(type_tag)
1123 .map(|ty| ty.to_canonical_string(false))
1124 })
1125 .collect::<BTreeSet<_>>();
1126
1127 if epoch_store.coin_deny_list_v1_enabled() {
1128 check_coin_deny_list_v1(
1129 tx_data.sender(),
1130 checked_input_objects,
1131 receiving_objects,
1132 funds_withdraw_types.clone(),
1133 &self.get_object_store(),
1134 )?;
1135 }
1136
1137 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1138 check_coin_deny_list_v2_during_signing(
1139 tx_data.sender(),
1140 checked_input_objects,
1141 receiving_objects,
1142 funds_withdraw_types.clone(),
1143 &self.get_object_store(),
1144 )?;
1145 }
1146
1147 Ok(())
1148 }
1149
1150 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1160 pub fn handle_vote_transaction(
1161 &self,
1162 epoch_store: &Arc<AuthorityPerEpochStore>,
1163 transaction: VerifiedTransaction,
1164 ) -> SuiResult<()> {
1165 debug!("handle_vote_transaction");
1166
1167 let _metrics_guard = self
1168 .metrics
1169 .authority_state_handle_vote_transaction_latency
1170 .start_timer();
1171 self.metrics.tx_orders.inc();
1172
1173 if !epoch_store
1177 .get_reconfig_state_read_lock_guard()
1178 .should_accept_user_certs()
1179 {
1180 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1181 }
1182
1183 let tx_digest = *transaction.digest();
1188 if epoch_store.is_recently_finalized(&tx_digest)
1189 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1190 {
1191 assert_reachable!("transaction recently executed");
1192 return Ok(());
1193 }
1194
1195 if self
1196 .get_transaction_cache_reader()
1197 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1198 {
1199 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1200 digest: (*transaction.digest()),
1201 }
1202 .into());
1203 }
1204
1205 let _execution_lock = self.execution_lock_for_validation()?;
1207
1208 let checked_input_objects =
1209 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1210
1211 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1212
1213 self.get_cache_writer()
1217 .validate_owned_object_versions(&owned_objects)
1218 }
1219
1220 pub fn check_transaction_validity(
1229 &self,
1230 epoch_store: &Arc<AuthorityPerEpochStore>,
1231 transaction: &VerifiedTransaction,
1232 enforce_live_input_objects: bool,
1233 ) -> SuiResult<()> {
1234 if !epoch_store
1235 .get_reconfig_state_read_lock_guard()
1236 .should_accept_user_certs()
1237 {
1238 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1239 }
1240
1241 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1242
1243 let checked_input_objects =
1244 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1245
1246 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1248 let cache_reader = self.get_object_cache_reader();
1249
1250 for obj_ref in &owned_objects {
1251 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1252 if obj_ref.1 < live_object.version() {
1255 return Err(SuiErrorKind::UserInputError {
1256 error: UserInputError::ObjectVersionUnavailableForConsumption {
1257 provided_obj_ref: *obj_ref,
1258 current_version: live_object.version(),
1259 },
1260 }
1261 .into());
1262 }
1263
1264 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1265 return Err(SuiErrorKind::UserInputError {
1267 error: UserInputError::ObjectVersionUnavailableForConsumption {
1268 provided_obj_ref: *obj_ref,
1269 current_version: live_object.version(),
1270 },
1271 }
1272 .into());
1273 }
1274
1275 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1277 return Err(SuiErrorKind::UserInputError {
1278 error: UserInputError::InvalidObjectDigest {
1279 object_id: obj_ref.0,
1280 expected_digest: live_object.digest(),
1281 },
1282 }
1283 .into());
1284 }
1285 }
1286 }
1287
1288 Ok(())
1289 }
1290
1291 pub fn check_system_overload_at_signing(&self) -> bool {
1292 self.config
1293 .authority_overload_config
1294 .check_system_overload_at_signing
1295 }
1296
1297 pub(crate) fn check_system_overload(
1298 &self,
1299 tx_data: &SenderSignedData,
1300 do_authority_overload_check: bool,
1301 ) -> SuiResult {
1302 if do_authority_overload_check {
1303 self.check_authority_overload(tx_data).tap_err(|_| {
1304 self.update_overload_metrics("execution_queue");
1305 })?;
1306 }
1307 self.execution_scheduler
1308 .check_execution_overload(self.overload_config(), tx_data)
1309 .tap_err(|_| {
1310 self.update_overload_metrics("execution_pending");
1311 })?;
1312 let pending_tx_count = self
1315 .get_cache_commit()
1316 .approximate_pending_transaction_count();
1317 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1318 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1319 retry_after_secs: 10,
1320 }
1321 .into());
1322 }
1323
1324 Ok(())
1325 }
1326
1327 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1328 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1329 return Ok(());
1330 }
1331
1332 let load_shedding_percentage = self
1333 .overload_info
1334 .load_shedding_percentage
1335 .load(Ordering::Relaxed);
1336 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1337 }
1338
1339 pub(crate) fn update_overload_metrics(&self, source: &str) {
1340 self.metrics
1341 .transaction_overload_sources
1342 .with_label_values(&[source])
1343 .inc();
1344 }
1345
1346 pub(crate) async fn wait_for_fastpath_dependency_objects(
1350 &self,
1351 transaction: &VerifiedTransaction,
1352 epoch: EpochId,
1353 ) -> SuiResult<bool> {
1354 let txn_data = transaction.data().transaction_data();
1355 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1356
1357 let fastpath_dependency_objects: Vec<_> = move_objects
1359 .into_iter()
1360 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1361 .chain(
1362 packages
1363 .into_iter()
1364 .map(|package_id| InputKey::Package { id: package_id }),
1365 )
1366 .collect();
1367 let receiving_keys: HashSet<_> = receiving_objects
1368 .into_iter()
1369 .filter_map(|receiving_obj_ref| {
1370 self.should_wait_for_dependency_object(receiving_obj_ref)
1371 })
1372 .collect();
1373 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1374 return Ok(true);
1375 }
1376
1377 let max_wait = if cfg!(msim) {
1380 Duration::from_millis(200)
1381 } else {
1382 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1383 };
1384
1385 match timeout(
1386 max_wait,
1387 self.get_object_cache_reader().notify_read_input_objects(
1388 &fastpath_dependency_objects,
1389 &receiving_keys,
1390 epoch,
1391 ),
1392 )
1393 .await
1394 {
1395 Ok(()) => Ok(true),
1396 Err(_) => Ok(false),
1399 }
1400 }
1401
1402 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1409 let (obj_id, cur_version, _digest) = obj_ref;
1410 let Some(latest_obj_ref) = self
1411 .get_object_cache_reader()
1412 .get_latest_object_ref_or_tombstone(obj_id)
1413 else {
1414 return Some(InputKey::VersionedObject {
1416 id: FullObjectID::new(obj_id, None),
1417 version: cur_version,
1418 });
1419 };
1420 let latest_digest = latest_obj_ref.2;
1421 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1422 return None;
1424 }
1425 let latest_version = latest_obj_ref.1;
1426 if cur_version <= latest_version {
1427 return None;
1430 }
1431 Some(InputKey::VersionedObject {
1433 id: FullObjectID::new(obj_id, None),
1434 version: cur_version,
1435 })
1436 }
1437
1438 #[instrument(level = "trace", skip_all)]
1443 pub async fn wait_for_transaction_execution_for_testing(
1444 &self,
1445 transaction: &VerifiedExecutableTransaction,
1446 ) -> TransactionEffects {
1447 self.notify_read_effects_for_testing(
1448 "AuthorityState::wait_for_transaction_execution_for_testing",
1449 *transaction.digest(),
1450 )
1451 .await
1452 }
1453
1454 #[instrument(level = "trace", skip_all)]
1466 pub fn try_execute_immediately(
1467 &self,
1468 certificate: &VerifiedExecutableTransaction,
1469 mut execution_env: ExecutionEnv,
1470 epoch_store: &Arc<AuthorityPerEpochStore>,
1471 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1472 let _scope = monitored_scope("Execution::try_execute_immediately");
1473 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1474
1475 let tx_digest = certificate.digest();
1476
1477 if let Some(fork_recovery) = &self.fork_recovery_state
1478 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1479 {
1480 warn!(
1481 ?tx_digest,
1482 original_digest = ?execution_env.expected_effects_digest,
1483 override_digest = ?override_digest,
1484 "Applying fork recovery override for transaction effects digest"
1485 );
1486 execution_env.expected_effects_digest = Some(override_digest);
1487 }
1488
1489 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1491
1492 let tx_cache_reader = self.get_transaction_cache_reader();
1493 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1494 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1495 assert_eq!(
1496 effects.digest(),
1497 expected_effects_digest,
1498 "Unexpected effects digest for transaction {:?}",
1499 tx_digest
1500 );
1501 }
1502 tx_guard.release();
1503 return ExecutionOutput::Success((effects, None));
1504 }
1505
1506 let execution_start_time = Instant::now();
1507
1508 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1513 else {
1514 tx_guard.release();
1515 return ExecutionOutput::EpochEnded;
1516 };
1517 if *execution_guard != epoch_store.epoch() {
1522 tx_guard.release();
1523 info!("The epoch of the execution_guard doesn't match the epoch store");
1524 return ExecutionOutput::EpochEnded;
1525 }
1526
1527 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1528
1529 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1530 &tx_guard,
1531 &execution_guard,
1532 certificate,
1533 execution_env,
1534 epoch_store,
1535 ) {
1536 ExecutionOutput::Success(result) => result,
1537 output => return output.unwrap_err(),
1538 };
1539 let transaction_outputs = Arc::new(transaction_outputs);
1540
1541 fail_point!("crash");
1542
1543 let effects = transaction_outputs.effects.clone();
1544 debug!(
1545 ?tx_digest,
1546 fx_digest=?effects.digest(),
1547 "process_certificate succeeded in {:.3}ms",
1548 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1549 );
1550
1551 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1552 if let Err(err) = commit_result {
1553 error!(?tx_digest, "Error committing transaction: {err}");
1554 tx_guard.release();
1555 return ExecutionOutput::Fatal(err);
1556 }
1557
1558 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1559 certificate.data().transaction_data().kind()
1560 {
1561 if let Some(err) = &execution_error_opt {
1562 debug_fatal!("Authenticator state update failed: {:?}", err);
1563 }
1564 epoch_store.update_authenticator_state(auth_state);
1565
1566 if cfg!(debug_assertions) {
1568 let authenticator_state = get_authenticator_state(self.get_object_store())
1569 .expect("Read cannot fail")
1570 .expect("Authenticator state must exist");
1571
1572 let mut sys_jwks: Vec<_> = authenticator_state
1573 .active_jwks
1574 .into_iter()
1575 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1576 .collect();
1577 let mut active_jwks: Vec<_> = epoch_store
1578 .signature_verifier
1579 .get_jwks()
1580 .into_iter()
1581 .collect();
1582 sys_jwks.sort();
1583 active_jwks.sort();
1584
1585 assert_eq!(sys_jwks, active_jwks);
1586 }
1587 }
1588
1589 if certificate
1590 .transaction_data()
1591 .kind()
1592 .is_accumulator_barrier_settle_tx()
1593 {
1594 let object_funds_checker = self.object_funds_checker.load();
1595 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1596 let next_accumulator_version = accumulator_version.unwrap().next();
1599 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1600 }
1601 }
1602
1603 tx_guard.commit_tx();
1604
1605 let elapsed = execution_start_time.elapsed();
1606 epoch_store.record_local_execution_time(
1607 certificate.data().transaction_data(),
1608 &effects,
1609 timings,
1610 elapsed,
1611 );
1612
1613 let elapsed_us = elapsed.as_micros() as f64;
1614 if elapsed_us > 0.0 {
1615 self.metrics
1616 .execution_gas_latency_ratio
1617 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1618 };
1619 ExecutionOutput::Success((effects, execution_error_opt))
1620 }
1621
1622 pub fn read_objects_for_execution(
1623 &self,
1624 tx_lock: &CertLockGuard,
1625 certificate: &VerifiedExecutableTransaction,
1626 assigned_shared_object_versions: &AssignedVersions,
1627 epoch_store: &Arc<AuthorityPerEpochStore>,
1628 ) -> SuiResult<InputObjects> {
1629 let _scope = monitored_scope("Execution::load_input_objects");
1630 let _metrics_guard = self
1631 .metrics
1632 .execution_load_input_objects_latency
1633 .start_timer();
1634 let input_objects = &certificate.data().transaction_data().input_objects()?;
1635 self.input_loader.read_objects_for_execution(
1636 &certificate.key(),
1637 tx_lock,
1638 input_objects,
1639 assigned_shared_object_versions,
1640 epoch_store.epoch(),
1641 )
1642 }
1643
1644 pub async fn try_execute_executable_for_test(
1645 &self,
1646 executable: &VerifiedExecutableTransaction,
1647 execution_env: ExecutionEnv,
1648 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1649 let epoch_store = self.epoch_store_for_testing();
1650 let (effects, execution_error_opt) = self
1651 .try_execute_immediately(executable, execution_env, &epoch_store)
1652 .unwrap();
1653 self.flush_post_processing(executable.digest()).await;
1654 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1655 (signed_effects, execution_error_opt)
1656 }
1657
1658 pub async fn notify_read_effects_for_testing(
1663 &self,
1664 task_name: &'static str,
1665 digest: TransactionDigest,
1666 ) -> TransactionEffects {
1667 self.get_transaction_cache_reader()
1668 .notify_read_executed_effects(task_name, &[digest])
1669 .await
1670 .pop()
1671 .expect("must return correct number of effects")
1672 }
1673
1674 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1675 self.get_object_cache_reader()
1676 .check_owned_objects_are_live(owned_object_refs)
1677 }
1678
1679 pub(crate) fn debug_dump_transaction_state(
1684 &self,
1685 tx_digest: &TransactionDigest,
1686 effects: &TransactionEffects,
1687 expected_effects_digest: TransactionEffectsDigest,
1688 inner_temporary_store: &InnerTemporaryStore,
1689 certificate: &VerifiedExecutableTransaction,
1690 debug_dump_config: &StateDebugDumpConfig,
1691 ) -> SuiResult<PathBuf> {
1692 let dump_dir = debug_dump_config
1693 .dump_file_directory
1694 .as_ref()
1695 .cloned()
1696 .unwrap_or(std::env::temp_dir());
1697 let epoch_store = self.load_epoch_store_one_call_per_task();
1698
1699 NodeStateDump::new(
1700 tx_digest,
1701 effects,
1702 expected_effects_digest,
1703 self.get_object_store().as_ref(),
1704 &epoch_store,
1705 inner_temporary_store,
1706 certificate,
1707 )?
1708 .write_to_file(&dump_dir)
1709 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1710 }
1711
1712 #[instrument(level = "trace", skip_all)]
1713 pub(crate) fn process_certificate(
1714 &self,
1715 tx_guard: &CertTxGuard,
1716 execution_guard: &ExecutionLockReadGuard<'_>,
1717 certificate: &VerifiedExecutableTransaction,
1718 execution_env: ExecutionEnv,
1719 epoch_store: &Arc<AuthorityPerEpochStore>,
1720 ) -> ExecutionOutput<(
1721 TransactionOutputs,
1722 Vec<ExecutionTiming>,
1723 Option<ExecutionError>,
1724 )> {
1725 let _scope = monitored_scope("Execution::process_certificate");
1726 let tx_digest = *certificate.digest();
1727
1728 let input_objects = match self.read_objects_for_execution(
1729 tx_guard.as_lock_guard(),
1730 certificate,
1731 &execution_env.assigned_versions,
1732 epoch_store,
1733 ) {
1734 Ok(objects) => objects,
1735 Err(e) => return ExecutionOutput::Fatal(e),
1736 };
1737
1738 let expected_effects_digest = match execution_env.expected_effects_digest {
1739 Some(expected_effects_digest) => Some(expected_effects_digest),
1740 None => {
1741 match epoch_store.get_signed_effects_digest(&tx_digest) {
1745 Ok(digest) => digest,
1746 Err(e) => return ExecutionOutput::Fatal(e),
1747 }
1748 }
1749 };
1750
1751 fail_point_if!("correlated-crash-process-certificate", || {
1752 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1753 sui_simulator::task::kill_current_node(None);
1754 }
1755 });
1756
1757 self.execute_certificate(
1762 execution_guard,
1763 certificate,
1764 input_objects,
1765 expected_effects_digest,
1766 execution_env,
1767 epoch_store,
1768 )
1769 }
1770
1771 pub async fn reconfigure_traffic_control(
1772 &self,
1773 params: TrafficControlReconfigParams,
1774 ) -> Result<TrafficControlReconfigParams, SuiError> {
1775 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1776 traffic_controller.admin_reconfigure(params).await
1777 } else {
1778 Err(SuiErrorKind::InvalidAdminRequest(
1779 "Traffic controller is not configured on this node".to_string(),
1780 )
1781 .into())
1782 }
1783 }
1784
1785 #[instrument(level = "trace", skip_all)]
1786 fn commit_certificate(
1787 &self,
1788 certificate: &VerifiedExecutableTransaction,
1789 transaction_outputs: Arc<TransactionOutputs>,
1790 epoch_store: &Arc<AuthorityPerEpochStore>,
1791 ) -> SuiResult {
1792 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1793 monitored_scope("Execution::commit_certificate");
1794 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1795
1796 let tx_digest = certificate.digest();
1797
1798 epoch_store.insert_executed_in_epoch(tx_digest);
1801 let key = certificate.key();
1802 if !matches!(key, TransactionKey::Digest(_)) {
1803 epoch_store.insert_tx_key(key, *tx_digest)?;
1804 }
1805
1806 fail_point!("crash");
1808
1809 self.get_cache_writer()
1810 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1811
1812 if let Some(settlement_key) = certificate
1817 .transaction_data()
1818 .kind()
1819 .accumulator_barrier_settlement_key()
1820 {
1821 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1822 }
1823
1824 if certificate.transaction_data().is_end_of_epoch_tx() {
1825 self.get_object_cache_reader()
1828 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1829 }
1830
1831 Ok(())
1832 }
1833
1834 fn update_metrics(
1835 &self,
1836 certificate: &VerifiedExecutableTransaction,
1837 inner_temporary_store: &InnerTemporaryStore,
1838 effects: &TransactionEffects,
1839 ) {
1840 if certificate.has_zklogin_sig() {
1842 self.metrics.zklogin_sig_count.inc();
1843 } else if certificate.has_upgraded_multisig() {
1844 self.metrics.multisig_sig_count.inc();
1845 }
1846
1847 self.metrics.total_effects.inc();
1848 self.metrics.total_certs.inc();
1849
1850 let consensus_object_count = effects.input_consensus_objects().len();
1851 if consensus_object_count > 0 {
1852 self.metrics.shared_obj_tx.inc();
1853 }
1854
1855 if certificate.is_sponsored_tx() {
1856 self.metrics.sponsored_tx.inc();
1857 }
1858
1859 let input_object_count = inner_temporary_store.input_objects.len();
1860 self.metrics
1861 .num_input_objs
1862 .observe(input_object_count as f64);
1863 self.metrics
1864 .num_shared_objects
1865 .observe(consensus_object_count as f64);
1866 self.metrics.batch_size.observe(
1867 certificate
1868 .data()
1869 .intent_message()
1870 .value
1871 .kind()
1872 .num_commands() as f64,
1873 );
1874 }
1875
1876 fn execute_transaction_to_effects(
1879 &self,
1880 executor: &dyn Executor,
1881 store: &dyn BackingStore,
1882 protocol_config: &ProtocolConfig,
1883 enable_expensive_checks: bool,
1884 execution_params: ExecutionOrEarlyError,
1885 epoch_id: &EpochId,
1886 epoch_timestamp_ms: u64,
1887 input_objects: CheckedInputObjects,
1888 gas_data: GasData,
1889 gas_status: SuiGasStatus,
1890 kind: TransactionKind,
1891 rewritten_inputs: Option<Vec<bool>>,
1892 signer: SuiAddress,
1893 tx_digest: TransactionDigest,
1894 ) -> (
1895 InnerTemporaryStore,
1896 SuiGasStatus,
1897 TransactionEffects,
1898 Vec<ExecutionTiming>,
1899 Result<(), ExecutionError>,
1900 ) {
1901 let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1902 .execute_transaction_to_effects_and_execution_error(
1904 store,
1905 protocol_config,
1906 self.metrics.execution_metrics.clone(),
1907 enable_expensive_checks,
1908 execution_params,
1909 epoch_id,
1910 epoch_timestamp_ms,
1911 input_objects,
1912 gas_data,
1913 gas_status,
1914 kind,
1915 rewritten_inputs,
1916 signer,
1917 tx_digest,
1918 &mut None,
1919 );
1920
1921 (
1922 inner_temp_store,
1923 gas_status,
1924 effects,
1925 timings,
1926 execution_error,
1927 )
1928 }
1929
1930 #[instrument(level = "trace", skip_all)]
1939 fn execute_certificate(
1940 &self,
1941 _execution_guard: &ExecutionLockReadGuard<'_>,
1942 certificate: &VerifiedExecutableTransaction,
1943 input_objects: InputObjects,
1944 expected_effects_digest: Option<TransactionEffectsDigest>,
1945 execution_env: ExecutionEnv,
1946 epoch_store: &Arc<AuthorityPerEpochStore>,
1947 ) -> ExecutionOutput<(
1948 TransactionOutputs,
1949 Vec<ExecutionTiming>,
1950 Option<ExecutionError>,
1951 )> {
1952 let _scope = monitored_scope("Execution::prepare_certificate");
1953 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1954 let prepare_certificate_start_time = tokio::time::Instant::now();
1955
1956 let tx_data = certificate.data().transaction_data();
1958
1959 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1960 return ExecutionOutput::Fatal(e);
1961 }
1962
1963 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1967 certificate,
1968 input_objects,
1969 epoch_store.protocol_config(),
1970 epoch_store.reference_gas_price(),
1971 ) {
1972 Ok(result) => result,
1973 Err(e) => return ExecutionOutput::Fatal(e),
1974 };
1975
1976 let owned_object_refs = input_objects.inner().filter_owned_objects();
1977 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1978 return ExecutionOutput::Fatal(e);
1979 }
1980 let tx_digest = *certificate.digest();
1981 let protocol_config = epoch_store.protocol_config();
1982 let transaction_data = &certificate.data().intent_message().value;
1983 let sender = transaction_data.sender();
1984 let (mut kind, signer, gas_data) = transaction_data.execution_parts();
1985 let early_execution_error = get_early_execution_error(
1986 &tx_digest,
1987 &input_objects,
1988 self.config.certificate_deny_config.certificate_deny_set(),
1989 &execution_env.funds_withdraw_status,
1990 );
1991 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1992 let execution_params = match early_execution_error {
1993 None => ExecutionOrEarlyError::ok(accumulator_version),
1994 Some(errors) => ExecutionOrEarlyError::failed(errors, accumulator_version),
1995 };
1996
1997 let rewritten_inputs = if execution_params.is_ok() {
2000 rewrite_transaction_for_coin_reservations(
2001 self.chain_identifier,
2002 &*self.coin_reservation_resolver,
2003 sender,
2004 &mut kind,
2005 execution_env.assigned_versions.accumulator_version,
2006 )
2007 .expect("rewriting must succeed for a certified transaction")
2008 } else {
2009 None
2010 };
2011
2012 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2013
2014 #[allow(unused_mut)]
2015 let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2016 .execute_transaction_to_effects(
2017 &**epoch_store.executor(),
2018 &tracking_store,
2019 protocol_config,
2020 self.config
2023 .expensive_safety_check_config
2024 .enable_deep_per_tx_sui_conservation_check(),
2025 execution_params,
2026 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2027 epoch_store
2028 .epoch_start_config()
2029 .epoch_data()
2030 .epoch_start_timestamp(),
2031 input_objects,
2032 gas_data,
2033 gas_status,
2034 kind,
2035 rewritten_inputs,
2036 signer,
2037 tx_digest,
2038 );
2039
2040 let object_funds_checker = self.object_funds_checker.load();
2041 if let Some(object_funds_checker) = object_funds_checker.as_ref()
2042 && !object_funds_checker.should_commit_object_funds_withdraws(
2043 certificate,
2044 effects.status(),
2045 &inner_temp_store.accumulator_running_max_withdraws,
2046 &execution_env,
2047 self.get_account_funds_read(),
2048 &self.execution_scheduler,
2049 epoch_store,
2050 )
2051 {
2052 assert_reachable!("retry object withdraw later");
2053 return ExecutionOutput::RetryLater;
2054 }
2055
2056 if let Some(expected_effects_digest) = expected_effects_digest
2057 && effects.digest() != expected_effects_digest
2058 {
2059 match self.debug_dump_transaction_state(
2061 &tx_digest,
2062 &effects,
2063 expected_effects_digest,
2064 &inner_temp_store,
2065 certificate,
2066 &self.config.state_debug_dump_config,
2067 ) {
2068 Ok(out_path) => {
2069 info!(
2070 "Dumped node state for transaction {} to {}",
2071 tx_digest,
2072 out_path.as_path().display().to_string()
2073 );
2074 }
2075 Err(e) => {
2076 error!("Error dumping state for transaction {}: {e}", tx_digest);
2077 }
2078 }
2079 let expected_effects = self
2080 .get_transaction_cache_reader()
2081 .get_effects(&expected_effects_digest);
2082 error!(
2083 ?tx_digest,
2084 ?expected_effects_digest,
2085 actual_effects = ?effects,
2086 expected_effects = ?expected_effects,
2087 "fork detected!"
2088 );
2089 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2090 tx_digest,
2091 expected_effects_digest,
2092 effects.digest(),
2093 ) {
2094 error!("Failed to record transaction fork: {e}");
2095 }
2096
2097 fail_point_if!("kill_transaction_fork_node", || {
2098 #[cfg(msim)]
2099 {
2100 tracing::error!(
2101 fatal = true,
2102 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2103 tx_digest
2104 );
2105 sui_simulator::task::shutdown_current_node();
2106 }
2107 });
2108
2109 fatal!(
2110 "Transaction {} is expected to have effects digest {}, but got {}!",
2111 tx_digest,
2112 expected_effects_digest,
2113 effects.digest()
2114 );
2115 }
2116
2117 fail_point_arg!("simulate_fork_during_execution", |(
2118 forked_validators,
2119 full_halt,
2120 effects_overrides,
2121 fork_probability,
2122 ): (
2123 std::sync::Arc<
2124 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2125 >,
2126 bool,
2127 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2128 f32,
2129 )| {
2130 #[cfg(msim)]
2131 self.simulate_fork_during_execution(
2132 certificate,
2133 epoch_store,
2134 &mut effects,
2135 forked_validators,
2136 full_halt,
2137 effects_overrides,
2138 fork_probability,
2139 );
2140 });
2141
2142 let unchanged_loaded_runtime_objects =
2143 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2144 certificate.transaction_data(),
2145 &effects,
2146 &tracking_store.into_read_objects(),
2147 );
2148
2149 let _ = self
2151 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2152 .tap_err(|e| {
2153 self.metrics.post_processing_total_failures.inc();
2154 error!(?tx_digest, "tx post processing failed: {e}");
2155 });
2156
2157 self.update_metrics(certificate, &inner_temp_store, &effects);
2158
2159 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2160 certificate.clone().into_unsigned(),
2161 effects,
2162 inner_temp_store,
2163 unchanged_loaded_runtime_objects,
2164 );
2165
2166 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2167 if elapsed > 0.0 {
2168 self.metrics.prepare_cert_gas_latency_ratio.observe(
2169 transaction_outputs
2170 .effects
2171 .gas_cost_summary()
2172 .computation_cost as f64
2173 / elapsed,
2174 );
2175 }
2176
2177 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2178 }
2179
2180 pub fn prepare_certificate_for_benchmark(
2181 &self,
2182 certificate: &VerifiedExecutableTransaction,
2183 input_objects: InputObjects,
2184 epoch_store: &Arc<AuthorityPerEpochStore>,
2185 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2186 let lock = RwLock::new(epoch_store.epoch());
2187 let execution_guard = lock.try_read().unwrap();
2188
2189 let (transaction_outputs, _timings, execution_error_opt) = self
2190 .execute_certificate(
2191 &execution_guard,
2192 certificate,
2193 input_objects,
2194 None,
2195 ExecutionEnv::default(),
2196 epoch_store,
2197 )
2198 .unwrap();
2199 Ok((transaction_outputs, execution_error_opt))
2200 }
2201
2202 #[instrument(skip_all)]
2203 #[allow(clippy::type_complexity)]
2204 pub async fn dry_exec_transaction(
2205 &self,
2206 transaction: TransactionData,
2207 ) -> SuiResult<(
2208 DryRunTransactionBlockResponse,
2209 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2210 TransactionEffects,
2211 Option<ObjectID>,
2212 )> {
2213 let epoch_store = self.load_epoch_store_one_call_per_task();
2214 if !self.is_fullnode(&epoch_store) {
2215 return Err(SuiErrorKind::UnsupportedFeatureError {
2216 error: "dry-exec is only supported on fullnodes".to_string(),
2217 }
2218 .into());
2219 }
2220
2221 if transaction.kind().is_system_tx() {
2222 return Err(SuiErrorKind::UnsupportedFeatureError {
2223 error: "dry-exec does not support system transactions".to_string(),
2224 }
2225 .into());
2226 }
2227
2228 self.dry_exec_transaction_impl(&epoch_store, transaction)
2229 }
2230
2231 #[allow(clippy::type_complexity)]
2232 fn dry_exec_transaction_impl(
2233 &self,
2234 epoch_store: &AuthorityPerEpochStore,
2235 transaction: TransactionData,
2236 ) -> SuiResult<(
2237 DryRunTransactionBlockResponse,
2238 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2239 TransactionEffects,
2240 Option<ObjectID>,
2241 )> {
2242 let sim = self.simulate_transaction(
2245 transaction.clone(),
2246 TransactionChecks::Enabled,
2247 true,
2248 )?;
2249
2250 self.build_dry_run_response(epoch_store, transaction, sim)
2251 }
2252
2253 #[allow(clippy::type_complexity)]
2265 fn build_dry_run_response(
2266 &self,
2267 epoch_store: &AuthorityPerEpochStore,
2268 transaction: TransactionData,
2269 sim: SimulateTransactionResult,
2270 ) -> SuiResult<(
2271 DryRunTransactionBlockResponse,
2272 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2273 TransactionEffects,
2274 Option<ObjectID>,
2275 )> {
2276 let SimulateTransactionResult {
2277 effects,
2278 events,
2279 objects,
2280 execution_result,
2281 mock_gas_id,
2282 suggested_gas_price,
2283 ..
2284 } = sim;
2285
2286 let tx_digest = *effects.transaction_digest();
2287
2288 let written_with_kind: BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)> = effects
2293 .created()
2294 .into_iter()
2295 .map(|(oref, _)| (oref, WriteKind::Create))
2296 .chain(
2297 effects
2298 .unwrapped()
2299 .into_iter()
2300 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2301 )
2302 .chain(
2303 effects
2304 .mutated()
2305 .into_iter()
2306 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2307 )
2308 .filter_map(|(oref, kind)| {
2309 objects
2310 .get(&ObjectKey(oref.0, oref.1))
2311 .map(|obj| (oref.0, (oref, obj.clone(), kind)))
2312 })
2313 .collect();
2314
2315 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2319 epoch_store.protocol_config(),
2320 Box::new(OverlayBackingPackageStore::new(
2321 &objects,
2322 self.get_backing_package_store(),
2323 )),
2324 );
2325
2326 let execution_error_source = execution_result
2327 .as_ref()
2328 .err()
2329 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2330
2331 let response = DryRunTransactionBlockResponse {
2332 suggested_gas_price,
2333 input: SuiTransactionBlockData::try_from_with_module_cache(
2334 transaction,
2335 &epoch_store.module_cache().clone(),
2336 )
2337 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2338 error: format!("Failed to convert transaction to SuiTransactionBlockData: {e}"),
2339 })?,
2340 effects: effects.clone().try_into()?,
2341 events: SuiTransactionBlockEvents::try_from(
2342 events.unwrap_or_default(),
2343 tx_digest,
2344 None,
2345 layout_resolver.as_mut(),
2346 )?,
2347 object_changes: Vec::new(),
2350 balance_changes: Vec::new(),
2351 execution_error_source,
2352 };
2353
2354 Ok((response, written_with_kind, effects, mock_gas_id))
2355 }
2356
2357 pub fn simulate_transaction(
2358 &self,
2359 mut transaction: TransactionData,
2360 checks: TransactionChecks,
2361 allow_mock_gas_coin: bool,
2362 ) -> SuiResult<SimulateTransactionResult> {
2363 if transaction.kind().is_system_tx() {
2364 return Err(SuiErrorKind::UnsupportedFeatureError {
2365 error: "simulate does not support system transactions".to_string(),
2366 }
2367 .into());
2368 }
2369
2370 let epoch_store = self.load_epoch_store_one_call_per_task();
2371 if !self.is_fullnode(&epoch_store) {
2372 return Err(SuiErrorKind::UnsupportedFeatureError {
2373 error: "simulate is only supported on fullnodes".to_string(),
2374 }
2375 .into());
2376 }
2377
2378 let dev_inspect = checks.disabled();
2379 if dev_inspect && self.config.dev_inspect_disabled {
2380 return Err(SuiErrorKind::UnsupportedFeatureError {
2381 error: "simulate with checks disabled is not allowed on this node".to_string(),
2382 }
2383 .into());
2384 }
2385
2386 let protocol_config = epoch_store.protocol_config();
2389 if !protocol_config.enable_coin_reservation_obj_refs()
2390 && transaction.gas().iter().any(|obj_ref| {
2391 sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2392 })
2393 {
2394 return Err(SuiErrorKind::UnsupportedFeatureError {
2395 error:
2396 "coin reservations in gas payment are not supported at this protocol version"
2397 .to_string(),
2398 }
2399 .into());
2400 }
2401
2402 let input_object_kinds = transaction.input_objects()?;
2406 let receiving_object_refs = transaction.receiving_objects();
2407
2408 let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2417 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2418 {
2419 let obj = Object::new_move(
2420 MoveObject::new_gas_coin(
2421 OBJECT_START_VERSION,
2422 ObjectID::MAX,
2423 DEV_INSPECT_GAS_COIN_VALUE,
2424 ),
2425 Owner::AddressOwner(transaction.gas_data().owner),
2426 TransactionDigest::genesis_marker(),
2427 );
2428 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2429 Some(obj)
2430 } else {
2431 None
2432 };
2433
2434 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
2436
2437 let declared_withdrawals = self.pre_object_load_checks(
2438 &transaction,
2439 &[],
2440 &input_object_kinds,
2441 &receiving_object_refs,
2442 epoch_store.protocol_config(),
2443 )?;
2444 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2445
2446 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2447 None,
2449 &input_object_kinds,
2450 &receiving_object_refs,
2451 epoch_store.epoch(),
2452 )?;
2453
2454 let mock_gas_id = mock_gas_object.map(|obj| {
2456 let id = obj.id();
2457 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2458 id
2459 });
2460
2461 let protocol_config = epoch_store.protocol_config();
2462
2463 let (gas_status, checked_input_objects) = if dev_inspect {
2464 sui_transaction_checks::check_dev_inspect_input(
2465 protocol_config,
2466 &transaction,
2467 input_objects,
2468 receiving_objects,
2469 epoch_store.reference_gas_price(),
2470 )?
2471 } else {
2472 sui_transaction_checks::check_transaction_input(
2473 epoch_store.protocol_config(),
2474 epoch_store.reference_gas_price(),
2475 &transaction,
2476 input_objects,
2477 &receiving_objects,
2478 &self.metrics.bytecode_verifier_metrics,
2479 &self.config.verifier_signing_config,
2480 )?
2481 };
2482
2483 let executor = sui_execution::executor(
2485 protocol_config,
2486 true, )
2488 .expect("Creating an executor should not fail here");
2489
2490 let (mut kind, signer, gas_data) = transaction.execution_parts();
2491 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2492 self.chain_identifier,
2493 &*self.coin_reservation_resolver,
2494 signer,
2495 &mut kind,
2496 None,
2497 )?;
2498 let early_execution_error = get_early_execution_error(
2499 &transaction.digest(),
2500 &checked_input_objects,
2501 self.config.certificate_deny_config.certificate_deny_set(),
2502 &FundsWithdrawStatus::MaybeSufficient,
2503 );
2504 let execution_params = match early_execution_error {
2507 None => ExecutionOrEarlyError::ok(None),
2508 Some(errors) => ExecutionOrEarlyError::failed(errors, None),
2509 };
2510
2511 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2512
2513 let cloned_input_objects = checked_input_objects.clone();
2515 let cloned_gas = gas_data.clone();
2516 let cloned_kind = kind.clone();
2517 let tx_digest = transaction.digest();
2518 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2519 let epoch_timestamp_ms = epoch_store
2520 .epoch_start_config()
2521 .epoch_data()
2522 .epoch_start_timestamp();
2523 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2524 &tracking_store,
2525 protocol_config,
2526 self.metrics.execution_metrics.clone(),
2527 false, execution_params,
2529 &epoch_id,
2530 epoch_timestamp_ms,
2531 checked_input_objects,
2532 gas_data,
2533 gas_status,
2534 kind,
2535 rewritten_inputs.clone(),
2536 signer,
2537 tx_digest,
2538 dev_inspect,
2539 );
2540
2541 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2543 let has_insufficient_object_funds = inner_temp_store
2544 .accumulator_running_max_withdraws
2545 .iter()
2546 .filter(|(id, _)| !address_funds.contains(id))
2547 .any(|(id, max_withdraw)| {
2548 let balance = self.get_account_funds_read().get_latest_account_amount(id);
2549 balance < *max_withdraw
2550 });
2551
2552 if has_insufficient_object_funds {
2553 let retry_gas_status = SuiGasStatus::new(
2554 cloned_gas.budget,
2555 cloned_gas.price,
2556 epoch_store.reference_gas_price(),
2557 protocol_config,
2558 )?;
2559 let (store, _, effects, result) = executor.dev_inspect_transaction(
2560 &tracking_store,
2561 protocol_config,
2562 self.metrics.execution_metrics.clone(),
2563 false,
2564 ExecutionOrEarlyError::failed(
2565 NonEmpty::new(ExecutionErrorKind::InsufficientFundsForWithdraw),
2566 None,
2567 ),
2568 &epoch_id,
2569 epoch_timestamp_ms,
2570 cloned_input_objects,
2571 cloned_gas,
2572 retry_gas_status,
2573 cloned_kind,
2574 rewritten_inputs,
2575 signer,
2576 tx_digest,
2577 dev_inspect,
2578 );
2579 (store, effects, result)
2580 } else {
2581 (inner_temp_store, effects, execution_result)
2582 }
2583 } else {
2584 (inner_temp_store, effects, execution_result)
2585 };
2586
2587 let loaded_runtime_objects = tracking_store.into_read_objects();
2588 let unchanged_loaded_runtime_objects =
2589 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2590 &transaction,
2591 &effects,
2592 &loaded_runtime_objects,
2593 );
2594
2595 let object_set = {
2596 let objects = {
2597 let mut objects = loaded_runtime_objects;
2598
2599 for o in inner_temp_store
2600 .input_objects
2601 .into_values()
2602 .chain(inner_temp_store.written.into_values())
2603 {
2604 objects.insert(o);
2605 }
2606
2607 objects
2608 };
2609
2610 let object_keys = sui_types::storage::get_transaction_object_set(
2611 &transaction,
2612 &effects,
2613 &unchanged_loaded_runtime_objects,
2614 );
2615
2616 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2617 for k in object_keys {
2618 if let Some(o) = objects.get(&k) {
2619 set.insert(o.clone());
2620 }
2621 }
2622
2623 set
2624 };
2625
2626 Ok(SimulateTransactionResult {
2627 objects: object_set,
2628 events: effects.events_digest().map(|_| inner_temp_store.events),
2629 effects,
2630 execution_result,
2631 mock_gas_id,
2632 unchanged_loaded_runtime_objects,
2633 suggested_gas_price: self
2634 .congestion_tracker
2635 .get_suggested_gas_prices(&transaction),
2636 })
2637 }
2638
2639 #[instrument(skip_all)]
2641 pub async fn dev_inspect_transaction_block(
2642 &self,
2643 sender: SuiAddress,
2644 transaction_kind: TransactionKind,
2645 gas_price: Option<u64>,
2646 gas_budget: Option<u64>,
2647 gas_sponsor: Option<SuiAddress>,
2648 gas_objects: Option<Vec<ObjectRef>>,
2649 show_raw_txn_data_and_effects: Option<bool>,
2650 skip_checks: Option<bool>,
2651 ) -> SuiResult<DevInspectResults> {
2652 let epoch_store = self.load_epoch_store_one_call_per_task();
2653 let protocol_config = epoch_store.protocol_config();
2654 let reference_gas_price = epoch_store.reference_gas_price();
2655
2656 let skip_checks = skip_checks.unwrap_or(true);
2657 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2658
2659 let price = gas_price.unwrap_or(reference_gas_price);
2661 let budget = gas_budget.unwrap_or(protocol_config.max_tx_gas());
2662 let owner = gas_sponsor.unwrap_or(sender);
2663 let payment = gas_objects.unwrap_or_default();
2664 let transaction = TransactionData::V1(TransactionDataV1 {
2665 kind: transaction_kind,
2666 sender,
2667 gas_data: GasData {
2668 payment,
2669 owner,
2670 price,
2671 budget,
2672 },
2673 expiration: TransactionExpiration::None,
2674 });
2675
2676 let raw_txn_data = if show_raw_txn_data_and_effects {
2679 bcs::to_bytes(&transaction).map_err(|_| {
2680 SuiErrorKind::TransactionSerializationError {
2681 error: "Failed to serialize transaction during dev inspect".to_string(),
2682 }
2683 })?
2684 } else {
2685 vec![]
2686 };
2687
2688 let checks = if skip_checks {
2692 TransactionChecks::Disabled
2693 } else {
2694 TransactionChecks::Enabled
2695 };
2696 let sim =
2697 self.simulate_transaction(transaction, checks, true)?;
2698
2699 let raw_effects = if show_raw_txn_data_and_effects {
2700 bcs::to_bytes(&sim.effects).map_err(|_| {
2701 SuiErrorKind::TransactionSerializationError {
2702 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2703 }
2704 })?
2705 } else {
2706 vec![]
2707 };
2708
2709 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2713 epoch_store.protocol_config(),
2714 Box::new(OverlayBackingPackageStore::new(
2715 &sim.objects,
2716 self.get_backing_package_store(),
2717 )),
2718 );
2719
2720 DevInspectResults::new(
2721 sim.effects,
2722 sim.events.unwrap_or_default(),
2723 sim.execution_result,
2724 raw_txn_data,
2725 raw_effects,
2726 layout_resolver.as_mut(),
2727 )
2728 }
2729
2730 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2732 let epoch_store = self.epoch_store_for_testing();
2733 Ok(epoch_store.reference_gas_price())
2734 }
2735
2736 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2737 self.get_transaction_cache_reader()
2738 .is_tx_already_executed(digest)
2739 }
2740
2741 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2742 fn index_tx(
2743 sequence: u64,
2744 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2745 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2746 indexes: &IndexStore,
2747 digest: &TransactionDigest,
2748 cert: &VerifiedExecutableTransaction,
2750 effects: &TransactionEffects,
2751 events: &TransactionEvents,
2752 timestamp_ms: u64,
2753 tx_coins: Option<TxCoins>,
2754 written: &WrittenObjects,
2755 inner_temporary_store: &InnerTemporaryStore,
2756 epoch_store: &Arc<AuthorityPerEpochStore>,
2757 acquire_locks: bool,
2758 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2759 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2760 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2761
2762 indexes.index_tx(
2763 sequence,
2764 cert.data().intent_message().value.sender(),
2765 cert.data()
2766 .intent_message()
2767 .value
2768 .input_objects()?
2769 .iter()
2770 .map(|o| o.object_id()),
2771 effects
2772 .all_changed_objects()
2773 .into_iter()
2774 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2775 cert.data()
2776 .intent_message()
2777 .value
2778 .move_calls()
2779 .into_iter()
2780 .map(|(_cmd_idx, package, module, function)| {
2781 (*package, module.to_owned(), function.to_owned())
2782 }),
2783 events,
2784 changes,
2785 digest,
2786 timestamp_ms,
2787 tx_coins,
2788 effects.accumulator_events(),
2789 acquire_locks,
2790 )
2791 }
2792
2793 #[cfg(msim)]
2794 fn simulate_fork_during_execution(
2795 &self,
2796 certificate: &VerifiedExecutableTransaction,
2797 epoch_store: &Arc<AuthorityPerEpochStore>,
2798 effects: &mut TransactionEffects,
2799 forked_validators: std::sync::Arc<
2800 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2801 >,
2802 full_halt: bool,
2803 effects_overrides: std::sync::Arc<
2804 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2805 >,
2806 fork_probability: f32,
2807 ) {
2808 use std::cell::RefCell;
2809 thread_local! {
2810 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2811 }
2812 if !certificate.data().intent_message().value.is_system_tx() {
2813 let committee = epoch_store.committee();
2814 let cur_stake = (**committee).weight(&self.name);
2815 if cur_stake > 0 {
2816 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2817 let already_forked = forked_validators
2818 .lock()
2819 .ok()
2820 .map(|set| set.contains(&self.name))
2821 .unwrap_or(false);
2822
2823 if !already_forked {
2824 let should_fork = if full_halt {
2825 *total_stake <= committee.validity_threshold()
2827 } else {
2828 *total_stake + cur_stake < committee.validity_threshold()
2830 };
2831
2832 if should_fork {
2833 *total_stake += cur_stake;
2834
2835 if let Ok(mut external_set) = forked_validators.lock() {
2836 external_set.insert(self.name);
2837 info!("forked_validators: {:?}", external_set);
2838 }
2839 }
2840 }
2841
2842 if let Ok(external_set) = forked_validators.lock() {
2843 if external_set.contains(&self.name) {
2844 let tx_digest = certificate.digest().to_string();
2850 if let Ok(mut overrides) = effects_overrides.lock() {
2851 if overrides.contains_key(&tx_digest)
2852 || overrides.is_empty()
2853 && sui_simulator::random::deterministic_probability(
2854 &tx_digest,
2855 fork_probability,
2856 )
2857 {
2858 let original_effects_digest = effects.digest().to_string();
2859 overrides
2860 .insert(tx_digest.clone(), original_effects_digest.clone());
2861 info!(
2862 ?tx_digest,
2863 ?original_effects_digest,
2864 "Captured forked effects digest for transaction"
2865 );
2866 effects.gas_cost_summary_mut_for_testing().computation_cost +=
2867 1;
2868 }
2869 }
2870 }
2871 }
2872 });
2873 }
2874 }
2875 }
2876
2877 fn process_object_index(
2878 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2879 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2880 effects: &TransactionEffects,
2881 written: &WrittenObjects,
2882 inner_temporary_store: &InnerTemporaryStore,
2883 epoch_store: &Arc<AuthorityPerEpochStore>,
2884 ) -> SuiResult<ObjectIndexChanges> {
2885 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2886 epoch_store.protocol_config(),
2887 Box::new(PackageStoreWithFallback::new(
2888 inner_temporary_store,
2889 backing_package_store,
2890 )),
2891 );
2892
2893 let modified_at_version = effects
2894 .modified_at_versions()
2895 .into_iter()
2896 .collect::<HashMap<_, _>>();
2897
2898 let tx_digest = effects.transaction_digest();
2899 let mut deleted_owners = vec![];
2900 let mut deleted_dynamic_fields = vec![];
2901 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2902 let old_version = modified_at_version.get(&id).unwrap();
2903 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
2906 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2907 ) {
2908 Owner::AddressOwner(addr)
2909 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
2910 Owner::ObjectOwner(object_id) => {
2911 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2912 }
2913 _ => {}
2914 }
2915 }
2916
2917 let mut new_owners = vec![];
2918 let mut new_dynamic_fields = vec![];
2919
2920 for (oref, owner, kind) in effects.all_changed_objects() {
2921 let id = &oref.0;
2922 if let WriteKind::Mutate = kind {
2924 let Some(old_version) = modified_at_version.get(id) else {
2925 panic!(
2926 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2927 tx_digest
2928 );
2929 };
2930 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
2933 panic!(
2934 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2935 tx_digest, id, old_version
2936 );
2937 };
2938 if old_object.owner != owner {
2939 match old_object.owner {
2940 Owner::AddressOwner(addr)
2941 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2942 deleted_owners.push((addr, *id));
2943 }
2944 Owner::ObjectOwner(object_id) => {
2945 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2946 }
2947 _ => {}
2948 }
2949 }
2950 }
2951
2952 match owner {
2953 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2954 let new_object = written.get(id).unwrap_or_else(
2956 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2957 );
2958 assert_eq!(
2959 new_object.version(),
2960 oref.1,
2961 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2962 tx_digest,
2963 id,
2964 new_object.version(),
2965 oref.1
2966 );
2967
2968 let type_ = new_object
2969 .type_()
2970 .map(|type_| ObjectType::Struct(type_.clone()))
2971 .unwrap_or(ObjectType::Package);
2972
2973 new_owners.push((
2974 (addr, *id),
2975 ObjectInfo {
2976 object_id: *id,
2977 version: oref.1,
2978 digest: oref.2,
2979 type_,
2980 owner,
2981 previous_transaction: *effects.transaction_digest(),
2982 },
2983 ));
2984 }
2985 Owner::ObjectOwner(owner) => {
2986 let new_object = written.get(id).unwrap_or_else(
2987 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2988 );
2989 assert_eq!(
2990 new_object.version(),
2991 oref.1,
2992 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2993 tx_digest,
2994 id,
2995 new_object.version(),
2996 oref.1
2997 );
2998
2999 let Some(df_info) = Self::try_create_dynamic_field_info(
3000 object_store,
3001 new_object,
3002 written,
3003 layout_resolver.as_mut(),
3004 )
3005 .unwrap_or_else(|e| {
3006 error!(
3007 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3008 e, new_object
3009 );
3010 None
3011 }) else {
3012 continue;
3014 };
3015 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3016 }
3017 _ => {}
3018 }
3019 }
3020
3021 Ok(ObjectIndexChanges {
3022 deleted_owners,
3023 deleted_dynamic_fields,
3024 new_owners,
3025 new_dynamic_fields,
3026 })
3027 }
3028
3029 fn try_create_dynamic_field_info(
3030 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3031 o: &Object,
3032 written: &WrittenObjects,
3033 resolver: &mut dyn LayoutResolver,
3034 ) -> SuiResult<Option<DynamicFieldInfo>> {
3035 let Some(move_object) = o.data.try_as_move().cloned() else {
3037 return Ok(None);
3038 };
3039
3040 if !move_object.type_().is_dynamic_field() {
3042 return Ok(None);
3043 }
3044
3045 let layout = resolver
3046 .get_annotated_layout(&move_object.type_().clone().into())?
3047 .into_layout();
3048
3049 let field =
3050 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3051 SuiErrorKind::ObjectDeserializationError {
3052 error: e.to_string(),
3053 }
3054 })?;
3055
3056 let type_ = field.kind;
3057 let name_type: TypeTag = field.name_layout.into();
3058 let bcs_name = field.name_bytes.to_owned();
3059
3060 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3061 .map_err(|e| {
3062 warn!("{e}");
3063 SuiErrorKind::ObjectDeserializationError {
3064 error: e.to_string(),
3065 }
3066 })?;
3067
3068 let name = DynamicFieldName {
3069 type_: name_type,
3070 value: SuiMoveValue::from(name_value).to_json_value(),
3071 };
3072
3073 let value_metadata = field.value_metadata().map_err(|e| {
3074 warn!("{e}");
3075 SuiErrorKind::ObjectDeserializationError {
3076 error: e.to_string(),
3077 }
3078 })?;
3079
3080 Ok(Some(match value_metadata {
3081 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3082 name,
3083 bcs_name,
3084 type_,
3085 object_type: object_type.to_canonical_string(true),
3086 object_id: o.id(),
3087 version: o.version(),
3088 digest: o.digest(),
3089 },
3090
3091 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3092 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3096 let version = object.version();
3097 let digest = object.digest();
3098 let object_type = object.data.type_().unwrap().clone();
3099 (version, digest, object_type)
3100 } else {
3101 let object = object_store
3103 .get_object_by_key(&object_id, o.version())
3104 .ok_or_else(|| UserInputError::ObjectNotFound {
3105 object_id,
3106 version: Some(o.version()),
3107 })?;
3108 let version = object.version();
3109 let digest = object.digest();
3110 let object_type = object.data.type_().unwrap().clone();
3111 (version, digest, object_type)
3112 };
3113
3114 DynamicFieldInfo {
3115 name,
3116 bcs_name,
3117 type_,
3118 object_type: object_type.to_string(),
3119 object_id,
3120 version,
3121 digest,
3122 }
3123 }
3124 }))
3125 }
3126
3127 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3128 fn post_process_one_tx(
3129 &self,
3130 certificate: &VerifiedExecutableTransaction,
3131 effects: &TransactionEffects,
3132 inner_temporary_store: &InnerTemporaryStore,
3133 epoch_store: &Arc<AuthorityPerEpochStore>,
3134 ) -> SuiResult {
3135 let Some(indexes) = &self.indexes else {
3136 return Ok(());
3137 };
3138
3139 let tx_digest = *certificate.digest();
3140
3141 let sequence = indexes.allocate_sequence_number();
3143
3144 if self.config.sync_post_process_one_tx {
3145 let result = Self::post_process_one_tx_impl(
3150 sequence,
3151 indexes,
3152 &self.subscription_handler,
3153 &self.metrics,
3154 self.name,
3155 self.get_backing_package_store(),
3156 self.get_object_store(),
3157 certificate,
3158 effects,
3159 inner_temporary_store,
3160 epoch_store,
3161 true, );
3163
3164 match result {
3165 Ok((raw_batch, cache_updates_with_locks)) => {
3166 let mut db_batch = indexes.new_db_batch();
3167 db_batch
3168 .concat(vec![raw_batch])
3169 .expect("failed to absorb raw index batch");
3170 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3172 cache_updates_with_locks;
3173 indexes
3174 .commit_index_batch(db_batch, vec![inner])
3175 .expect("failed to commit index batch");
3176 }
3177 Err(e) => {
3178 self.metrics.post_processing_total_failures.inc();
3179 error!(?tx_digest, "tx post processing failed: {e}");
3180 return Err(e);
3181 }
3182 }
3183
3184 return Ok(());
3185 }
3186
3187 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3188 self.pending_post_processing.insert(tx_digest, done_rx);
3189
3190 let indexes = indexes.clone();
3191 let subscription_handler = self.subscription_handler.clone();
3192 let metrics = self.metrics.clone();
3193 let name = self.name;
3194 let backing_package_store = self.get_backing_package_store().clone();
3195 let object_store = self.get_object_store().clone();
3196 let semaphore = self.post_processing_semaphore.clone();
3197
3198 let certificate = certificate.clone();
3199 let effects = effects.clone();
3200 let inner_temporary_store = inner_temporary_store.clone();
3201 let epoch_store = epoch_store.clone();
3202
3203 tokio::spawn(async move {
3205 let permit = {
3206 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3207 semaphore
3208 .acquire_owned()
3209 .await
3210 .expect("post-processing semaphore should not be closed")
3211 };
3212
3213 let _ = tokio::task::spawn_blocking(move || {
3214 let _permit = permit;
3215
3216 let result = Self::post_process_one_tx_impl(
3217 sequence,
3218 &indexes,
3219 &subscription_handler,
3220 &metrics,
3221 name,
3222 &backing_package_store,
3223 &object_store,
3224 &certificate,
3225 &effects,
3226 &inner_temporary_store,
3227 &epoch_store,
3228 false, );
3230
3231 match result {
3232 Ok((raw_batch, cache_updates_with_locks)) => {
3233 fail_point!("crash-after-post-process-one-tx");
3234 let output = (raw_batch, cache_updates_with_locks.into_inner());
3235 let _ = done_tx.send(output);
3236 }
3237 Err(e) => {
3238 metrics.post_processing_total_failures.inc();
3239 error!(?tx_digest, "tx post processing failed: {e}");
3240 }
3241 }
3242 })
3243 .await;
3244 });
3245
3246 Ok(())
3247 }
3248
3249 fn post_process_one_tx_impl(
3250 sequence: u64,
3251 indexes: &Arc<IndexStore>,
3252 subscription_handler: &Arc<SubscriptionHandler>,
3253 metrics: &Arc<AuthorityMetrics>,
3254 name: AuthorityName,
3255 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3256 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3257 certificate: &VerifiedExecutableTransaction,
3258 effects: &TransactionEffects,
3259 inner_temporary_store: &InnerTemporaryStore,
3260 epoch_store: &Arc<AuthorityPerEpochStore>,
3261 acquire_locks: bool,
3262 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3263 let _scope = monitored_scope("Execution::post_process_one_tx");
3264
3265 let tx_digest = certificate.digest();
3266 let timestamp_ms = Self::unixtime_now_ms();
3267 let events = &inner_temporary_store.events;
3268 let written = &inner_temporary_store.written;
3269 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3270 name,
3271 object_store,
3272 effects,
3273 inner_temporary_store,
3274 epoch_store,
3275 );
3276
3277 let (raw_batch, cache_updates) = Self::index_tx(
3278 sequence,
3279 backing_package_store,
3280 object_store,
3281 indexes,
3282 tx_digest,
3283 certificate,
3284 effects,
3285 events,
3286 timestamp_ms,
3287 tx_coins,
3288 written,
3289 inner_temporary_store,
3290 epoch_store,
3291 acquire_locks,
3292 )
3293 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3294 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3295 .expect("Indexing tx should not fail");
3296
3297 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3298 let events = Self::make_transaction_block_events(
3299 backing_package_store,
3300 events.clone(),
3301 *tx_digest,
3302 timestamp_ms,
3303 epoch_store,
3304 inner_temporary_store,
3305 )?;
3306 subscription_handler
3308 .process_tx(certificate.data().transaction_data(), &effects, &events)
3309 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3310 .tap_err(|e| {
3311 warn!(
3312 ?tx_digest,
3313 "Post processing - Couldn't process events for tx: {}", e
3314 )
3315 })?;
3316
3317 metrics
3318 .post_processing_total_events_emitted
3319 .inc_by(events.data.len() as u64);
3320
3321 Ok((raw_batch, cache_updates))
3322 }
3323
3324 fn make_transaction_block_events(
3325 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3326 transaction_events: TransactionEvents,
3327 digest: TransactionDigest,
3328 timestamp_ms: u64,
3329 epoch_store: &Arc<AuthorityPerEpochStore>,
3330 inner_temporary_store: &InnerTemporaryStore,
3331 ) -> SuiResult<SuiTransactionBlockEvents> {
3332 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3333 epoch_store.protocol_config(),
3334 Box::new(PackageStoreWithFallback::new(
3335 inner_temporary_store,
3336 backing_package_store,
3337 )),
3338 );
3339 SuiTransactionBlockEvents::try_from(
3340 transaction_events,
3341 digest,
3342 Some(timestamp_ms),
3343 layout_resolver.as_mut(),
3344 )
3345 }
3346
3347 pub fn unixtime_now_ms() -> u64 {
3348 let now = SystemTime::now()
3349 .duration_since(UNIX_EPOCH)
3350 .expect("Time went backwards")
3351 .as_millis();
3352 u64::try_from(now).expect("Travelling in time machine")
3353 }
3354
3355 #[instrument(level = "trace", skip_all)]
3359 pub async fn handle_transaction_info_request(
3360 &self,
3361 request: TransactionInfoRequest,
3362 ) -> SuiResult<TransactionInfoResponse> {
3363 let epoch_store = self.load_epoch_store_one_call_per_task();
3364 let (transaction, status) = self
3365 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3366 .ok_or(SuiErrorKind::TransactionNotFound {
3367 digest: request.transaction_digest,
3368 })?;
3369 Ok(TransactionInfoResponse {
3370 transaction,
3371 status,
3372 })
3373 }
3374
3375 #[instrument(level = "trace", skip_all)]
3376 pub async fn handle_object_info_request(
3377 &self,
3378 request: ObjectInfoRequest,
3379 ) -> SuiResult<ObjectInfoResponse> {
3380 let epoch_store = self.load_epoch_store_one_call_per_task();
3381
3382 let requested_object_seq = match request.request_kind {
3383 ObjectInfoRequestKind::LatestObjectInfo => {
3384 let (_, seq, _) =
3385 self.get_object_or_tombstone(request.object_id)
3386 .ok_or_else(|| {
3387 SuiError::from(UserInputError::ObjectNotFound {
3388 object_id: request.object_id,
3389 version: None,
3390 })
3391 })?;
3392 seq
3393 }
3394 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3395 };
3396
3397 let object = self
3398 .get_object_store()
3399 .get_object_by_key(&request.object_id, requested_object_seq)
3400 .ok_or_else(|| {
3401 SuiError::from(UserInputError::ObjectNotFound {
3402 object_id: request.object_id,
3403 version: Some(requested_object_seq),
3404 })
3405 })?;
3406
3407 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3408 (request.generate_layout, object.data.try_as_move())
3409 {
3410 let epoch_store = self.load_epoch_store_one_call_per_task();
3411 Some(into_struct_layout(
3412 epoch_store
3413 .executor()
3414 .type_layout_resolver(
3415 epoch_store.protocol_config(),
3416 Box::new(self.get_backing_package_store().as_ref()),
3417 )
3418 .get_annotated_layout(&move_obj.type_().clone().into())?,
3419 )?)
3420 } else {
3421 None
3422 };
3423
3424 let lock = if !object.is_address_owned() {
3425 None
3427 } else {
3428 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)?
3429 .map(|s| s.into_inner())
3430 };
3431
3432 Ok(ObjectInfoResponse {
3433 object,
3434 layout,
3435 lock_for_debugging: lock,
3436 })
3437 }
3438
3439 #[instrument(level = "trace", skip_all)]
3440 pub fn handle_checkpoint_request(
3441 &self,
3442 request: &CheckpointRequest,
3443 ) -> SuiResult<CheckpointResponse> {
3444 let summary = match request.sequence_number {
3445 Some(seq) => self
3446 .checkpoint_store
3447 .get_checkpoint_by_sequence_number(seq)?,
3448 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3449 }
3450 .map(|v| v.into_inner());
3451 let contents = match &summary {
3452 Some(s) => self
3453 .checkpoint_store
3454 .get_checkpoint_contents(&s.content_digest)?,
3455 None => None,
3456 };
3457 Ok(CheckpointResponse {
3458 checkpoint: summary,
3459 contents,
3460 })
3461 }
3462
3463 #[instrument(level = "trace", skip_all)]
3464 pub fn handle_checkpoint_request_v2(
3465 &self,
3466 request: &CheckpointRequestV2,
3467 ) -> SuiResult<CheckpointResponseV2> {
3468 let summary = if request.certified {
3469 let summary = match request.sequence_number {
3470 Some(seq) => self
3471 .checkpoint_store
3472 .get_checkpoint_by_sequence_number(seq)?,
3473 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474 }
3475 .map(|v| v.into_inner());
3476 summary.map(CheckpointSummaryResponse::Certified)
3477 } else {
3478 let summary = match request.sequence_number {
3479 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3480 None => self
3481 .checkpoint_store
3482 .get_latest_locally_computed_checkpoint()?,
3483 };
3484 summary.map(CheckpointSummaryResponse::Pending)
3485 };
3486 let contents = match &summary {
3487 Some(s) => self
3488 .checkpoint_store
3489 .get_checkpoint_contents(&s.content_digest())?,
3490 None => None,
3491 };
3492 Ok(CheckpointResponseV2 {
3493 checkpoint: summary,
3494 contents,
3495 })
3496 }
3497
3498 fn check_protocol_version(
3499 supported_protocol_versions: SupportedProtocolVersions,
3500 current_version: ProtocolVersion,
3501 ) {
3502 info!("current protocol version is now {:?}", current_version);
3503 info!("supported versions are: {:?}", supported_protocol_versions);
3504 if !supported_protocol_versions.is_version_supported(current_version) {
3505 let msg = format!(
3506 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3507 current_version, supported_protocol_versions,
3508 );
3509
3510 error!("{}", msg);
3511 eprintln!("{}", msg);
3512
3513 #[cfg(not(msim))]
3514 std::process::exit(1);
3515
3516 #[cfg(msim)]
3517 sui_simulator::task::shutdown_current_node();
3518 }
3519 }
3520
3521 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3523 pub async fn new(
3524 name: AuthorityName,
3525 secret: StableSyncAuthoritySigner,
3526 supported_protocol_versions: SupportedProtocolVersions,
3527 store: Arc<AuthorityStore>,
3528 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3529 epoch_store: Arc<AuthorityPerEpochStore>,
3530 committee_store: Arc<CommitteeStore>,
3531 indexes: Option<Arc<IndexStore>>,
3532 rpc_index: Option<Arc<RpcIndexStore>>,
3533 rpc_store: Option<RpcStore>,
3534 checkpoint_store: Arc<CheckpointStore>,
3535 prometheus_registry: &Registry,
3536 genesis_objects: &[Object],
3537 db_checkpoint_config: &DBCheckpointConfig,
3538 config: NodeConfig,
3539 chain_identifier: ChainIdentifier,
3540 policy_config: Option<PolicyConfig>,
3541 firewall_config: Option<RemoteFirewallConfig>,
3542 pruner_watermarks: Arc<PrunerWatermarks>,
3543 ) -> Arc<Self> {
3544 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3545
3546 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3547 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3548 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3549 execution_cache_trait_pointers.object_cache_reader.clone(),
3550 execution_cache_trait_pointers.account_funds_read.clone(),
3551 execution_cache_trait_pointers
3552 .transaction_cache_reader
3553 .clone(),
3554 tx_ready_certificates,
3555 &epoch_store,
3556 config.funds_withdraw_scheduler_type,
3557 metrics.clone(),
3558 prometheus_registry,
3559 ));
3560 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3561
3562 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3563 epoch_store.get_parent_path(),
3564 &config.authority_store_pruning_config,
3565 );
3566 let _pruner = AuthorityStorePruner::new(
3567 store.perpetual_tables.clone(),
3568 checkpoint_store.clone(),
3569 rpc_index.clone(),
3570 rpc_store,
3571 indexes.clone(),
3572 config.authority_store_pruning_config.clone(),
3573 epoch_store.committee().authority_exists(&name),
3574 epoch_store.epoch_start_state().epoch_duration_ms(),
3575 prometheus_registry,
3576 pruner_watermarks,
3577 );
3578 let input_loader =
3579 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3580 let epoch = epoch_store.epoch();
3581 let traffic_controller_metrics =
3582 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3583 let traffic_controller = if let Some(policy_config) = policy_config {
3584 Some(Arc::new(
3585 TrafficController::init(
3586 policy_config,
3587 traffic_controller_metrics,
3588 firewall_config.clone(),
3589 )
3590 .await,
3591 ))
3592 } else {
3593 None
3594 };
3595
3596 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3597 ForkRecoveryState::new(Some(fork_config))
3598 .expect("Failed to initialize fork recovery state")
3599 });
3600
3601 let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3602 execution_cache_trait_pointers.child_object_resolver.clone(),
3603 ));
3604
3605 let object_funds_checker_metrics =
3606 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3607 let state = Arc::new(AuthorityState {
3608 name,
3609 secret,
3610 execution_lock: RwLock::new(epoch),
3611 epoch_store: ArcSwap::new(epoch_store.clone()),
3612 input_loader,
3613 execution_cache_trait_pointers,
3614 coin_reservation_resolver,
3615 indexes,
3616 rpc_index,
3617 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3618 checkpoint_store,
3619 committee_store,
3620 execution_scheduler,
3621 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3622 metrics,
3623 _pruner,
3624 _authority_per_epoch_pruner,
3625 db_checkpoint_config: db_checkpoint_config.clone(),
3626 config,
3627 overload_info: AuthorityOverloadInfo::default(),
3628 chain_identifier,
3629 congestion_tracker: Arc::new(CongestionTracker::new()),
3630 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3631 traffic_controller,
3632 fork_recovery_state,
3633 notify_epoch: tokio::sync::watch::channel(epoch).0,
3634 object_funds_checker: ArcSwapOption::empty(),
3635 object_funds_checker_metrics,
3636 pending_post_processing: Arc::new(DashMap::new()),
3637 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3638 });
3639 state.init_object_funds_checker().await;
3640
3641 let authority_state = Arc::downgrade(&state);
3643 spawn_monitored_task!(execution_process(
3644 authority_state,
3645 rx_ready_certificates,
3646 rx_execution_shutdown,
3647 ));
3648 state
3650 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3651 .expect("Error indexing genesis objects.");
3652
3653 if epoch_store
3654 .protocol_config()
3655 .enable_multi_epoch_transaction_expiration()
3656 && epoch_store.epoch() > 0
3657 {
3658 use typed_store::Map;
3659 let previous_epoch = epoch_store.epoch() - 1;
3660 let start_key = (previous_epoch, TransactionDigest::ZERO);
3661 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3662 let has_previous_epoch_data = store
3663 .perpetual_tables
3664 .executed_transaction_digests
3665 .safe_range_iter(start_key..end_key)
3666 .next()
3667 .is_some();
3668
3669 if !has_previous_epoch_data {
3670 panic!(
3671 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3672 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3673 Please restore from a snapshot using the latest version of sui-tool.",
3674 previous_epoch
3675 );
3676 }
3677 }
3678
3679 state
3680 }
3681
3682 async fn init_object_funds_checker(&self) {
3683 let epoch_store = self.epoch_store.load();
3684 if self.node_role(&epoch_store).runs_consensus()
3685 && epoch_store.protocol_config().enable_object_funds_withdraw()
3686 {
3687 if self.object_funds_checker.load().is_none() {
3688 let inner = self.get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID).map(|o| {
3689 Arc::new(ObjectFundsChecker::new(
3690 o.version(),
3691 self.object_funds_checker_metrics.clone(),
3692 ))
3693 });
3694 self.object_funds_checker.store(inner);
3695 }
3696 } else {
3697 self.object_funds_checker.store(None);
3698 }
3699 }
3700
3701 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3703 &self.execution_cache_trait_pointers.object_cache_reader
3704 }
3705
3706 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3707 &self.execution_cache_trait_pointers.transaction_cache_reader
3708 }
3709
3710 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3711 &self.execution_cache_trait_pointers.cache_writer
3712 }
3713
3714 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3715 &self.execution_cache_trait_pointers.backing_store
3716 }
3717
3718 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3719 &self.execution_cache_trait_pointers.child_object_resolver
3720 }
3721
3722 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3723 &self.execution_cache_trait_pointers.account_funds_read
3724 }
3725
3726 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3727 &self.execution_cache_trait_pointers.backing_package_store
3728 }
3729
3730 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3731 &self.execution_cache_trait_pointers.object_store
3732 }
3733
3734 pub async fn await_post_processing(
3735 &self,
3736 tx_digest: &TransactionDigest,
3737 ) -> Option<PostProcessingOutput> {
3738 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3739 rx.await.ok()
3741 } else {
3742 None
3744 }
3745 }
3746
3747 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3751 if let Some(indexes) = &self.indexes
3752 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3753 {
3754 let mut db_batch = indexes.new_db_batch();
3755 db_batch
3756 .concat(vec![raw_batch])
3757 .expect("failed to build index batch");
3758 indexes
3759 .commit_index_batch(db_batch, vec![cache_updates])
3760 .expect("failed to commit index batch");
3761 }
3762 }
3763
3764 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3765 &self.execution_cache_trait_pointers.reconfig_api
3766 }
3767
3768 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3769 &self.execution_cache_trait_pointers.global_state_hash_store
3770 }
3771
3772 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3773 &self.execution_cache_trait_pointers.checkpoint_cache
3774 }
3775
3776 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3777 &self.execution_cache_trait_pointers.state_sync_store
3778 }
3779
3780 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3781 &self.execution_cache_trait_pointers.cache_commit
3782 }
3783
3784 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3785 self.execution_cache_trait_pointers
3786 .testing_api
3787 .database_for_testing()
3788 }
3789
3790 pub fn authority_store(&self) -> Arc<AuthorityStore> {
3797 self.execution_cache_trait_pointers
3798 .testing_api
3799 .database_for_testing()
3800 }
3801
3802 pub fn cache_for_testing(&self) -> &WritebackCache {
3803 self.execution_cache_trait_pointers
3804 .testing_api
3805 .cache_for_testing()
3806 }
3807
3808 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3809 &self,
3810 config: NodeConfig,
3811 metrics: Arc<AuthorityStorePruningMetrics>,
3812 ) -> anyhow::Result<()> {
3813 use crate::authority::authority_store_pruner::PrunerWatermarks;
3814 let watermarks = Arc::new(PrunerWatermarks::default());
3815 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3816 &self.database_for_testing().perpetual_tables,
3817 &self.checkpoint_store,
3818 self.rpc_index.as_deref(),
3819 None,
3820 config.authority_store_pruning_config,
3821 metrics,
3822 EPOCH_DURATION_MS_FOR_TESTING,
3823 &watermarks,
3824 )
3825 .await
3826 }
3827
3828 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3829 &self.execution_scheduler
3830 }
3831
3832 fn create_owner_index_if_empty(
3833 &self,
3834 genesis_objects: &[Object],
3835 epoch_store: &Arc<AuthorityPerEpochStore>,
3836 ) -> SuiResult {
3837 let Some(index_store) = &self.indexes else {
3838 return Ok(());
3839 };
3840 if !index_store.is_empty() {
3841 return Ok(());
3842 }
3843
3844 let mut new_owners = vec![];
3845 let mut new_dynamic_fields = vec![];
3846 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3847 epoch_store.protocol_config(),
3848 Box::new(self.get_backing_package_store().as_ref()),
3849 );
3850 for o in genesis_objects.iter() {
3851 match o.owner {
3852 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3853 new_owners.push((
3854 (addr, o.id()),
3855 ObjectInfo::new(&o.compute_object_reference(), o),
3856 ))
3857 }
3858 Owner::ObjectOwner(object_id) => {
3859 let id = o.id();
3860 let Some(info) = Self::try_create_dynamic_field_info(
3861 self.get_object_store(),
3862 o,
3863 &BTreeMap::new(),
3864 layout_resolver.as_mut(),
3865 )?
3866 else {
3867 continue;
3868 };
3869 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3870 }
3871 _ => {}
3872 }
3873 }
3874
3875 index_store.insert_genesis_objects(ObjectIndexChanges {
3876 deleted_owners: vec![],
3877 deleted_dynamic_fields: vec![],
3878 new_owners,
3879 new_dynamic_fields,
3880 })
3881 }
3882
3883 pub fn execution_lock_for_executable_transaction(
3887 &self,
3888 transaction: &VerifiedExecutableTransaction,
3889 ) -> Option<ExecutionLockReadGuard<'_>> {
3890 let lock = self.execution_lock.try_read().ok()?;
3891 if *lock == transaction.auth_sig().epoch() {
3892 Some(lock)
3893 } else {
3894 None
3896 }
3897 }
3898
3899 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3902 self.execution_lock
3903 .try_read()
3904 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3905 }
3906
3907 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3908 self.execution_lock.write().await
3909 }
3910
3911 #[instrument(level = "error", skip_all)]
3912 pub async fn reconfigure(
3913 &self,
3914 cur_epoch_store: &AuthorityPerEpochStore,
3915 supported_protocol_versions: SupportedProtocolVersions,
3916 new_committee: Committee,
3917 epoch_start_configuration: EpochStartConfiguration,
3918 state_hasher: Arc<GlobalStateHasher>,
3919 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3920 epoch_last_checkpoint: CheckpointSequenceNumber,
3921 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3922 Self::check_protocol_version(
3923 supported_protocol_versions,
3924 epoch_start_configuration
3925 .epoch_start_state()
3926 .protocol_version(),
3927 );
3928
3929 self.committee_store.insert_new_committee(&new_committee)?;
3930
3931 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3933
3934 cur_epoch_store.epoch_terminated().await;
3936
3937 {
3941 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3942 if state.should_accept_user_certs() {
3943 cur_epoch_store.close_user_certs(state);
3950 }
3951 }
3953
3954 self.get_reconfig_api()
3955 .clear_state_end_of_epoch(&execution_lock);
3956 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3957 self.maybe_reaccumulate_state_hash(
3958 cur_epoch_store,
3959 epoch_start_configuration
3960 .epoch_start_state()
3961 .protocol_version(),
3962 );
3963 self.get_reconfig_api()
3964 .set_epoch_start_configuration(&epoch_start_configuration);
3965 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3966 && self
3967 .db_checkpoint_config
3968 .perform_db_checkpoints_at_epoch_end
3969 {
3970 let checkpoint_indexes = self
3971 .db_checkpoint_config
3972 .perform_index_db_checkpoints_at_epoch_end
3973 .unwrap_or(false);
3974 let current_epoch = cur_epoch_store.epoch();
3975 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3976 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3977 }
3978
3979 self.get_reconfig_api()
3980 .reconfigure_cache(&epoch_start_configuration)
3981 .await;
3982
3983 let new_epoch = new_committee.epoch;
3984 let new_epoch_store = self
3985 .reopen_epoch_db(
3986 cur_epoch_store,
3987 new_committee,
3988 epoch_start_configuration,
3989 expensive_safety_check_config,
3990 epoch_last_checkpoint,
3991 )
3992 .await?;
3993 assert_eq!(new_epoch_store.epoch(), new_epoch);
3994 self.execution_scheduler
3995 .reconfigure(&new_epoch_store, self.get_account_funds_read());
3996 self.init_object_funds_checker().await;
3997 *execution_lock = new_epoch;
3998
3999 self.notify_epoch(new_epoch);
4000 Ok(new_epoch_store)
4004 }
4005
4006 fn notify_epoch(&self, new_epoch: EpochId) {
4007 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4008 }
4009
4010 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4011 let mut rx = self.notify_epoch.subscribe();
4012 loop {
4013 let epoch = *rx.borrow();
4014 if epoch >= target_epoch {
4015 return Ok(epoch);
4016 }
4017 rx.changed().await?;
4018 }
4019 }
4020
4021 pub async fn settle_accumulator_for_testing(
4025 &self,
4026 effects: &[TransactionEffects],
4027 checkpoint_seq: Option<u64>,
4028 ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4029 let accumulator_version = self
4030 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4031 .unwrap()
4032 .version();
4033 let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4035 let builder = AccumulatorSettlementTxBuilder::new(
4036 Some(self.get_transaction_cache_reader().as_ref()),
4037 effects,
4038 ckpt_seq,
4039 0,
4040 );
4041 let balance_changes = builder.collect_funds_changes();
4042 let epoch_store = self.epoch_store_for_testing();
4043 let epoch = epoch_store.epoch();
4044 let accumulator_root_obj_initial_shared_version = epoch_store
4045 .epoch_start_config()
4046 .accumulator_root_obj_initial_shared_version()
4047 .unwrap();
4048 let settlements = builder.build_tx(
4049 epoch_store.protocol_config(),
4050 epoch,
4051 accumulator_root_obj_initial_shared_version,
4052 ckpt_seq,
4053 ckpt_seq,
4054 );
4055
4056 let settlements: Vec<_> = settlements
4057 .into_iter()
4058 .map(|tx| {
4059 VerifiedExecutableTransaction::new_system(
4060 VerifiedTransaction::new_system_transaction(tx),
4061 epoch,
4062 )
4063 })
4064 .collect();
4065
4066 let assigned_versions = epoch_store
4067 .assign_shared_object_versions_for_tests(
4068 self.get_object_cache_reader().as_ref(),
4069 &settlements,
4070 )
4071 .unwrap();
4072 let version_map = assigned_versions.into_map();
4073
4074 let mut replay_txns = Vec::new();
4075 let mut settlement_effects = Vec::with_capacity(settlements.len());
4076 for tx in settlements {
4077 let assigned = version_map.get(&tx.key()).unwrap().clone();
4078 let env = ExecutionEnv::new().with_assigned_versions(assigned);
4079 let (effects, _) = self
4080 .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4081 .unwrap();
4082 assert!(effects.status().is_ok());
4083 replay_txns.push((tx, env));
4084 settlement_effects.push(effects);
4085 }
4086
4087 let barrier = accumulators::build_accumulator_barrier_tx(
4088 epoch,
4089 accumulator_root_obj_initial_shared_version,
4090 ckpt_seq,
4091 &settlement_effects,
4092 );
4093 let barrier = VerifiedExecutableTransaction::new_system(
4094 VerifiedTransaction::new_system_transaction(barrier),
4095 epoch,
4096 );
4097
4098 let assigned_versions = epoch_store
4099 .assign_shared_object_versions_for_tests(
4100 self.get_object_cache_reader().as_ref(),
4101 std::slice::from_ref(&barrier),
4102 )
4103 .unwrap();
4104 let version_map = assigned_versions.into_map();
4105
4106 let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4107 let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4108 let (effects, _) = self
4109 .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4110 .unwrap();
4111 assert!(effects.status().is_ok());
4112 replay_txns.push((barrier, env));
4113
4114 let next_accumulator_version = accumulator_version.next();
4115 self.execution_scheduler
4116 .settle_address_funds(FundsSettlement {
4117 funds_changes: balance_changes,
4118 next_accumulator_version,
4119 });
4120 replay_txns
4123 }
4124
4125 pub async fn replay_settlement_for_testing(
4128 &self,
4129 txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4130 ) {
4131 let epoch_store = self.epoch_store_for_testing();
4132 for (tx, env) in txns {
4133 let (effects, _) = self
4134 .try_execute_immediately(tx, env.clone(), &epoch_store)
4135 .unwrap();
4136 assert!(effects.status().is_ok());
4137 }
4138 }
4139
4140 pub async fn reconfigure_for_testing(&self) {
4144 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4145 let epoch_store = self.epoch_store_for_testing().clone();
4146 let protocol_config = epoch_store.protocol_config().clone();
4147 let _guard =
4154 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4155 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4156 self.get_backing_package_store().clone(),
4157 self.get_object_store().clone(),
4158 &self.config.expensive_safety_check_config,
4159 self.checkpoint_store
4160 .get_epoch_last_checkpoint(epoch_store.epoch())
4161 .unwrap()
4162 .map(|c| *c.sequence_number())
4163 .unwrap_or_default(),
4164 );
4165 self.execution_scheduler
4166 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4167 let new_epoch = new_epoch_store.epoch();
4168 self.epoch_store.store(new_epoch_store);
4169 epoch_store.epoch_terminated().await;
4170
4171 *execution_lock = new_epoch;
4172 }
4173
4174 #[instrument(level = "error", skip_all)]
4177 fn maybe_reaccumulate_state_hash(
4178 &self,
4179 cur_epoch_store: &AuthorityPerEpochStore,
4180 new_protocol_version: ProtocolVersion,
4181 ) {
4182 self.get_reconfig_api()
4183 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4184 }
4185
4186 #[instrument(level = "error", skip_all)]
4187 fn check_system_consistency(
4188 &self,
4189 cur_epoch_store: &AuthorityPerEpochStore,
4190 state_hasher: Arc<GlobalStateHasher>,
4191 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4192 ) {
4193 info!(
4194 "Performing sui conservation consistency check for epoch {}",
4195 cur_epoch_store.epoch()
4196 );
4197
4198 if let Err(err) = self
4199 .get_reconfig_api()
4200 .expensive_check_sui_conservation(cur_epoch_store)
4201 {
4202 if cfg!(debug_assertions) {
4203 panic!("{}", err);
4204 } else {
4205 warn!("Sui conservation consistency check failed: {}", err);
4208 }
4209 } else {
4210 info!("Sui conservation consistency check passed");
4211 }
4212
4213 if expensive_safety_check_config.enable_state_consistency_check() {
4215 info!(
4216 "Performing state consistency check for epoch {}",
4217 cur_epoch_store.epoch()
4218 );
4219 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4220 }
4221
4222 if expensive_safety_check_config.enable_secondary_index_checks()
4226 && let Some(indexes) = self.indexes.clone()
4227 {
4228 let epoch = cur_epoch_store.epoch();
4229 let first_checkpoint = if epoch == 0 {
4233 0
4234 } else {
4235 self.checkpoint_store
4236 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4237 .expect("Failed to get previous epoch's last checkpoint")
4238 .expect("Previous epoch's last checkpoint missing")
4239 + 1
4240 };
4241 let highest_executed = self
4242 .checkpoint_store
4243 .get_highest_executed_checkpoint_seq_number()
4244 .expect("Failed to get highest executed checkpoint")
4245 .expect("No executed checkpoints");
4246
4247 info!(
4248 "Verifying checkpointed transactions are in transactions_seq \
4249 (checkpoints {first_checkpoint}..={highest_executed})"
4250 );
4251 for seq in first_checkpoint..=highest_executed {
4252 let checkpoint = self
4253 .checkpoint_store
4254 .get_checkpoint_by_sequence_number(seq)
4255 .expect("Failed to get checkpoint")
4256 .expect("Checkpoint missing");
4257 let contents = self
4258 .checkpoint_store
4259 .get_checkpoint_contents(&checkpoint.content_digest)
4260 .expect("Failed to get checkpoint contents")
4261 .expect("Checkpoint contents missing");
4262 for digests in contents.iter() {
4263 let tx_digest = digests.transaction;
4264 assert!(
4265 indexes
4266 .get_transaction_seq(&tx_digest)
4267 .expect("Failed to read transactions_seq")
4268 .is_some(),
4269 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4270 );
4271 }
4272 }
4273 info!("All checkpointed transactions verified in transactions_seq");
4274 }
4275 }
4276
4277 fn expensive_check_is_consistent_state(
4278 &self,
4279 state_hasher: Arc<GlobalStateHasher>,
4280 cur_epoch_store: &AuthorityPerEpochStore,
4281 ) {
4282 let live_object_set_hash = state_hasher.digest_live_object_set(
4283 !cur_epoch_store
4284 .protocol_config()
4285 .simplified_unwrap_then_delete(),
4286 );
4287
4288 let root_state_hash: ECMHLiveObjectSetDigest = self
4289 .get_global_state_hash_store()
4290 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4291 .expect("Retrieving root state hash cannot fail")
4292 .expect("Root state hash for epoch must exist")
4293 .1
4294 .digest()
4295 .into();
4296
4297 let is_inconsistent = root_state_hash != live_object_set_hash;
4298 if is_inconsistent {
4299 debug_fatal!(
4300 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4301 root_state_hash,
4302 live_object_set_hash
4303 );
4304 } else {
4305 info!("State consistency check passed");
4306 }
4307
4308 state_hasher.set_inconsistent_state(is_inconsistent);
4309 }
4310
4311 pub fn current_epoch_for_testing(&self) -> EpochId {
4312 self.epoch_store_for_testing().epoch()
4313 }
4314
4315 #[instrument(level = "error", skip_all)]
4316 pub fn checkpoint_all_dbs(
4317 &self,
4318 checkpoint_path: &Path,
4319 cur_epoch_store: &AuthorityPerEpochStore,
4320 checkpoint_indexes: bool,
4321 ) -> SuiResult {
4322 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4323 let current_epoch = cur_epoch_store.epoch();
4324
4325 if checkpoint_path.exists() {
4326 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4327 return Ok(());
4328 }
4329
4330 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4331 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4332
4333 if checkpoint_path_tmp.exists() {
4334 fs::remove_dir_all(&checkpoint_path_tmp)
4335 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4336 }
4337
4338 fs::create_dir_all(&checkpoint_path_tmp)
4339 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4340 fs::create_dir(&store_checkpoint_path_tmp)
4341 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4342
4343 self.checkpoint_store
4346 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4347
4348 self.get_reconfig_api()
4349 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4350
4351 self.committee_store
4352 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4353
4354 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4355 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4356 }
4357
4358 fs::rename(checkpoint_path_tmp, checkpoint_path)
4359 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4360 Ok(())
4361 }
4362
4363 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4369 self.epoch_store.load()
4370 }
4371
4372 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4374 self.load_epoch_store_one_call_per_task()
4375 }
4376
4377 pub fn clone_committee_for_testing(&self) -> Committee {
4378 Committee::clone(self.epoch_store_for_testing().committee())
4379 }
4380
4381 #[instrument(level = "trace", skip_all)]
4382 pub fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4383 self.get_object_store().get_object(object_id)
4384 }
4385
4386 pub fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4387 Ok(self
4388 .get_object(&SUI_SYSTEM_ADDRESS.into())
4389 .expect("framework object should always exist")
4390 .compute_object_reference())
4391 }
4392
4393 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4395 self.get_object_cache_reader()
4396 .get_sui_system_state_object_unsafe()
4397 }
4398
4399 #[instrument(level = "trace", skip_all)]
4400 fn get_transaction_checkpoint_sequence(
4401 &self,
4402 digest: &TransactionDigest,
4403 epoch_store: &AuthorityPerEpochStore,
4404 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4405 epoch_store.get_transaction_checkpoint(digest)
4406 }
4407
4408 #[instrument(level = "trace", skip_all)]
4409 pub fn get_checkpoint_by_sequence_number(
4410 &self,
4411 sequence_number: CheckpointSequenceNumber,
4412 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4413 Ok(self
4414 .checkpoint_store
4415 .get_checkpoint_by_sequence_number(sequence_number)?)
4416 }
4417
4418 #[instrument(level = "trace", skip_all)]
4419 pub fn get_transaction_checkpoint_for_tests(
4420 &self,
4421 digest: &TransactionDigest,
4422 epoch_store: &AuthorityPerEpochStore,
4423 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4424 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4425 let Some(checkpoint) = checkpoint else {
4426 return Ok(None);
4427 };
4428 let checkpoint = self
4429 .checkpoint_store
4430 .get_checkpoint_by_sequence_number(checkpoint)?;
4431 Ok(checkpoint)
4432 }
4433
4434 #[instrument(level = "trace", skip_all)]
4435 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4436 Ok(
4437 match self
4438 .get_object_cache_reader()
4439 .get_latest_object_or_tombstone(*object_id)
4440 {
4441 Some((_, ObjectOrTombstone::Object(object))) => {
4442 let layout = self.get_object_layout(&object)?;
4443 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4444 }
4445 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4446 None => ObjectRead::NotExists(*object_id),
4447 },
4448 )
4449 }
4450
4451 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4453 self.chain_identifier
4454 }
4455
4456 #[instrument(level = "trace", skip_all)]
4462 pub fn get_past_object_read(
4463 &self,
4464 object_id: &ObjectID,
4465 version: SequenceNumber,
4466 ) -> SuiResult<PastObjectRead> {
4467 let Some(obj_ref) = self
4469 .get_object_cache_reader()
4470 .get_latest_object_ref_or_tombstone(*object_id)
4471 else {
4472 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4473 };
4474
4475 if version > obj_ref.1 {
4476 return Ok(PastObjectRead::VersionTooHigh {
4477 object_id: *object_id,
4478 asked_version: version,
4479 latest_version: obj_ref.1,
4480 });
4481 }
4482
4483 if version < obj_ref.1 {
4484 return Ok(match self.read_object_at_version(object_id, version)? {
4486 Some((object, layout)) => {
4487 let obj_ref = object.compute_object_reference();
4488 PastObjectRead::VersionFound(obj_ref, object, layout)
4489 }
4490
4491 None => PastObjectRead::VersionNotFound(*object_id, version),
4492 });
4493 }
4494
4495 if !obj_ref.2.is_alive() {
4496 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4497 }
4498
4499 match self.read_object_at_version(object_id, obj_ref.1)? {
4500 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4501 None => {
4502 debug_fatal!(
4503 "Object with in parent_entry is missing from object store, datastore is \
4504 inconsistent"
4505 );
4506 Err(UserInputError::ObjectNotFound {
4507 object_id: *object_id,
4508 version: Some(obj_ref.1),
4509 }
4510 .into())
4511 }
4512 }
4513 }
4514
4515 #[instrument(level = "trace", skip_all)]
4516 fn read_object_at_version(
4517 &self,
4518 object_id: &ObjectID,
4519 version: SequenceNumber,
4520 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4521 let Some(object) = self
4522 .get_object_cache_reader()
4523 .get_object_by_key(object_id, version)
4524 else {
4525 return Ok(None);
4526 };
4527
4528 let layout = self.get_object_layout(&object)?;
4529 Ok(Some((object, layout)))
4530 }
4531
4532 pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4533 let layout = object
4534 .data
4535 .try_as_move()
4536 .map(|object| {
4537 let epoch_store = self.load_epoch_store_one_call_per_task();
4538 into_struct_layout(
4539 epoch_store
4540 .executor()
4541 .type_layout_resolver(
4543 epoch_store.protocol_config(),
4544 Box::new(self.get_backing_package_store().as_ref()),
4545 )
4546 .get_annotated_layout(&object.type_().clone().into())?,
4547 )
4548 })
4549 .transpose()?;
4550 Ok(layout)
4551 }
4552
4553 #[instrument(level = "trace", skip_all)]
4557 pub fn get_address_balance_coin_info(
4558 &self,
4559 owner: SuiAddress,
4560 balance_type: TypeTag,
4561 ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4562 let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4563 let accumulator_obj = AccumulatorValue::load_object_by_id(
4564 self.get_child_object_resolver().as_ref(),
4565 None,
4566 *accumulator_id.inner(),
4567 )?;
4568
4569 let Some(accumulator_obj) = accumulator_obj else {
4570 return Ok(None);
4571 };
4572
4573 let currency_type =
4576 Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4577
4578 let balance = crate::accumulators::balances::get_balance(
4579 owner,
4580 self.get_child_object_resolver().as_ref(),
4581 currency_type,
4582 )?;
4583
4584 if balance == 0 {
4585 return Ok(None);
4586 };
4587
4588 let object_ref = coin_reservation::encode_object_ref(
4589 accumulator_obj.id(),
4590 accumulator_obj.version(),
4591 self.load_epoch_store_one_call_per_task().epoch(),
4592 balance,
4593 self.get_chain_identifier(),
4594 );
4595
4596 Ok(Some((
4597 object_ref,
4598 balance,
4599 accumulator_obj.previous_transaction,
4600 )))
4601 }
4602
4603 #[instrument(level = "trace", skip_all)]
4606 pub fn get_all_address_balance_coin_infos(
4607 &self,
4608 owner: SuiAddress,
4609 ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4610 let indexes = self
4611 .indexes
4612 .as_ref()
4613 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4614
4615 let mut result = std::collections::HashMap::new();
4616 for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4617 let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4618 if let Some((obj_ref, balance, prev_tx)) =
4619 self.get_address_balance_coin_info(owner, balance_type)?
4620 {
4621 result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4624 }
4625 }
4626 Ok(result)
4627 }
4628
4629 fn get_owner_at_version(
4630 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4631 object_id: &ObjectID,
4632 version: SequenceNumber,
4633 ) -> SuiResult<Owner> {
4634 object_store
4635 .get_object_by_key(object_id, version)
4636 .ok_or_else(|| {
4637 SuiError::from(UserInputError::ObjectNotFound {
4638 object_id: *object_id,
4639 version: Some(version),
4640 })
4641 })
4642 .map(|o| o.owner.clone())
4643 }
4644
4645 #[instrument(level = "trace", skip_all)]
4646 pub fn get_owner_objects(
4647 &self,
4648 owner: SuiAddress,
4649 cursor: Option<ObjectID>,
4651 limit: usize,
4652 filter: Option<SuiObjectDataFilter>,
4653 ) -> SuiResult<Vec<ObjectInfo>> {
4654 if let Some(indexes) = &self.indexes {
4655 indexes.get_owner_objects(owner, cursor, limit, filter)
4656 } else {
4657 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4658 }
4659 }
4660
4661 #[instrument(level = "trace", skip_all)]
4662 pub fn get_owned_coins_iterator_with_cursor(
4663 &self,
4664 owner: SuiAddress,
4665 cursor: (String, u64, ObjectID),
4667 limit: usize,
4668 one_coin_type_only: bool,
4669 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4670 if let Some(indexes) = &self.indexes {
4671 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4672 } else {
4673 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4674 }
4675 }
4676
4677 #[instrument(level = "trace", skip_all)]
4678 pub fn get_owner_objects_iterator(
4679 &self,
4680 owner: SuiAddress,
4681 cursor: Option<ObjectID>,
4683 filter: Option<SuiObjectDataFilter>,
4684 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4685 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4686 if let Some(indexes) = &self.indexes {
4687 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4688 } else {
4689 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4690 }
4691 }
4692
4693 #[instrument(level = "trace", skip_all)]
4694 pub fn get_move_objects<T>(&self, owner: SuiAddress, type_: MoveObjectType) -> SuiResult<Vec<T>>
4695 where
4696 T: DeserializeOwned,
4697 {
4698 let object_ids = self
4699 .get_owner_objects_iterator(owner, None, None)?
4700 .filter(|o| match &o.type_ {
4701 ObjectType::Struct(s) => &type_ == s,
4702 ObjectType::Package => false,
4703 })
4704 .map(|info| ObjectKey(info.object_id, info.version))
4705 .collect::<Vec<_>>();
4706 let mut move_objects = vec![];
4707
4708 let objects = self
4709 .get_object_store()
4710 .multi_get_objects_by_key(&object_ids);
4711
4712 for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4713 let object = o.ok_or_else(|| {
4714 SuiError::from(UserInputError::ObjectNotFound {
4715 object_id: id.0,
4716 version: Some(id.1),
4717 })
4718 })?;
4719 let move_object = object.data.try_as_move().ok_or_else(|| {
4720 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4721 })?;
4722 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4723 SuiErrorKind::ObjectDeserializationError {
4724 error: format!("{e}"),
4725 }
4726 })?);
4727 }
4728 Ok(move_objects)
4729 }
4730
4731 #[instrument(level = "trace", skip_all)]
4732 pub fn get_dynamic_fields(
4733 &self,
4734 owner: ObjectID,
4735 cursor: Option<ObjectID>,
4737 limit: usize,
4738 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4739 Ok(self
4740 .get_dynamic_fields_iterator(owner, cursor)?
4741 .take(limit)
4742 .collect::<Result<Vec<_>, _>>()?)
4743 }
4744
4745 fn get_dynamic_fields_iterator(
4746 &self,
4747 owner: ObjectID,
4748 cursor: Option<ObjectID>,
4750 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4751 {
4752 if let Some(indexes) = &self.indexes {
4753 indexes.get_dynamic_fields_iterator(owner, cursor)
4754 } else {
4755 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4756 }
4757 }
4758
4759 #[instrument(level = "trace", skip_all)]
4760 pub fn get_dynamic_field_object_id(
4761 &self,
4762 owner: ObjectID,
4763 name_type: TypeTag,
4764 name_bcs_bytes: &[u8],
4765 ) -> SuiResult<Option<ObjectID>> {
4766 if let Some(indexes) = &self.indexes {
4767 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4768 } else {
4769 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4770 }
4771 }
4772
4773 #[instrument(level = "trace", skip_all)]
4774 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4775 Ok(self.get_indexes()?.next_sequence_number())
4776 }
4777
4778 #[instrument(level = "trace", skip_all)]
4779 pub async fn get_executed_transaction_and_effects(
4780 &self,
4781 digest: TransactionDigest,
4782 kv_store: Arc<TransactionKeyValueStore>,
4783 ) -> SuiResult<(Transaction, TransactionEffects)> {
4784 let transaction = kv_store.get_tx(digest).await?;
4785 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4786 Ok((transaction, effects))
4787 }
4788
4789 #[instrument(level = "trace", skip_all)]
4790 pub fn multi_get_checkpoint_by_sequence_number(
4791 &self,
4792 sequence_numbers: &[CheckpointSequenceNumber],
4793 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4794 Ok(self
4795 .checkpoint_store
4796 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4797 }
4798
4799 #[instrument(level = "trace", skip_all)]
4800 pub fn get_transaction_events(
4801 &self,
4802 digest: &TransactionDigest,
4803 ) -> SuiResult<TransactionEvents> {
4804 self.get_transaction_cache_reader()
4805 .get_events(digest)
4806 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4807 }
4808
4809 pub fn get_transaction_input_objects(
4810 &self,
4811 effects: &TransactionEffects,
4812 ) -> SuiResult<Vec<Object>> {
4813 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4814 .map_err(Into::into)
4815 }
4816
4817 pub fn get_transaction_output_objects(
4818 &self,
4819 effects: &TransactionEffects,
4820 ) -> SuiResult<Vec<Object>> {
4821 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4822 .map_err(Into::into)
4823 }
4824
4825 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4826 match &self.indexes {
4827 Some(i) => Ok(i.clone()),
4828 None => Err(SuiErrorKind::UnsupportedFeatureError {
4829 error: "extended object indexing is not enabled on this server".into(),
4830 }
4831 .into()),
4832 }
4833 }
4834
4835 pub async fn get_transactions_for_tests(
4836 self: &Arc<Self>,
4837 filter: Option<TransactionFilter>,
4838 cursor: Option<TransactionDigest>,
4839 limit: Option<usize>,
4840 reverse: bool,
4841 ) -> SuiResult<Vec<TransactionDigest>> {
4842 let metrics = KeyValueStoreMetrics::new_for_tests();
4843 let kv_store = Arc::new(TransactionKeyValueStore::new(
4844 "rocksdb",
4845 metrics,
4846 self.clone(),
4847 ));
4848 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4849 .await
4850 }
4851
4852 #[instrument(level = "trace", skip_all)]
4853 pub async fn get_transactions(
4854 &self,
4855 kv_store: &Arc<TransactionKeyValueStore>,
4856 filter: Option<TransactionFilter>,
4857 cursor: Option<TransactionDigest>,
4859 limit: Option<usize>,
4860 reverse: bool,
4861 ) -> SuiResult<Vec<TransactionDigest>> {
4862 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4863 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4864 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4865 if reverse {
4866 let iter = iter
4867 .rev()
4868 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4869 .skip(usize::from(cursor.is_some()));
4870 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4871 } else {
4872 let iter = iter
4873 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4874 .skip(usize::from(cursor.is_some()));
4875 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4876 }
4877 }
4878 self.get_indexes()?
4879 .get_transactions(filter, cursor, limit, reverse)
4880 }
4881
4882 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4883 &self.checkpoint_store
4884 }
4885
4886 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4887 self.get_checkpoint_store()
4888 .get_highest_executed_checkpoint_seq_number()?
4889 .ok_or(
4890 SuiErrorKind::UserInputError {
4891 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4892 }
4893 .into(),
4894 )
4895 }
4896
4897 #[cfg(msim)]
4898 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4899 self.database_for_testing()
4900 .perpetual_tables
4901 .get_highest_pruned_checkpoint()
4902 .map(|c| c.unwrap_or(0))
4903 .map_err(Into::into)
4904 }
4905
4906 #[instrument(level = "trace", skip_all)]
4907 pub fn get_checkpoint_summary_by_sequence_number(
4908 &self,
4909 sequence_number: CheckpointSequenceNumber,
4910 ) -> SuiResult<CheckpointSummary> {
4911 let verified_checkpoint = self
4912 .get_checkpoint_store()
4913 .get_checkpoint_by_sequence_number(sequence_number)?;
4914 match verified_checkpoint {
4915 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4916 None => Err(SuiErrorKind::UserInputError {
4917 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4918 }
4919 .into()),
4920 }
4921 }
4922
4923 #[instrument(level = "trace", skip_all)]
4924 pub fn get_checkpoint_summary_by_digest(
4925 &self,
4926 digest: CheckpointDigest,
4927 ) -> SuiResult<CheckpointSummary> {
4928 let verified_checkpoint = self
4929 .get_checkpoint_store()
4930 .get_checkpoint_by_digest(&digest)?;
4931 match verified_checkpoint {
4932 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4933 None => Err(SuiErrorKind::UserInputError {
4934 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4935 }
4936 .into()),
4937 }
4938 }
4939
4940 #[instrument(level = "trace", skip_all)]
4941 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4942 if is_system_package(package_id) {
4943 return self.find_genesis_txn_digest();
4944 }
4945 Ok(self
4946 .get_object_read(&package_id)?
4947 .into_object()?
4948 .previous_transaction)
4949 }
4950
4951 #[instrument(level = "trace", skip_all)]
4952 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4953 let summary = self
4954 .get_verified_checkpoint_by_sequence_number(0)?
4955 .into_message();
4956 let content = self.get_checkpoint_contents(summary.content_digest)?;
4957 let genesis_transaction = content.enumerate_transactions(&summary).next();
4958 Ok(genesis_transaction
4959 .ok_or(SuiErrorKind::UserInputError {
4960 error: UserInputError::GenesisTransactionNotFound,
4961 })?
4962 .1
4963 .transaction)
4964 }
4965
4966 #[instrument(level = "trace", skip_all)]
4967 pub fn get_verified_checkpoint_by_sequence_number(
4968 &self,
4969 sequence_number: CheckpointSequenceNumber,
4970 ) -> SuiResult<VerifiedCheckpoint> {
4971 let verified_checkpoint = self
4972 .get_checkpoint_store()
4973 .get_checkpoint_by_sequence_number(sequence_number)?;
4974 match verified_checkpoint {
4975 Some(verified_checkpoint) => Ok(verified_checkpoint),
4976 None => Err(SuiErrorKind::UserInputError {
4977 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4978 }
4979 .into()),
4980 }
4981 }
4982
4983 #[instrument(level = "trace", skip_all)]
4984 pub fn get_verified_checkpoint_summary_by_digest(
4985 &self,
4986 digest: CheckpointDigest,
4987 ) -> SuiResult<VerifiedCheckpoint> {
4988 let verified_checkpoint = self
4989 .get_checkpoint_store()
4990 .get_checkpoint_by_digest(&digest)?;
4991 match verified_checkpoint {
4992 Some(verified_checkpoint) => Ok(verified_checkpoint),
4993 None => Err(SuiErrorKind::UserInputError {
4994 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4995 }
4996 .into()),
4997 }
4998 }
4999
5000 #[instrument(level = "trace", skip_all)]
5001 pub fn get_checkpoint_contents(
5002 &self,
5003 digest: CheckpointContentsDigest,
5004 ) -> SuiResult<CheckpointContents> {
5005 self.get_checkpoint_store()
5006 .get_checkpoint_contents(&digest)?
5007 .ok_or(
5008 SuiErrorKind::UserInputError {
5009 error: UserInputError::CheckpointContentsNotFound(digest),
5010 }
5011 .into(),
5012 )
5013 }
5014
5015 #[instrument(level = "trace", skip_all)]
5016 pub fn get_checkpoint_contents_by_sequence_number(
5017 &self,
5018 sequence_number: CheckpointSequenceNumber,
5019 ) -> SuiResult<CheckpointContents> {
5020 let verified_checkpoint = self
5021 .get_checkpoint_store()
5022 .get_checkpoint_by_sequence_number(sequence_number)?;
5023 match verified_checkpoint {
5024 Some(verified_checkpoint) => {
5025 let content_digest = verified_checkpoint.into_inner().content_digest;
5026 self.get_checkpoint_contents(content_digest)
5027 }
5028 None => Err(SuiErrorKind::UserInputError {
5029 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5030 }
5031 .into()),
5032 }
5033 }
5034
5035 #[instrument(level = "trace", skip_all)]
5036 pub async fn query_events(
5037 &self,
5038 kv_store: &Arc<TransactionKeyValueStore>,
5039 query: EventFilter,
5040 cursor: Option<EventID>,
5042 limit: usize,
5043 descending: bool,
5044 ) -> SuiResult<Vec<SuiEvent>> {
5045 let index_store = self.get_indexes()?;
5046
5047 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5049 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5050 SuiErrorKind::TransactionNotFound {
5051 digest: cursor.tx_digest,
5052 },
5053 )?;
5054 (tx_seq, cursor.event_seq as usize)
5055 } else if descending {
5056 (u64::MAX, usize::MAX)
5057 } else {
5058 (0, 0)
5059 };
5060
5061 let limit = limit + 1;
5062 let mut event_keys = match query {
5063 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5064 EventFilter::Transaction(digest) => {
5065 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5066 }
5067 EventFilter::MoveModule { package, module } => {
5068 let module_id = ModuleId::new(package.into(), module);
5069 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5070 }
5071 EventFilter::MoveEventType(struct_name) => index_store
5072 .events_by_move_event_struct_name(
5073 &struct_name,
5074 tx_num,
5075 event_num,
5076 limit,
5077 descending,
5078 )?,
5079 EventFilter::Sender(sender) => {
5080 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5081 }
5082 EventFilter::TimeRange {
5083 start_time,
5084 end_time,
5085 } => index_store
5086 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5087 EventFilter::MoveEventModule { package, module } => index_store
5088 .events_by_move_event_module(
5089 &ModuleId::new(package.into(), module),
5090 tx_num,
5091 event_num,
5092 limit,
5093 descending,
5094 )?,
5095 EventFilter::Any(_) => {
5097 return Err(SuiErrorKind::UserInputError {
5098 error: UserInputError::Unsupported(
5099 "'Any' queries are not supported by the fullnode.".to_string(),
5100 ),
5101 }
5102 .into());
5103 }
5104 };
5105
5106 if cursor.is_some() {
5109 if !event_keys.is_empty() {
5110 event_keys.remove(0);
5111 }
5112 } else {
5113 event_keys.truncate(limit - 1);
5114 }
5115
5116 let transaction_digests = event_keys
5118 .iter()
5119 .map(|(_, digest, _, _)| *digest)
5120 .collect::<HashSet<_>>()
5121 .into_iter()
5122 .collect::<Vec<_>>();
5123
5124 let events = kv_store
5125 .multi_get_events_by_tx_digests(&transaction_digests)
5126 .await?;
5127
5128 let events_map: HashMap<_, _> = transaction_digests
5129 .iter()
5130 .zip_debug_eq(events.into_iter())
5131 .collect();
5132
5133 let stored_events = event_keys
5134 .into_iter()
5135 .map(|k| {
5136 (
5137 k,
5138 events_map
5139 .get(&k.1)
5140 .expect("fetched digest is missing")
5141 .clone()
5142 .and_then(|e| e.data.get(k.2).cloned()),
5143 )
5144 })
5145 .map(
5146 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5147 event
5148 .map(|e| (e, tx_digest, event_seq, timestamp))
5149 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5150 },
5151 )
5152 .collect::<Result<Vec<_>, _>>()?;
5153
5154 let epoch_store = self.load_epoch_store_one_call_per_task();
5155 let backing_store = self.get_backing_package_store().as_ref();
5156 let mut layout_resolver = epoch_store
5157 .executor()
5158 .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5159 let mut events = vec![];
5160 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5161 events.push(SuiEvent::try_from(
5162 e.clone(),
5163 tx_digest,
5164 event_seq as u64,
5165 Some(timestamp),
5166 layout_resolver.get_annotated_layout(&e.type_)?,
5167 )?)
5168 }
5169 Ok(events)
5170 }
5171
5172 pub fn insert_genesis_object(&self, object: Object) {
5173 self.get_reconfig_api().insert_genesis_object(object);
5174 }
5175
5176 pub fn insert_genesis_objects(&self, objects: &[Object]) {
5177 for o in objects {
5178 self.insert_genesis_object(o.clone());
5179 }
5180 }
5181
5182 #[instrument(level = "trace", skip_all)]
5184 pub fn get_transaction_status(
5185 &self,
5186 transaction_digest: &TransactionDigest,
5187 epoch_store: &Arc<AuthorityPerEpochStore>,
5188 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5189 if let Some(effects) =
5191 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5192 {
5193 if let Some(transaction) = self
5194 .get_transaction_cache_reader()
5195 .get_transaction_block(transaction_digest)
5196 {
5197 let events = if effects.events_digest().is_some() {
5198 self.get_transaction_events(effects.transaction_digest())?
5199 } else {
5200 TransactionEvents::default()
5201 };
5202 return Ok(Some((
5205 (*transaction).clone().into_message(),
5206 TransactionStatus::Executed(None, effects.into_inner(), events),
5207 )));
5208 } else {
5209 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5213 }
5214 }
5215 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5216 self.metrics.tx_already_processed.inc();
5217 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5218 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5219 } else {
5220 Ok(None)
5221 }
5222 }
5223
5224 #[instrument(level = "trace", skip_all)]
5228 pub fn get_signed_effects_and_maybe_resign(
5229 &self,
5230 transaction_digest: &TransactionDigest,
5231 epoch_store: &Arc<AuthorityPerEpochStore>,
5232 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5233 let effects = self
5234 .get_transaction_cache_reader()
5235 .get_executed_effects(transaction_digest);
5236 match effects {
5237 Some(effects) => {
5238 if effects.executed_epoch() != epoch_store.epoch() {
5258 debug!(
5259 tx_digest=?transaction_digest,
5260 effects_epoch=?effects.executed_epoch(),
5261 epoch=?epoch_store.epoch(),
5262 "Re-signing the effects with the current epoch"
5263 );
5264 }
5265 Ok(Some(self.sign_effects(effects, epoch_store)?))
5266 }
5267 None => Ok(None),
5268 }
5269 }
5270
5271 #[instrument(level = "trace", skip_all)]
5272 pub(crate) fn sign_effects(
5273 &self,
5274 effects: TransactionEffects,
5275 epoch_store: &Arc<AuthorityPerEpochStore>,
5276 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5277 let tx_digest = *effects.transaction_digest();
5278 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5279 Some(sig) => {
5280 debug_assert!(sig.epoch == epoch_store.epoch());
5281 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5282 }
5283 _ => {
5284 let sig = AuthoritySignInfo::new(
5285 epoch_store.epoch(),
5286 &effects,
5287 Intent::sui_app(IntentScope::TransactionEffects),
5288 self.name,
5289 &*self.secret,
5290 );
5291
5292 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5293
5294 epoch_store.insert_effects_digest_and_signature(
5295 &tx_digest,
5296 effects.digest(),
5297 &sig,
5298 )?;
5299
5300 effects
5301 }
5302 };
5303
5304 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5305 signed_effects,
5306 ))
5307 }
5308
5309 #[instrument(level = "trace", skip_all)]
5311 fn fullnode_only_get_tx_coins_for_indexing(
5312 name: AuthorityName,
5313 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5314 effects: &TransactionEffects,
5315 inner_temporary_store: &InnerTemporaryStore,
5316 epoch_store: &Arc<AuthorityPerEpochStore>,
5317 ) -> Option<TxCoins> {
5318 if epoch_store.committee().authority_exists(&name) {
5319 return None;
5320 }
5321 let written_coin_objects = inner_temporary_store
5322 .written
5323 .iter()
5324 .filter_map(|(k, v)| {
5325 if v.is_coin() {
5326 Some((*k, v.clone()))
5327 } else {
5328 None
5329 }
5330 })
5331 .collect();
5332 let mut input_coin_objects = inner_temporary_store
5333 .input_objects
5334 .iter()
5335 .filter_map(|(k, v)| {
5336 if v.is_coin() {
5337 Some((*k, v.clone()))
5338 } else {
5339 None
5340 }
5341 })
5342 .collect::<ObjectMap>();
5343
5344 for (object_id, version) in effects.modified_at_versions() {
5348 if inner_temporary_store
5349 .loaded_runtime_objects
5350 .contains_key(&object_id)
5351 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5352 && object.is_coin()
5353 {
5354 input_coin_objects.insert(object_id, object);
5355 }
5356 }
5357
5358 Some((input_coin_objects, written_coin_objects))
5359 }
5360
5361 #[instrument(level = "trace", skip_all)]
5369 pub fn get_transaction_lock(
5370 &self,
5371 object_ref: &ObjectRef,
5372 epoch_store: &AuthorityPerEpochStore,
5373 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5374 let lock_info = self
5375 .get_object_cache_reader()
5376 .get_lock(*object_ref, epoch_store)?;
5377 let lock_info = match lock_info {
5378 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5379 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5380 provided_obj_ref: *object_ref,
5381 current_version: locked_ref.1,
5382 }
5383 .into());
5384 }
5385 ObjectLockStatus::Initialized => {
5386 return Ok(None);
5387 }
5388 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5389 };
5390
5391 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5392 }
5393
5394 pub fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5395 self.get_object_cache_reader().get_objects(objects)
5396 }
5397
5398 pub fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5399 self.get_object_cache_reader()
5400 .get_latest_object_ref_or_tombstone(object_id)
5401 }
5402
5403 pub fn set_override_protocol_upgrade_buffer_stake(
5412 &self,
5413 expected_epoch: EpochId,
5414 buffer_stake_bps: u64,
5415 ) -> SuiResult {
5416 let epoch_store = self.load_epoch_store_one_call_per_task();
5417 let actual_epoch = epoch_store.epoch();
5418 if actual_epoch != expected_epoch {
5419 return Err(SuiErrorKind::WrongEpoch {
5420 expected_epoch,
5421 actual_epoch,
5422 }
5423 .into());
5424 }
5425
5426 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5427 }
5428
5429 pub fn clear_override_protocol_upgrade_buffer_stake(
5430 &self,
5431 expected_epoch: EpochId,
5432 ) -> SuiResult {
5433 let epoch_store = self.load_epoch_store_one_call_per_task();
5434 let actual_epoch = epoch_store.epoch();
5435 if actual_epoch != expected_epoch {
5436 return Err(SuiErrorKind::WrongEpoch {
5437 expected_epoch,
5438 actual_epoch,
5439 }
5440 .into());
5441 }
5442
5443 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5444 }
5445
5446 pub async fn get_available_system_packages(
5449 &self,
5450 binary_config: &BinaryConfig,
5451 ) -> Vec<ObjectRef> {
5452 let mut results = vec![];
5453
5454 let system_packages = BuiltInFramework::iter_system_packages();
5455
5456 #[cfg(msim)]
5458 let extra_packages = framework_injection::get_extra_packages(self.name);
5459 #[cfg(msim)]
5460 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5461
5462 for system_package in system_packages {
5463 let modules = system_package.modules().to_vec();
5464 #[cfg(msim)]
5466 let modules =
5467 match framework_injection::get_override_modules(&system_package.id, self.name) {
5468 Some(overrides) if overrides.is_empty() => continue,
5469 Some(overrides) => overrides,
5470 None => modules,
5471 };
5472
5473 let Some(obj_ref) = sui_framework::compare_system_package(
5474 &self.get_object_store(),
5475 &system_package.id,
5476 &modules,
5477 system_package.dependencies.to_vec(),
5478 binary_config,
5479 )
5480 .await
5481 else {
5482 return vec![];
5483 };
5484 results.push(obj_ref);
5485 }
5486
5487 results
5488 }
5489
5490 async fn get_system_package_bytes(
5504 &self,
5505 system_packages: Vec<ObjectRef>,
5506 binary_config: &BinaryConfig,
5507 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5508 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5509 let objects = self.get_objects(&ids);
5510
5511 let mut res = Vec::with_capacity(system_packages.len());
5512 for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5513 {
5514 let prev_transaction = match object {
5515 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5516 info!("Framework {} does not need updating", system_package_ref.0);
5518 continue;
5519 }
5520
5521 Some(cur_object) => cur_object.previous_transaction,
5522 None => TransactionDigest::genesis_marker(),
5523 };
5524
5525 #[cfg(msim)]
5526 let SystemPackage {
5527 id: _,
5528 bytes,
5529 dependencies,
5530 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5531 .unwrap_or_else(|| {
5532 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5533 });
5534
5535 #[cfg(not(msim))]
5536 let SystemPackage {
5537 id: _,
5538 bytes,
5539 dependencies,
5540 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5541
5542 let modules: Vec<_> = bytes
5543 .iter()
5544 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5545 .collect();
5546
5547 let new_object = Object::new_system_package(
5548 &modules,
5549 system_package_ref.1,
5550 dependencies.clone(),
5551 prev_transaction,
5552 );
5553
5554 let new_ref = new_object.compute_object_reference();
5555 if new_ref != system_package_ref {
5556 if cfg!(msim) {
5557 debug_fatal!(
5559 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5560 );
5561 } else {
5562 error!(
5563 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5564 );
5565 }
5566 return None;
5567 }
5568
5569 res.push((system_package_ref.1, bytes, dependencies));
5570 }
5571
5572 Some(res)
5573 }
5574
5575 fn is_protocol_version_supported_v2(
5576 current_protocol_version: ProtocolVersion,
5577 proposed_protocol_version: ProtocolVersion,
5578 protocol_config: &ProtocolConfig,
5579 committee: &Committee,
5580 capabilities: Vec<AuthorityCapabilitiesV2>,
5581 mut buffer_stake_bps: u64,
5582 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5583 if proposed_protocol_version > current_protocol_version + 1
5584 && !protocol_config.advance_to_highest_supported_protocol_version()
5585 {
5586 return None;
5587 }
5588
5589 if buffer_stake_bps > 10000 {
5590 warn!("clamping buffer_stake_bps to 10000");
5591 buffer_stake_bps = 10000;
5592 }
5593
5594 let mut desired_upgrades: Vec<_> = capabilities
5597 .into_iter()
5598 .filter_map(|mut cap| {
5599 if cap.available_system_packages.is_empty() {
5601 return None;
5602 }
5603
5604 cap.available_system_packages.sort();
5605
5606 info!(
5607 "validator {:?} supports {:?} with system packages: {:?}",
5608 cap.authority.concise(),
5609 cap.supported_protocol_versions,
5610 cap.available_system_packages,
5611 );
5612
5613 cap.supported_protocol_versions
5617 .get_version_digest(proposed_protocol_version)
5618 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5619 })
5620 .collect();
5621
5622 desired_upgrades.sort();
5624 desired_upgrades
5625 .into_iter()
5626 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5627 .into_iter()
5628 .find_map(|((digest, packages), group)| {
5629 assert!(!packages.is_empty());
5631
5632 let mut stake_aggregator: StakeAggregator<(), true> =
5633 StakeAggregator::new(Arc::new(committee.clone()));
5634
5635 for (_, _, authority) in group {
5636 stake_aggregator.insert_generic(authority, ());
5637 }
5638
5639 let total_votes = stake_aggregator.total_votes();
5640 let quorum_threshold = committee.quorum_threshold();
5641 let f = committee.total_votes() - committee.quorum_threshold();
5642
5643 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5645 let effective_threshold = quorum_threshold + buffer_stake;
5646
5647 info!(
5648 protocol_config_digest = ?digest,
5649 ?total_votes,
5650 ?quorum_threshold,
5651 ?buffer_stake_bps,
5652 ?effective_threshold,
5653 ?proposed_protocol_version,
5654 ?packages,
5655 "support for upgrade"
5656 );
5657
5658 let has_support = total_votes >= effective_threshold;
5659 has_support.then_some((proposed_protocol_version, packages))
5660 })
5661 }
5662
5663 fn choose_protocol_version_and_system_packages_v2(
5664 current_protocol_version: ProtocolVersion,
5665 protocol_config: &ProtocolConfig,
5666 committee: &Committee,
5667 capabilities: Vec<AuthorityCapabilitiesV2>,
5668 buffer_stake_bps: u64,
5669 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5670 assert!(protocol_config.authority_capabilities_v2());
5671 let mut next_protocol_version = current_protocol_version;
5672 let mut system_packages = vec![];
5673
5674 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5675 current_protocol_version,
5676 next_protocol_version + 1,
5677 protocol_config,
5678 committee,
5679 capabilities.clone(),
5680 buffer_stake_bps,
5681 ) {
5682 next_protocol_version = version;
5683 system_packages = packages;
5684 }
5685
5686 (next_protocol_version, system_packages)
5687 }
5688
5689 #[instrument(level = "debug", skip_all)]
5690 fn create_authenticator_state_tx(
5691 &self,
5692 epoch_store: &Arc<AuthorityPerEpochStore>,
5693 ) -> Option<EndOfEpochTransactionKind> {
5694 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5695 info!("authenticator state transactions not enabled");
5696 return None;
5697 }
5698
5699 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5700 let tx = if authenticator_state_exists {
5701 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5702 let min_epoch =
5703 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5704 let authenticator_obj_initial_shared_version = epoch_store
5705 .epoch_start_config()
5706 .authenticator_obj_initial_shared_version()
5707 .expect("initial version must exist");
5708
5709 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5710 min_epoch,
5711 authenticator_obj_initial_shared_version,
5712 );
5713
5714 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5715
5716 tx
5717 } else {
5718 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5719 info!("Creating AuthenticatorStateCreate tx");
5720 tx
5721 };
5722 Some(tx)
5723 }
5724
5725 #[instrument(level = "debug", skip_all)]
5726 fn create_randomness_state_tx(
5727 &self,
5728 epoch_store: &Arc<AuthorityPerEpochStore>,
5729 ) -> Option<EndOfEpochTransactionKind> {
5730 if !epoch_store.protocol_config().random_beacon() {
5731 info!("randomness state transactions not enabled");
5732 return None;
5733 }
5734
5735 if epoch_store.randomness_state_exists() {
5736 return None;
5737 }
5738
5739 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5740 info!("Creating RandomnessStateCreate tx");
5741 Some(tx)
5742 }
5743
5744 #[instrument(level = "debug", skip_all)]
5745 fn create_accumulator_root_tx(
5746 &self,
5747 epoch_store: &Arc<AuthorityPerEpochStore>,
5748 ) -> Option<EndOfEpochTransactionKind> {
5749 if !epoch_store
5750 .protocol_config()
5751 .create_root_accumulator_object()
5752 {
5753 info!("accumulator root creation not enabled");
5754 return None;
5755 }
5756
5757 if epoch_store.accumulator_root_exists() {
5758 return None;
5759 }
5760
5761 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5762 info!("Creating AccumulatorRootCreate tx");
5763 Some(tx)
5764 }
5765
5766 #[instrument(level = "debug", skip_all)]
5767 fn create_write_accumulator_storage_cost_tx(
5768 &self,
5769 epoch_store: &Arc<AuthorityPerEpochStore>,
5770 ) -> Option<EndOfEpochTransactionKind> {
5771 if !epoch_store.accumulator_root_exists() {
5772 info!("accumulator root does not exist yet");
5773 return None;
5774 }
5775 if !epoch_store.protocol_config().enable_accumulators() {
5776 info!("accumulators not enabled");
5777 return None;
5778 }
5779
5780 let object_store = self.get_object_store();
5781 let object_count =
5782 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5783 Ok(Some(count)) => count,
5784 Ok(None) => return None,
5785 Err(e) => {
5786 fatal!("failed to read accumulator object count: {e}");
5787 }
5788 };
5789
5790 let storage_cost = object_count.saturating_mul(
5791 epoch_store
5792 .protocol_config()
5793 .accumulator_object_storage_cost(),
5794 );
5795
5796 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5797 info!(
5798 object_count,
5799 storage_cost, "Creating WriteAccumulatorStorageCost tx"
5800 );
5801 Some(tx)
5802 }
5803
5804 #[instrument(level = "debug", skip_all)]
5805 fn create_coin_registry_tx(
5806 &self,
5807 epoch_store: &Arc<AuthorityPerEpochStore>,
5808 ) -> Option<EndOfEpochTransactionKind> {
5809 if !epoch_store.protocol_config().enable_coin_registry() {
5810 info!("coin registry not enabled");
5811 return None;
5812 }
5813
5814 if epoch_store.coin_registry_exists() {
5815 return None;
5816 }
5817
5818 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5819 info!("Creating CoinRegistryCreate tx");
5820 Some(tx)
5821 }
5822
5823 #[instrument(level = "debug", skip_all)]
5824 fn create_display_registry_tx(
5825 &self,
5826 epoch_store: &Arc<AuthorityPerEpochStore>,
5827 ) -> Option<EndOfEpochTransactionKind> {
5828 if !epoch_store.protocol_config().enable_display_registry() {
5829 info!("display registry not enabled");
5830 return None;
5831 }
5832
5833 if epoch_store.display_registry_exists() {
5834 return None;
5835 }
5836
5837 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5838 info!("Creating DisplayRegistryCreate tx");
5839 Some(tx)
5840 }
5841
5842 #[instrument(level = "debug", skip_all)]
5843 fn create_bridge_tx(
5844 &self,
5845 epoch_store: &Arc<AuthorityPerEpochStore>,
5846 ) -> Option<EndOfEpochTransactionKind> {
5847 if !epoch_store.protocol_config().enable_bridge() {
5848 info!("bridge not enabled");
5849 return None;
5850 }
5851 if epoch_store.bridge_exists() {
5852 return None;
5853 }
5854 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5855 info!("Creating Bridge Create tx");
5856 Some(tx)
5857 }
5858
5859 #[instrument(level = "debug", skip_all)]
5860 fn init_bridge_committee_tx(
5861 &self,
5862 epoch_store: &Arc<AuthorityPerEpochStore>,
5863 ) -> Option<EndOfEpochTransactionKind> {
5864 if !epoch_store.protocol_config().enable_bridge() {
5865 info!("bridge not enabled");
5866 return None;
5867 }
5868 if !epoch_store
5869 .protocol_config()
5870 .should_try_to_finalize_bridge_committee()
5871 {
5872 info!("should not try to finalize bridge committee yet");
5873 return None;
5874 }
5875 if !epoch_store.bridge_exists() {
5877 return None;
5878 }
5879
5880 if epoch_store.bridge_committee_initiated() {
5881 return None;
5882 }
5883
5884 let bridge_initial_shared_version = epoch_store
5885 .epoch_start_config()
5886 .bridge_obj_initial_shared_version()
5887 .expect("initial version must exist");
5888 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5889 info!("Init Bridge committee tx");
5890 Some(tx)
5891 }
5892
5893 #[instrument(level = "debug", skip_all)]
5894 fn create_deny_list_state_tx(
5895 &self,
5896 epoch_store: &Arc<AuthorityPerEpochStore>,
5897 ) -> Option<EndOfEpochTransactionKind> {
5898 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5899 return None;
5900 }
5901
5902 if epoch_store.coin_deny_list_state_exists() {
5903 return None;
5904 }
5905
5906 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5907 info!("Creating DenyListStateCreate tx");
5908 Some(tx)
5909 }
5910
5911 #[instrument(level = "debug", skip_all)]
5912 fn create_address_alias_state_tx(
5913 &self,
5914 epoch_store: &Arc<AuthorityPerEpochStore>,
5915 ) -> Option<EndOfEpochTransactionKind> {
5916 if !epoch_store.protocol_config().address_aliases() {
5917 info!("address aliases not enabled");
5918 return None;
5919 }
5920
5921 if epoch_store.address_alias_state_exists() {
5922 return None;
5923 }
5924
5925 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5926 info!("Creating AddressAliasStateCreate tx");
5927 Some(tx)
5928 }
5929
5930 #[instrument(level = "debug", skip_all)]
5931 fn create_execution_time_observations_tx(
5932 &self,
5933 epoch_store: &Arc<AuthorityPerEpochStore>,
5934 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5935 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5936 ) -> Option<EndOfEpochTransactionKind> {
5937 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5938 .protocol_config()
5939 .per_object_congestion_control_mode()
5940 else {
5941 return None;
5942 };
5943
5944 let start_checkpoint = std::cmp::max(
5947 last_checkpoint_before_end_of_epoch
5948 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5949 epoch_store
5951 .epoch()
5952 .checked_sub(1)
5953 .map(|prev_epoch| {
5954 self.checkpoint_store
5955 .get_epoch_last_checkpoint_seq_number(prev_epoch)
5956 .expect("typed store must not fail")
5957 .expect(
5958 "sequence number of last checkpoint of preceding epoch must be saved",
5959 )
5960 + 1
5961 })
5962 .unwrap_or(1),
5963 );
5964 info!(
5965 "reading checkpoint range {:?}..={:?}",
5966 start_checkpoint, last_checkpoint_before_end_of_epoch
5967 );
5968 let sequence_numbers =
5969 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5970 let contents_digests: Vec<_> = self
5971 .checkpoint_store
5972 .multi_get_locally_computed_checkpoints(&sequence_numbers)
5973 .expect("typed store must not fail")
5974 .into_iter()
5975 .zip_debug_eq(sequence_numbers)
5976 .map(|(maybe_checkpoint, sequence_number)| {
5977 if let Some(checkpoint) = maybe_checkpoint {
5978 checkpoint.content_digest
5979 } else {
5980 self.checkpoint_store
5983 .get_checkpoint_by_sequence_number(sequence_number)
5984 .expect("typed store must not fail")
5985 .unwrap_or_else(|| {
5986 fatal!("preceding checkpoints must exist by end of epoch")
5987 })
5988 .data()
5989 .content_digest
5990 }
5991 })
5992 .collect();
5993 let tx_digests: Vec<_> = self
5994 .checkpoint_store
5995 .multi_get_checkpoint_content(&contents_digests)
5996 .expect("typed store must not fail")
5997 .into_iter()
5998 .flat_map(|maybe_contents| {
5999 maybe_contents
6000 .expect("preceding checkpoint contents must exist by end of epoch")
6001 .into_inner()
6002 .into_iter()
6003 .map(|digests| digests.transaction)
6004 })
6005 .collect();
6006 let included_execution_time_observations: HashSet<_> = self
6007 .get_transaction_cache_reader()
6008 .multi_get_transaction_blocks(&tx_digests)
6009 .into_iter()
6010 .flat_map(|maybe_tx| {
6011 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6012 .expect("preceding transaction must exist by end of epoch")
6013 .transaction_data()
6014 .kind()
6015 {
6016 #[allow(clippy::unnecessary_to_owned)]
6017 itertools::Either::Left(
6018 ptb.commands
6019 .to_owned()
6020 .into_iter()
6021 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6022 )
6023 } else {
6024 itertools::Either::Right(std::iter::empty())
6025 }
6026 })
6027 .chain(end_of_epoch_observation_keys.into_iter())
6028 .collect();
6029
6030 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6031 epoch_store
6032 .get_end_of_epoch_execution_time_observations()
6033 .filter_and_sort_v1(
6034 |(key, _)| included_execution_time_observations.contains(key),
6035 params.stored_observations_limit.try_into().unwrap(),
6036 ),
6037 );
6038 info!("Creating StoreExecutionTimeObservations tx");
6039 Some(tx)
6040 }
6041
6042 #[instrument(level = "error", skip_all)]
6053 pub async fn create_and_execute_advance_epoch_tx(
6054 &self,
6055 epoch_store: &Arc<AuthorityPerEpochStore>,
6056 gas_cost_summary: &GasCostSummary,
6057 checkpoint: CheckpointSequenceNumber,
6058 epoch_start_timestamp_ms: CheckpointTimestamp,
6059 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6060 last_checkpoint: CheckpointSequenceNumber,
6063 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6064 let mut txns = Vec::new();
6065
6066 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6067 txns.push(tx);
6068 }
6069 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6070 txns.push(tx);
6071 }
6072 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6073 txns.push(tx);
6074 }
6075 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6076 txns.push(tx);
6077 }
6078 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6079 txns.push(tx);
6080 }
6081 if let Some(tx) = self.create_execution_time_observations_tx(
6082 epoch_store,
6083 end_of_epoch_observation_keys,
6084 last_checkpoint,
6085 ) {
6086 txns.push(tx);
6087 }
6088 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6089 txns.push(tx);
6090 }
6091
6092 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6093 txns.push(tx);
6094 }
6095 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6096 txns.push(tx);
6097 }
6098 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6099 txns.push(tx);
6100 }
6101 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6102 txns.push(tx);
6103 }
6104
6105 let next_epoch = epoch_store.epoch() + 1;
6106
6107 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6108
6109 let (next_epoch_protocol_version, next_epoch_system_packages) =
6110 Self::choose_protocol_version_and_system_packages_v2(
6111 epoch_store.protocol_version(),
6112 epoch_store.protocol_config(),
6113 epoch_store.committee(),
6114 epoch_store
6115 .get_capabilities_v2()
6116 .expect("read capabilities from db cannot fail"),
6117 buffer_stake_bps,
6118 );
6119
6120 let config = epoch_store.protocol_config();
6123 let binary_config = config.binary_config(None);
6124 let Some(next_epoch_system_package_bytes) = self
6125 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6126 .await
6127 else {
6128 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6129 debug_fatal!(
6133 "upgraded system packages {:?} are not locally available, cannot create \
6134 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6135 next_epoch_system_packages
6136 );
6137 } else {
6138 error!(
6139 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6140 next_epoch_protocol_version
6141 );
6142 }
6143 return Err(CheckpointBuilderError::SystemPackagesMissing);
6152 };
6153
6154 let tx = if epoch_store
6155 .protocol_config()
6156 .end_of_epoch_transaction_supported()
6157 {
6158 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6159 next_epoch,
6160 next_epoch_protocol_version,
6161 gas_cost_summary.storage_cost,
6162 gas_cost_summary.computation_cost,
6163 gas_cost_summary.storage_rebate,
6164 gas_cost_summary.non_refundable_storage_fee,
6165 epoch_start_timestamp_ms,
6166 next_epoch_system_package_bytes,
6167 ));
6168
6169 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6170 } else {
6171 VerifiedTransaction::new_change_epoch(
6172 next_epoch,
6173 next_epoch_protocol_version,
6174 gas_cost_summary.storage_cost,
6175 gas_cost_summary.computation_cost,
6176 gas_cost_summary.storage_rebate,
6177 gas_cost_summary.non_refundable_storage_fee,
6178 epoch_start_timestamp_ms,
6179 next_epoch_system_package_bytes,
6180 )
6181 };
6182
6183 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6184 tx.clone(),
6185 epoch_store.epoch(),
6186 checkpoint,
6187 );
6188
6189 let tx_digest = executable_tx.digest();
6190
6191 info!(
6192 ?next_epoch,
6193 ?next_epoch_protocol_version,
6194 ?next_epoch_system_packages,
6195 computation_cost=?gas_cost_summary.computation_cost,
6196 storage_cost=?gas_cost_summary.storage_cost,
6197 storage_rebate=?gas_cost_summary.storage_rebate,
6198 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6199 ?tx_digest,
6200 "Creating advance epoch transaction"
6201 );
6202
6203 fail_point_async!("change_epoch_tx_delay");
6204 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6205
6206 if self
6209 .get_transaction_cache_reader()
6210 .is_tx_already_executed(tx_digest)
6211 {
6212 warn!("change epoch tx has already been executed via state sync");
6213 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6214 }
6215
6216 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6217 else {
6218 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6219 };
6220
6221 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6224 self.get_object_cache_reader().as_ref(),
6225 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6226 )?;
6227
6228 assert_eq!(assigned_versions.0.len(), 1);
6229 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6230
6231 let input_objects = self.read_objects_for_execution(
6232 &tx_lock,
6233 &executable_tx,
6234 &assigned_versions,
6235 epoch_store,
6236 )?;
6237
6238 let (transaction_outputs, _timings, _execution_error_opt) = self
6239 .execute_certificate(
6240 &execution_guard,
6241 &executable_tx,
6242 input_objects,
6243 None,
6244 ExecutionEnv::default(),
6245 epoch_store,
6246 )
6247 .unwrap();
6248 let system_obj = get_sui_system_state(&transaction_outputs.written)
6249 .expect("change epoch tx must write to system object");
6250
6251 let effects = transaction_outputs.effects;
6252 self.get_state_sync_store()
6256 .insert_transaction_and_effects(&tx, &effects);
6257
6258 info!(
6259 "Effects summary of the change epoch transaction: {:?}",
6260 effects.summary_for_debug()
6261 );
6262 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6263 assert!(effects.status().is_ok());
6265 Ok((system_obj, effects))
6266 }
6267
6268 #[instrument(level = "error", skip_all)]
6269 async fn reopen_epoch_db(
6270 &self,
6271 cur_epoch_store: &AuthorityPerEpochStore,
6272 new_committee: Committee,
6273 epoch_start_configuration: EpochStartConfiguration,
6274 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6275 epoch_last_checkpoint: CheckpointSequenceNumber,
6276 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6277 let new_epoch = new_committee.epoch;
6278 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6279 assert_eq!(
6280 epoch_start_configuration.epoch_start_state().epoch(),
6281 new_committee.epoch
6282 );
6283 fail_point!("before-open-new-epoch-store");
6284 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6285 self.name,
6286 new_committee,
6287 epoch_start_configuration,
6288 self.get_backing_package_store().clone(),
6289 self.get_object_store().clone(),
6290 expensive_safety_check_config,
6291 epoch_last_checkpoint,
6292 self.config.fullnode_sync_mode,
6293 )?;
6294 self.epoch_store.store(new_epoch_store.clone());
6295 Ok(new_epoch_store)
6296 }
6297
6298 #[cfg(test)]
6299 pub(crate) fn iter_live_object_set_for_testing(
6300 &self,
6301 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6302 let include_wrapped_object = !self
6303 .epoch_store_for_testing()
6304 .protocol_config()
6305 .simplified_unwrap_then_delete();
6306 self.get_global_state_hash_store()
6307 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6308 }
6309
6310 #[cfg(test)]
6311 pub(crate) fn shutdown_execution_for_test(&self) {
6312 self.tx_execution_shutdown
6313 .lock()
6314 .take()
6315 .unwrap()
6316 .send(())
6317 .unwrap();
6318 }
6319
6320 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6322 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6323 self.get_object_cache_reader()
6324 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6325 self.get_reconfig_api()
6326 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6327 Ok(())
6328 }
6329}
6330
6331#[async_trait]
6332impl TransactionKeyValueStoreTrait for AuthorityState {
6333 #[instrument(skip(self))]
6334 async fn multi_get(
6335 &self,
6336 transactions: &[TransactionDigest],
6337 effects: &[TransactionDigest],
6338 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6339 let txns = if !transactions.is_empty() {
6340 self.get_transaction_cache_reader()
6341 .multi_get_transaction_blocks(transactions)
6342 .into_iter()
6343 .map(|t| t.map(|t| (*t).clone().into_inner()))
6344 .collect()
6345 } else {
6346 vec![]
6347 };
6348
6349 let fx = if !effects.is_empty() {
6350 self.get_transaction_cache_reader()
6351 .multi_get_executed_effects(effects)
6352 } else {
6353 vec![]
6354 };
6355
6356 Ok((txns, fx))
6357 }
6358
6359 #[instrument(skip(self))]
6360 async fn multi_get_checkpoints(
6361 &self,
6362 checkpoint_summaries: &[CheckpointSequenceNumber],
6363 checkpoint_contents: &[CheckpointSequenceNumber],
6364 checkpoint_summaries_by_digest: &[CheckpointDigest],
6365 ) -> SuiResult<(
6366 Vec<Option<CertifiedCheckpointSummary>>,
6367 Vec<Option<CheckpointContents>>,
6368 Vec<Option<CertifiedCheckpointSummary>>,
6369 )> {
6370 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6372 let store = self.get_checkpoint_store();
6373 for seq in checkpoint_summaries {
6374 let checkpoint = store
6375 .get_checkpoint_by_sequence_number(*seq)?
6376 .map(|c| c.into_inner());
6377
6378 summaries.push(checkpoint);
6379 }
6380
6381 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6382 for seq in checkpoint_contents {
6383 let checkpoint = store
6384 .get_checkpoint_by_sequence_number(*seq)?
6385 .and_then(|summary| {
6386 store
6387 .get_checkpoint_contents(&summary.content_digest)
6388 .expect("db read cannot fail")
6389 });
6390 contents.push(checkpoint);
6391 }
6392
6393 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6394 for digest in checkpoint_summaries_by_digest {
6395 let checkpoint = store
6396 .get_checkpoint_by_digest(digest)?
6397 .map(|c| c.into_inner());
6398 summaries_by_digest.push(checkpoint);
6399 }
6400 Ok((summaries, contents, summaries_by_digest))
6401 }
6402
6403 #[instrument(skip(self))]
6404 async fn deprecated_get_transaction_checkpoint(
6405 &self,
6406 digest: TransactionDigest,
6407 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6408 Ok(self
6409 .get_checkpoint_cache()
6410 .deprecated_get_transaction_checkpoint(&digest)
6411 .map(|(_epoch, checkpoint)| checkpoint))
6412 }
6413
6414 #[instrument(skip(self))]
6415 async fn get_object(
6416 &self,
6417 object_id: ObjectID,
6418 version: VersionNumber,
6419 ) -> SuiResult<Option<Object>> {
6420 Ok(self
6421 .get_object_cache_reader()
6422 .get_object_by_key(&object_id, version))
6423 }
6424
6425 #[instrument(skip_all)]
6426 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6427 Ok(self
6428 .get_object_cache_reader()
6429 .multi_get_objects_by_key(object_keys))
6430 }
6431
6432 #[instrument(skip(self))]
6433 async fn multi_get_transaction_checkpoint(
6434 &self,
6435 digests: &[TransactionDigest],
6436 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6437 let res = self
6438 .get_checkpoint_cache()
6439 .deprecated_multi_get_transaction_checkpoint(digests);
6440
6441 Ok(res
6442 .into_iter()
6443 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6444 .collect())
6445 }
6446
6447 #[instrument(skip(self))]
6448 async fn multi_get_events_by_tx_digests(
6449 &self,
6450 digests: &[TransactionDigest],
6451 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6452 if digests.is_empty() {
6453 return Ok(vec![]);
6454 }
6455
6456 Ok(self
6457 .get_transaction_cache_reader()
6458 .multi_get_events(digests))
6459 }
6460}
6461
6462#[cfg(msim)]
6463pub mod framework_injection {
6464 use move_binary_format::CompiledModule;
6465 use std::collections::BTreeMap;
6466 use std::{cell::RefCell, collections::BTreeSet};
6467 use sui_framework::{BuiltInFramework, SystemPackage};
6468 use sui_types::base_types::{AuthorityName, ObjectID};
6469 use sui_types::is_system_package;
6470
6471 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6472
6473 thread_local! {
6475 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6476 }
6477
6478 type Framework = Vec<CompiledModule>;
6479
6480 pub type PackageUpgradeCallback =
6481 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6482
6483 enum PackageOverrideConfig {
6484 Global(Framework),
6485 PerValidator(PackageUpgradeCallback),
6486 }
6487
6488 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6489 modules
6490 .iter()
6491 .map(|m| {
6492 let mut buf = Vec::new();
6493 m.serialize_with_version(m.version, &mut buf).unwrap();
6494 buf
6495 })
6496 .collect()
6497 }
6498
6499 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6500 OVERRIDE.with(|bs| {
6501 bs.borrow_mut()
6502 .insert(package_id, PackageOverrideConfig::Global(modules))
6503 });
6504 }
6505
6506 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6507 OVERRIDE.with(|bs| {
6508 bs.borrow_mut()
6509 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6510 });
6511 }
6512
6513 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6514 OVERRIDE.with(|bs| {
6515 let mut new_packages_not_to_include: BTreeSet<_> =
6516 BuiltInFramework::all_package_ids().into_iter().collect();
6517 for pkg in &packages {
6518 new_packages_not_to_include.remove(&pkg.id);
6519 }
6520 for pkg in packages {
6521 bs.borrow_mut()
6522 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6523 }
6524 for empty_pkg in new_packages_not_to_include {
6525 bs.borrow_mut()
6526 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6527 }
6528 });
6529 }
6530
6531 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6532 OVERRIDE.with(|cfg| {
6533 cfg.borrow().get(package_id).and_then(|entry| match entry {
6534 PackageOverrideConfig::Global(framework) => {
6535 Some(compiled_modules_to_bytes(framework))
6536 }
6537 PackageOverrideConfig::PerValidator(func) => {
6538 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6539 }
6540 })
6541 })
6542 }
6543
6544 pub fn get_override_modules(
6545 package_id: &ObjectID,
6546 name: AuthorityName,
6547 ) -> Option<Vec<CompiledModule>> {
6548 OVERRIDE.with(|cfg| {
6549 cfg.borrow().get(package_id).and_then(|entry| match entry {
6550 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6551 PackageOverrideConfig::PerValidator(func) => func(name),
6552 })
6553 })
6554 }
6555
6556 pub fn get_override_system_package(
6557 package_id: &ObjectID,
6558 name: AuthorityName,
6559 ) -> Option<SystemPackage> {
6560 let bytes = get_override_bytes(package_id, name)?;
6561 let dependencies = if is_system_package(*package_id) {
6562 BuiltInFramework::get_package_by_id(package_id)
6563 .dependencies
6564 .to_vec()
6565 } else {
6566 BuiltInFramework::all_package_ids()
6568 };
6569 Some(SystemPackage {
6570 id: *package_id,
6571 bytes,
6572 dependencies,
6573 })
6574 }
6575
6576 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6577 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6578 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6579 cfg.borrow()
6580 .keys()
6581 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6582 .collect()
6583 });
6584
6585 extra
6586 .into_iter()
6587 .map(|package| SystemPackage {
6588 id: package,
6589 bytes: get_override_bytes(&package, name).unwrap(),
6590 dependencies: BuiltInFramework::all_package_ids(),
6591 })
6592 .collect()
6593 }
6594}
6595
6596#[derive(Debug, Serialize, Deserialize, Clone)]
6597pub struct ObjDumpFormat {
6598 pub id: ObjectID,
6599 pub version: VersionNumber,
6600 pub digest: ObjectDigest,
6601 pub object: Object,
6602}
6603
6604impl ObjDumpFormat {
6605 fn new(object: Object) -> Self {
6606 let oref = object.compute_object_reference();
6607 Self {
6608 id: oref.0,
6609 version: oref.1,
6610 digest: oref.2,
6611 object,
6612 }
6613 }
6614}
6615
6616#[derive(Debug, Serialize, Deserialize, Clone)]
6617pub struct NodeStateDump {
6618 pub tx_digest: TransactionDigest,
6619 pub sender_signed_data: SenderSignedData,
6620 pub executed_epoch: u64,
6621 pub reference_gas_price: u64,
6622 pub protocol_version: u64,
6623 pub epoch_start_timestamp_ms: u64,
6624 pub computed_effects: TransactionEffects,
6625 pub expected_effects_digest: TransactionEffectsDigest,
6626 pub relevant_system_packages: Vec<ObjDumpFormat>,
6627 pub shared_objects: Vec<ObjDumpFormat>,
6628 pub loaded_child_objects: Vec<ObjDumpFormat>,
6629 pub modified_at_versions: Vec<ObjDumpFormat>,
6630 pub runtime_reads: Vec<ObjDumpFormat>,
6631 pub input_objects: Vec<ObjDumpFormat>,
6632}
6633
6634impl NodeStateDump {
6635 pub fn new(
6636 tx_digest: &TransactionDigest,
6637 effects: &TransactionEffects,
6638 expected_effects_digest: TransactionEffectsDigest,
6639 object_store: &dyn ObjectStore,
6640 epoch_store: &Arc<AuthorityPerEpochStore>,
6641 inner_temporary_store: &InnerTemporaryStore,
6642 certificate: &VerifiedExecutableTransaction,
6643 ) -> SuiResult<Self> {
6644 let executed_epoch = epoch_store.epoch();
6646 let reference_gas_price = epoch_store.reference_gas_price();
6647 let epoch_start_config = epoch_store.epoch_start_config();
6648 let protocol_version = epoch_store.protocol_version().as_u64();
6649 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6650
6651 let mut relevant_system_packages = Vec::new();
6653 for sys_package_id in BuiltInFramework::all_package_ids() {
6654 if let Some(w) = object_store.get_object(&sys_package_id) {
6655 relevant_system_packages.push(ObjDumpFormat::new(w))
6656 }
6657 }
6658
6659 let mut shared_objects = Vec::new();
6661 for kind in effects.input_consensus_objects() {
6662 match kind {
6663 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6664 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6665 shared_objects.push(ObjDumpFormat::new(w))
6666 }
6667 }
6668 InputConsensusObject::ReadConsensusStreamEnded(..)
6669 | InputConsensusObject::MutateConsensusStreamEnded(..)
6670 | InputConsensusObject::Cancelled(..) => (), }
6672 }
6673
6674 let mut loaded_child_objects = Vec::new();
6677 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6678 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6679 loaded_child_objects.push(ObjDumpFormat::new(w))
6680 }
6681 }
6682
6683 let mut modified_at_versions = Vec::new();
6685 for (id, ver) in effects.modified_at_versions() {
6686 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6687 modified_at_versions.push(ObjDumpFormat::new(w))
6688 }
6689 }
6690
6691 let mut runtime_reads = Vec::new();
6694 for obj in inner_temporary_store
6695 .runtime_packages_loaded_from_db
6696 .values()
6697 {
6698 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6699 }
6700
6701 Ok(Self {
6704 tx_digest: *tx_digest,
6705 executed_epoch,
6706 reference_gas_price,
6707 epoch_start_timestamp_ms,
6708 protocol_version,
6709 relevant_system_packages,
6710 shared_objects,
6711 loaded_child_objects,
6712 modified_at_versions,
6713 runtime_reads,
6714 sender_signed_data: certificate.clone().into_message(),
6715 input_objects: inner_temporary_store
6716 .input_objects
6717 .values()
6718 .map(|o| ObjDumpFormat::new(o.clone()))
6719 .collect(),
6720 computed_effects: effects.clone(),
6721 expected_effects_digest,
6722 })
6723 }
6724
6725 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6726 let mut objects = Vec::new();
6727 objects.extend(self.relevant_system_packages.clone());
6728 objects.extend(self.shared_objects.clone());
6729 objects.extend(self.loaded_child_objects.clone());
6730 objects.extend(self.modified_at_versions.clone());
6731 objects.extend(self.runtime_reads.clone());
6732 objects.extend(self.input_objects.clone());
6733 objects
6734 }
6735
6736 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6737 let file_name = format!(
6738 "{}_{}_NODE_DUMP.json",
6739 self.tx_digest,
6740 AuthorityState::unixtime_now_ms()
6741 );
6742 let mut path = path.to_path_buf();
6743 path.push(&file_name);
6744 let mut file = File::create(path.clone())?;
6745 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6746 Ok(path)
6747 }
6748
6749 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6750 let file = File::open(path)?;
6751 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6752 }
6753}