1use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_common::{assert_reachable, fatal};
39use nonempty::NonEmpty;
40use parking_lot::Mutex;
41use prometheus::{
42 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43 register_histogram_vec_with_registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62 collections::{HashMap, HashSet},
63 fs,
64 pin::Pin,
65 str::FromStr,
66 sync::Arc,
67 vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::Chain;
73use sui_protocol_config::PerObjectCongestionControlMode;
74use sui_types::accumulator_root::AccumulatorObjId;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::inner_temporary_store::PackageStoreWithFallback;
83use sui_types::layout_resolver::LayoutResolver;
84use sui_types::layout_resolver::into_struct_layout;
85use sui_types::messages_consensus::AuthorityCapabilitiesV2;
86use sui_types::node_role::NodeRole;
87use sui_types::object::bounded_visitor::BoundedVisitor;
88use sui_types::storage::ChildObjectResolver;
89use sui_types::storage::InputKey;
90use sui_types::storage::OverlayBackingPackageStore;
91use sui_types::storage::TrackingBackingStore;
92use sui_types::traffic_control::{
93 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
94};
95use sui_types::transaction_executor::SimulateTransactionResult;
96use sui_types::transaction_executor::TransactionChecks;
97use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
98use tap::TapFallible;
99use tokio::sync::RwLock;
100use tokio::sync::mpsc::unbounded_channel;
101use tokio::sync::oneshot;
102use tokio::sync::watch::error::RecvError;
103use tokio::time::timeout;
104use tracing::{debug, error, info, instrument, warn};
105
106use self::authority_store::ExecutionLockWriteGuard;
107use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
108pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
109use mysten_metrics::{monitored_scope, spawn_monitored_task};
110
111use crate::jsonrpc_index::IndexStore;
112use crate::jsonrpc_index::{
113 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
114};
115use mysten_common::debug_fatal;
116use shared_crypto::intent::{Intent, IntentScope};
117use sui_config::genesis::Genesis;
118use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
119use sui_framework::{BuiltInFramework, SystemPackage};
120use sui_json_rpc_types::{
121 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
122 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
123 SuiTransactionBlockEvents, TransactionFilter,
124};
125use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139 TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{InnerTemporaryStore, ObjectMap, TxCoins, WrittenObjects};
147use sui_types::message_envelope::Message;
148use sui_types::messages_checkpoint::{
149 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
150 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
151 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
152 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
153};
154use sui_types::messages_grpc::{
155 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
156 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
157};
158use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
159use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
160use sui_types::signature::GenericSignature;
161use sui_types::storage::{
162 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
163};
164use sui_types::sui_system_state::SuiSystemStateTrait;
165use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
166use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
167use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
168use sui_types::{
169 SUI_SYSTEM_ADDRESS,
170 base_types::*,
171 committee::Committee,
172 crypto::AuthoritySignature,
173 error::{SuiError, SuiResult},
174 object::{Object, ObjectRead},
175 transaction::*,
176};
177use sui_types::{TypeTag, is_system_package};
178use typed_store::TypedStoreError;
179use typed_store::rocks::StagedBatch;
180
181use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
182use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
183use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
184use crate::authority::authority_store_pruner::{
185 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
186};
187use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
188use crate::authority::epoch_start_configuration::EpochStartConfiguration;
189use crate::checkpoints::CheckpointStore;
190use crate::epoch::committee_store::CommitteeStore;
191use crate::execution_cache::{
192 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
193 ObjectCacheRead, StateSyncAPI,
194};
195use crate::execution_driver::execution_process;
196use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
197use crate::metrics::LatencyObserver;
198use crate::metrics::RateTracker;
199use crate::module_cache_metrics::ResolverMetrics;
200use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
201use crate::stake_aggregator::StakeAggregator;
202use crate::subscription_handler::SubscriptionHandler;
203use crate::transaction_input_loader::TransactionInputLoader;
204
205#[cfg(msim)]
206pub use crate::checkpoints::checkpoint_executor::utils::{
207 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
208};
209
210#[cfg(msim)]
211use sui_types::committee::CommitteeTrait;
212use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
213
214#[cfg(test)]
215#[path = "unit_tests/authority_tests.rs"]
216pub mod authority_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/transaction_tests.rs"]
220pub mod transaction_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_transaction_tests.rs"]
224mod batch_transaction_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/move_integration_tests.rs"]
228pub mod move_integration_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/gas_tests.rs"]
232mod gas_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/gas_data_tests.rs"]
236mod gas_data_tests;
237
238#[cfg(test)]
239#[path = "unit_tests/batch_verification_tests.rs"]
240mod batch_verification_tests;
241
242#[cfg(test)]
243#[path = "unit_tests/coin_deny_list_tests.rs"]
244mod coin_deny_list_tests;
245
246#[cfg(test)]
247#[path = "unit_tests/auth_unit_test_utils.rs"]
248pub mod auth_unit_test_utils;
249
250pub mod authority_test_utils;
251
252pub mod authority_per_epoch_store;
253pub mod authority_per_epoch_store_pruner;
254
255pub mod authority_store_pruner;
256pub mod authority_store_tables;
257pub mod authority_store_types;
258pub mod congestion_log;
259pub mod consensus_tx_status_cache;
260pub(crate) mod epoch_marker_key;
261pub mod epoch_start_configuration;
262pub mod execution_time_estimator;
263pub mod shared_object_congestion_tracker;
264pub mod shared_object_version_manager;
265pub mod submitted_transaction_cache;
266pub mod test_authority_builder;
267pub mod transaction_deferral;
268pub mod transaction_reject_reason_cache;
269mod weighted_moving_average;
270
271pub(crate) mod authority_store;
272pub mod backpressure;
273
274pub struct AuthorityMetrics {
276 tx_orders: IntCounter,
277 total_certs: IntCounter,
278 total_effects: IntCounter,
279 pub shared_obj_tx: IntCounter,
281 sponsored_tx: IntCounter,
282 tx_already_processed: IntCounter,
283 num_input_objs: Histogram,
284 num_shared_objects: Histogram,
286 batch_size: Histogram,
287
288 authority_state_handle_vote_transaction_latency: Histogram,
289
290 internal_execution_latency: Histogram,
291 execution_load_input_objects_latency: Histogram,
292 prepare_certificate_latency: Histogram,
293 commit_certificate_latency: Histogram,
294 db_checkpoint_latency: Histogram,
295
296 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
298 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
299 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
300 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
301
302 pub(crate) execution_driver_executed_transactions: IntCounter,
303 pub(crate) execution_driver_paused_transactions: IntCounter,
304 pub(crate) execution_driver_dispatch_queue: IntGauge,
305 pub(crate) execution_queueing_delay_s: Histogram,
306 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
307 pub(crate) execution_gas_latency_ratio: Histogram,
308
309 pub(crate) skipped_consensus_txns: IntCounter,
310 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
311 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
312
313 pub(crate) authority_overload_status: IntGauge,
314 pub(crate) authority_load_shedding_percentage: IntGauge,
315
316 pub(crate) transaction_overload_sources: IntCounterVec,
317
318 post_processing_total_events_emitted: IntCounter,
320 post_processing_total_tx_indexed: IntCounter,
321 post_processing_total_tx_had_event_processed: IntCounter,
322 post_processing_total_failures: IntCounter,
323
324 pub consensus_handler_processed: IntCounterVec,
326 pub consensus_handler_transaction_sizes: HistogramVec,
327 pub consensus_handler_deferred_transactions: IntCounter,
328 pub consensus_handler_congested_transactions: IntCounter,
329 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
330 pub consensus_handler_cancelled_transactions: IntCounter,
331 pub consensus_handler_max_object_costs: IntGaugeVec,
332 pub consensus_committed_subdags: IntCounterVec,
333 pub accumulator_deposits: IntCounter,
334 pub accumulator_withdrawals: IntCounter,
335 pub consensus_committed_messages: IntGaugeVec,
336 pub consensus_committed_user_transactions: IntGaugeVec,
337 pub consensus_finalized_user_transactions: IntGaugeVec,
338 pub consensus_rejected_user_transactions: IntGaugeVec,
339 pub consensus_calculated_throughput: IntGauge,
340 pub consensus_calculated_throughput_profile: IntGauge,
341 pub consensus_block_handler_block_processed: IntCounter,
342 pub consensus_block_handler_txn_processed: IntCounterVec,
343 pub consensus_block_handler_fastpath_executions: IntCounter,
344 pub consensus_timestamp_bias: Histogram,
345
346 pub execution_metrics: Arc<ExecutionMetrics>,
347
348 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
350
351 pub zklogin_sig_count: IntCounter,
353 pub multisig_sig_count: IntCounter,
355
356 pub execution_queueing_latency: LatencyObserver,
359
360 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
367
368 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
371}
372
373const POSITIVE_INT_BUCKETS: &[f64] = &[
375 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
376 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
377 7000000., 10000000.,
378];
379
380const LATENCY_SEC_BUCKETS: &[f64] = &[
381 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
382 10., 20., 30., 60., 90.,
383];
384
385const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
387 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
388 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
389];
390
391const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
394 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
395 2.0, 10.0, 30.0,
396];
397
398const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
399 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
400 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
401];
402
403pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
404
405pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
409
410impl AuthorityMetrics {
411 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
412 Self {
413 tx_orders: register_int_counter_with_registry!(
414 "total_transaction_orders",
415 "Total number of transaction orders",
416 registry,
417 )
418 .unwrap(),
419 total_certs: register_int_counter_with_registry!(
420 "total_transaction_certificates",
421 "Total number of transaction certificates handled",
422 registry,
423 )
424 .unwrap(),
425 total_effects: register_int_counter_with_registry!(
427 "total_transaction_effects",
428 "Total number of transaction effects produced",
429 registry,
430 )
431 .unwrap(),
432
433 shared_obj_tx: register_int_counter_with_registry!(
434 "num_shared_obj_tx",
435 "Number of transactions involving shared objects",
436 registry,
437 )
438 .unwrap(),
439
440 sponsored_tx: register_int_counter_with_registry!(
441 "num_sponsored_tx",
442 "Number of sponsored transactions",
443 registry,
444 )
445 .unwrap(),
446
447 tx_already_processed: register_int_counter_with_registry!(
448 "num_tx_already_processed",
449 "Number of transaction orders already processed previously",
450 registry,
451 )
452 .unwrap(),
453 num_input_objs: register_histogram_with_registry!(
454 "num_input_objects",
455 "Distribution of number of input TX objects per TX",
456 POSITIVE_INT_BUCKETS.to_vec(),
457 registry,
458 )
459 .unwrap(),
460 num_shared_objects: register_histogram_with_registry!(
461 "num_shared_objects",
462 "Number of shared input objects per TX",
463 POSITIVE_INT_BUCKETS.to_vec(),
464 registry,
465 )
466 .unwrap(),
467 batch_size: register_histogram_with_registry!(
468 "batch_size",
469 "Distribution of size of transaction batch",
470 POSITIVE_INT_BUCKETS.to_vec(),
471 registry,
472 )
473 .unwrap(),
474 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
475 "authority_state_handle_vote_transaction_latency",
476 "Latency of voting on transactions without signing",
477 LATENCY_SEC_BUCKETS.to_vec(),
478 registry,
479 )
480 .unwrap(),
481 internal_execution_latency: register_histogram_with_registry!(
482 "authority_state_internal_execution_latency",
483 "Latency of actual certificate executions",
484 LATENCY_SEC_BUCKETS.to_vec(),
485 registry,
486 )
487 .unwrap(),
488 execution_load_input_objects_latency: register_histogram_with_registry!(
489 "authority_state_execution_load_input_objects_latency",
490 "Latency of loading input objects for execution",
491 LOW_LATENCY_SEC_BUCKETS.to_vec(),
492 registry,
493 )
494 .unwrap(),
495 prepare_certificate_latency: register_histogram_with_registry!(
496 "authority_state_prepare_certificate_latency",
497 "Latency of executing certificates, before committing the results",
498 LATENCY_SEC_BUCKETS.to_vec(),
499 registry,
500 )
501 .unwrap(),
502 commit_certificate_latency: register_histogram_with_registry!(
503 "authority_state_commit_certificate_latency",
504 "Latency of committing certificate execution results",
505 LATENCY_SEC_BUCKETS.to_vec(),
506 registry,
507 )
508 .unwrap(),
509 db_checkpoint_latency: register_histogram_with_registry!(
510 "db_checkpoint_latency",
511 "Latency of checkpointing dbs",
512 LATENCY_SEC_BUCKETS.to_vec(),
513 registry,
514 ).unwrap(),
515 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
516 "transaction_manager_num_enqueued_certificates",
517 "Current number of certificates enqueued to ExecutionScheduler",
518 &["result"],
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
523 "transaction_manager_num_pending_certificates",
524 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
529 "transaction_manager_num_executing_certificates",
530 "Number of executing certificates, including queued and actually running certificates",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
545 "transaction_manager_transaction_queue_age_s",
546 "Time spent in waiting for transaction in the queue",
547 LATENCY_SEC_BUCKETS.to_vec(),
548 registry,
549 )
550 .unwrap(),
551 transaction_overload_sources: register_int_counter_vec_with_registry!(
552 "transaction_overload_sources",
553 "Number of times each source indicates transaction overload.",
554 &["source"],
555 registry)
556 .unwrap(),
557 execution_driver_executed_transactions: register_int_counter_with_registry!(
558 "execution_driver_executed_transactions",
559 "Cumulative number of transaction executed by execution driver",
560 registry,
561 )
562 .unwrap(),
563 execution_driver_paused_transactions: register_int_counter_with_registry!(
564 "execution_driver_paused_transactions",
565 "Cumulative number of transactions paused by execution driver",
566 registry,
567 )
568 .unwrap(),
569 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
570 "execution_driver_dispatch_queue",
571 "Number of transaction pending in execution driver dispatch queue",
572 registry,
573 )
574 .unwrap(),
575 execution_queueing_delay_s: register_histogram_with_registry!(
576 "execution_queueing_delay_s",
577 "Queueing delay between a transaction is ready for execution until it starts executing.",
578 LATENCY_SEC_BUCKETS.to_vec(),
579 registry
580 )
581 .unwrap(),
582 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
583 "prepare_cert_gas_latency_ratio",
584 "The ratio of computation gas divided by VM execution latency.",
585 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
586 registry
587 )
588 .unwrap(),
589 execution_gas_latency_ratio: register_histogram_with_registry!(
590 "execution_gas_latency_ratio",
591 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
592 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
593 registry
594 )
595 .unwrap(),
596 skipped_consensus_txns: register_int_counter_with_registry!(
597 "skipped_consensus_txns",
598 "Total number of consensus transactions skipped",
599 registry,
600 )
601 .unwrap(),
602 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
603 "skipped_consensus_txns_cache_hit",
604 "Total number of consensus transactions skipped because of local cache hit",
605 registry,
606 )
607 .unwrap(),
608 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
609 "consensus_handler_duplicate_tx_count",
610 "Number of times each transaction appears in its first consensus commit",
611 POSITIVE_INT_BUCKETS.to_vec(),
612 registry,
613 )
614 .unwrap(),
615 post_processing_total_events_emitted: register_int_counter_with_registry!(
616 "post_processing_total_events_emitted",
617 "Total number of events emitted in post processing",
618 registry,
619 )
620 .unwrap(),
621 post_processing_total_tx_indexed: register_int_counter_with_registry!(
622 "post_processing_total_tx_indexed",
623 "Total number of txes indexed in post processing",
624 registry,
625 )
626 .unwrap(),
627 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
628 "post_processing_total_tx_had_event_processed",
629 "Total number of txes finished event processing in post processing",
630 registry,
631 )
632 .unwrap(),
633 post_processing_total_failures: register_int_counter_with_registry!(
634 "post_processing_total_failures",
635 "Total number of failure in post processing",
636 registry,
637 )
638 .unwrap(),
639 consensus_handler_processed: register_int_counter_vec_with_registry!(
640 "consensus_handler_processed",
641 "Number of transactions processed by consensus handler",
642 &["class"],
643 registry
644 ).unwrap(),
645 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
646 "consensus_handler_transaction_sizes",
647 "Sizes of each type of transactions processed by consensus handler",
648 &["class"],
649 POSITIVE_INT_BUCKETS.to_vec(),
650 registry
651 ).unwrap(),
652 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
653 "consensus_handler_deferred_transactions",
654 "Number of transactions deferred by consensus handler",
655 registry,
656 ).unwrap(),
657 consensus_handler_congested_transactions: register_int_counter_with_registry!(
658 "consensus_handler_congested_transactions",
659 "Number of transactions deferred by consensus handler due to congestion",
660 registry,
661 ).unwrap(),
662 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
663 "consensus_handler_unpaid_amplification_deferrals",
664 "Number of transactions deferred due to unpaid consensus amplification",
665 registry,
666 ).unwrap(),
667 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
668 "consensus_handler_cancelled_transactions",
669 "Number of transactions cancelled by consensus handler",
670 registry,
671 ).unwrap(),
672 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
673 "consensus_handler_max_congestion_control_object_costs",
674 "Max object costs for congestion control in the current consensus commit",
675 &["commit_type"],
676 registry,
677 ).unwrap(),
678 consensus_committed_subdags: register_int_counter_vec_with_registry!(
679 "consensus_committed_subdags",
680 "Number of committed subdags, sliced by author",
681 &["authority"],
682 registry,
683 ).unwrap(),
684 accumulator_deposits: register_int_counter_with_registry!(
685 "accumulator_deposits",
686 "Total accumulator deposit (merge) events processed in settlement",
687 registry,
688 ).unwrap(),
689 accumulator_withdrawals: register_int_counter_with_registry!(
690 "accumulator_withdrawals",
691 "Total accumulator withdrawal (split) events processed in settlement",
692 registry,
693 ).unwrap(),
694 consensus_committed_messages: register_int_gauge_vec_with_registry!(
695 "consensus_committed_messages",
696 "Total number of committed consensus messages, sliced by author",
697 &["authority"],
698 registry,
699 ).unwrap(),
700 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
701 "consensus_committed_user_transactions",
702 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
703 &["authority"],
704 registry,
705 ).unwrap(),
706 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
707 "consensus_finalized_user_transactions",
708 "Number of user transactions finalized, sliced by submitter",
709 &["authority"],
710 registry,
711 ).unwrap(),
712 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
713 "consensus_rejected_user_transactions",
714 "Number of user transactions rejected, sliced by submitter",
715 &["authority"],
716 registry,
717 ).unwrap(),
718 execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
719 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
720 zklogin_sig_count: register_int_counter_with_registry!(
721 "zklogin_sig_count",
722 "Count of zkLogin signatures",
723 registry,
724 )
725 .unwrap(),
726 multisig_sig_count: register_int_counter_with_registry!(
727 "multisig_sig_count",
728 "Count of zkLogin signatures",
729 registry,
730 )
731 .unwrap(),
732 consensus_calculated_throughput: register_int_gauge_with_registry!(
733 "consensus_calculated_throughput",
734 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
735 registry,
736 ).unwrap(),
737 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
738 "consensus_calculated_throughput_profile",
739 "The current active calculated throughput profile",
740 registry
741 ).unwrap(),
742 consensus_block_handler_block_processed: register_int_counter_with_registry!(
743 "consensus_block_handler_block_processed",
744 "Number of blocks processed by consensus block handler.",
745 registry
746 ).unwrap(),
747 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
748 "consensus_block_handler_txn_processed",
749 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
750 &["outcome"],
751 registry
752 ).unwrap(),
753 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
754 "consensus_block_handler_fastpath_executions",
755 "Number of fastpath transactions sent for execution by consensus transaction handler",
756 registry,
757 ).unwrap(),
758 consensus_timestamp_bias: register_histogram_with_registry!(
759 "consensus_timestamp_bias",
760 "Bias/delay from consensus timestamp to system time",
761 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
762 registry
763 ).unwrap(),
764 execution_queueing_latency: LatencyObserver::new(),
765 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
766 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
767 }
768 }
769}
770
771pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779#[derive(Debug, Clone)]
782pub struct ExecutionEnv {
783 pub assigned_versions: AssignedVersions,
785 pub expected_effects_digest: Option<TransactionEffectsDigest>,
788 pub funds_withdraw_status: FundsWithdrawStatus,
791 pub barrier_dependencies: Vec<TransactionDigest>,
794}
795
796impl Default for ExecutionEnv {
797 fn default() -> Self {
798 Self {
799 assigned_versions: Default::default(),
800 expected_effects_digest: None,
801 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
802 barrier_dependencies: Default::default(),
803 }
804 }
805}
806
807impl ExecutionEnv {
808 pub fn new() -> Self {
809 Default::default()
810 }
811
812 pub fn with_expected_effects_digest(
813 mut self,
814 expected_effects_digest: TransactionEffectsDigest,
815 ) -> Self {
816 self.expected_effects_digest = Some(expected_effects_digest);
817 self
818 }
819
820 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
821 self.assigned_versions = assigned_versions;
822 self
823 }
824
825 pub fn with_insufficient_funds(mut self) -> Self {
826 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
827 self
828 }
829
830 pub fn with_barrier_dependencies(
831 mut self,
832 barrier_dependencies: BTreeSet<TransactionDigest>,
833 ) -> Self {
834 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
835 self
836 }
837}
838
839#[derive(Debug)]
840pub struct ForkRecoveryState {
841 transaction_overrides:
843 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
844}
845
846impl Default for ForkRecoveryState {
847 fn default() -> Self {
848 Self {
849 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
850 }
851 }
852}
853
854impl ForkRecoveryState {
855 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
856 let Some(config) = config else {
857 return Ok(Self::default());
858 };
859
860 let mut transaction_overrides = HashMap::new();
861 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
862 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
863 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
864 })?;
865 let effects_digest =
866 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
867 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
868 })?;
869 transaction_overrides.insert(tx_digest, effects_digest);
870 }
871
872 Ok(Self {
873 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
874 })
875 }
876
877 pub fn get_transaction_override(
878 &self,
879 tx_digest: &TransactionDigest,
880 ) -> Option<TransactionEffectsDigest> {
881 self.transaction_overrides.read().get(tx_digest).copied()
882 }
883}
884
885pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
886
887pub struct AuthorityState {
888 pub name: AuthorityName,
891 pub secret: StableSyncAuthoritySigner,
893
894 input_loader: TransactionInputLoader,
896 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
897 coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
898
899 epoch_store: ArcSwap<AuthorityPerEpochStore>,
900
901 execution_lock: RwLock<EpochId>,
906
907 pub indexes: Option<Arc<IndexStore>>,
908 pub rpc_index: Option<Arc<RpcIndexStore>>,
909
910 pub subscription_handler: Arc<SubscriptionHandler>,
911 pub checkpoint_store: Arc<CheckpointStore>,
912
913 committee_store: Arc<CommitteeStore>,
914
915 execution_scheduler: Arc<ExecutionScheduler>,
917
918 #[allow(unused)]
920 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
921
922 pub metrics: Arc<AuthorityMetrics>,
923 _pruner: AuthorityStorePruner,
924 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
925
926 db_checkpoint_config: DBCheckpointConfig,
928
929 pub config: NodeConfig,
930
931 pub overload_info: AuthorityOverloadInfo,
933
934 chain_identifier: ChainIdentifier,
936
937 pub(crate) congestion_tracker: Arc<CongestionTracker>,
938
939 pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
941
942 pub traffic_controller: Option<Arc<TrafficController>>,
944
945 fork_recovery_state: Option<ForkRecoveryState>,
947
948 notify_epoch: tokio::sync::watch::Sender<EpochId>,
950
951 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
952 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
953
954 pending_post_processing:
958 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
959
960 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
963}
964
965impl AuthorityState {
972 pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
973 epoch_store.node_role()
974 }
975
976 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
977 epoch_store.node_role().is_validator()
978 }
979
980 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
981 epoch_store.node_role().is_fullnode()
982 }
983
984 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
985 &self.committee_store
986 }
987
988 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
989 self.committee_store.clone()
990 }
991
992 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
993 &self.config.authority_overload_config
994 }
995
996 pub fn get_epoch_state_commitments(
997 &self,
998 epoch: EpochId,
999 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1000 self.checkpoint_store.get_epoch_state_commitments(epoch)
1001 }
1002
1003 fn pre_object_load_checks(
1006 &self,
1007 tx_data: &TransactionData,
1008 tx_signatures: &[GenericSignature],
1009 input_object_kinds: &[InputObjectKind],
1010 receiving_objects_refs: &[ObjectRef],
1011 protocol_config: &ProtocolConfig,
1012 ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1013 sui_transaction_checks::deny::check_transaction_for_signing(
1017 tx_data,
1018 tx_signatures,
1019 input_object_kinds,
1020 receiving_objects_refs,
1021 &self.config.transaction_deny_config,
1022 self.get_backing_package_store().as_ref(),
1023 )?;
1024
1025 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1026 self.chain_identifier,
1027 self.coin_reservation_resolver.as_ref(),
1028 )?;
1029
1030 self.execution_cache_trait_pointers
1031 .account_funds_read
1032 .check_amounts_available(&declared_withdrawals)?;
1033
1034 if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1035 let min_amounts =
1036 sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1037 self.execution_cache_trait_pointers
1038 .account_funds_read
1039 .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1040 }
1041
1042 Ok(declared_withdrawals)
1043 }
1044
1045 fn handle_transaction_deny_checks(
1046 &self,
1047 transaction: &VerifiedTransaction,
1048 epoch_store: &Arc<AuthorityPerEpochStore>,
1049 ) -> SuiResult<CheckedInputObjects> {
1050 let tx_digest = transaction.digest();
1051 let tx_data = transaction.data().transaction_data();
1052
1053 let input_object_kinds = tx_data.input_objects()?;
1054 let receiving_objects_refs = tx_data.receiving_objects();
1055
1056 self.pre_object_load_checks(
1057 tx_data,
1058 transaction.tx_signatures(),
1059 &input_object_kinds,
1060 &receiving_objects_refs,
1061 epoch_store.protocol_config(),
1062 )?;
1063
1064 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1065 Some(tx_digest),
1066 &input_object_kinds,
1067 &receiving_objects_refs,
1068 epoch_store.epoch(),
1069 )?;
1070
1071 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1072 epoch_store.protocol_config(),
1073 epoch_store.reference_gas_price(),
1074 tx_data,
1075 input_objects,
1076 &receiving_objects,
1077 &self.metrics.bytecode_verifier_metrics,
1078 &self.config.verifier_signing_config,
1079 )?;
1080
1081 self.handle_coin_deny_list_checks(
1082 tx_data,
1083 &checked_input_objects,
1084 &receiving_objects,
1085 epoch_store,
1086 )?;
1087
1088 Ok(checked_input_objects)
1089 }
1090
1091 fn handle_coin_deny_list_checks(
1092 &self,
1093 tx_data: &TransactionData,
1094 checked_input_objects: &CheckedInputObjects,
1095 receiving_objects: &ReceivingObjects,
1096 epoch_store: &Arc<AuthorityPerEpochStore>,
1097 ) -> SuiResult<()> {
1098 use sui_types::balance::Balance;
1099
1100 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1101 self.chain_identifier,
1102 self.coin_reservation_resolver.as_ref(),
1103 )?;
1104
1105 let funds_withdraw_types = declared_withdrawals
1106 .values()
1107 .filter_map(|(_, type_tag)| {
1108 Balance::maybe_get_balance_type_param(type_tag)
1109 .map(|ty| ty.to_canonical_string(false))
1110 })
1111 .collect::<BTreeSet<_>>();
1112
1113 if epoch_store.coin_deny_list_v1_enabled() {
1114 check_coin_deny_list_v1(
1115 tx_data.sender(),
1116 checked_input_objects,
1117 receiving_objects,
1118 funds_withdraw_types.clone(),
1119 &self.get_object_store(),
1120 )?;
1121 }
1122
1123 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1124 check_coin_deny_list_v2_during_signing(
1125 tx_data.sender(),
1126 checked_input_objects,
1127 receiving_objects,
1128 funds_withdraw_types.clone(),
1129 &self.get_object_store(),
1130 )?;
1131 }
1132
1133 Ok(())
1134 }
1135
1136 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1146 pub fn handle_vote_transaction(
1147 &self,
1148 epoch_store: &Arc<AuthorityPerEpochStore>,
1149 transaction: VerifiedTransaction,
1150 ) -> SuiResult<()> {
1151 debug!("handle_vote_transaction");
1152
1153 let _metrics_guard = self
1154 .metrics
1155 .authority_state_handle_vote_transaction_latency
1156 .start_timer();
1157 self.metrics.tx_orders.inc();
1158
1159 if !epoch_store
1163 .get_reconfig_state_read_lock_guard()
1164 .should_accept_user_certs()
1165 {
1166 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1167 }
1168
1169 let tx_digest = *transaction.digest();
1174 if epoch_store.is_recently_finalized(&tx_digest)
1175 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1176 {
1177 assert_reachable!("transaction recently executed");
1178 return Ok(());
1179 }
1180
1181 if self
1182 .get_transaction_cache_reader()
1183 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1184 {
1185 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1186 digest: (*transaction.digest()),
1187 }
1188 .into());
1189 }
1190
1191 let _execution_lock = self.execution_lock_for_validation()?;
1193
1194 let checked_input_objects =
1195 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1196
1197 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1198
1199 self.get_cache_writer()
1203 .validate_owned_object_versions(&owned_objects)
1204 }
1205
1206 pub fn check_transaction_validity(
1215 &self,
1216 epoch_store: &Arc<AuthorityPerEpochStore>,
1217 transaction: &VerifiedTransaction,
1218 enforce_live_input_objects: bool,
1219 ) -> SuiResult<()> {
1220 if !epoch_store
1221 .get_reconfig_state_read_lock_guard()
1222 .should_accept_user_certs()
1223 {
1224 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1225 }
1226
1227 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1228
1229 let checked_input_objects =
1230 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1231
1232 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1234 let cache_reader = self.get_object_cache_reader();
1235
1236 for obj_ref in &owned_objects {
1237 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1238 if obj_ref.1 < live_object.version() {
1241 return Err(SuiErrorKind::UserInputError {
1242 error: UserInputError::ObjectVersionUnavailableForConsumption {
1243 provided_obj_ref: *obj_ref,
1244 current_version: live_object.version(),
1245 },
1246 }
1247 .into());
1248 }
1249
1250 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1251 return Err(SuiErrorKind::UserInputError {
1253 error: UserInputError::ObjectVersionUnavailableForConsumption {
1254 provided_obj_ref: *obj_ref,
1255 current_version: live_object.version(),
1256 },
1257 }
1258 .into());
1259 }
1260
1261 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1263 return Err(SuiErrorKind::UserInputError {
1264 error: UserInputError::InvalidObjectDigest {
1265 object_id: obj_ref.0,
1266 expected_digest: live_object.digest(),
1267 },
1268 }
1269 .into());
1270 }
1271 }
1272 }
1273
1274 Ok(())
1275 }
1276
1277 pub fn check_system_overload_at_signing(&self) -> bool {
1278 self.config
1279 .authority_overload_config
1280 .check_system_overload_at_signing
1281 }
1282
1283 pub fn check_system_overload_at_execution(&self) -> bool {
1284 self.config
1285 .authority_overload_config
1286 .check_system_overload_at_execution
1287 }
1288
1289 pub(crate) fn check_system_overload(
1290 &self,
1291 tx_data: &SenderSignedData,
1292 do_authority_overload_check: bool,
1293 ) -> SuiResult {
1294 if do_authority_overload_check {
1295 self.check_authority_overload(tx_data).tap_err(|_| {
1296 self.update_overload_metrics("execution_queue");
1297 })?;
1298 }
1299 self.execution_scheduler
1300 .check_execution_overload(self.overload_config(), tx_data)
1301 .tap_err(|_| {
1302 self.update_overload_metrics("execution_pending");
1303 })?;
1304 let pending_tx_count = self
1307 .get_cache_commit()
1308 .approximate_pending_transaction_count();
1309 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1310 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1311 retry_after_secs: 10,
1312 }
1313 .into());
1314 }
1315
1316 Ok(())
1317 }
1318
1319 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1320 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1321 return Ok(());
1322 }
1323
1324 let load_shedding_percentage = self
1325 .overload_info
1326 .load_shedding_percentage
1327 .load(Ordering::Relaxed);
1328 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1329 }
1330
1331 pub(crate) fn update_overload_metrics(&self, source: &str) {
1332 self.metrics
1333 .transaction_overload_sources
1334 .with_label_values(&[source])
1335 .inc();
1336 }
1337
1338 pub(crate) async fn wait_for_fastpath_dependency_objects(
1342 &self,
1343 transaction: &VerifiedTransaction,
1344 epoch: EpochId,
1345 ) -> SuiResult<bool> {
1346 let txn_data = transaction.data().transaction_data();
1347 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1348
1349 let fastpath_dependency_objects: Vec<_> = move_objects
1351 .into_iter()
1352 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1353 .chain(
1354 packages
1355 .into_iter()
1356 .map(|package_id| InputKey::Package { id: package_id }),
1357 )
1358 .collect();
1359 let receiving_keys: HashSet<_> = receiving_objects
1360 .into_iter()
1361 .filter_map(|receiving_obj_ref| {
1362 self.should_wait_for_dependency_object(receiving_obj_ref)
1363 })
1364 .collect();
1365 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1366 return Ok(true);
1367 }
1368
1369 let max_wait = if cfg!(msim) {
1372 Duration::from_millis(200)
1373 } else {
1374 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1375 };
1376
1377 match timeout(
1378 max_wait,
1379 self.get_object_cache_reader().notify_read_input_objects(
1380 &fastpath_dependency_objects,
1381 &receiving_keys,
1382 epoch,
1383 ),
1384 )
1385 .await
1386 {
1387 Ok(()) => Ok(true),
1388 Err(_) => Ok(false),
1391 }
1392 }
1393
1394 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1401 let (obj_id, cur_version, _digest) = obj_ref;
1402 let Some(latest_obj_ref) = self
1403 .get_object_cache_reader()
1404 .get_latest_object_ref_or_tombstone(obj_id)
1405 else {
1406 return Some(InputKey::VersionedObject {
1408 id: FullObjectID::new(obj_id, None),
1409 version: cur_version,
1410 });
1411 };
1412 let latest_digest = latest_obj_ref.2;
1413 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1414 return None;
1416 }
1417 let latest_version = latest_obj_ref.1;
1418 if cur_version <= latest_version {
1419 return None;
1422 }
1423 Some(InputKey::VersionedObject {
1425 id: FullObjectID::new(obj_id, None),
1426 version: cur_version,
1427 })
1428 }
1429
1430 #[instrument(level = "trace", skip_all)]
1435 pub async fn wait_for_transaction_execution_for_testing(
1436 &self,
1437 transaction: &VerifiedExecutableTransaction,
1438 ) -> TransactionEffects {
1439 self.notify_read_effects_for_testing(
1440 "AuthorityState::wait_for_transaction_execution_for_testing",
1441 *transaction.digest(),
1442 )
1443 .await
1444 }
1445
1446 #[instrument(level = "trace", skip_all)]
1458 pub fn try_execute_immediately(
1459 &self,
1460 certificate: &VerifiedExecutableTransaction,
1461 mut execution_env: ExecutionEnv,
1462 epoch_store: &Arc<AuthorityPerEpochStore>,
1463 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1464 let _scope = monitored_scope("Execution::try_execute_immediately");
1465 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1466
1467 let tx_digest = certificate.digest();
1468
1469 if let Some(fork_recovery) = &self.fork_recovery_state
1470 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1471 {
1472 warn!(
1473 ?tx_digest,
1474 original_digest = ?execution_env.expected_effects_digest,
1475 override_digest = ?override_digest,
1476 "Applying fork recovery override for transaction effects digest"
1477 );
1478 execution_env.expected_effects_digest = Some(override_digest);
1479 }
1480
1481 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1483
1484 let tx_cache_reader = self.get_transaction_cache_reader();
1485 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1486 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1487 assert_eq!(
1488 effects.digest(),
1489 expected_effects_digest,
1490 "Unexpected effects digest for transaction {:?}",
1491 tx_digest
1492 );
1493 }
1494 tx_guard.release();
1495 return ExecutionOutput::Success((effects, None));
1496 }
1497
1498 let execution_start_time = Instant::now();
1499
1500 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1505 else {
1506 tx_guard.release();
1507 return ExecutionOutput::EpochEnded;
1508 };
1509 if *execution_guard != epoch_store.epoch() {
1514 tx_guard.release();
1515 info!("The epoch of the execution_guard doesn't match the epoch store");
1516 return ExecutionOutput::EpochEnded;
1517 }
1518
1519 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1520
1521 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1522 &tx_guard,
1523 &execution_guard,
1524 certificate,
1525 execution_env,
1526 epoch_store,
1527 ) {
1528 ExecutionOutput::Success(result) => result,
1529 output => return output.unwrap_err(),
1530 };
1531 let transaction_outputs = Arc::new(transaction_outputs);
1532
1533 fail_point!("crash");
1534
1535 let effects = transaction_outputs.effects.clone();
1536 debug!(
1537 ?tx_digest,
1538 fx_digest=?effects.digest(),
1539 "process_certificate succeeded in {:.3}ms",
1540 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1541 );
1542
1543 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1544 if let Err(err) = commit_result {
1545 error!(?tx_digest, "Error committing transaction: {err}");
1546 tx_guard.release();
1547 return ExecutionOutput::Fatal(err);
1548 }
1549
1550 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1551 certificate.data().transaction_data().kind()
1552 {
1553 if let Some(err) = &execution_error_opt {
1554 debug_fatal!("Authenticator state update failed: {:?}", err);
1555 }
1556 epoch_store.update_authenticator_state(auth_state);
1557
1558 if cfg!(debug_assertions) {
1560 let authenticator_state = get_authenticator_state(self.get_object_store())
1561 .expect("Read cannot fail")
1562 .expect("Authenticator state must exist");
1563
1564 let mut sys_jwks: Vec<_> = authenticator_state
1565 .active_jwks
1566 .into_iter()
1567 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1568 .collect();
1569 let mut active_jwks: Vec<_> = epoch_store
1570 .signature_verifier
1571 .get_jwks()
1572 .into_iter()
1573 .collect();
1574 sys_jwks.sort();
1575 active_jwks.sort();
1576
1577 assert_eq!(sys_jwks, active_jwks);
1578 }
1579 }
1580
1581 if certificate
1582 .transaction_data()
1583 .kind()
1584 .is_accumulator_barrier_settle_tx()
1585 {
1586 let object_funds_checker = self.object_funds_checker.load();
1587 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1588 let next_accumulator_version = accumulator_version.unwrap().next();
1591 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1592 }
1593 }
1594
1595 tx_guard.commit_tx();
1596
1597 let elapsed = execution_start_time.elapsed();
1598 epoch_store.record_local_execution_time(
1599 certificate.data().transaction_data(),
1600 &effects,
1601 timings,
1602 elapsed,
1603 );
1604
1605 let elapsed_us = elapsed.as_micros() as f64;
1606 if elapsed_us > 0.0 {
1607 self.metrics
1608 .execution_gas_latency_ratio
1609 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1610 };
1611 ExecutionOutput::Success((effects, execution_error_opt))
1612 }
1613
1614 pub fn read_objects_for_execution(
1615 &self,
1616 tx_lock: &CertLockGuard,
1617 certificate: &VerifiedExecutableTransaction,
1618 assigned_shared_object_versions: &AssignedVersions,
1619 epoch_store: &Arc<AuthorityPerEpochStore>,
1620 ) -> SuiResult<InputObjects> {
1621 let _scope = monitored_scope("Execution::load_input_objects");
1622 let _metrics_guard = self
1623 .metrics
1624 .execution_load_input_objects_latency
1625 .start_timer();
1626 let input_objects = &certificate.data().transaction_data().input_objects()?;
1627 self.input_loader.read_objects_for_execution(
1628 &certificate.key(),
1629 tx_lock,
1630 input_objects,
1631 assigned_shared_object_versions,
1632 epoch_store.epoch(),
1633 )
1634 }
1635
1636 pub async fn try_execute_for_test(
1639 &self,
1640 certificate: &VerifiedCertificate,
1641 execution_env: ExecutionEnv,
1642 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1643 let epoch_store = self.epoch_store_for_testing();
1644 let (effects, execution_error_opt) = self
1645 .try_execute_immediately(
1646 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1647 execution_env,
1648 &epoch_store,
1649 )
1650 .unwrap();
1651 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1652 (signed_effects, execution_error_opt)
1653 }
1654
1655 pub async fn try_execute_executable_for_test(
1656 &self,
1657 executable: &VerifiedExecutableTransaction,
1658 execution_env: ExecutionEnv,
1659 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1660 let epoch_store = self.epoch_store_for_testing();
1661 let (effects, execution_error_opt) = self
1662 .try_execute_immediately(executable, execution_env, &epoch_store)
1663 .unwrap();
1664 self.flush_post_processing(executable.digest()).await;
1665 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1666 (signed_effects, execution_error_opt)
1667 }
1668
1669 pub async fn notify_read_effects_for_testing(
1674 &self,
1675 task_name: &'static str,
1676 digest: TransactionDigest,
1677 ) -> TransactionEffects {
1678 self.get_transaction_cache_reader()
1679 .notify_read_executed_effects(task_name, &[digest])
1680 .await
1681 .pop()
1682 .expect("must return correct number of effects")
1683 }
1684
1685 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1686 self.get_object_cache_reader()
1687 .check_owned_objects_are_live(owned_object_refs)
1688 }
1689
1690 pub(crate) fn debug_dump_transaction_state(
1695 &self,
1696 tx_digest: &TransactionDigest,
1697 effects: &TransactionEffects,
1698 expected_effects_digest: TransactionEffectsDigest,
1699 inner_temporary_store: &InnerTemporaryStore,
1700 certificate: &VerifiedExecutableTransaction,
1701 debug_dump_config: &StateDebugDumpConfig,
1702 ) -> SuiResult<PathBuf> {
1703 let dump_dir = debug_dump_config
1704 .dump_file_directory
1705 .as_ref()
1706 .cloned()
1707 .unwrap_or(std::env::temp_dir());
1708 let epoch_store = self.load_epoch_store_one_call_per_task();
1709
1710 NodeStateDump::new(
1711 tx_digest,
1712 effects,
1713 expected_effects_digest,
1714 self.get_object_store().as_ref(),
1715 &epoch_store,
1716 inner_temporary_store,
1717 certificate,
1718 )?
1719 .write_to_file(&dump_dir)
1720 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1721 }
1722
1723 #[instrument(level = "trace", skip_all)]
1724 pub(crate) fn process_certificate(
1725 &self,
1726 tx_guard: &CertTxGuard,
1727 execution_guard: &ExecutionLockReadGuard<'_>,
1728 certificate: &VerifiedExecutableTransaction,
1729 execution_env: ExecutionEnv,
1730 epoch_store: &Arc<AuthorityPerEpochStore>,
1731 ) -> ExecutionOutput<(
1732 TransactionOutputs,
1733 Vec<ExecutionTiming>,
1734 Option<ExecutionError>,
1735 )> {
1736 let _scope = monitored_scope("Execution::process_certificate");
1737 let tx_digest = *certificate.digest();
1738
1739 let input_objects = match self.read_objects_for_execution(
1740 tx_guard.as_lock_guard(),
1741 certificate,
1742 &execution_env.assigned_versions,
1743 epoch_store,
1744 ) {
1745 Ok(objects) => objects,
1746 Err(e) => return ExecutionOutput::Fatal(e),
1747 };
1748
1749 let expected_effects_digest = match execution_env.expected_effects_digest {
1750 Some(expected_effects_digest) => Some(expected_effects_digest),
1751 None => {
1752 match epoch_store.get_signed_effects_digest(&tx_digest) {
1756 Ok(digest) => digest,
1757 Err(e) => return ExecutionOutput::Fatal(e),
1758 }
1759 }
1760 };
1761
1762 fail_point_if!("correlated-crash-process-certificate", || {
1763 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1764 sui_simulator::task::kill_current_node(None);
1765 }
1766 });
1767
1768 self.execute_certificate(
1773 execution_guard,
1774 certificate,
1775 input_objects,
1776 expected_effects_digest,
1777 execution_env,
1778 epoch_store,
1779 )
1780 }
1781
1782 pub async fn reconfigure_traffic_control(
1783 &self,
1784 params: TrafficControlReconfigParams,
1785 ) -> Result<TrafficControlReconfigParams, SuiError> {
1786 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1787 traffic_controller.admin_reconfigure(params).await
1788 } else {
1789 Err(SuiErrorKind::InvalidAdminRequest(
1790 "Traffic controller is not configured on this node".to_string(),
1791 )
1792 .into())
1793 }
1794 }
1795
1796 #[instrument(level = "trace", skip_all)]
1797 fn commit_certificate(
1798 &self,
1799 certificate: &VerifiedExecutableTransaction,
1800 transaction_outputs: Arc<TransactionOutputs>,
1801 epoch_store: &Arc<AuthorityPerEpochStore>,
1802 ) -> SuiResult {
1803 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1804 monitored_scope("Execution::commit_certificate");
1805 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1806
1807 let tx_digest = certificate.digest();
1808
1809 epoch_store.insert_executed_in_epoch(tx_digest);
1812 let key = certificate.key();
1813 if !matches!(key, TransactionKey::Digest(_)) {
1814 epoch_store.insert_tx_key(key, *tx_digest)?;
1815 }
1816
1817 fail_point!("crash");
1819
1820 self.get_cache_writer()
1821 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1822
1823 if let Some(settlement_key) = certificate
1828 .transaction_data()
1829 .kind()
1830 .accumulator_barrier_settlement_key()
1831 {
1832 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1833 }
1834
1835 if certificate.transaction_data().is_end_of_epoch_tx() {
1836 self.get_object_cache_reader()
1839 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1840 }
1841
1842 Ok(())
1843 }
1844
1845 fn update_metrics(
1846 &self,
1847 certificate: &VerifiedExecutableTransaction,
1848 inner_temporary_store: &InnerTemporaryStore,
1849 effects: &TransactionEffects,
1850 ) {
1851 if certificate.has_zklogin_sig() {
1853 self.metrics.zklogin_sig_count.inc();
1854 } else if certificate.has_upgraded_multisig() {
1855 self.metrics.multisig_sig_count.inc();
1856 }
1857
1858 self.metrics.total_effects.inc();
1859 self.metrics.total_certs.inc();
1860
1861 let consensus_object_count = effects.input_consensus_objects().len();
1862 if consensus_object_count > 0 {
1863 self.metrics.shared_obj_tx.inc();
1864 }
1865
1866 if certificate.is_sponsored_tx() {
1867 self.metrics.sponsored_tx.inc();
1868 }
1869
1870 let input_object_count = inner_temporary_store.input_objects.len();
1871 self.metrics
1872 .num_input_objs
1873 .observe(input_object_count as f64);
1874 self.metrics
1875 .num_shared_objects
1876 .observe(consensus_object_count as f64);
1877 self.metrics.batch_size.observe(
1878 certificate
1879 .data()
1880 .intent_message()
1881 .value
1882 .kind()
1883 .num_commands() as f64,
1884 );
1885 }
1886
1887 fn execute_transaction_to_effects(
1890 &self,
1891 executor: &dyn Executor,
1892 store: &dyn BackingStore,
1893 protocol_config: &ProtocolConfig,
1894 enable_expensive_checks: bool,
1895 execution_params: ExecutionOrEarlyError,
1896 epoch_id: &EpochId,
1897 epoch_timestamp_ms: u64,
1898 input_objects: CheckedInputObjects,
1899 gas_data: GasData,
1900 gas_status: SuiGasStatus,
1901 kind: TransactionKind,
1902 rewritten_inputs: Option<Vec<bool>>,
1903 signer: SuiAddress,
1904 tx_digest: TransactionDigest,
1905 ) -> (
1906 InnerTemporaryStore,
1907 SuiGasStatus,
1908 TransactionEffects,
1909 Vec<ExecutionTiming>,
1910 Result<(), ExecutionError>,
1911 ) {
1912 let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1913 .execute_transaction_to_effects_and_execution_error(
1915 store,
1916 protocol_config,
1917 self.metrics.execution_metrics.clone(),
1918 enable_expensive_checks,
1919 execution_params,
1920 epoch_id,
1921 epoch_timestamp_ms,
1922 input_objects,
1923 gas_data,
1924 gas_status,
1925 kind,
1926 rewritten_inputs,
1927 signer,
1928 tx_digest,
1929 &mut None,
1930 );
1931
1932 (
1933 inner_temp_store,
1934 gas_status,
1935 effects,
1936 timings,
1937 execution_error,
1938 )
1939 }
1940
1941 #[instrument(level = "trace", skip_all)]
1950 fn execute_certificate(
1951 &self,
1952 _execution_guard: &ExecutionLockReadGuard<'_>,
1953 certificate: &VerifiedExecutableTransaction,
1954 input_objects: InputObjects,
1955 expected_effects_digest: Option<TransactionEffectsDigest>,
1956 execution_env: ExecutionEnv,
1957 epoch_store: &Arc<AuthorityPerEpochStore>,
1958 ) -> ExecutionOutput<(
1959 TransactionOutputs,
1960 Vec<ExecutionTiming>,
1961 Option<ExecutionError>,
1962 )> {
1963 let _scope = monitored_scope("Execution::prepare_certificate");
1964 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1965 let prepare_certificate_start_time = tokio::time::Instant::now();
1966
1967 let tx_data = certificate.data().transaction_data();
1969
1970 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1971 return ExecutionOutput::Fatal(e);
1972 }
1973
1974 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1978 certificate,
1979 input_objects,
1980 epoch_store.protocol_config(),
1981 epoch_store.reference_gas_price(),
1982 ) {
1983 Ok(result) => result,
1984 Err(e) => return ExecutionOutput::Fatal(e),
1985 };
1986
1987 let owned_object_refs = input_objects.inner().filter_owned_objects();
1988 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1989 return ExecutionOutput::Fatal(e);
1990 }
1991 let tx_digest = *certificate.digest();
1992 let protocol_config = epoch_store.protocol_config();
1993 let transaction_data = &certificate.data().intent_message().value;
1994 let sender = transaction_data.sender();
1995 let (mut kind, signer, gas_data) = transaction_data.execution_parts();
1996 let early_execution_error = get_early_execution_error(
1997 &tx_digest,
1998 &input_objects,
1999 self.config.certificate_deny_config.certificate_deny_set(),
2000 &execution_env.funds_withdraw_status,
2001 );
2002 let accumulator_version = if self.chain_identifier.chain() == Chain::Mainnet {
2006 execution_env.assigned_versions.accumulator_version
2007 } else {
2008 None
2009 };
2010 let execution_params = match early_execution_error {
2011 None => ExecutionOrEarlyError::ok(accumulator_version),
2012 Some(errors) => ExecutionOrEarlyError::failed(errors, accumulator_version),
2013 };
2014
2015 let rewritten_inputs = if execution_params.is_ok() {
2018 rewrite_transaction_for_coin_reservations(
2019 self.chain_identifier,
2020 &*self.coin_reservation_resolver,
2021 sender,
2022 &mut kind,
2023 execution_env.assigned_versions.accumulator_version,
2024 )
2025 .expect("rewriting must succeed for a certified transaction")
2026 } else {
2027 None
2028 };
2029
2030 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2031
2032 #[allow(unused_mut)]
2033 let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2034 .execute_transaction_to_effects(
2035 &**epoch_store.executor(),
2036 &tracking_store,
2037 protocol_config,
2038 self.config
2041 .expensive_safety_check_config
2042 .enable_deep_per_tx_sui_conservation_check(),
2043 execution_params,
2044 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2045 epoch_store
2046 .epoch_start_config()
2047 .epoch_data()
2048 .epoch_start_timestamp(),
2049 input_objects,
2050 gas_data,
2051 gas_status,
2052 kind,
2053 rewritten_inputs,
2054 signer,
2055 tx_digest,
2056 );
2057
2058 let object_funds_checker = self.object_funds_checker.load();
2059 if let Some(object_funds_checker) = object_funds_checker.as_ref()
2060 && !object_funds_checker.should_commit_object_funds_withdraws(
2061 certificate,
2062 effects.status(),
2063 &inner_temp_store.accumulator_running_max_withdraws,
2064 &execution_env,
2065 self.get_account_funds_read(),
2066 &self.execution_scheduler,
2067 epoch_store,
2068 )
2069 {
2070 assert_reachable!("retry object withdraw later");
2071 return ExecutionOutput::RetryLater;
2072 }
2073
2074 if let Some(expected_effects_digest) = expected_effects_digest
2075 && effects.digest() != expected_effects_digest
2076 {
2077 match self.debug_dump_transaction_state(
2079 &tx_digest,
2080 &effects,
2081 expected_effects_digest,
2082 &inner_temp_store,
2083 certificate,
2084 &self.config.state_debug_dump_config,
2085 ) {
2086 Ok(out_path) => {
2087 info!(
2088 "Dumped node state for transaction {} to {}",
2089 tx_digest,
2090 out_path.as_path().display().to_string()
2091 );
2092 }
2093 Err(e) => {
2094 error!("Error dumping state for transaction {}: {e}", tx_digest);
2095 }
2096 }
2097 let expected_effects = self
2098 .get_transaction_cache_reader()
2099 .get_effects(&expected_effects_digest);
2100 error!(
2101 ?tx_digest,
2102 ?expected_effects_digest,
2103 actual_effects = ?effects,
2104 expected_effects = ?expected_effects,
2105 "fork detected!"
2106 );
2107 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2108 tx_digest,
2109 expected_effects_digest,
2110 effects.digest(),
2111 ) {
2112 error!("Failed to record transaction fork: {e}");
2113 }
2114
2115 fail_point_if!("kill_transaction_fork_node", || {
2116 #[cfg(msim)]
2117 {
2118 tracing::error!(
2119 fatal = true,
2120 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2121 tx_digest
2122 );
2123 sui_simulator::task::shutdown_current_node();
2124 }
2125 });
2126
2127 fatal!(
2128 "Transaction {} is expected to have effects digest {}, but got {}!",
2129 tx_digest,
2130 expected_effects_digest,
2131 effects.digest()
2132 );
2133 }
2134
2135 fail_point_arg!("simulate_fork_during_execution", |(
2136 forked_validators,
2137 full_halt,
2138 effects_overrides,
2139 fork_probability,
2140 ): (
2141 std::sync::Arc<
2142 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2143 >,
2144 bool,
2145 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2146 f32,
2147 )| {
2148 #[cfg(msim)]
2149 self.simulate_fork_during_execution(
2150 certificate,
2151 epoch_store,
2152 &mut effects,
2153 forked_validators,
2154 full_halt,
2155 effects_overrides,
2156 fork_probability,
2157 );
2158 });
2159
2160 let unchanged_loaded_runtime_objects =
2161 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2162 certificate.transaction_data(),
2163 &effects,
2164 &tracking_store.into_read_objects(),
2165 );
2166
2167 let _ = self
2169 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2170 .tap_err(|e| {
2171 self.metrics.post_processing_total_failures.inc();
2172 error!(?tx_digest, "tx post processing failed: {e}");
2173 });
2174
2175 self.update_metrics(certificate, &inner_temp_store, &effects);
2176
2177 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2178 certificate.clone().into_unsigned(),
2179 effects,
2180 inner_temp_store,
2181 unchanged_loaded_runtime_objects,
2182 );
2183
2184 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2185 if elapsed > 0.0 {
2186 self.metrics.prepare_cert_gas_latency_ratio.observe(
2187 transaction_outputs
2188 .effects
2189 .gas_cost_summary()
2190 .computation_cost as f64
2191 / elapsed,
2192 );
2193 }
2194
2195 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2196 }
2197
2198 pub fn prepare_certificate_for_benchmark(
2199 &self,
2200 certificate: &VerifiedExecutableTransaction,
2201 input_objects: InputObjects,
2202 epoch_store: &Arc<AuthorityPerEpochStore>,
2203 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2204 let lock = RwLock::new(epoch_store.epoch());
2205 let execution_guard = lock.try_read().unwrap();
2206
2207 let (transaction_outputs, _timings, execution_error_opt) = self
2208 .execute_certificate(
2209 &execution_guard,
2210 certificate,
2211 input_objects,
2212 None,
2213 ExecutionEnv::default(),
2214 epoch_store,
2215 )
2216 .unwrap();
2217 Ok((transaction_outputs, execution_error_opt))
2218 }
2219
2220 #[instrument(skip_all)]
2221 #[allow(clippy::type_complexity)]
2222 pub async fn dry_exec_transaction(
2223 &self,
2224 transaction: TransactionData,
2225 ) -> SuiResult<(
2226 DryRunTransactionBlockResponse,
2227 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2228 TransactionEffects,
2229 Option<ObjectID>,
2230 )> {
2231 let epoch_store = self.load_epoch_store_one_call_per_task();
2232 if !self.is_fullnode(&epoch_store) {
2233 return Err(SuiErrorKind::UnsupportedFeatureError {
2234 error: "dry-exec is only supported on fullnodes".to_string(),
2235 }
2236 .into());
2237 }
2238
2239 if transaction.kind().is_system_tx() {
2240 return Err(SuiErrorKind::UnsupportedFeatureError {
2241 error: "dry-exec does not support system transactions".to_string(),
2242 }
2243 .into());
2244 }
2245
2246 self.dry_exec_transaction_impl(&epoch_store, transaction)
2247 }
2248
2249 #[allow(clippy::type_complexity)]
2250 fn dry_exec_transaction_impl(
2251 &self,
2252 epoch_store: &AuthorityPerEpochStore,
2253 transaction: TransactionData,
2254 ) -> SuiResult<(
2255 DryRunTransactionBlockResponse,
2256 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2257 TransactionEffects,
2258 Option<ObjectID>,
2259 )> {
2260 let sim = self.simulate_transaction(
2263 transaction.clone(),
2264 TransactionChecks::Enabled,
2265 true,
2266 )?;
2267
2268 self.build_dry_run_response(epoch_store, transaction, sim)
2269 }
2270
2271 #[allow(clippy::type_complexity)]
2283 fn build_dry_run_response(
2284 &self,
2285 epoch_store: &AuthorityPerEpochStore,
2286 transaction: TransactionData,
2287 sim: SimulateTransactionResult,
2288 ) -> SuiResult<(
2289 DryRunTransactionBlockResponse,
2290 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2291 TransactionEffects,
2292 Option<ObjectID>,
2293 )> {
2294 let SimulateTransactionResult {
2295 effects,
2296 events,
2297 objects,
2298 execution_result,
2299 mock_gas_id,
2300 suggested_gas_price,
2301 ..
2302 } = sim;
2303
2304 let tx_digest = *effects.transaction_digest();
2305
2306 let written_with_kind: BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)> = effects
2311 .created()
2312 .into_iter()
2313 .map(|(oref, _)| (oref, WriteKind::Create))
2314 .chain(
2315 effects
2316 .unwrapped()
2317 .into_iter()
2318 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2319 )
2320 .chain(
2321 effects
2322 .mutated()
2323 .into_iter()
2324 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2325 )
2326 .filter_map(|(oref, kind)| {
2327 objects
2328 .get(&ObjectKey(oref.0, oref.1))
2329 .map(|obj| (oref.0, (oref, obj.clone(), kind)))
2330 })
2331 .collect();
2332
2333 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2337 epoch_store.protocol_config(),
2338 Box::new(OverlayBackingPackageStore::new(
2339 &objects,
2340 self.get_backing_package_store(),
2341 )),
2342 );
2343
2344 let execution_error_source = execution_result
2345 .as_ref()
2346 .err()
2347 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2348
2349 let response = DryRunTransactionBlockResponse {
2350 suggested_gas_price,
2351 input: SuiTransactionBlockData::try_from_with_module_cache(
2352 transaction,
2353 &epoch_store.module_cache().clone(),
2354 )
2355 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2356 error: format!("Failed to convert transaction to SuiTransactionBlockData: {e}"),
2357 })?,
2358 effects: effects.clone().try_into()?,
2359 events: SuiTransactionBlockEvents::try_from(
2360 events.unwrap_or_default(),
2361 tx_digest,
2362 None,
2363 layout_resolver.as_mut(),
2364 )?,
2365 object_changes: Vec::new(),
2368 balance_changes: Vec::new(),
2369 execution_error_source,
2370 };
2371
2372 Ok((response, written_with_kind, effects, mock_gas_id))
2373 }
2374
2375 pub fn simulate_transaction(
2376 &self,
2377 mut transaction: TransactionData,
2378 checks: TransactionChecks,
2379 allow_mock_gas_coin: bool,
2380 ) -> SuiResult<SimulateTransactionResult> {
2381 if transaction.kind().is_system_tx() {
2382 return Err(SuiErrorKind::UnsupportedFeatureError {
2383 error: "simulate does not support system transactions".to_string(),
2384 }
2385 .into());
2386 }
2387
2388 let epoch_store = self.load_epoch_store_one_call_per_task();
2389 if !self.is_fullnode(&epoch_store) {
2390 return Err(SuiErrorKind::UnsupportedFeatureError {
2391 error: "simulate is only supported on fullnodes".to_string(),
2392 }
2393 .into());
2394 }
2395
2396 let dev_inspect = checks.disabled();
2397 if dev_inspect && self.config.dev_inspect_disabled {
2398 return Err(SuiErrorKind::UnsupportedFeatureError {
2399 error: "simulate with checks disabled is not allowed on this node".to_string(),
2400 }
2401 .into());
2402 }
2403
2404 let protocol_config = epoch_store.protocol_config();
2407 if !protocol_config.enable_coin_reservation_obj_refs()
2408 && transaction.gas().iter().any(|obj_ref| {
2409 sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2410 })
2411 {
2412 return Err(SuiErrorKind::UnsupportedFeatureError {
2413 error:
2414 "coin reservations in gas payment are not supported at this protocol version"
2415 .to_string(),
2416 }
2417 .into());
2418 }
2419
2420 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2422
2423 let input_object_kinds = transaction.input_objects()?;
2424 let receiving_object_refs = transaction.receiving_objects();
2425
2426 let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2431 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2432 {
2433 let obj = Object::new_move(
2434 MoveObject::new_gas_coin(
2435 OBJECT_START_VERSION,
2436 ObjectID::MAX,
2437 DEV_INSPECT_GAS_COIN_VALUE,
2438 ),
2439 Owner::AddressOwner(transaction.gas_data().owner),
2440 TransactionDigest::genesis_marker(),
2441 );
2442 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2443 Some(obj)
2444 } else {
2445 None
2446 };
2447
2448 let declared_withdrawals = self.pre_object_load_checks(
2449 &transaction,
2450 &[],
2451 &input_object_kinds,
2452 &receiving_object_refs,
2453 epoch_store.protocol_config(),
2454 )?;
2455 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2456
2457 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2458 None,
2460 &input_object_kinds,
2461 &receiving_object_refs,
2462 epoch_store.epoch(),
2463 )?;
2464
2465 let mock_gas_id = mock_gas_object.map(|obj| {
2467 let id = obj.id();
2468 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2469 id
2470 });
2471
2472 let protocol_config = epoch_store.protocol_config();
2473
2474 let (gas_status, checked_input_objects) = if dev_inspect {
2475 sui_transaction_checks::check_dev_inspect_input(
2476 protocol_config,
2477 &transaction,
2478 input_objects,
2479 receiving_objects,
2480 epoch_store.reference_gas_price(),
2481 )?
2482 } else {
2483 sui_transaction_checks::check_transaction_input(
2484 epoch_store.protocol_config(),
2485 epoch_store.reference_gas_price(),
2486 &transaction,
2487 input_objects,
2488 &receiving_objects,
2489 &self.metrics.bytecode_verifier_metrics,
2490 &self.config.verifier_signing_config,
2491 )?
2492 };
2493
2494 let executor = sui_execution::executor(
2496 protocol_config,
2497 true, )
2499 .expect("Creating an executor should not fail here");
2500
2501 let (mut kind, signer, gas_data) = transaction.execution_parts();
2502 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2503 self.chain_identifier,
2504 &*self.coin_reservation_resolver,
2505 signer,
2506 &mut kind,
2507 None,
2508 )?;
2509 let early_execution_error = get_early_execution_error(
2510 &transaction.digest(),
2511 &checked_input_objects,
2512 self.config.certificate_deny_config.certificate_deny_set(),
2513 &FundsWithdrawStatus::MaybeSufficient,
2514 );
2515 let execution_params = match early_execution_error {
2518 None => ExecutionOrEarlyError::ok(None),
2519 Some(errors) => ExecutionOrEarlyError::failed(errors, None),
2520 };
2521
2522 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2523
2524 let cloned_input_objects = checked_input_objects.clone();
2526 let cloned_gas = gas_data.clone();
2527 let cloned_kind = kind.clone();
2528 let tx_digest = transaction.digest();
2529 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2530 let epoch_timestamp_ms = epoch_store
2531 .epoch_start_config()
2532 .epoch_data()
2533 .epoch_start_timestamp();
2534 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2535 &tracking_store,
2536 protocol_config,
2537 self.metrics.execution_metrics.clone(),
2538 false, execution_params,
2540 &epoch_id,
2541 epoch_timestamp_ms,
2542 checked_input_objects,
2543 gas_data,
2544 gas_status,
2545 kind,
2546 rewritten_inputs.clone(),
2547 signer,
2548 tx_digest,
2549 dev_inspect,
2550 );
2551
2552 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2554 let has_insufficient_object_funds = inner_temp_store
2555 .accumulator_running_max_withdraws
2556 .iter()
2557 .filter(|(id, _)| !address_funds.contains(id))
2558 .any(|(id, max_withdraw)| {
2559 let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2560 balance < *max_withdraw
2561 });
2562
2563 if has_insufficient_object_funds {
2564 let retry_gas_status = SuiGasStatus::new(
2565 cloned_gas.budget,
2566 cloned_gas.price,
2567 epoch_store.reference_gas_price(),
2568 protocol_config,
2569 )?;
2570 let (store, _, effects, result) = executor.dev_inspect_transaction(
2571 &tracking_store,
2572 protocol_config,
2573 self.metrics.execution_metrics.clone(),
2574 false,
2575 ExecutionOrEarlyError::failed(
2576 NonEmpty::new(ExecutionErrorKind::InsufficientFundsForWithdraw),
2577 None,
2578 ),
2579 &epoch_id,
2580 epoch_timestamp_ms,
2581 cloned_input_objects,
2582 cloned_gas,
2583 retry_gas_status,
2584 cloned_kind,
2585 rewritten_inputs,
2586 signer,
2587 tx_digest,
2588 dev_inspect,
2589 );
2590 (store, effects, result)
2591 } else {
2592 (inner_temp_store, effects, execution_result)
2593 }
2594 } else {
2595 (inner_temp_store, effects, execution_result)
2596 };
2597
2598 let loaded_runtime_objects = tracking_store.into_read_objects();
2599 let unchanged_loaded_runtime_objects =
2600 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2601 &transaction,
2602 &effects,
2603 &loaded_runtime_objects,
2604 );
2605
2606 let object_set = {
2607 let objects = {
2608 let mut objects = loaded_runtime_objects;
2609
2610 for o in inner_temp_store
2611 .input_objects
2612 .into_values()
2613 .chain(inner_temp_store.written.into_values())
2614 {
2615 objects.insert(o);
2616 }
2617
2618 objects
2619 };
2620
2621 let object_keys = sui_types::storage::get_transaction_object_set(
2622 &transaction,
2623 &effects,
2624 &unchanged_loaded_runtime_objects,
2625 );
2626
2627 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2628 for k in object_keys {
2629 if let Some(o) = objects.get(&k) {
2630 set.insert(o.clone());
2631 }
2632 }
2633
2634 set
2635 };
2636
2637 Ok(SimulateTransactionResult {
2638 objects: object_set,
2639 events: effects.events_digest().map(|_| inner_temp_store.events),
2640 effects,
2641 execution_result,
2642 mock_gas_id,
2643 unchanged_loaded_runtime_objects,
2644 suggested_gas_price: self
2645 .congestion_tracker
2646 .get_suggested_gas_prices(&transaction),
2647 })
2648 }
2649
2650 #[instrument(skip_all)]
2652 pub async fn dev_inspect_transaction_block(
2653 &self,
2654 sender: SuiAddress,
2655 transaction_kind: TransactionKind,
2656 gas_price: Option<u64>,
2657 gas_budget: Option<u64>,
2658 gas_sponsor: Option<SuiAddress>,
2659 gas_objects: Option<Vec<ObjectRef>>,
2660 show_raw_txn_data_and_effects: Option<bool>,
2661 skip_checks: Option<bool>,
2662 ) -> SuiResult<DevInspectResults> {
2663 let epoch_store = self.load_epoch_store_one_call_per_task();
2664 let protocol_config = epoch_store.protocol_config();
2665 let reference_gas_price = epoch_store.reference_gas_price();
2666
2667 let skip_checks = skip_checks.unwrap_or(true);
2668 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2669
2670 let price = gas_price.unwrap_or(reference_gas_price);
2672 let budget = gas_budget.unwrap_or(protocol_config.max_tx_gas());
2673 let owner = gas_sponsor.unwrap_or(sender);
2674 let payment = gas_objects.unwrap_or_default();
2675 let transaction = TransactionData::V1(TransactionDataV1 {
2676 kind: transaction_kind,
2677 sender,
2678 gas_data: GasData {
2679 payment,
2680 owner,
2681 price,
2682 budget,
2683 },
2684 expiration: TransactionExpiration::None,
2685 });
2686
2687 let raw_txn_data = if show_raw_txn_data_and_effects {
2690 bcs::to_bytes(&transaction).map_err(|_| {
2691 SuiErrorKind::TransactionSerializationError {
2692 error: "Failed to serialize transaction during dev inspect".to_string(),
2693 }
2694 })?
2695 } else {
2696 vec![]
2697 };
2698
2699 let checks = if skip_checks {
2703 TransactionChecks::Disabled
2704 } else {
2705 TransactionChecks::Enabled
2706 };
2707 let sim =
2708 self.simulate_transaction(transaction, checks, true)?;
2709
2710 let raw_effects = if show_raw_txn_data_and_effects {
2711 bcs::to_bytes(&sim.effects).map_err(|_| {
2712 SuiErrorKind::TransactionSerializationError {
2713 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2714 }
2715 })?
2716 } else {
2717 vec![]
2718 };
2719
2720 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2724 epoch_store.protocol_config(),
2725 Box::new(OverlayBackingPackageStore::new(
2726 &sim.objects,
2727 self.get_backing_package_store(),
2728 )),
2729 );
2730
2731 DevInspectResults::new(
2732 sim.effects,
2733 sim.events.unwrap_or_default(),
2734 sim.execution_result,
2735 raw_txn_data,
2736 raw_effects,
2737 layout_resolver.as_mut(),
2738 )
2739 }
2740
2741 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2743 let epoch_store = self.epoch_store_for_testing();
2744 Ok(epoch_store.reference_gas_price())
2745 }
2746
2747 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2748 self.get_transaction_cache_reader()
2749 .is_tx_already_executed(digest)
2750 }
2751
2752 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2753 fn index_tx(
2754 sequence: u64,
2755 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2756 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2757 indexes: &IndexStore,
2758 digest: &TransactionDigest,
2759 cert: &VerifiedExecutableTransaction,
2761 effects: &TransactionEffects,
2762 events: &TransactionEvents,
2763 timestamp_ms: u64,
2764 tx_coins: Option<TxCoins>,
2765 written: &WrittenObjects,
2766 inner_temporary_store: &InnerTemporaryStore,
2767 epoch_store: &Arc<AuthorityPerEpochStore>,
2768 acquire_locks: bool,
2769 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2770 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2771 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2772
2773 indexes.index_tx(
2774 sequence,
2775 cert.data().intent_message().value.sender(),
2776 cert.data()
2777 .intent_message()
2778 .value
2779 .input_objects()?
2780 .iter()
2781 .map(|o| o.object_id()),
2782 effects
2783 .all_changed_objects()
2784 .into_iter()
2785 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2786 cert.data()
2787 .intent_message()
2788 .value
2789 .move_calls()
2790 .into_iter()
2791 .map(|(_cmd_idx, package, module, function)| {
2792 (*package, module.to_owned(), function.to_owned())
2793 }),
2794 events,
2795 changes,
2796 digest,
2797 timestamp_ms,
2798 tx_coins,
2799 effects.accumulator_events(),
2800 acquire_locks,
2801 )
2802 }
2803
2804 #[cfg(msim)]
2805 fn simulate_fork_during_execution(
2806 &self,
2807 certificate: &VerifiedExecutableTransaction,
2808 epoch_store: &Arc<AuthorityPerEpochStore>,
2809 effects: &mut TransactionEffects,
2810 forked_validators: std::sync::Arc<
2811 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2812 >,
2813 full_halt: bool,
2814 effects_overrides: std::sync::Arc<
2815 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2816 >,
2817 fork_probability: f32,
2818 ) {
2819 use std::cell::RefCell;
2820 thread_local! {
2821 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2822 }
2823 if !certificate.data().intent_message().value.is_system_tx() {
2824 let committee = epoch_store.committee();
2825 let cur_stake = (**committee).weight(&self.name);
2826 if cur_stake > 0 {
2827 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2828 let already_forked = forked_validators
2829 .lock()
2830 .ok()
2831 .map(|set| set.contains(&self.name))
2832 .unwrap_or(false);
2833
2834 if !already_forked {
2835 let should_fork = if full_halt {
2836 *total_stake <= committee.validity_threshold()
2838 } else {
2839 *total_stake + cur_stake < committee.validity_threshold()
2841 };
2842
2843 if should_fork {
2844 *total_stake += cur_stake;
2845
2846 if let Ok(mut external_set) = forked_validators.lock() {
2847 external_set.insert(self.name);
2848 info!("forked_validators: {:?}", external_set);
2849 }
2850 }
2851 }
2852
2853 if let Ok(external_set) = forked_validators.lock() {
2854 if external_set.contains(&self.name) {
2855 let tx_digest = certificate.digest().to_string();
2861 if let Ok(mut overrides) = effects_overrides.lock() {
2862 if overrides.contains_key(&tx_digest)
2863 || overrides.is_empty()
2864 && sui_simulator::random::deterministic_probability(
2865 &tx_digest,
2866 fork_probability,
2867 )
2868 {
2869 let original_effects_digest = effects.digest().to_string();
2870 overrides
2871 .insert(tx_digest.clone(), original_effects_digest.clone());
2872 info!(
2873 ?tx_digest,
2874 ?original_effects_digest,
2875 "Captured forked effects digest for transaction"
2876 );
2877 effects.gas_cost_summary_mut_for_testing().computation_cost +=
2878 1;
2879 }
2880 }
2881 }
2882 }
2883 });
2884 }
2885 }
2886 }
2887
2888 fn process_object_index(
2889 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2890 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2891 effects: &TransactionEffects,
2892 written: &WrittenObjects,
2893 inner_temporary_store: &InnerTemporaryStore,
2894 epoch_store: &Arc<AuthorityPerEpochStore>,
2895 ) -> SuiResult<ObjectIndexChanges> {
2896 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2897 epoch_store.protocol_config(),
2898 Box::new(PackageStoreWithFallback::new(
2899 inner_temporary_store,
2900 backing_package_store,
2901 )),
2902 );
2903
2904 let modified_at_version = effects
2905 .modified_at_versions()
2906 .into_iter()
2907 .collect::<HashMap<_, _>>();
2908
2909 let tx_digest = effects.transaction_digest();
2910 let mut deleted_owners = vec![];
2911 let mut deleted_dynamic_fields = vec![];
2912 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2913 let old_version = modified_at_version.get(&id).unwrap();
2914 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
2917 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2918 ) {
2919 Owner::AddressOwner(addr)
2920 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
2921 Owner::ObjectOwner(object_id) => {
2922 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2923 }
2924 _ => {}
2925 }
2926 }
2927
2928 let mut new_owners = vec![];
2929 let mut new_dynamic_fields = vec![];
2930
2931 for (oref, owner, kind) in effects.all_changed_objects() {
2932 let id = &oref.0;
2933 if let WriteKind::Mutate = kind {
2935 let Some(old_version) = modified_at_version.get(id) else {
2936 panic!(
2937 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2938 tx_digest
2939 );
2940 };
2941 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
2944 panic!(
2945 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2946 tx_digest, id, old_version
2947 );
2948 };
2949 if old_object.owner != owner {
2950 match old_object.owner {
2951 Owner::AddressOwner(addr)
2952 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2953 deleted_owners.push((addr, *id));
2954 }
2955 Owner::ObjectOwner(object_id) => {
2956 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2957 }
2958 _ => {}
2959 }
2960 }
2961 }
2962
2963 match owner {
2964 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
2965 let new_object = written.get(id).unwrap_or_else(
2967 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2968 );
2969 assert_eq!(
2970 new_object.version(),
2971 oref.1,
2972 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2973 tx_digest,
2974 id,
2975 new_object.version(),
2976 oref.1
2977 );
2978
2979 let type_ = new_object
2980 .type_()
2981 .map(|type_| ObjectType::Struct(type_.clone()))
2982 .unwrap_or(ObjectType::Package);
2983
2984 new_owners.push((
2985 (addr, *id),
2986 ObjectInfo {
2987 object_id: *id,
2988 version: oref.1,
2989 digest: oref.2,
2990 type_,
2991 owner,
2992 previous_transaction: *effects.transaction_digest(),
2993 },
2994 ));
2995 }
2996 Owner::ObjectOwner(owner) => {
2997 let new_object = written.get(id).unwrap_or_else(
2998 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2999 );
3000 assert_eq!(
3001 new_object.version(),
3002 oref.1,
3003 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3004 tx_digest,
3005 id,
3006 new_object.version(),
3007 oref.1
3008 );
3009
3010 let Some(df_info) = Self::try_create_dynamic_field_info(
3011 object_store,
3012 new_object,
3013 written,
3014 layout_resolver.as_mut(),
3015 )
3016 .unwrap_or_else(|e| {
3017 error!(
3018 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3019 e, new_object
3020 );
3021 None
3022 }) else {
3023 continue;
3025 };
3026 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3027 }
3028 _ => {}
3029 }
3030 }
3031
3032 Ok(ObjectIndexChanges {
3033 deleted_owners,
3034 deleted_dynamic_fields,
3035 new_owners,
3036 new_dynamic_fields,
3037 })
3038 }
3039
3040 fn try_create_dynamic_field_info(
3041 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3042 o: &Object,
3043 written: &WrittenObjects,
3044 resolver: &mut dyn LayoutResolver,
3045 ) -> SuiResult<Option<DynamicFieldInfo>> {
3046 let Some(move_object) = o.data.try_as_move().cloned() else {
3048 return Ok(None);
3049 };
3050
3051 if !move_object.type_().is_dynamic_field() {
3053 return Ok(None);
3054 }
3055
3056 let layout = resolver
3057 .get_annotated_layout(&move_object.type_().clone().into())?
3058 .into_layout();
3059
3060 let field =
3061 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3062 SuiErrorKind::ObjectDeserializationError {
3063 error: e.to_string(),
3064 }
3065 })?;
3066
3067 let type_ = field.kind;
3068 let name_type: TypeTag = field.name_layout.into();
3069 let bcs_name = field.name_bytes.to_owned();
3070
3071 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3072 .map_err(|e| {
3073 warn!("{e}");
3074 SuiErrorKind::ObjectDeserializationError {
3075 error: e.to_string(),
3076 }
3077 })?;
3078
3079 let name = DynamicFieldName {
3080 type_: name_type,
3081 value: SuiMoveValue::from(name_value).to_json_value(),
3082 };
3083
3084 let value_metadata = field.value_metadata().map_err(|e| {
3085 warn!("{e}");
3086 SuiErrorKind::ObjectDeserializationError {
3087 error: e.to_string(),
3088 }
3089 })?;
3090
3091 Ok(Some(match value_metadata {
3092 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3093 name,
3094 bcs_name,
3095 type_,
3096 object_type: object_type.to_canonical_string(true),
3097 object_id: o.id(),
3098 version: o.version(),
3099 digest: o.digest(),
3100 },
3101
3102 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3103 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3107 let version = object.version();
3108 let digest = object.digest();
3109 let object_type = object.data.type_().unwrap().clone();
3110 (version, digest, object_type)
3111 } else {
3112 let object = object_store
3114 .get_object_by_key(&object_id, o.version())
3115 .ok_or_else(|| UserInputError::ObjectNotFound {
3116 object_id,
3117 version: Some(o.version()),
3118 })?;
3119 let version = object.version();
3120 let digest = object.digest();
3121 let object_type = object.data.type_().unwrap().clone();
3122 (version, digest, object_type)
3123 };
3124
3125 DynamicFieldInfo {
3126 name,
3127 bcs_name,
3128 type_,
3129 object_type: object_type.to_string(),
3130 object_id,
3131 version,
3132 digest,
3133 }
3134 }
3135 }))
3136 }
3137
3138 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3139 fn post_process_one_tx(
3140 &self,
3141 certificate: &VerifiedExecutableTransaction,
3142 effects: &TransactionEffects,
3143 inner_temporary_store: &InnerTemporaryStore,
3144 epoch_store: &Arc<AuthorityPerEpochStore>,
3145 ) -> SuiResult {
3146 let Some(indexes) = &self.indexes else {
3147 return Ok(());
3148 };
3149
3150 let tx_digest = *certificate.digest();
3151
3152 let sequence = indexes.allocate_sequence_number();
3154
3155 if self.config.sync_post_process_one_tx {
3156 let result = Self::post_process_one_tx_impl(
3161 sequence,
3162 indexes,
3163 &self.subscription_handler,
3164 &self.metrics,
3165 self.name,
3166 self.get_backing_package_store(),
3167 self.get_object_store(),
3168 certificate,
3169 effects,
3170 inner_temporary_store,
3171 epoch_store,
3172 true, );
3174
3175 match result {
3176 Ok((raw_batch, cache_updates_with_locks)) => {
3177 let mut db_batch = indexes.new_db_batch();
3178 db_batch
3179 .concat(vec![raw_batch])
3180 .expect("failed to absorb raw index batch");
3181 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3183 cache_updates_with_locks;
3184 indexes
3185 .commit_index_batch(db_batch, vec![inner])
3186 .expect("failed to commit index batch");
3187 }
3188 Err(e) => {
3189 self.metrics.post_processing_total_failures.inc();
3190 error!(?tx_digest, "tx post processing failed: {e}");
3191 return Err(e);
3192 }
3193 }
3194
3195 return Ok(());
3196 }
3197
3198 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3199 self.pending_post_processing.insert(tx_digest, done_rx);
3200
3201 let indexes = indexes.clone();
3202 let subscription_handler = self.subscription_handler.clone();
3203 let metrics = self.metrics.clone();
3204 let name = self.name;
3205 let backing_package_store = self.get_backing_package_store().clone();
3206 let object_store = self.get_object_store().clone();
3207 let semaphore = self.post_processing_semaphore.clone();
3208
3209 let certificate = certificate.clone();
3210 let effects = effects.clone();
3211 let inner_temporary_store = inner_temporary_store.clone();
3212 let epoch_store = epoch_store.clone();
3213
3214 tokio::spawn(async move {
3216 let permit = {
3217 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3218 semaphore
3219 .acquire_owned()
3220 .await
3221 .expect("post-processing semaphore should not be closed")
3222 };
3223
3224 let _ = tokio::task::spawn_blocking(move || {
3225 let _permit = permit;
3226
3227 let result = Self::post_process_one_tx_impl(
3228 sequence,
3229 &indexes,
3230 &subscription_handler,
3231 &metrics,
3232 name,
3233 &backing_package_store,
3234 &object_store,
3235 &certificate,
3236 &effects,
3237 &inner_temporary_store,
3238 &epoch_store,
3239 false, );
3241
3242 match result {
3243 Ok((raw_batch, cache_updates_with_locks)) => {
3244 fail_point!("crash-after-post-process-one-tx");
3245 let output = (raw_batch, cache_updates_with_locks.into_inner());
3246 let _ = done_tx.send(output);
3247 }
3248 Err(e) => {
3249 metrics.post_processing_total_failures.inc();
3250 error!(?tx_digest, "tx post processing failed: {e}");
3251 }
3252 }
3253 })
3254 .await;
3255 });
3256
3257 Ok(())
3258 }
3259
3260 fn post_process_one_tx_impl(
3261 sequence: u64,
3262 indexes: &Arc<IndexStore>,
3263 subscription_handler: &Arc<SubscriptionHandler>,
3264 metrics: &Arc<AuthorityMetrics>,
3265 name: AuthorityName,
3266 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3267 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3268 certificate: &VerifiedExecutableTransaction,
3269 effects: &TransactionEffects,
3270 inner_temporary_store: &InnerTemporaryStore,
3271 epoch_store: &Arc<AuthorityPerEpochStore>,
3272 acquire_locks: bool,
3273 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3274 let _scope = monitored_scope("Execution::post_process_one_tx");
3275
3276 let tx_digest = certificate.digest();
3277 let timestamp_ms = Self::unixtime_now_ms();
3278 let events = &inner_temporary_store.events;
3279 let written = &inner_temporary_store.written;
3280 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3281 name,
3282 object_store,
3283 effects,
3284 inner_temporary_store,
3285 epoch_store,
3286 );
3287
3288 let (raw_batch, cache_updates) = Self::index_tx(
3289 sequence,
3290 backing_package_store,
3291 object_store,
3292 indexes,
3293 tx_digest,
3294 certificate,
3295 effects,
3296 events,
3297 timestamp_ms,
3298 tx_coins,
3299 written,
3300 inner_temporary_store,
3301 epoch_store,
3302 acquire_locks,
3303 )
3304 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3305 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3306 .expect("Indexing tx should not fail");
3307
3308 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3309 let events = Self::make_transaction_block_events(
3310 backing_package_store,
3311 events.clone(),
3312 *tx_digest,
3313 timestamp_ms,
3314 epoch_store,
3315 inner_temporary_store,
3316 )?;
3317 subscription_handler
3319 .process_tx(certificate.data().transaction_data(), &effects, &events)
3320 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3321 .tap_err(|e| {
3322 warn!(
3323 ?tx_digest,
3324 "Post processing - Couldn't process events for tx: {}", e
3325 )
3326 })?;
3327
3328 metrics
3329 .post_processing_total_events_emitted
3330 .inc_by(events.data.len() as u64);
3331
3332 Ok((raw_batch, cache_updates))
3333 }
3334
3335 fn make_transaction_block_events(
3336 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3337 transaction_events: TransactionEvents,
3338 digest: TransactionDigest,
3339 timestamp_ms: u64,
3340 epoch_store: &Arc<AuthorityPerEpochStore>,
3341 inner_temporary_store: &InnerTemporaryStore,
3342 ) -> SuiResult<SuiTransactionBlockEvents> {
3343 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3344 epoch_store.protocol_config(),
3345 Box::new(PackageStoreWithFallback::new(
3346 inner_temporary_store,
3347 backing_package_store,
3348 )),
3349 );
3350 SuiTransactionBlockEvents::try_from(
3351 transaction_events,
3352 digest,
3353 Some(timestamp_ms),
3354 layout_resolver.as_mut(),
3355 )
3356 }
3357
3358 pub fn unixtime_now_ms() -> u64 {
3359 let now = SystemTime::now()
3360 .duration_since(UNIX_EPOCH)
3361 .expect("Time went backwards")
3362 .as_millis();
3363 u64::try_from(now).expect("Travelling in time machine")
3364 }
3365
3366 #[instrument(level = "trace", skip_all)]
3370 pub async fn handle_transaction_info_request(
3371 &self,
3372 request: TransactionInfoRequest,
3373 ) -> SuiResult<TransactionInfoResponse> {
3374 let epoch_store = self.load_epoch_store_one_call_per_task();
3375 let (transaction, status) = self
3376 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3377 .ok_or(SuiErrorKind::TransactionNotFound {
3378 digest: request.transaction_digest,
3379 })?;
3380 Ok(TransactionInfoResponse {
3381 transaction,
3382 status,
3383 })
3384 }
3385
3386 #[instrument(level = "trace", skip_all)]
3387 pub async fn handle_object_info_request(
3388 &self,
3389 request: ObjectInfoRequest,
3390 ) -> SuiResult<ObjectInfoResponse> {
3391 let epoch_store = self.load_epoch_store_one_call_per_task();
3392
3393 let requested_object_seq = match request.request_kind {
3394 ObjectInfoRequestKind::LatestObjectInfo => {
3395 let (_, seq, _) =
3396 self.get_object_or_tombstone(request.object_id)
3397 .ok_or_else(|| {
3398 SuiError::from(UserInputError::ObjectNotFound {
3399 object_id: request.object_id,
3400 version: None,
3401 })
3402 })?;
3403 seq
3404 }
3405 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3406 };
3407
3408 let object = self
3409 .get_object_store()
3410 .get_object_by_key(&request.object_id, requested_object_seq)
3411 .ok_or_else(|| {
3412 SuiError::from(UserInputError::ObjectNotFound {
3413 object_id: request.object_id,
3414 version: Some(requested_object_seq),
3415 })
3416 })?;
3417
3418 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3419 (request.generate_layout, object.data.try_as_move())
3420 {
3421 let epoch_store = self.load_epoch_store_one_call_per_task();
3422 Some(into_struct_layout(
3423 epoch_store
3424 .executor()
3425 .type_layout_resolver(
3426 epoch_store.protocol_config(),
3427 Box::new(self.get_backing_package_store().as_ref()),
3428 )
3429 .get_annotated_layout(&move_obj.type_().clone().into())?,
3430 )?)
3431 } else {
3432 None
3433 };
3434
3435 let lock = if !object.is_address_owned() {
3436 None
3438 } else {
3439 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)?
3440 .map(|s| s.into_inner())
3441 };
3442
3443 Ok(ObjectInfoResponse {
3444 object,
3445 layout,
3446 lock_for_debugging: lock,
3447 })
3448 }
3449
3450 #[instrument(level = "trace", skip_all)]
3451 pub fn handle_checkpoint_request(
3452 &self,
3453 request: &CheckpointRequest,
3454 ) -> SuiResult<CheckpointResponse> {
3455 let summary = match request.sequence_number {
3456 Some(seq) => self
3457 .checkpoint_store
3458 .get_checkpoint_by_sequence_number(seq)?,
3459 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3460 }
3461 .map(|v| v.into_inner());
3462 let contents = match &summary {
3463 Some(s) => self
3464 .checkpoint_store
3465 .get_checkpoint_contents(&s.content_digest)?,
3466 None => None,
3467 };
3468 Ok(CheckpointResponse {
3469 checkpoint: summary,
3470 contents,
3471 })
3472 }
3473
3474 #[instrument(level = "trace", skip_all)]
3475 pub fn handle_checkpoint_request_v2(
3476 &self,
3477 request: &CheckpointRequestV2,
3478 ) -> SuiResult<CheckpointResponseV2> {
3479 let summary = if request.certified {
3480 let summary = match request.sequence_number {
3481 Some(seq) => self
3482 .checkpoint_store
3483 .get_checkpoint_by_sequence_number(seq)?,
3484 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3485 }
3486 .map(|v| v.into_inner());
3487 summary.map(CheckpointSummaryResponse::Certified)
3488 } else {
3489 let summary = match request.sequence_number {
3490 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3491 None => self
3492 .checkpoint_store
3493 .get_latest_locally_computed_checkpoint()?,
3494 };
3495 summary.map(CheckpointSummaryResponse::Pending)
3496 };
3497 let contents = match &summary {
3498 Some(s) => self
3499 .checkpoint_store
3500 .get_checkpoint_contents(&s.content_digest())?,
3501 None => None,
3502 };
3503 Ok(CheckpointResponseV2 {
3504 checkpoint: summary,
3505 contents,
3506 })
3507 }
3508
3509 fn check_protocol_version(
3510 supported_protocol_versions: SupportedProtocolVersions,
3511 current_version: ProtocolVersion,
3512 ) {
3513 info!("current protocol version is now {:?}", current_version);
3514 info!("supported versions are: {:?}", supported_protocol_versions);
3515 if !supported_protocol_versions.is_version_supported(current_version) {
3516 let msg = format!(
3517 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3518 current_version, supported_protocol_versions,
3519 );
3520
3521 error!("{}", msg);
3522 eprintln!("{}", msg);
3523
3524 #[cfg(not(msim))]
3525 std::process::exit(1);
3526
3527 #[cfg(msim)]
3528 sui_simulator::task::shutdown_current_node();
3529 }
3530 }
3531
3532 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3534 pub async fn new(
3535 name: AuthorityName,
3536 secret: StableSyncAuthoritySigner,
3537 supported_protocol_versions: SupportedProtocolVersions,
3538 store: Arc<AuthorityStore>,
3539 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3540 epoch_store: Arc<AuthorityPerEpochStore>,
3541 committee_store: Arc<CommitteeStore>,
3542 indexes: Option<Arc<IndexStore>>,
3543 rpc_index: Option<Arc<RpcIndexStore>>,
3544 checkpoint_store: Arc<CheckpointStore>,
3545 prometheus_registry: &Registry,
3546 genesis_objects: &[Object],
3547 db_checkpoint_config: &DBCheckpointConfig,
3548 config: NodeConfig,
3549 chain_identifier: ChainIdentifier,
3550 policy_config: Option<PolicyConfig>,
3551 firewall_config: Option<RemoteFirewallConfig>,
3552 pruner_watermarks: Arc<PrunerWatermarks>,
3553 ) -> Arc<Self> {
3554 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3555
3556 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3557 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3558 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3559 execution_cache_trait_pointers.object_cache_reader.clone(),
3560 execution_cache_trait_pointers.account_funds_read.clone(),
3561 execution_cache_trait_pointers
3562 .transaction_cache_reader
3563 .clone(),
3564 tx_ready_certificates,
3565 &epoch_store,
3566 config.funds_withdraw_scheduler_type,
3567 metrics.clone(),
3568 prometheus_registry,
3569 ));
3570 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3571
3572 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3573 epoch_store.get_parent_path(),
3574 &config.authority_store_pruning_config,
3575 );
3576 let _pruner = AuthorityStorePruner::new(
3577 store.perpetual_tables.clone(),
3578 checkpoint_store.clone(),
3579 rpc_index.clone(),
3580 indexes.clone(),
3581 config.authority_store_pruning_config.clone(),
3582 epoch_store.committee().authority_exists(&name),
3583 epoch_store.epoch_start_state().epoch_duration_ms(),
3584 prometheus_registry,
3585 pruner_watermarks,
3586 );
3587 let input_loader =
3588 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3589 let epoch = epoch_store.epoch();
3590 let traffic_controller_metrics =
3591 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3592 let traffic_controller = if let Some(policy_config) = policy_config {
3593 Some(Arc::new(
3594 TrafficController::init(
3595 policy_config,
3596 traffic_controller_metrics,
3597 firewall_config.clone(),
3598 )
3599 .await,
3600 ))
3601 } else {
3602 None
3603 };
3604
3605 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3606 ForkRecoveryState::new(Some(fork_config))
3607 .expect("Failed to initialize fork recovery state")
3608 });
3609
3610 let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3611 execution_cache_trait_pointers.child_object_resolver.clone(),
3612 ));
3613
3614 let object_funds_checker_metrics =
3615 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3616 let state = Arc::new(AuthorityState {
3617 name,
3618 secret,
3619 execution_lock: RwLock::new(epoch),
3620 epoch_store: ArcSwap::new(epoch_store.clone()),
3621 input_loader,
3622 execution_cache_trait_pointers,
3623 coin_reservation_resolver,
3624 indexes,
3625 rpc_index,
3626 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3627 checkpoint_store,
3628 committee_store,
3629 execution_scheduler,
3630 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3631 metrics,
3632 _pruner,
3633 _authority_per_epoch_pruner,
3634 db_checkpoint_config: db_checkpoint_config.clone(),
3635 config,
3636 overload_info: AuthorityOverloadInfo::default(),
3637 chain_identifier,
3638 congestion_tracker: Arc::new(CongestionTracker::new()),
3639 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3640 traffic_controller,
3641 fork_recovery_state,
3642 notify_epoch: tokio::sync::watch::channel(epoch).0,
3643 object_funds_checker: ArcSwapOption::empty(),
3644 object_funds_checker_metrics,
3645 pending_post_processing: Arc::new(DashMap::new()),
3646 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3647 });
3648 state.init_object_funds_checker().await;
3649
3650 let authority_state = Arc::downgrade(&state);
3652 spawn_monitored_task!(execution_process(
3653 authority_state,
3654 rx_ready_certificates,
3655 rx_execution_shutdown,
3656 ));
3657 state
3659 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3660 .expect("Error indexing genesis objects.");
3661
3662 if epoch_store
3663 .protocol_config()
3664 .enable_multi_epoch_transaction_expiration()
3665 && epoch_store.epoch() > 0
3666 {
3667 use typed_store::Map;
3668 let previous_epoch = epoch_store.epoch() - 1;
3669 let start_key = (previous_epoch, TransactionDigest::ZERO);
3670 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3671 let has_previous_epoch_data = store
3672 .perpetual_tables
3673 .executed_transaction_digests
3674 .safe_range_iter(start_key..end_key)
3675 .next()
3676 .is_some();
3677
3678 if !has_previous_epoch_data {
3679 panic!(
3680 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3681 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3682 Please restore from a snapshot using the latest version of sui-tool.",
3683 previous_epoch
3684 );
3685 }
3686 }
3687
3688 state
3689 }
3690
3691 async fn init_object_funds_checker(&self) {
3692 let epoch_store = self.epoch_store.load();
3693 if self.is_validator(&epoch_store)
3694 && epoch_store.protocol_config().enable_object_funds_withdraw()
3695 {
3696 if self.object_funds_checker.load().is_none() {
3697 let inner = self.get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID).map(|o| {
3698 Arc::new(ObjectFundsChecker::new(
3699 o.version(),
3700 self.object_funds_checker_metrics.clone(),
3701 ))
3702 });
3703 self.object_funds_checker.store(inner);
3704 }
3705 } else {
3706 self.object_funds_checker.store(None);
3707 }
3708 }
3709
3710 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3712 &self.execution_cache_trait_pointers.object_cache_reader
3713 }
3714
3715 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3716 &self.execution_cache_trait_pointers.transaction_cache_reader
3717 }
3718
3719 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3720 &self.execution_cache_trait_pointers.cache_writer
3721 }
3722
3723 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3724 &self.execution_cache_trait_pointers.backing_store
3725 }
3726
3727 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3728 &self.execution_cache_trait_pointers.child_object_resolver
3729 }
3730
3731 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3732 &self.execution_cache_trait_pointers.account_funds_read
3733 }
3734
3735 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3736 &self.execution_cache_trait_pointers.backing_package_store
3737 }
3738
3739 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3740 &self.execution_cache_trait_pointers.object_store
3741 }
3742
3743 pub fn pending_post_processing(
3744 &self,
3745 ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3746 &self.pending_post_processing
3747 }
3748
3749 pub async fn await_post_processing(
3750 &self,
3751 tx_digest: &TransactionDigest,
3752 ) -> Option<PostProcessingOutput> {
3753 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3754 rx.await.ok()
3756 } else {
3757 None
3759 }
3760 }
3761
3762 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3766 if let Some(indexes) = &self.indexes
3767 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3768 {
3769 let mut db_batch = indexes.new_db_batch();
3770 db_batch
3771 .concat(vec![raw_batch])
3772 .expect("failed to build index batch");
3773 indexes
3774 .commit_index_batch(db_batch, vec![cache_updates])
3775 .expect("failed to commit index batch");
3776 }
3777 }
3778
3779 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3780 &self.execution_cache_trait_pointers.reconfig_api
3781 }
3782
3783 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3784 &self.execution_cache_trait_pointers.global_state_hash_store
3785 }
3786
3787 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3788 &self.execution_cache_trait_pointers.checkpoint_cache
3789 }
3790
3791 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3792 &self.execution_cache_trait_pointers.state_sync_store
3793 }
3794
3795 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3796 &self.execution_cache_trait_pointers.cache_commit
3797 }
3798
3799 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3800 self.execution_cache_trait_pointers
3801 .testing_api
3802 .database_for_testing()
3803 }
3804
3805 pub fn cache_for_testing(&self) -> &WritebackCache {
3806 self.execution_cache_trait_pointers
3807 .testing_api
3808 .cache_for_testing()
3809 }
3810
3811 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3812 &self,
3813 config: NodeConfig,
3814 metrics: Arc<AuthorityStorePruningMetrics>,
3815 ) -> anyhow::Result<()> {
3816 use crate::authority::authority_store_pruner::PrunerWatermarks;
3817 let watermarks = Arc::new(PrunerWatermarks::default());
3818 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3819 &self.database_for_testing().perpetual_tables,
3820 &self.checkpoint_store,
3821 self.rpc_index.as_deref(),
3822 config.authority_store_pruning_config,
3823 metrics,
3824 EPOCH_DURATION_MS_FOR_TESTING,
3825 &watermarks,
3826 )
3827 .await
3828 }
3829
3830 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3831 &self.execution_scheduler
3832 }
3833
3834 fn create_owner_index_if_empty(
3835 &self,
3836 genesis_objects: &[Object],
3837 epoch_store: &Arc<AuthorityPerEpochStore>,
3838 ) -> SuiResult {
3839 let Some(index_store) = &self.indexes else {
3840 return Ok(());
3841 };
3842 if !index_store.is_empty() {
3843 return Ok(());
3844 }
3845
3846 let mut new_owners = vec![];
3847 let mut new_dynamic_fields = vec![];
3848 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3849 epoch_store.protocol_config(),
3850 Box::new(self.get_backing_package_store().as_ref()),
3851 );
3852 for o in genesis_objects.iter() {
3853 match o.owner {
3854 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3855 new_owners.push((
3856 (addr, o.id()),
3857 ObjectInfo::new(&o.compute_object_reference(), o),
3858 ))
3859 }
3860 Owner::ObjectOwner(object_id) => {
3861 let id = o.id();
3862 let Some(info) = Self::try_create_dynamic_field_info(
3863 self.get_object_store(),
3864 o,
3865 &BTreeMap::new(),
3866 layout_resolver.as_mut(),
3867 )?
3868 else {
3869 continue;
3870 };
3871 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3872 }
3873 _ => {}
3874 }
3875 }
3876
3877 index_store.insert_genesis_objects(ObjectIndexChanges {
3878 deleted_owners: vec![],
3879 deleted_dynamic_fields: vec![],
3880 new_owners,
3881 new_dynamic_fields,
3882 })
3883 }
3884
3885 pub fn execution_lock_for_executable_transaction(
3889 &self,
3890 transaction: &VerifiedExecutableTransaction,
3891 ) -> Option<ExecutionLockReadGuard<'_>> {
3892 let lock = self.execution_lock.try_read().ok()?;
3893 if *lock == transaction.auth_sig().epoch() {
3894 Some(lock)
3895 } else {
3896 None
3898 }
3899 }
3900
3901 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3904 self.execution_lock
3905 .try_read()
3906 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3907 }
3908
3909 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3910 self.execution_lock.write().await
3911 }
3912
3913 #[instrument(level = "error", skip_all)]
3914 pub async fn reconfigure(
3915 &self,
3916 cur_epoch_store: &AuthorityPerEpochStore,
3917 supported_protocol_versions: SupportedProtocolVersions,
3918 new_committee: Committee,
3919 epoch_start_configuration: EpochStartConfiguration,
3920 state_hasher: Arc<GlobalStateHasher>,
3921 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3922 epoch_last_checkpoint: CheckpointSequenceNumber,
3923 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3924 Self::check_protocol_version(
3925 supported_protocol_versions,
3926 epoch_start_configuration
3927 .epoch_start_state()
3928 .protocol_version(),
3929 );
3930
3931 self.committee_store.insert_new_committee(&new_committee)?;
3932
3933 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3935
3936 cur_epoch_store.epoch_terminated().await;
3938
3939 {
3943 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3944 if state.should_accept_user_certs() {
3945 cur_epoch_store.close_user_certs(state);
3952 }
3953 }
3955
3956 self.get_reconfig_api()
3957 .clear_state_end_of_epoch(&execution_lock);
3958 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3959 self.maybe_reaccumulate_state_hash(
3960 cur_epoch_store,
3961 epoch_start_configuration
3962 .epoch_start_state()
3963 .protocol_version(),
3964 );
3965 self.get_reconfig_api()
3966 .set_epoch_start_configuration(&epoch_start_configuration);
3967 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3968 && self
3969 .db_checkpoint_config
3970 .perform_db_checkpoints_at_epoch_end
3971 {
3972 let checkpoint_indexes = self
3973 .db_checkpoint_config
3974 .perform_index_db_checkpoints_at_epoch_end
3975 .unwrap_or(false);
3976 let current_epoch = cur_epoch_store.epoch();
3977 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3978 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3979 }
3980
3981 self.get_reconfig_api()
3982 .reconfigure_cache(&epoch_start_configuration)
3983 .await;
3984
3985 let new_epoch = new_committee.epoch;
3986 let new_epoch_store = self
3987 .reopen_epoch_db(
3988 cur_epoch_store,
3989 new_committee,
3990 epoch_start_configuration,
3991 expensive_safety_check_config,
3992 epoch_last_checkpoint,
3993 )
3994 .await?;
3995 assert_eq!(new_epoch_store.epoch(), new_epoch);
3996 self.execution_scheduler
3997 .reconfigure(&new_epoch_store, self.get_account_funds_read());
3998 self.init_object_funds_checker().await;
3999 *execution_lock = new_epoch;
4000
4001 self.notify_epoch(new_epoch);
4002 Ok(new_epoch_store)
4006 }
4007
4008 fn notify_epoch(&self, new_epoch: EpochId) {
4009 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4010 }
4011
4012 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4013 let mut rx = self.notify_epoch.subscribe();
4014 loop {
4015 let epoch = *rx.borrow();
4016 if epoch >= target_epoch {
4017 return Ok(epoch);
4018 }
4019 rx.changed().await?;
4020 }
4021 }
4022
4023 pub async fn settle_accumulator_for_testing(
4027 &self,
4028 effects: &[TransactionEffects],
4029 checkpoint_seq: Option<u64>,
4030 ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4031 let accumulator_version = self
4032 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4033 .unwrap()
4034 .version();
4035 let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4037 let builder = AccumulatorSettlementTxBuilder::new(
4038 Some(self.get_transaction_cache_reader().as_ref()),
4039 effects,
4040 ckpt_seq,
4041 0,
4042 );
4043 let balance_changes = builder.collect_funds_changes();
4044 let epoch_store = self.epoch_store_for_testing();
4045 let epoch = epoch_store.epoch();
4046 let accumulator_root_obj_initial_shared_version = epoch_store
4047 .epoch_start_config()
4048 .accumulator_root_obj_initial_shared_version()
4049 .unwrap();
4050 let settlements = builder.build_tx(
4051 epoch_store.protocol_config(),
4052 epoch,
4053 accumulator_root_obj_initial_shared_version,
4054 ckpt_seq,
4055 ckpt_seq,
4056 );
4057
4058 let settlements: Vec<_> = settlements
4059 .into_iter()
4060 .map(|tx| {
4061 VerifiedExecutableTransaction::new_system(
4062 VerifiedTransaction::new_system_transaction(tx),
4063 epoch,
4064 )
4065 })
4066 .collect();
4067
4068 let assigned_versions = epoch_store
4069 .assign_shared_object_versions_for_tests(
4070 self.get_object_cache_reader().as_ref(),
4071 &settlements,
4072 )
4073 .unwrap();
4074 let version_map = assigned_versions.into_map();
4075
4076 let mut replay_txns = Vec::new();
4077 let mut settlement_effects = Vec::with_capacity(settlements.len());
4078 for tx in settlements {
4079 let assigned = version_map.get(&tx.key()).unwrap().clone();
4080 let env = ExecutionEnv::new().with_assigned_versions(assigned);
4081 let (effects, _) = self
4082 .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4083 .unwrap();
4084 assert!(effects.status().is_ok());
4085 replay_txns.push((tx, env));
4086 settlement_effects.push(effects);
4087 }
4088
4089 let barrier = accumulators::build_accumulator_barrier_tx(
4090 epoch,
4091 accumulator_root_obj_initial_shared_version,
4092 ckpt_seq,
4093 &settlement_effects,
4094 );
4095 let barrier = VerifiedExecutableTransaction::new_system(
4096 VerifiedTransaction::new_system_transaction(barrier),
4097 epoch,
4098 );
4099
4100 let assigned_versions = epoch_store
4101 .assign_shared_object_versions_for_tests(
4102 self.get_object_cache_reader().as_ref(),
4103 std::slice::from_ref(&barrier),
4104 )
4105 .unwrap();
4106 let version_map = assigned_versions.into_map();
4107
4108 let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4109 let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4110 let (effects, _) = self
4111 .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4112 .unwrap();
4113 assert!(effects.status().is_ok());
4114 replay_txns.push((barrier, env));
4115
4116 let next_accumulator_version = accumulator_version.next();
4117 self.execution_scheduler
4118 .settle_address_funds(FundsSettlement {
4119 funds_changes: balance_changes,
4120 next_accumulator_version,
4121 });
4122 replay_txns
4125 }
4126
4127 pub async fn replay_settlement_for_testing(
4130 &self,
4131 txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4132 ) {
4133 let epoch_store = self.epoch_store_for_testing();
4134 for (tx, env) in txns {
4135 let (effects, _) = self
4136 .try_execute_immediately(tx, env.clone(), &epoch_store)
4137 .unwrap();
4138 assert!(effects.status().is_ok());
4139 }
4140 }
4141
4142 pub async fn reconfigure_for_testing(&self) {
4146 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4147 let epoch_store = self.epoch_store_for_testing().clone();
4148 let protocol_config = epoch_store.protocol_config().clone();
4149 let _guard =
4156 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4157 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4158 self.get_backing_package_store().clone(),
4159 self.get_object_store().clone(),
4160 &self.config.expensive_safety_check_config,
4161 self.checkpoint_store
4162 .get_epoch_last_checkpoint(epoch_store.epoch())
4163 .unwrap()
4164 .map(|c| *c.sequence_number())
4165 .unwrap_or_default(),
4166 );
4167 self.execution_scheduler
4168 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4169 let new_epoch = new_epoch_store.epoch();
4170 self.epoch_store.store(new_epoch_store);
4171 epoch_store.epoch_terminated().await;
4172
4173 *execution_lock = new_epoch;
4174 }
4175
4176 #[instrument(level = "error", skip_all)]
4179 fn maybe_reaccumulate_state_hash(
4180 &self,
4181 cur_epoch_store: &AuthorityPerEpochStore,
4182 new_protocol_version: ProtocolVersion,
4183 ) {
4184 self.get_reconfig_api()
4185 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4186 }
4187
4188 #[instrument(level = "error", skip_all)]
4189 fn check_system_consistency(
4190 &self,
4191 cur_epoch_store: &AuthorityPerEpochStore,
4192 state_hasher: Arc<GlobalStateHasher>,
4193 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4194 ) {
4195 info!(
4196 "Performing sui conservation consistency check for epoch {}",
4197 cur_epoch_store.epoch()
4198 );
4199
4200 if let Err(err) = self
4201 .get_reconfig_api()
4202 .expensive_check_sui_conservation(cur_epoch_store)
4203 {
4204 if cfg!(debug_assertions) {
4205 panic!("{}", err);
4206 } else {
4207 warn!("Sui conservation consistency check failed: {}", err);
4210 }
4211 } else {
4212 info!("Sui conservation consistency check passed");
4213 }
4214
4215 if expensive_safety_check_config.enable_state_consistency_check() {
4217 info!(
4218 "Performing state consistency check for epoch {}",
4219 cur_epoch_store.epoch()
4220 );
4221 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4222 }
4223
4224 if expensive_safety_check_config.enable_secondary_index_checks()
4228 && let Some(indexes) = self.indexes.clone()
4229 {
4230 let epoch = cur_epoch_store.epoch();
4231 let first_checkpoint = if epoch == 0 {
4235 0
4236 } else {
4237 self.checkpoint_store
4238 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4239 .expect("Failed to get previous epoch's last checkpoint")
4240 .expect("Previous epoch's last checkpoint missing")
4241 + 1
4242 };
4243 let highest_executed = self
4244 .checkpoint_store
4245 .get_highest_executed_checkpoint_seq_number()
4246 .expect("Failed to get highest executed checkpoint")
4247 .expect("No executed checkpoints");
4248
4249 info!(
4250 "Verifying checkpointed transactions are in transactions_seq \
4251 (checkpoints {first_checkpoint}..={highest_executed})"
4252 );
4253 for seq in first_checkpoint..=highest_executed {
4254 let checkpoint = self
4255 .checkpoint_store
4256 .get_checkpoint_by_sequence_number(seq)
4257 .expect("Failed to get checkpoint")
4258 .expect("Checkpoint missing");
4259 let contents = self
4260 .checkpoint_store
4261 .get_checkpoint_contents(&checkpoint.content_digest)
4262 .expect("Failed to get checkpoint contents")
4263 .expect("Checkpoint contents missing");
4264 for digests in contents.iter() {
4265 let tx_digest = digests.transaction;
4266 assert!(
4267 indexes
4268 .get_transaction_seq(&tx_digest)
4269 .expect("Failed to read transactions_seq")
4270 .is_some(),
4271 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4272 );
4273 }
4274 }
4275 info!("All checkpointed transactions verified in transactions_seq");
4276 }
4277 }
4278
4279 fn expensive_check_is_consistent_state(
4280 &self,
4281 state_hasher: Arc<GlobalStateHasher>,
4282 cur_epoch_store: &AuthorityPerEpochStore,
4283 ) {
4284 let live_object_set_hash = state_hasher.digest_live_object_set(
4285 !cur_epoch_store
4286 .protocol_config()
4287 .simplified_unwrap_then_delete(),
4288 );
4289
4290 let root_state_hash: ECMHLiveObjectSetDigest = self
4291 .get_global_state_hash_store()
4292 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4293 .expect("Retrieving root state hash cannot fail")
4294 .expect("Root state hash for epoch must exist")
4295 .1
4296 .digest()
4297 .into();
4298
4299 let is_inconsistent = root_state_hash != live_object_set_hash;
4300 if is_inconsistent {
4301 debug_fatal!(
4302 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4303 root_state_hash,
4304 live_object_set_hash
4305 );
4306 } else {
4307 info!("State consistency check passed");
4308 }
4309
4310 state_hasher.set_inconsistent_state(is_inconsistent);
4311 }
4312
4313 pub fn current_epoch_for_testing(&self) -> EpochId {
4314 self.epoch_store_for_testing().epoch()
4315 }
4316
4317 #[instrument(level = "error", skip_all)]
4318 pub fn checkpoint_all_dbs(
4319 &self,
4320 checkpoint_path: &Path,
4321 cur_epoch_store: &AuthorityPerEpochStore,
4322 checkpoint_indexes: bool,
4323 ) -> SuiResult {
4324 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4325 let current_epoch = cur_epoch_store.epoch();
4326
4327 if checkpoint_path.exists() {
4328 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4329 return Ok(());
4330 }
4331
4332 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4333 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4334
4335 if checkpoint_path_tmp.exists() {
4336 fs::remove_dir_all(&checkpoint_path_tmp)
4337 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4338 }
4339
4340 fs::create_dir_all(&checkpoint_path_tmp)
4341 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4342 fs::create_dir(&store_checkpoint_path_tmp)
4343 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4344
4345 self.checkpoint_store
4348 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4349
4350 self.get_reconfig_api()
4351 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4352
4353 self.committee_store
4354 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4355
4356 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4357 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4358 }
4359
4360 fs::rename(checkpoint_path_tmp, checkpoint_path)
4361 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4362 Ok(())
4363 }
4364
4365 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4371 self.epoch_store.load()
4372 }
4373
4374 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4376 self.load_epoch_store_one_call_per_task()
4377 }
4378
4379 pub fn clone_committee_for_testing(&self) -> Committee {
4380 Committee::clone(self.epoch_store_for_testing().committee())
4381 }
4382
4383 #[instrument(level = "trace", skip_all)]
4384 pub fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4385 self.get_object_store().get_object(object_id)
4386 }
4387
4388 pub fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4389 Ok(self
4390 .get_object(&SUI_SYSTEM_ADDRESS.into())
4391 .expect("framework object should always exist")
4392 .compute_object_reference())
4393 }
4394
4395 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4397 self.get_object_cache_reader()
4398 .get_sui_system_state_object_unsafe()
4399 }
4400
4401 #[instrument(level = "trace", skip_all)]
4402 fn get_transaction_checkpoint_sequence(
4403 &self,
4404 digest: &TransactionDigest,
4405 epoch_store: &AuthorityPerEpochStore,
4406 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4407 epoch_store.get_transaction_checkpoint(digest)
4408 }
4409
4410 #[instrument(level = "trace", skip_all)]
4411 pub fn get_checkpoint_by_sequence_number(
4412 &self,
4413 sequence_number: CheckpointSequenceNumber,
4414 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4415 Ok(self
4416 .checkpoint_store
4417 .get_checkpoint_by_sequence_number(sequence_number)?)
4418 }
4419
4420 #[instrument(level = "trace", skip_all)]
4421 pub fn get_transaction_checkpoint_for_tests(
4422 &self,
4423 digest: &TransactionDigest,
4424 epoch_store: &AuthorityPerEpochStore,
4425 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4426 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4427 let Some(checkpoint) = checkpoint else {
4428 return Ok(None);
4429 };
4430 let checkpoint = self
4431 .checkpoint_store
4432 .get_checkpoint_by_sequence_number(checkpoint)?;
4433 Ok(checkpoint)
4434 }
4435
4436 #[instrument(level = "trace", skip_all)]
4437 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4438 Ok(
4439 match self
4440 .get_object_cache_reader()
4441 .get_latest_object_or_tombstone(*object_id)
4442 {
4443 Some((_, ObjectOrTombstone::Object(object))) => {
4444 let layout = self.get_object_layout(&object)?;
4445 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4446 }
4447 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4448 None => ObjectRead::NotExists(*object_id),
4449 },
4450 )
4451 }
4452
4453 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4455 self.chain_identifier
4456 }
4457
4458 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4459 self.fork_recovery_state.as_ref()
4460 }
4461
4462 #[instrument(level = "trace", skip_all)]
4463 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4464 where
4465 T: DeserializeOwned,
4466 {
4467 let o = self.get_object_read(object_id)?.into_object()?;
4468 if let Some(move_object) = o.data.try_as_move() {
4469 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4470 SuiErrorKind::ObjectDeserializationError {
4471 error: format!("{e}"),
4472 }
4473 })?)
4474 } else {
4475 Err(SuiErrorKind::ObjectDeserializationError {
4476 error: format!("Provided object : [{object_id}] is not a Move object."),
4477 }
4478 .into())
4479 }
4480 }
4481
4482 #[instrument(level = "trace", skip_all)]
4488 pub fn get_past_object_read(
4489 &self,
4490 object_id: &ObjectID,
4491 version: SequenceNumber,
4492 ) -> SuiResult<PastObjectRead> {
4493 let Some(obj_ref) = self
4495 .get_object_cache_reader()
4496 .get_latest_object_ref_or_tombstone(*object_id)
4497 else {
4498 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4499 };
4500
4501 if version > obj_ref.1 {
4502 return Ok(PastObjectRead::VersionTooHigh {
4503 object_id: *object_id,
4504 asked_version: version,
4505 latest_version: obj_ref.1,
4506 });
4507 }
4508
4509 if version < obj_ref.1 {
4510 return Ok(match self.read_object_at_version(object_id, version)? {
4512 Some((object, layout)) => {
4513 let obj_ref = object.compute_object_reference();
4514 PastObjectRead::VersionFound(obj_ref, object, layout)
4515 }
4516
4517 None => PastObjectRead::VersionNotFound(*object_id, version),
4518 });
4519 }
4520
4521 if !obj_ref.2.is_alive() {
4522 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4523 }
4524
4525 match self.read_object_at_version(object_id, obj_ref.1)? {
4526 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4527 None => {
4528 debug_fatal!(
4529 "Object with in parent_entry is missing from object store, datastore is \
4530 inconsistent"
4531 );
4532 Err(UserInputError::ObjectNotFound {
4533 object_id: *object_id,
4534 version: Some(obj_ref.1),
4535 }
4536 .into())
4537 }
4538 }
4539 }
4540
4541 #[instrument(level = "trace", skip_all)]
4542 fn read_object_at_version(
4543 &self,
4544 object_id: &ObjectID,
4545 version: SequenceNumber,
4546 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4547 let Some(object) = self
4548 .get_object_cache_reader()
4549 .get_object_by_key(object_id, version)
4550 else {
4551 return Ok(None);
4552 };
4553
4554 let layout = self.get_object_layout(&object)?;
4555 Ok(Some((object, layout)))
4556 }
4557
4558 pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4559 let layout = object
4560 .data
4561 .try_as_move()
4562 .map(|object| {
4563 let epoch_store = self.load_epoch_store_one_call_per_task();
4564 into_struct_layout(
4565 epoch_store
4566 .executor()
4567 .type_layout_resolver(
4569 epoch_store.protocol_config(),
4570 Box::new(self.get_backing_package_store().as_ref()),
4571 )
4572 .get_annotated_layout(&object.type_().clone().into())?,
4573 )
4574 })
4575 .transpose()?;
4576 Ok(layout)
4577 }
4578
4579 #[instrument(level = "trace", skip_all)]
4583 pub fn get_address_balance_coin_info(
4584 &self,
4585 owner: SuiAddress,
4586 balance_type: TypeTag,
4587 ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4588 let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4589 let accumulator_obj = AccumulatorValue::load_object_by_id(
4590 self.get_child_object_resolver().as_ref(),
4591 None,
4592 *accumulator_id.inner(),
4593 )?;
4594
4595 let Some(accumulator_obj) = accumulator_obj else {
4596 return Ok(None);
4597 };
4598
4599 let currency_type =
4602 Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4603
4604 let balance = crate::accumulators::balances::get_balance(
4605 owner,
4606 self.get_child_object_resolver().as_ref(),
4607 currency_type,
4608 )?;
4609
4610 if balance == 0 {
4611 return Ok(None);
4612 };
4613
4614 let object_ref = coin_reservation::encode_object_ref(
4615 accumulator_obj.id(),
4616 accumulator_obj.version(),
4617 self.load_epoch_store_one_call_per_task().epoch(),
4618 balance,
4619 self.get_chain_identifier(),
4620 );
4621
4622 Ok(Some((
4623 object_ref,
4624 balance,
4625 accumulator_obj.previous_transaction,
4626 )))
4627 }
4628
4629 #[instrument(level = "trace", skip_all)]
4632 pub fn get_all_address_balance_coin_infos(
4633 &self,
4634 owner: SuiAddress,
4635 ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4636 let indexes = self
4637 .indexes
4638 .as_ref()
4639 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4640
4641 let mut result = std::collections::HashMap::new();
4642 for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4643 let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4644 if let Some((obj_ref, balance, prev_tx)) =
4645 self.get_address_balance_coin_info(owner, balance_type)?
4646 {
4647 result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4650 }
4651 }
4652 Ok(result)
4653 }
4654
4655 fn get_owner_at_version(
4656 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4657 object_id: &ObjectID,
4658 version: SequenceNumber,
4659 ) -> SuiResult<Owner> {
4660 object_store
4661 .get_object_by_key(object_id, version)
4662 .ok_or_else(|| {
4663 SuiError::from(UserInputError::ObjectNotFound {
4664 object_id: *object_id,
4665 version: Some(version),
4666 })
4667 })
4668 .map(|o| o.owner.clone())
4669 }
4670
4671 #[instrument(level = "trace", skip_all)]
4672 pub fn get_owner_objects(
4673 &self,
4674 owner: SuiAddress,
4675 cursor: Option<ObjectID>,
4677 limit: usize,
4678 filter: Option<SuiObjectDataFilter>,
4679 ) -> SuiResult<Vec<ObjectInfo>> {
4680 if let Some(indexes) = &self.indexes {
4681 indexes.get_owner_objects(owner, cursor, limit, filter)
4682 } else {
4683 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4684 }
4685 }
4686
4687 #[instrument(level = "trace", skip_all)]
4688 pub fn get_owned_coins_iterator_with_cursor(
4689 &self,
4690 owner: SuiAddress,
4691 cursor: (String, u64, ObjectID),
4693 limit: usize,
4694 one_coin_type_only: bool,
4695 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4696 if let Some(indexes) = &self.indexes {
4697 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4698 } else {
4699 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4700 }
4701 }
4702
4703 #[instrument(level = "trace", skip_all)]
4704 pub fn get_owner_objects_iterator(
4705 &self,
4706 owner: SuiAddress,
4707 cursor: Option<ObjectID>,
4709 filter: Option<SuiObjectDataFilter>,
4710 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4711 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4712 if let Some(indexes) = &self.indexes {
4713 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4714 } else {
4715 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4716 }
4717 }
4718
4719 #[instrument(level = "trace", skip_all)]
4720 pub fn get_move_objects<T>(&self, owner: SuiAddress, type_: MoveObjectType) -> SuiResult<Vec<T>>
4721 where
4722 T: DeserializeOwned,
4723 {
4724 let object_ids = self
4725 .get_owner_objects_iterator(owner, None, None)?
4726 .filter(|o| match &o.type_ {
4727 ObjectType::Struct(s) => &type_ == s,
4728 ObjectType::Package => false,
4729 })
4730 .map(|info| ObjectKey(info.object_id, info.version))
4731 .collect::<Vec<_>>();
4732 let mut move_objects = vec![];
4733
4734 let objects = self
4735 .get_object_store()
4736 .multi_get_objects_by_key(&object_ids);
4737
4738 for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4739 let object = o.ok_or_else(|| {
4740 SuiError::from(UserInputError::ObjectNotFound {
4741 object_id: id.0,
4742 version: Some(id.1),
4743 })
4744 })?;
4745 let move_object = object.data.try_as_move().ok_or_else(|| {
4746 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4747 })?;
4748 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4749 SuiErrorKind::ObjectDeserializationError {
4750 error: format!("{e}"),
4751 }
4752 })?);
4753 }
4754 Ok(move_objects)
4755 }
4756
4757 #[instrument(level = "trace", skip_all)]
4758 pub fn get_dynamic_fields(
4759 &self,
4760 owner: ObjectID,
4761 cursor: Option<ObjectID>,
4763 limit: usize,
4764 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4765 Ok(self
4766 .get_dynamic_fields_iterator(owner, cursor)?
4767 .take(limit)
4768 .collect::<Result<Vec<_>, _>>()?)
4769 }
4770
4771 fn get_dynamic_fields_iterator(
4772 &self,
4773 owner: ObjectID,
4774 cursor: Option<ObjectID>,
4776 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4777 {
4778 if let Some(indexes) = &self.indexes {
4779 indexes.get_dynamic_fields_iterator(owner, cursor)
4780 } else {
4781 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4782 }
4783 }
4784
4785 #[instrument(level = "trace", skip_all)]
4786 pub fn get_dynamic_field_object_id(
4787 &self,
4788 owner: ObjectID,
4789 name_type: TypeTag,
4790 name_bcs_bytes: &[u8],
4791 ) -> SuiResult<Option<ObjectID>> {
4792 if let Some(indexes) = &self.indexes {
4793 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4794 } else {
4795 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4796 }
4797 }
4798
4799 #[instrument(level = "trace", skip_all)]
4800 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4801 Ok(self.get_indexes()?.next_sequence_number())
4802 }
4803
4804 #[instrument(level = "trace", skip_all)]
4805 pub async fn get_executed_transaction_and_effects(
4806 &self,
4807 digest: TransactionDigest,
4808 kv_store: Arc<TransactionKeyValueStore>,
4809 ) -> SuiResult<(Transaction, TransactionEffects)> {
4810 let transaction = kv_store.get_tx(digest).await?;
4811 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4812 Ok((transaction, effects))
4813 }
4814
4815 #[instrument(level = "trace", skip_all)]
4816 pub fn multi_get_checkpoint_by_sequence_number(
4817 &self,
4818 sequence_numbers: &[CheckpointSequenceNumber],
4819 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4820 Ok(self
4821 .checkpoint_store
4822 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4823 }
4824
4825 #[instrument(level = "trace", skip_all)]
4826 pub fn get_transaction_events(
4827 &self,
4828 digest: &TransactionDigest,
4829 ) -> SuiResult<TransactionEvents> {
4830 self.get_transaction_cache_reader()
4831 .get_events(digest)
4832 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4833 }
4834
4835 pub fn get_transaction_input_objects(
4836 &self,
4837 effects: &TransactionEffects,
4838 ) -> SuiResult<Vec<Object>> {
4839 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4840 .map_err(Into::into)
4841 }
4842
4843 pub fn get_transaction_output_objects(
4844 &self,
4845 effects: &TransactionEffects,
4846 ) -> SuiResult<Vec<Object>> {
4847 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4848 .map_err(Into::into)
4849 }
4850
4851 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4852 match &self.indexes {
4853 Some(i) => Ok(i.clone()),
4854 None => Err(SuiErrorKind::UnsupportedFeatureError {
4855 error: "extended object indexing is not enabled on this server".into(),
4856 }
4857 .into()),
4858 }
4859 }
4860
4861 pub async fn get_transactions_for_tests(
4862 self: &Arc<Self>,
4863 filter: Option<TransactionFilter>,
4864 cursor: Option<TransactionDigest>,
4865 limit: Option<usize>,
4866 reverse: bool,
4867 ) -> SuiResult<Vec<TransactionDigest>> {
4868 let metrics = KeyValueStoreMetrics::new_for_tests();
4869 let kv_store = Arc::new(TransactionKeyValueStore::new(
4870 "rocksdb",
4871 metrics,
4872 self.clone(),
4873 ));
4874 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4875 .await
4876 }
4877
4878 #[instrument(level = "trace", skip_all)]
4879 pub async fn get_transactions(
4880 &self,
4881 kv_store: &Arc<TransactionKeyValueStore>,
4882 filter: Option<TransactionFilter>,
4883 cursor: Option<TransactionDigest>,
4885 limit: Option<usize>,
4886 reverse: bool,
4887 ) -> SuiResult<Vec<TransactionDigest>> {
4888 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4889 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4890 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4891 if reverse {
4892 let iter = iter
4893 .rev()
4894 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4895 .skip(usize::from(cursor.is_some()));
4896 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4897 } else {
4898 let iter = iter
4899 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4900 .skip(usize::from(cursor.is_some()));
4901 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4902 }
4903 }
4904 self.get_indexes()?
4905 .get_transactions(filter, cursor, limit, reverse)
4906 }
4907
4908 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4909 &self.checkpoint_store
4910 }
4911
4912 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4913 self.get_checkpoint_store()
4914 .get_highest_executed_checkpoint_seq_number()?
4915 .ok_or(
4916 SuiErrorKind::UserInputError {
4917 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4918 }
4919 .into(),
4920 )
4921 }
4922
4923 #[cfg(msim)]
4924 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4925 self.database_for_testing()
4926 .perpetual_tables
4927 .get_highest_pruned_checkpoint()
4928 .map(|c| c.unwrap_or(0))
4929 .map_err(Into::into)
4930 }
4931
4932 #[instrument(level = "trace", skip_all)]
4933 pub fn get_checkpoint_summary_by_sequence_number(
4934 &self,
4935 sequence_number: CheckpointSequenceNumber,
4936 ) -> SuiResult<CheckpointSummary> {
4937 let verified_checkpoint = self
4938 .get_checkpoint_store()
4939 .get_checkpoint_by_sequence_number(sequence_number)?;
4940 match verified_checkpoint {
4941 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4942 None => Err(SuiErrorKind::UserInputError {
4943 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4944 }
4945 .into()),
4946 }
4947 }
4948
4949 #[instrument(level = "trace", skip_all)]
4950 pub fn get_checkpoint_summary_by_digest(
4951 &self,
4952 digest: CheckpointDigest,
4953 ) -> SuiResult<CheckpointSummary> {
4954 let verified_checkpoint = self
4955 .get_checkpoint_store()
4956 .get_checkpoint_by_digest(&digest)?;
4957 match verified_checkpoint {
4958 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4959 None => Err(SuiErrorKind::UserInputError {
4960 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4961 }
4962 .into()),
4963 }
4964 }
4965
4966 #[instrument(level = "trace", skip_all)]
4967 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4968 if is_system_package(package_id) {
4969 return self.find_genesis_txn_digest();
4970 }
4971 Ok(self
4972 .get_object_read(&package_id)?
4973 .into_object()?
4974 .previous_transaction)
4975 }
4976
4977 #[instrument(level = "trace", skip_all)]
4978 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4979 let summary = self
4980 .get_verified_checkpoint_by_sequence_number(0)?
4981 .into_message();
4982 let content = self.get_checkpoint_contents(summary.content_digest)?;
4983 let genesis_transaction = content.enumerate_transactions(&summary).next();
4984 Ok(genesis_transaction
4985 .ok_or(SuiErrorKind::UserInputError {
4986 error: UserInputError::GenesisTransactionNotFound,
4987 })?
4988 .1
4989 .transaction)
4990 }
4991
4992 #[instrument(level = "trace", skip_all)]
4993 pub fn get_verified_checkpoint_by_sequence_number(
4994 &self,
4995 sequence_number: CheckpointSequenceNumber,
4996 ) -> SuiResult<VerifiedCheckpoint> {
4997 let verified_checkpoint = self
4998 .get_checkpoint_store()
4999 .get_checkpoint_by_sequence_number(sequence_number)?;
5000 match verified_checkpoint {
5001 Some(verified_checkpoint) => Ok(verified_checkpoint),
5002 None => Err(SuiErrorKind::UserInputError {
5003 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5004 }
5005 .into()),
5006 }
5007 }
5008
5009 #[instrument(level = "trace", skip_all)]
5010 pub fn get_verified_checkpoint_summary_by_digest(
5011 &self,
5012 digest: CheckpointDigest,
5013 ) -> SuiResult<VerifiedCheckpoint> {
5014 let verified_checkpoint = self
5015 .get_checkpoint_store()
5016 .get_checkpoint_by_digest(&digest)?;
5017 match verified_checkpoint {
5018 Some(verified_checkpoint) => Ok(verified_checkpoint),
5019 None => Err(SuiErrorKind::UserInputError {
5020 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5021 }
5022 .into()),
5023 }
5024 }
5025
5026 #[instrument(level = "trace", skip_all)]
5027 pub fn get_checkpoint_contents(
5028 &self,
5029 digest: CheckpointContentsDigest,
5030 ) -> SuiResult<CheckpointContents> {
5031 self.get_checkpoint_store()
5032 .get_checkpoint_contents(&digest)?
5033 .ok_or(
5034 SuiErrorKind::UserInputError {
5035 error: UserInputError::CheckpointContentsNotFound(digest),
5036 }
5037 .into(),
5038 )
5039 }
5040
5041 #[instrument(level = "trace", skip_all)]
5042 pub fn get_checkpoint_contents_by_sequence_number(
5043 &self,
5044 sequence_number: CheckpointSequenceNumber,
5045 ) -> SuiResult<CheckpointContents> {
5046 let verified_checkpoint = self
5047 .get_checkpoint_store()
5048 .get_checkpoint_by_sequence_number(sequence_number)?;
5049 match verified_checkpoint {
5050 Some(verified_checkpoint) => {
5051 let content_digest = verified_checkpoint.into_inner().content_digest;
5052 self.get_checkpoint_contents(content_digest)
5053 }
5054 None => Err(SuiErrorKind::UserInputError {
5055 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5056 }
5057 .into()),
5058 }
5059 }
5060
5061 #[instrument(level = "trace", skip_all)]
5062 pub async fn query_events(
5063 &self,
5064 kv_store: &Arc<TransactionKeyValueStore>,
5065 query: EventFilter,
5066 cursor: Option<EventID>,
5068 limit: usize,
5069 descending: bool,
5070 ) -> SuiResult<Vec<SuiEvent>> {
5071 let index_store = self.get_indexes()?;
5072
5073 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5075 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5076 SuiErrorKind::TransactionNotFound {
5077 digest: cursor.tx_digest,
5078 },
5079 )?;
5080 (tx_seq, cursor.event_seq as usize)
5081 } else if descending {
5082 (u64::MAX, usize::MAX)
5083 } else {
5084 (0, 0)
5085 };
5086
5087 let limit = limit + 1;
5088 let mut event_keys = match query {
5089 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5090 EventFilter::Transaction(digest) => {
5091 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5092 }
5093 EventFilter::MoveModule { package, module } => {
5094 let module_id = ModuleId::new(package.into(), module);
5095 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5096 }
5097 EventFilter::MoveEventType(struct_name) => index_store
5098 .events_by_move_event_struct_name(
5099 &struct_name,
5100 tx_num,
5101 event_num,
5102 limit,
5103 descending,
5104 )?,
5105 EventFilter::Sender(sender) => {
5106 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5107 }
5108 EventFilter::TimeRange {
5109 start_time,
5110 end_time,
5111 } => index_store
5112 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5113 EventFilter::MoveEventModule { package, module } => index_store
5114 .events_by_move_event_module(
5115 &ModuleId::new(package.into(), module),
5116 tx_num,
5117 event_num,
5118 limit,
5119 descending,
5120 )?,
5121 EventFilter::Any(_) => {
5123 return Err(SuiErrorKind::UserInputError {
5124 error: UserInputError::Unsupported(
5125 "'Any' queries are not supported by the fullnode.".to_string(),
5126 ),
5127 }
5128 .into());
5129 }
5130 };
5131
5132 if cursor.is_some() {
5135 if !event_keys.is_empty() {
5136 event_keys.remove(0);
5137 }
5138 } else {
5139 event_keys.truncate(limit - 1);
5140 }
5141
5142 let transaction_digests = event_keys
5144 .iter()
5145 .map(|(_, digest, _, _)| *digest)
5146 .collect::<HashSet<_>>()
5147 .into_iter()
5148 .collect::<Vec<_>>();
5149
5150 let events = kv_store
5151 .multi_get_events_by_tx_digests(&transaction_digests)
5152 .await?;
5153
5154 let events_map: HashMap<_, _> = transaction_digests
5155 .iter()
5156 .zip_debug_eq(events.into_iter())
5157 .collect();
5158
5159 let stored_events = event_keys
5160 .into_iter()
5161 .map(|k| {
5162 (
5163 k,
5164 events_map
5165 .get(&k.1)
5166 .expect("fetched digest is missing")
5167 .clone()
5168 .and_then(|e| e.data.get(k.2).cloned()),
5169 )
5170 })
5171 .map(
5172 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5173 event
5174 .map(|e| (e, tx_digest, event_seq, timestamp))
5175 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5176 },
5177 )
5178 .collect::<Result<Vec<_>, _>>()?;
5179
5180 let epoch_store = self.load_epoch_store_one_call_per_task();
5181 let backing_store = self.get_backing_package_store().as_ref();
5182 let mut layout_resolver = epoch_store
5183 .executor()
5184 .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5185 let mut events = vec![];
5186 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5187 events.push(SuiEvent::try_from(
5188 e.clone(),
5189 tx_digest,
5190 event_seq as u64,
5191 Some(timestamp),
5192 layout_resolver.get_annotated_layout(&e.type_)?,
5193 )?)
5194 }
5195 Ok(events)
5196 }
5197
5198 pub fn insert_genesis_object(&self, object: Object) {
5199 self.get_reconfig_api().insert_genesis_object(object);
5200 }
5201
5202 pub fn insert_genesis_objects(&self, objects: &[Object]) {
5203 for o in objects {
5204 self.insert_genesis_object(o.clone());
5205 }
5206 }
5207
5208 #[instrument(level = "trace", skip_all)]
5210 pub fn get_transaction_status(
5211 &self,
5212 transaction_digest: &TransactionDigest,
5213 epoch_store: &Arc<AuthorityPerEpochStore>,
5214 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5215 if let Some(effects) =
5217 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5218 {
5219 if let Some(transaction) = self
5220 .get_transaction_cache_reader()
5221 .get_transaction_block(transaction_digest)
5222 {
5223 let events = if effects.events_digest().is_some() {
5224 self.get_transaction_events(effects.transaction_digest())?
5225 } else {
5226 TransactionEvents::default()
5227 };
5228 return Ok(Some((
5232 (*transaction).clone().into_message(),
5233 TransactionStatus::Executed(None, effects.into_inner(), events),
5234 )));
5235 } else {
5236 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5240 }
5241 }
5242 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5243 self.metrics.tx_already_processed.inc();
5244 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5245 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5246 } else {
5247 Ok(None)
5248 }
5249 }
5250
5251 #[instrument(level = "trace", skip_all)]
5255 pub fn get_signed_effects_and_maybe_resign(
5256 &self,
5257 transaction_digest: &TransactionDigest,
5258 epoch_store: &Arc<AuthorityPerEpochStore>,
5259 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5260 let effects = self
5261 .get_transaction_cache_reader()
5262 .get_executed_effects(transaction_digest);
5263 match effects {
5264 Some(effects) => {
5265 if effects.executed_epoch() != epoch_store.epoch() {
5285 debug!(
5286 tx_digest=?transaction_digest,
5287 effects_epoch=?effects.executed_epoch(),
5288 epoch=?epoch_store.epoch(),
5289 "Re-signing the effects with the current epoch"
5290 );
5291 }
5292 Ok(Some(self.sign_effects(effects, epoch_store)?))
5293 }
5294 None => Ok(None),
5295 }
5296 }
5297
5298 #[instrument(level = "trace", skip_all)]
5299 pub(crate) fn sign_effects(
5300 &self,
5301 effects: TransactionEffects,
5302 epoch_store: &Arc<AuthorityPerEpochStore>,
5303 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5304 let tx_digest = *effects.transaction_digest();
5305 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5306 Some(sig) => {
5307 debug_assert!(sig.epoch == epoch_store.epoch());
5308 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5309 }
5310 _ => {
5311 let sig = AuthoritySignInfo::new(
5312 epoch_store.epoch(),
5313 &effects,
5314 Intent::sui_app(IntentScope::TransactionEffects),
5315 self.name,
5316 &*self.secret,
5317 );
5318
5319 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5320
5321 epoch_store.insert_effects_digest_and_signature(
5322 &tx_digest,
5323 effects.digest(),
5324 &sig,
5325 )?;
5326
5327 effects
5328 }
5329 };
5330
5331 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5332 signed_effects,
5333 ))
5334 }
5335
5336 #[instrument(level = "trace", skip_all)]
5338 fn fullnode_only_get_tx_coins_for_indexing(
5339 name: AuthorityName,
5340 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5341 effects: &TransactionEffects,
5342 inner_temporary_store: &InnerTemporaryStore,
5343 epoch_store: &Arc<AuthorityPerEpochStore>,
5344 ) -> Option<TxCoins> {
5345 if epoch_store.committee().authority_exists(&name) {
5346 return None;
5347 }
5348 let written_coin_objects = inner_temporary_store
5349 .written
5350 .iter()
5351 .filter_map(|(k, v)| {
5352 if v.is_coin() {
5353 Some((*k, v.clone()))
5354 } else {
5355 None
5356 }
5357 })
5358 .collect();
5359 let mut input_coin_objects = inner_temporary_store
5360 .input_objects
5361 .iter()
5362 .filter_map(|(k, v)| {
5363 if v.is_coin() {
5364 Some((*k, v.clone()))
5365 } else {
5366 None
5367 }
5368 })
5369 .collect::<ObjectMap>();
5370
5371 for (object_id, version) in effects.modified_at_versions() {
5375 if inner_temporary_store
5376 .loaded_runtime_objects
5377 .contains_key(&object_id)
5378 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5379 && object.is_coin()
5380 {
5381 input_coin_objects.insert(object_id, object);
5382 }
5383 }
5384
5385 Some((input_coin_objects, written_coin_objects))
5386 }
5387
5388 #[instrument(level = "trace", skip_all)]
5396 pub fn get_transaction_lock(
5397 &self,
5398 object_ref: &ObjectRef,
5399 epoch_store: &AuthorityPerEpochStore,
5400 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5401 let lock_info = self
5402 .get_object_cache_reader()
5403 .get_lock(*object_ref, epoch_store)?;
5404 let lock_info = match lock_info {
5405 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5406 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5407 provided_obj_ref: *object_ref,
5408 current_version: locked_ref.1,
5409 }
5410 .into());
5411 }
5412 ObjectLockStatus::Initialized => {
5413 return Ok(None);
5414 }
5415 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5416 };
5417
5418 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5419 }
5420
5421 pub fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5422 self.get_object_cache_reader().get_objects(objects)
5423 }
5424
5425 pub fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5426 self.get_object_cache_reader()
5427 .get_latest_object_ref_or_tombstone(object_id)
5428 }
5429
5430 pub fn set_override_protocol_upgrade_buffer_stake(
5439 &self,
5440 expected_epoch: EpochId,
5441 buffer_stake_bps: u64,
5442 ) -> SuiResult {
5443 let epoch_store = self.load_epoch_store_one_call_per_task();
5444 let actual_epoch = epoch_store.epoch();
5445 if actual_epoch != expected_epoch {
5446 return Err(SuiErrorKind::WrongEpoch {
5447 expected_epoch,
5448 actual_epoch,
5449 }
5450 .into());
5451 }
5452
5453 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5454 }
5455
5456 pub fn clear_override_protocol_upgrade_buffer_stake(
5457 &self,
5458 expected_epoch: EpochId,
5459 ) -> SuiResult {
5460 let epoch_store = self.load_epoch_store_one_call_per_task();
5461 let actual_epoch = epoch_store.epoch();
5462 if actual_epoch != expected_epoch {
5463 return Err(SuiErrorKind::WrongEpoch {
5464 expected_epoch,
5465 actual_epoch,
5466 }
5467 .into());
5468 }
5469
5470 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5471 }
5472
5473 pub async fn get_available_system_packages(
5476 &self,
5477 binary_config: &BinaryConfig,
5478 ) -> Vec<ObjectRef> {
5479 let mut results = vec![];
5480
5481 let system_packages = BuiltInFramework::iter_system_packages();
5482
5483 #[cfg(msim)]
5485 let extra_packages = framework_injection::get_extra_packages(self.name);
5486 #[cfg(msim)]
5487 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5488
5489 for system_package in system_packages {
5490 let modules = system_package.modules().to_vec();
5491 #[cfg(msim)]
5493 let modules =
5494 match framework_injection::get_override_modules(&system_package.id, self.name) {
5495 Some(overrides) if overrides.is_empty() => continue,
5496 Some(overrides) => overrides,
5497 None => modules,
5498 };
5499
5500 let Some(obj_ref) = sui_framework::compare_system_package(
5501 &self.get_object_store(),
5502 &system_package.id,
5503 &modules,
5504 system_package.dependencies.to_vec(),
5505 binary_config,
5506 )
5507 .await
5508 else {
5509 return vec![];
5510 };
5511 results.push(obj_ref);
5512 }
5513
5514 results
5515 }
5516
5517 async fn get_system_package_bytes(
5531 &self,
5532 system_packages: Vec<ObjectRef>,
5533 binary_config: &BinaryConfig,
5534 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5535 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5536 let objects = self.get_objects(&ids);
5537
5538 let mut res = Vec::with_capacity(system_packages.len());
5539 for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5540 {
5541 let prev_transaction = match object {
5542 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5543 info!("Framework {} does not need updating", system_package_ref.0);
5545 continue;
5546 }
5547
5548 Some(cur_object) => cur_object.previous_transaction,
5549 None => TransactionDigest::genesis_marker(),
5550 };
5551
5552 #[cfg(msim)]
5553 let SystemPackage {
5554 id: _,
5555 bytes,
5556 dependencies,
5557 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5558 .unwrap_or_else(|| {
5559 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5560 });
5561
5562 #[cfg(not(msim))]
5563 let SystemPackage {
5564 id: _,
5565 bytes,
5566 dependencies,
5567 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5568
5569 let modules: Vec<_> = bytes
5570 .iter()
5571 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5572 .collect();
5573
5574 let new_object = Object::new_system_package(
5575 &modules,
5576 system_package_ref.1,
5577 dependencies.clone(),
5578 prev_transaction,
5579 );
5580
5581 let new_ref = new_object.compute_object_reference();
5582 if new_ref != system_package_ref {
5583 if cfg!(msim) {
5584 debug_fatal!(
5586 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5587 );
5588 } else {
5589 error!(
5590 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5591 );
5592 }
5593 return None;
5594 }
5595
5596 res.push((system_package_ref.1, bytes, dependencies));
5597 }
5598
5599 Some(res)
5600 }
5601
5602 fn is_protocol_version_supported_v2(
5603 current_protocol_version: ProtocolVersion,
5604 proposed_protocol_version: ProtocolVersion,
5605 protocol_config: &ProtocolConfig,
5606 committee: &Committee,
5607 capabilities: Vec<AuthorityCapabilitiesV2>,
5608 mut buffer_stake_bps: u64,
5609 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5610 if proposed_protocol_version > current_protocol_version + 1
5611 && !protocol_config.advance_to_highest_supported_protocol_version()
5612 {
5613 return None;
5614 }
5615
5616 if buffer_stake_bps > 10000 {
5617 warn!("clamping buffer_stake_bps to 10000");
5618 buffer_stake_bps = 10000;
5619 }
5620
5621 let mut desired_upgrades: Vec<_> = capabilities
5624 .into_iter()
5625 .filter_map(|mut cap| {
5626 if cap.available_system_packages.is_empty() {
5628 return None;
5629 }
5630
5631 cap.available_system_packages.sort();
5632
5633 info!(
5634 "validator {:?} supports {:?} with system packages: {:?}",
5635 cap.authority.concise(),
5636 cap.supported_protocol_versions,
5637 cap.available_system_packages,
5638 );
5639
5640 cap.supported_protocol_versions
5644 .get_version_digest(proposed_protocol_version)
5645 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5646 })
5647 .collect();
5648
5649 desired_upgrades.sort();
5651 desired_upgrades
5652 .into_iter()
5653 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5654 .into_iter()
5655 .find_map(|((digest, packages), group)| {
5656 assert!(!packages.is_empty());
5658
5659 let mut stake_aggregator: StakeAggregator<(), true> =
5660 StakeAggregator::new(Arc::new(committee.clone()));
5661
5662 for (_, _, authority) in group {
5663 stake_aggregator.insert_generic(authority, ());
5664 }
5665
5666 let total_votes = stake_aggregator.total_votes();
5667 let quorum_threshold = committee.quorum_threshold();
5668 let f = committee.total_votes() - committee.quorum_threshold();
5669
5670 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5672 let effective_threshold = quorum_threshold + buffer_stake;
5673
5674 info!(
5675 protocol_config_digest = ?digest,
5676 ?total_votes,
5677 ?quorum_threshold,
5678 ?buffer_stake_bps,
5679 ?effective_threshold,
5680 ?proposed_protocol_version,
5681 ?packages,
5682 "support for upgrade"
5683 );
5684
5685 let has_support = total_votes >= effective_threshold;
5686 has_support.then_some((proposed_protocol_version, packages))
5687 })
5688 }
5689
5690 fn choose_protocol_version_and_system_packages_v2(
5691 current_protocol_version: ProtocolVersion,
5692 protocol_config: &ProtocolConfig,
5693 committee: &Committee,
5694 capabilities: Vec<AuthorityCapabilitiesV2>,
5695 buffer_stake_bps: u64,
5696 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5697 assert!(protocol_config.authority_capabilities_v2());
5698 let mut next_protocol_version = current_protocol_version;
5699 let mut system_packages = vec![];
5700
5701 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5702 current_protocol_version,
5703 next_protocol_version + 1,
5704 protocol_config,
5705 committee,
5706 capabilities.clone(),
5707 buffer_stake_bps,
5708 ) {
5709 next_protocol_version = version;
5710 system_packages = packages;
5711 }
5712
5713 (next_protocol_version, system_packages)
5714 }
5715
5716 #[instrument(level = "debug", skip_all)]
5717 fn create_authenticator_state_tx(
5718 &self,
5719 epoch_store: &Arc<AuthorityPerEpochStore>,
5720 ) -> Option<EndOfEpochTransactionKind> {
5721 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5722 info!("authenticator state transactions not enabled");
5723 return None;
5724 }
5725
5726 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5727 let tx = if authenticator_state_exists {
5728 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5729 let min_epoch =
5730 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5731 let authenticator_obj_initial_shared_version = epoch_store
5732 .epoch_start_config()
5733 .authenticator_obj_initial_shared_version()
5734 .expect("initial version must exist");
5735
5736 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5737 min_epoch,
5738 authenticator_obj_initial_shared_version,
5739 );
5740
5741 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5742
5743 tx
5744 } else {
5745 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5746 info!("Creating AuthenticatorStateCreate tx");
5747 tx
5748 };
5749 Some(tx)
5750 }
5751
5752 #[instrument(level = "debug", skip_all)]
5753 fn create_randomness_state_tx(
5754 &self,
5755 epoch_store: &Arc<AuthorityPerEpochStore>,
5756 ) -> Option<EndOfEpochTransactionKind> {
5757 if !epoch_store.protocol_config().random_beacon() {
5758 info!("randomness state transactions not enabled");
5759 return None;
5760 }
5761
5762 if epoch_store.randomness_state_exists() {
5763 return None;
5764 }
5765
5766 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5767 info!("Creating RandomnessStateCreate tx");
5768 Some(tx)
5769 }
5770
5771 #[instrument(level = "debug", skip_all)]
5772 fn create_accumulator_root_tx(
5773 &self,
5774 epoch_store: &Arc<AuthorityPerEpochStore>,
5775 ) -> Option<EndOfEpochTransactionKind> {
5776 if !epoch_store
5777 .protocol_config()
5778 .create_root_accumulator_object()
5779 {
5780 info!("accumulator root creation not enabled");
5781 return None;
5782 }
5783
5784 if epoch_store.accumulator_root_exists() {
5785 return None;
5786 }
5787
5788 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5789 info!("Creating AccumulatorRootCreate tx");
5790 Some(tx)
5791 }
5792
5793 #[instrument(level = "debug", skip_all)]
5794 fn create_write_accumulator_storage_cost_tx(
5795 &self,
5796 epoch_store: &Arc<AuthorityPerEpochStore>,
5797 ) -> Option<EndOfEpochTransactionKind> {
5798 if !epoch_store.accumulator_root_exists() {
5799 info!("accumulator root does not exist yet");
5800 return None;
5801 }
5802 if !epoch_store.protocol_config().enable_accumulators() {
5803 info!("accumulators not enabled");
5804 return None;
5805 }
5806
5807 let object_store = self.get_object_store();
5808 let object_count =
5809 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5810 Ok(Some(count)) => count,
5811 Ok(None) => return None,
5812 Err(e) => {
5813 fatal!("failed to read accumulator object count: {e}");
5814 }
5815 };
5816
5817 let storage_cost = object_count.saturating_mul(
5818 epoch_store
5819 .protocol_config()
5820 .accumulator_object_storage_cost(),
5821 );
5822
5823 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5824 info!(
5825 object_count,
5826 storage_cost, "Creating WriteAccumulatorStorageCost tx"
5827 );
5828 Some(tx)
5829 }
5830
5831 #[instrument(level = "debug", skip_all)]
5832 fn create_coin_registry_tx(
5833 &self,
5834 epoch_store: &Arc<AuthorityPerEpochStore>,
5835 ) -> Option<EndOfEpochTransactionKind> {
5836 if !epoch_store.protocol_config().enable_coin_registry() {
5837 info!("coin registry not enabled");
5838 return None;
5839 }
5840
5841 if epoch_store.coin_registry_exists() {
5842 return None;
5843 }
5844
5845 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5846 info!("Creating CoinRegistryCreate tx");
5847 Some(tx)
5848 }
5849
5850 #[instrument(level = "debug", skip_all)]
5851 fn create_display_registry_tx(
5852 &self,
5853 epoch_store: &Arc<AuthorityPerEpochStore>,
5854 ) -> Option<EndOfEpochTransactionKind> {
5855 if !epoch_store.protocol_config().enable_display_registry() {
5856 info!("display registry not enabled");
5857 return None;
5858 }
5859
5860 if epoch_store.display_registry_exists() {
5861 return None;
5862 }
5863
5864 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5865 info!("Creating DisplayRegistryCreate tx");
5866 Some(tx)
5867 }
5868
5869 #[instrument(level = "debug", skip_all)]
5870 fn create_bridge_tx(
5871 &self,
5872 epoch_store: &Arc<AuthorityPerEpochStore>,
5873 ) -> Option<EndOfEpochTransactionKind> {
5874 if !epoch_store.protocol_config().enable_bridge() {
5875 info!("bridge not enabled");
5876 return None;
5877 }
5878 if epoch_store.bridge_exists() {
5879 return None;
5880 }
5881 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5882 info!("Creating Bridge Create tx");
5883 Some(tx)
5884 }
5885
5886 #[instrument(level = "debug", skip_all)]
5887 fn init_bridge_committee_tx(
5888 &self,
5889 epoch_store: &Arc<AuthorityPerEpochStore>,
5890 ) -> Option<EndOfEpochTransactionKind> {
5891 if !epoch_store.protocol_config().enable_bridge() {
5892 info!("bridge not enabled");
5893 return None;
5894 }
5895 if !epoch_store
5896 .protocol_config()
5897 .should_try_to_finalize_bridge_committee()
5898 {
5899 info!("should not try to finalize bridge committee yet");
5900 return None;
5901 }
5902 if !epoch_store.bridge_exists() {
5904 return None;
5905 }
5906
5907 if epoch_store.bridge_committee_initiated() {
5908 return None;
5909 }
5910
5911 let bridge_initial_shared_version = epoch_store
5912 .epoch_start_config()
5913 .bridge_obj_initial_shared_version()
5914 .expect("initial version must exist");
5915 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5916 info!("Init Bridge committee tx");
5917 Some(tx)
5918 }
5919
5920 #[instrument(level = "debug", skip_all)]
5921 fn create_deny_list_state_tx(
5922 &self,
5923 epoch_store: &Arc<AuthorityPerEpochStore>,
5924 ) -> Option<EndOfEpochTransactionKind> {
5925 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5926 return None;
5927 }
5928
5929 if epoch_store.coin_deny_list_state_exists() {
5930 return None;
5931 }
5932
5933 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5934 info!("Creating DenyListStateCreate tx");
5935 Some(tx)
5936 }
5937
5938 #[instrument(level = "debug", skip_all)]
5939 fn create_address_alias_state_tx(
5940 &self,
5941 epoch_store: &Arc<AuthorityPerEpochStore>,
5942 ) -> Option<EndOfEpochTransactionKind> {
5943 if !epoch_store.protocol_config().address_aliases() {
5944 info!("address aliases not enabled");
5945 return None;
5946 }
5947
5948 if epoch_store.address_alias_state_exists() {
5949 return None;
5950 }
5951
5952 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5953 info!("Creating AddressAliasStateCreate tx");
5954 Some(tx)
5955 }
5956
5957 #[instrument(level = "debug", skip_all)]
5958 fn create_execution_time_observations_tx(
5959 &self,
5960 epoch_store: &Arc<AuthorityPerEpochStore>,
5961 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5962 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5963 ) -> Option<EndOfEpochTransactionKind> {
5964 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5965 .protocol_config()
5966 .per_object_congestion_control_mode()
5967 else {
5968 return None;
5969 };
5970
5971 let start_checkpoint = std::cmp::max(
5974 last_checkpoint_before_end_of_epoch
5975 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5976 epoch_store
5978 .epoch()
5979 .checked_sub(1)
5980 .map(|prev_epoch| {
5981 self.checkpoint_store
5982 .get_epoch_last_checkpoint_seq_number(prev_epoch)
5983 .expect("typed store must not fail")
5984 .expect(
5985 "sequence number of last checkpoint of preceding epoch must be saved",
5986 )
5987 + 1
5988 })
5989 .unwrap_or(1),
5990 );
5991 info!(
5992 "reading checkpoint range {:?}..={:?}",
5993 start_checkpoint, last_checkpoint_before_end_of_epoch
5994 );
5995 let sequence_numbers =
5996 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5997 let contents_digests: Vec<_> = self
5998 .checkpoint_store
5999 .multi_get_locally_computed_checkpoints(&sequence_numbers)
6000 .expect("typed store must not fail")
6001 .into_iter()
6002 .zip_debug_eq(sequence_numbers)
6003 .map(|(maybe_checkpoint, sequence_number)| {
6004 if let Some(checkpoint) = maybe_checkpoint {
6005 checkpoint.content_digest
6006 } else {
6007 self.checkpoint_store
6010 .get_checkpoint_by_sequence_number(sequence_number)
6011 .expect("typed store must not fail")
6012 .unwrap_or_else(|| {
6013 fatal!("preceding checkpoints must exist by end of epoch")
6014 })
6015 .data()
6016 .content_digest
6017 }
6018 })
6019 .collect();
6020 let tx_digests: Vec<_> = self
6021 .checkpoint_store
6022 .multi_get_checkpoint_content(&contents_digests)
6023 .expect("typed store must not fail")
6024 .into_iter()
6025 .flat_map(|maybe_contents| {
6026 maybe_contents
6027 .expect("preceding checkpoint contents must exist by end of epoch")
6028 .into_inner()
6029 .into_iter()
6030 .map(|digests| digests.transaction)
6031 })
6032 .collect();
6033 let included_execution_time_observations: HashSet<_> = self
6034 .get_transaction_cache_reader()
6035 .multi_get_transaction_blocks(&tx_digests)
6036 .into_iter()
6037 .flat_map(|maybe_tx| {
6038 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6039 .expect("preceding transaction must exist by end of epoch")
6040 .transaction_data()
6041 .kind()
6042 {
6043 #[allow(clippy::unnecessary_to_owned)]
6044 itertools::Either::Left(
6045 ptb.commands
6046 .to_owned()
6047 .into_iter()
6048 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6049 )
6050 } else {
6051 itertools::Either::Right(std::iter::empty())
6052 }
6053 })
6054 .chain(end_of_epoch_observation_keys.into_iter())
6055 .collect();
6056
6057 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6058 epoch_store
6059 .get_end_of_epoch_execution_time_observations()
6060 .filter_and_sort_v1(
6061 |(key, _)| included_execution_time_observations.contains(key),
6062 params.stored_observations_limit.try_into().unwrap(),
6063 ),
6064 );
6065 info!("Creating StoreExecutionTimeObservations tx");
6066 Some(tx)
6067 }
6068
6069 #[instrument(level = "error", skip_all)]
6080 pub async fn create_and_execute_advance_epoch_tx(
6081 &self,
6082 epoch_store: &Arc<AuthorityPerEpochStore>,
6083 gas_cost_summary: &GasCostSummary,
6084 checkpoint: CheckpointSequenceNumber,
6085 epoch_start_timestamp_ms: CheckpointTimestamp,
6086 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6087 last_checkpoint: CheckpointSequenceNumber,
6090 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6091 let mut txns = Vec::new();
6092
6093 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6094 txns.push(tx);
6095 }
6096 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6097 txns.push(tx);
6098 }
6099 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6100 txns.push(tx);
6101 }
6102 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6103 txns.push(tx);
6104 }
6105 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6106 txns.push(tx);
6107 }
6108 if let Some(tx) = self.create_execution_time_observations_tx(
6109 epoch_store,
6110 end_of_epoch_observation_keys,
6111 last_checkpoint,
6112 ) {
6113 txns.push(tx);
6114 }
6115 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6116 txns.push(tx);
6117 }
6118
6119 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6120 txns.push(tx);
6121 }
6122 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6123 txns.push(tx);
6124 }
6125 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6126 txns.push(tx);
6127 }
6128 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6129 txns.push(tx);
6130 }
6131
6132 let next_epoch = epoch_store.epoch() + 1;
6133
6134 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6135
6136 let (next_epoch_protocol_version, next_epoch_system_packages) =
6137 Self::choose_protocol_version_and_system_packages_v2(
6138 epoch_store.protocol_version(),
6139 epoch_store.protocol_config(),
6140 epoch_store.committee(),
6141 epoch_store
6142 .get_capabilities_v2()
6143 .expect("read capabilities from db cannot fail"),
6144 buffer_stake_bps,
6145 );
6146
6147 let config = epoch_store.protocol_config();
6150 let binary_config = config.binary_config(None);
6151 let Some(next_epoch_system_package_bytes) = self
6152 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6153 .await
6154 else {
6155 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6156 debug_fatal!(
6160 "upgraded system packages {:?} are not locally available, cannot create \
6161 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6162 next_epoch_system_packages
6163 );
6164 } else {
6165 error!(
6166 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6167 next_epoch_protocol_version
6168 );
6169 }
6170 return Err(CheckpointBuilderError::SystemPackagesMissing);
6179 };
6180
6181 let tx = if epoch_store
6182 .protocol_config()
6183 .end_of_epoch_transaction_supported()
6184 {
6185 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6186 next_epoch,
6187 next_epoch_protocol_version,
6188 gas_cost_summary.storage_cost,
6189 gas_cost_summary.computation_cost,
6190 gas_cost_summary.storage_rebate,
6191 gas_cost_summary.non_refundable_storage_fee,
6192 epoch_start_timestamp_ms,
6193 next_epoch_system_package_bytes,
6194 ));
6195
6196 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6197 } else {
6198 VerifiedTransaction::new_change_epoch(
6199 next_epoch,
6200 next_epoch_protocol_version,
6201 gas_cost_summary.storage_cost,
6202 gas_cost_summary.computation_cost,
6203 gas_cost_summary.storage_rebate,
6204 gas_cost_summary.non_refundable_storage_fee,
6205 epoch_start_timestamp_ms,
6206 next_epoch_system_package_bytes,
6207 )
6208 };
6209
6210 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6211 tx.clone(),
6212 epoch_store.epoch(),
6213 checkpoint,
6214 );
6215
6216 let tx_digest = executable_tx.digest();
6217
6218 info!(
6219 ?next_epoch,
6220 ?next_epoch_protocol_version,
6221 ?next_epoch_system_packages,
6222 computation_cost=?gas_cost_summary.computation_cost,
6223 storage_cost=?gas_cost_summary.storage_cost,
6224 storage_rebate=?gas_cost_summary.storage_rebate,
6225 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6226 ?tx_digest,
6227 "Creating advance epoch transaction"
6228 );
6229
6230 fail_point_async!("change_epoch_tx_delay");
6231 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6232
6233 if self
6236 .get_transaction_cache_reader()
6237 .is_tx_already_executed(tx_digest)
6238 {
6239 warn!("change epoch tx has already been executed via state sync");
6240 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6241 }
6242
6243 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6244 else {
6245 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6246 };
6247
6248 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6251 self.get_object_cache_reader().as_ref(),
6252 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6253 )?;
6254
6255 assert_eq!(assigned_versions.0.len(), 1);
6256 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6257
6258 let input_objects = self.read_objects_for_execution(
6259 &tx_lock,
6260 &executable_tx,
6261 &assigned_versions,
6262 epoch_store,
6263 )?;
6264
6265 let (transaction_outputs, _timings, _execution_error_opt) = self
6266 .execute_certificate(
6267 &execution_guard,
6268 &executable_tx,
6269 input_objects,
6270 None,
6271 ExecutionEnv::default(),
6272 epoch_store,
6273 )
6274 .unwrap();
6275 let system_obj = get_sui_system_state(&transaction_outputs.written)
6276 .expect("change epoch tx must write to system object");
6277
6278 let effects = transaction_outputs.effects;
6279 self.get_state_sync_store()
6283 .insert_transaction_and_effects(&tx, &effects);
6284
6285 info!(
6286 "Effects summary of the change epoch transaction: {:?}",
6287 effects.summary_for_debug()
6288 );
6289 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6290 assert!(effects.status().is_ok());
6292 Ok((system_obj, effects))
6293 }
6294
6295 #[instrument(level = "error", skip_all)]
6296 async fn reopen_epoch_db(
6297 &self,
6298 cur_epoch_store: &AuthorityPerEpochStore,
6299 new_committee: Committee,
6300 epoch_start_configuration: EpochStartConfiguration,
6301 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6302 epoch_last_checkpoint: CheckpointSequenceNumber,
6303 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6304 let new_epoch = new_committee.epoch;
6305 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6306 assert_eq!(
6307 epoch_start_configuration.epoch_start_state().epoch(),
6308 new_committee.epoch
6309 );
6310 fail_point!("before-open-new-epoch-store");
6311 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6312 self.name,
6313 new_committee,
6314 epoch_start_configuration,
6315 self.get_backing_package_store().clone(),
6316 self.get_object_store().clone(),
6317 expensive_safety_check_config,
6318 epoch_last_checkpoint,
6319 self.config.fullnode_sync_mode,
6320 )?;
6321 self.epoch_store.store(new_epoch_store.clone());
6322 Ok(new_epoch_store)
6323 }
6324
6325 #[cfg(test)]
6326 pub(crate) fn iter_live_object_set_for_testing(
6327 &self,
6328 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6329 let include_wrapped_object = !self
6330 .epoch_store_for_testing()
6331 .protocol_config()
6332 .simplified_unwrap_then_delete();
6333 self.get_global_state_hash_store()
6334 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6335 }
6336
6337 #[cfg(test)]
6338 pub(crate) fn shutdown_execution_for_test(&self) {
6339 self.tx_execution_shutdown
6340 .lock()
6341 .take()
6342 .unwrap()
6343 .send(())
6344 .unwrap();
6345 }
6346
6347 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6349 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6350 self.get_object_cache_reader()
6351 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6352 self.get_reconfig_api()
6353 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6354 Ok(())
6355 }
6356}
6357
6358#[async_trait]
6359impl TransactionKeyValueStoreTrait for AuthorityState {
6360 #[instrument(skip(self))]
6361 async fn multi_get(
6362 &self,
6363 transactions: &[TransactionDigest],
6364 effects: &[TransactionDigest],
6365 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6366 let txns = if !transactions.is_empty() {
6367 self.get_transaction_cache_reader()
6368 .multi_get_transaction_blocks(transactions)
6369 .into_iter()
6370 .map(|t| t.map(|t| (*t).clone().into_inner()))
6371 .collect()
6372 } else {
6373 vec![]
6374 };
6375
6376 let fx = if !effects.is_empty() {
6377 self.get_transaction_cache_reader()
6378 .multi_get_executed_effects(effects)
6379 } else {
6380 vec![]
6381 };
6382
6383 Ok((txns, fx))
6384 }
6385
6386 #[instrument(skip(self))]
6387 async fn multi_get_checkpoints(
6388 &self,
6389 checkpoint_summaries: &[CheckpointSequenceNumber],
6390 checkpoint_contents: &[CheckpointSequenceNumber],
6391 checkpoint_summaries_by_digest: &[CheckpointDigest],
6392 ) -> SuiResult<(
6393 Vec<Option<CertifiedCheckpointSummary>>,
6394 Vec<Option<CheckpointContents>>,
6395 Vec<Option<CertifiedCheckpointSummary>>,
6396 )> {
6397 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6399 let store = self.get_checkpoint_store();
6400 for seq in checkpoint_summaries {
6401 let checkpoint = store
6402 .get_checkpoint_by_sequence_number(*seq)?
6403 .map(|c| c.into_inner());
6404
6405 summaries.push(checkpoint);
6406 }
6407
6408 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6409 for seq in checkpoint_contents {
6410 let checkpoint = store
6411 .get_checkpoint_by_sequence_number(*seq)?
6412 .and_then(|summary| {
6413 store
6414 .get_checkpoint_contents(&summary.content_digest)
6415 .expect("db read cannot fail")
6416 });
6417 contents.push(checkpoint);
6418 }
6419
6420 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6421 for digest in checkpoint_summaries_by_digest {
6422 let checkpoint = store
6423 .get_checkpoint_by_digest(digest)?
6424 .map(|c| c.into_inner());
6425 summaries_by_digest.push(checkpoint);
6426 }
6427 Ok((summaries, contents, summaries_by_digest))
6428 }
6429
6430 #[instrument(skip(self))]
6431 async fn deprecated_get_transaction_checkpoint(
6432 &self,
6433 digest: TransactionDigest,
6434 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6435 Ok(self
6436 .get_checkpoint_cache()
6437 .deprecated_get_transaction_checkpoint(&digest)
6438 .map(|(_epoch, checkpoint)| checkpoint))
6439 }
6440
6441 #[instrument(skip(self))]
6442 async fn get_object(
6443 &self,
6444 object_id: ObjectID,
6445 version: VersionNumber,
6446 ) -> SuiResult<Option<Object>> {
6447 Ok(self
6448 .get_object_cache_reader()
6449 .get_object_by_key(&object_id, version))
6450 }
6451
6452 #[instrument(skip_all)]
6453 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6454 Ok(self
6455 .get_object_cache_reader()
6456 .multi_get_objects_by_key(object_keys))
6457 }
6458
6459 #[instrument(skip(self))]
6460 async fn multi_get_transaction_checkpoint(
6461 &self,
6462 digests: &[TransactionDigest],
6463 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6464 let res = self
6465 .get_checkpoint_cache()
6466 .deprecated_multi_get_transaction_checkpoint(digests);
6467
6468 Ok(res
6469 .into_iter()
6470 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6471 .collect())
6472 }
6473
6474 #[instrument(skip(self))]
6475 async fn multi_get_events_by_tx_digests(
6476 &self,
6477 digests: &[TransactionDigest],
6478 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6479 if digests.is_empty() {
6480 return Ok(vec![]);
6481 }
6482
6483 Ok(self
6484 .get_transaction_cache_reader()
6485 .multi_get_events(digests))
6486 }
6487}
6488
6489#[cfg(msim)]
6490pub mod framework_injection {
6491 use move_binary_format::CompiledModule;
6492 use std::collections::BTreeMap;
6493 use std::{cell::RefCell, collections::BTreeSet};
6494 use sui_framework::{BuiltInFramework, SystemPackage};
6495 use sui_types::base_types::{AuthorityName, ObjectID};
6496 use sui_types::is_system_package;
6497
6498 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6499
6500 thread_local! {
6502 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6503 }
6504
6505 type Framework = Vec<CompiledModule>;
6506
6507 pub type PackageUpgradeCallback =
6508 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6509
6510 enum PackageOverrideConfig {
6511 Global(Framework),
6512 PerValidator(PackageUpgradeCallback),
6513 }
6514
6515 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6516 modules
6517 .iter()
6518 .map(|m| {
6519 let mut buf = Vec::new();
6520 m.serialize_with_version(m.version, &mut buf).unwrap();
6521 buf
6522 })
6523 .collect()
6524 }
6525
6526 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6527 OVERRIDE.with(|bs| {
6528 bs.borrow_mut()
6529 .insert(package_id, PackageOverrideConfig::Global(modules))
6530 });
6531 }
6532
6533 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6534 OVERRIDE.with(|bs| {
6535 bs.borrow_mut()
6536 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6537 });
6538 }
6539
6540 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6541 OVERRIDE.with(|bs| {
6542 let mut new_packages_not_to_include: BTreeSet<_> =
6543 BuiltInFramework::all_package_ids().into_iter().collect();
6544 for pkg in &packages {
6545 new_packages_not_to_include.remove(&pkg.id);
6546 }
6547 for pkg in packages {
6548 bs.borrow_mut()
6549 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6550 }
6551 for empty_pkg in new_packages_not_to_include {
6552 bs.borrow_mut()
6553 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6554 }
6555 });
6556 }
6557
6558 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6559 OVERRIDE.with(|cfg| {
6560 cfg.borrow().get(package_id).and_then(|entry| match entry {
6561 PackageOverrideConfig::Global(framework) => {
6562 Some(compiled_modules_to_bytes(framework))
6563 }
6564 PackageOverrideConfig::PerValidator(func) => {
6565 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6566 }
6567 })
6568 })
6569 }
6570
6571 pub fn get_override_modules(
6572 package_id: &ObjectID,
6573 name: AuthorityName,
6574 ) -> Option<Vec<CompiledModule>> {
6575 OVERRIDE.with(|cfg| {
6576 cfg.borrow().get(package_id).and_then(|entry| match entry {
6577 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6578 PackageOverrideConfig::PerValidator(func) => func(name),
6579 })
6580 })
6581 }
6582
6583 pub fn get_override_system_package(
6584 package_id: &ObjectID,
6585 name: AuthorityName,
6586 ) -> Option<SystemPackage> {
6587 let bytes = get_override_bytes(package_id, name)?;
6588 let dependencies = if is_system_package(*package_id) {
6589 BuiltInFramework::get_package_by_id(package_id)
6590 .dependencies
6591 .to_vec()
6592 } else {
6593 BuiltInFramework::all_package_ids()
6595 };
6596 Some(SystemPackage {
6597 id: *package_id,
6598 bytes,
6599 dependencies,
6600 })
6601 }
6602
6603 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6604 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6605 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6606 cfg.borrow()
6607 .keys()
6608 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6609 .collect()
6610 });
6611
6612 extra
6613 .into_iter()
6614 .map(|package| SystemPackage {
6615 id: package,
6616 bytes: get_override_bytes(&package, name).unwrap(),
6617 dependencies: BuiltInFramework::all_package_ids(),
6618 })
6619 .collect()
6620 }
6621}
6622
6623#[derive(Debug, Serialize, Deserialize, Clone)]
6624pub struct ObjDumpFormat {
6625 pub id: ObjectID,
6626 pub version: VersionNumber,
6627 pub digest: ObjectDigest,
6628 pub object: Object,
6629}
6630
6631impl ObjDumpFormat {
6632 fn new(object: Object) -> Self {
6633 let oref = object.compute_object_reference();
6634 Self {
6635 id: oref.0,
6636 version: oref.1,
6637 digest: oref.2,
6638 object,
6639 }
6640 }
6641}
6642
6643#[derive(Debug, Serialize, Deserialize, Clone)]
6644pub struct NodeStateDump {
6645 pub tx_digest: TransactionDigest,
6646 pub sender_signed_data: SenderSignedData,
6647 pub executed_epoch: u64,
6648 pub reference_gas_price: u64,
6649 pub protocol_version: u64,
6650 pub epoch_start_timestamp_ms: u64,
6651 pub computed_effects: TransactionEffects,
6652 pub expected_effects_digest: TransactionEffectsDigest,
6653 pub relevant_system_packages: Vec<ObjDumpFormat>,
6654 pub shared_objects: Vec<ObjDumpFormat>,
6655 pub loaded_child_objects: Vec<ObjDumpFormat>,
6656 pub modified_at_versions: Vec<ObjDumpFormat>,
6657 pub runtime_reads: Vec<ObjDumpFormat>,
6658 pub input_objects: Vec<ObjDumpFormat>,
6659}
6660
6661impl NodeStateDump {
6662 pub fn new(
6663 tx_digest: &TransactionDigest,
6664 effects: &TransactionEffects,
6665 expected_effects_digest: TransactionEffectsDigest,
6666 object_store: &dyn ObjectStore,
6667 epoch_store: &Arc<AuthorityPerEpochStore>,
6668 inner_temporary_store: &InnerTemporaryStore,
6669 certificate: &VerifiedExecutableTransaction,
6670 ) -> SuiResult<Self> {
6671 let executed_epoch = epoch_store.epoch();
6673 let reference_gas_price = epoch_store.reference_gas_price();
6674 let epoch_start_config = epoch_store.epoch_start_config();
6675 let protocol_version = epoch_store.protocol_version().as_u64();
6676 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6677
6678 let mut relevant_system_packages = Vec::new();
6680 for sys_package_id in BuiltInFramework::all_package_ids() {
6681 if let Some(w) = object_store.get_object(&sys_package_id) {
6682 relevant_system_packages.push(ObjDumpFormat::new(w))
6683 }
6684 }
6685
6686 let mut shared_objects = Vec::new();
6688 for kind in effects.input_consensus_objects() {
6689 match kind {
6690 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6691 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6692 shared_objects.push(ObjDumpFormat::new(w))
6693 }
6694 }
6695 InputConsensusObject::ReadConsensusStreamEnded(..)
6696 | InputConsensusObject::MutateConsensusStreamEnded(..)
6697 | InputConsensusObject::Cancelled(..) => (), }
6699 }
6700
6701 let mut loaded_child_objects = Vec::new();
6704 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6705 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6706 loaded_child_objects.push(ObjDumpFormat::new(w))
6707 }
6708 }
6709
6710 let mut modified_at_versions = Vec::new();
6712 for (id, ver) in effects.modified_at_versions() {
6713 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6714 modified_at_versions.push(ObjDumpFormat::new(w))
6715 }
6716 }
6717
6718 let mut runtime_reads = Vec::new();
6721 for obj in inner_temporary_store
6722 .runtime_packages_loaded_from_db
6723 .values()
6724 {
6725 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6726 }
6727
6728 Ok(Self {
6731 tx_digest: *tx_digest,
6732 executed_epoch,
6733 reference_gas_price,
6734 epoch_start_timestamp_ms,
6735 protocol_version,
6736 relevant_system_packages,
6737 shared_objects,
6738 loaded_child_objects,
6739 modified_at_versions,
6740 runtime_reads,
6741 sender_signed_data: certificate.clone().into_message(),
6742 input_objects: inner_temporary_store
6743 .input_objects
6744 .values()
6745 .map(|o| ObjDumpFormat::new(o.clone()))
6746 .collect(),
6747 computed_effects: effects.clone(),
6748 expected_effects_digest,
6749 })
6750 }
6751
6752 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6753 let mut objects = Vec::new();
6754 objects.extend(self.relevant_system_packages.clone());
6755 objects.extend(self.shared_objects.clone());
6756 objects.extend(self.loaded_child_objects.clone());
6757 objects.extend(self.modified_at_versions.clone());
6758 objects.extend(self.runtime_reads.clone());
6759 objects.extend(self.input_objects.clone());
6760 objects
6761 }
6762
6763 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6764 let file_name = format!(
6765 "{}_{}_NODE_DUMP.json",
6766 self.tx_digest,
6767 AuthorityState::unixtime_now_ms()
6768 );
6769 let mut path = path.to_path_buf();
6770 path.push(&file_name);
6771 let mut file = File::create(path.clone())?;
6772 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6773 Ok(path)
6774 }
6775
6776 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6777 let file = File::open(path)?;
6778 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6779 }
6780}