1use crate::accumulators::AccumulatorSettlementTxBuilder;
6use crate::accumulators::balance_read::AccountBalanceRead;
7use crate::checkpoints::CheckpointBuilderError;
8use crate::checkpoints::CheckpointBuilderResult;
9use crate::congestion_tracker::CongestionTracker;
10use crate::consensus_adapter::ConsensusOverloadChecker;
11use crate::execution_cache::ExecutionCacheTraitPointers;
12use crate::execution_cache::TransactionCacheRead;
13use crate::execution_scheduler::ExecutionScheduler;
14use crate::execution_scheduler::SchedulingSource;
15use crate::jsonrpc_index::CoinIndexKey2;
16use crate::rpc_index::RpcIndexStore;
17use crate::traffic_controller::TrafficController;
18use crate::traffic_controller::metrics::TrafficControllerMetrics;
19use crate::transaction_outputs::TransactionOutputs;
20use crate::verify_indexes::{fix_indexes, verify_indexes};
21use arc_swap::{ArcSwap, Guard};
22use async_trait::async_trait;
23use authority_per_epoch_store::CertLockGuard;
24use fastcrypto::encoding::Base58;
25use fastcrypto::encoding::Encoding;
26use fastcrypto::hash::MultisetHash;
27use itertools::Itertools;
28use move_binary_format::CompiledModule;
29use move_binary_format::binary_config::BinaryConfig;
30use move_core_types::annotated_value::MoveStructLayout;
31use move_core_types::language_storage::ModuleId;
32use mysten_common::fatal;
33use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
34use parking_lot::Mutex;
35use prometheus::{
36 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
37 register_histogram_vec_with_registry, register_histogram_with_registry,
38 register_int_counter_vec_with_registry, register_int_counter_with_registry,
39 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
40};
41use serde::de::DeserializeOwned;
42use serde::{Deserialize, Serialize};
43use shared_object_version_manager::AssignedVersions;
44use shared_object_version_manager::Schedulable;
45use std::collections::BTreeMap;
46use std::collections::BTreeSet;
47use std::fs::File;
48use std::io::Write;
49use std::path::{Path, PathBuf};
50use std::sync::atomic::Ordering;
51use std::time::Duration;
52use std::time::Instant;
53use std::time::SystemTime;
54use std::time::UNIX_EPOCH;
55use std::{
56 collections::{HashMap, HashSet},
57 fs,
58 pin::Pin,
59 str::FromStr,
60 sync::Arc,
61 vec,
62};
63use sui_config::NodeConfig;
64use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
65use sui_protocol_config::PerObjectCongestionControlMode;
66use sui_types::crypto::RandomnessRound;
67use sui_types::dynamic_field::visitor as DFV;
68use sui_types::execution::ExecutionOutput;
69use sui_types::execution::ExecutionTimeObservationKey;
70use sui_types::execution::ExecutionTiming;
71use sui_types::execution_params::BalanceWithdrawStatus;
72use sui_types::execution_params::ExecutionOrEarlyError;
73use sui_types::execution_params::get_early_execution_error;
74use sui_types::execution_status::ExecutionStatus;
75use sui_types::inner_temporary_store::PackageStoreWithFallback;
76use sui_types::layout_resolver::LayoutResolver;
77use sui_types::layout_resolver::into_struct_layout;
78use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
79use sui_types::object::bounded_visitor::BoundedVisitor;
80use sui_types::storage::ChildObjectResolver;
81use sui_types::storage::InputKey;
82use sui_types::storage::TrackingBackingStore;
83use sui_types::traffic_control::{
84 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
85};
86use sui_types::transaction_executor::SimulateTransactionResult;
87use sui_types::transaction_executor::TransactionChecks;
88use tap::TapFallible;
89use tokio::sync::RwLock;
90use tokio::sync::mpsc::unbounded_channel;
91use tokio::sync::{mpsc, oneshot};
92use tokio::task::JoinHandle;
93use tokio::time::timeout;
94use tracing::trace;
95use tracing::{debug, error, info, instrument, warn};
96
97use self::authority_store::ExecutionLockWriteGuard;
98use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
99pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
100use mysten_metrics::{monitored_scope, spawn_monitored_task};
101
102use crate::jsonrpc_index::IndexStore;
103use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
104use mysten_common::debug_fatal;
105use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
106use sui_config::genesis::Genesis;
107use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
108use sui_framework::{BuiltInFramework, SystemPackage};
109use sui_json_rpc_types::{
110 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
111 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
112 SuiTransactionBlockEvents, TransactionFilter,
113};
114use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
115use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
116use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
117use sui_types::authenticator_state::get_authenticator_state;
118use sui_types::committee::{EpochId, ProtocolVersion};
119use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
120use sui_types::deny_list_v1::check_coin_deny_list_v1;
121use sui_types::digests::ChainIdentifier;
122use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
123use sui_types::effects::{
124 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
125 TransactionEvents, VerifiedSignedTransactionEffects,
126};
127use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
128use sui_types::event::{Event, EventID};
129use sui_types::executable_transaction::VerifiedExecutableTransaction;
130use sui_types::gas::{GasCostSummary, SuiGasStatus};
131use sui_types::inner_temporary_store::{
132 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
133};
134use sui_types::message_envelope::Message;
135use sui_types::messages_checkpoint::{
136 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
137 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
138 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
139 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
140};
141use sui_types::messages_grpc::{
142 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind,
143 ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
144};
145use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
146use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
147use sui_types::storage::{
148 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
149};
150use sui_types::sui_system_state::SuiSystemStateTrait;
151use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
152use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
153use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
154use sui_types::{
155 SUI_SYSTEM_ADDRESS,
156 base_types::*,
157 committee::Committee,
158 crypto::AuthoritySignature,
159 error::{SuiError, SuiResult},
160 object::{Object, ObjectRead},
161 transaction::*,
162};
163use sui_types::{TypeTag, is_system_package};
164use typed_store::TypedStoreError;
165
166use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
167use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
168use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
169use crate::authority::authority_store_pruner::{
170 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
171};
172use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
173use crate::authority::epoch_start_configuration::EpochStartConfiguration;
174use crate::checkpoints::CheckpointStore;
175use crate::epoch::committee_store::CommitteeStore;
176use crate::execution_cache::{
177 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
178 ObjectCacheRead, StateSyncAPI,
179};
180use crate::execution_driver::execution_process;
181use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
182use crate::metrics::LatencyObserver;
183use crate::metrics::RateTracker;
184use crate::module_cache_metrics::ResolverMetrics;
185use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
186use crate::stake_aggregator::StakeAggregator;
187use crate::subscription_handler::SubscriptionHandler;
188use crate::transaction_input_loader::TransactionInputLoader;
189
190#[cfg(msim)]
191pub use crate::checkpoints::checkpoint_executor::utils::{
192 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
193};
194
195use crate::authority::authority_store_tables::AuthorityPrunerTables;
196use crate::authority_client::NetworkAuthorityClient;
197use crate::validator_tx_finalizer::ValidatorTxFinalizer;
198#[cfg(msim)]
199use sui_types::committee::CommitteeTrait;
200use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
201
202#[cfg(test)]
203#[path = "unit_tests/authority_tests.rs"]
204pub mod authority_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/transaction_tests.rs"]
208pub mod transaction_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/batch_transaction_tests.rs"]
212mod batch_transaction_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/move_integration_tests.rs"]
216pub mod move_integration_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/gas_tests.rs"]
220mod gas_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/batch_verification_tests.rs"]
224mod batch_verification_tests;
225
226#[cfg(test)]
227#[path = "unit_tests/coin_deny_list_tests.rs"]
228mod coin_deny_list_tests;
229
230#[cfg(test)]
231#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
232mod mysticeti_fastpath_execution_tests;
233
234#[cfg(test)]
235#[path = "unit_tests/auth_unit_test_utils.rs"]
236pub mod auth_unit_test_utils;
237
238pub mod authority_test_utils;
239
240pub mod authority_per_epoch_store;
241pub mod authority_per_epoch_store_pruner;
242
243pub mod authority_store_pruner;
244pub mod authority_store_tables;
245pub mod authority_store_types;
246pub mod consensus_tx_status_cache;
247pub mod epoch_start_configuration;
248pub mod execution_time_estimator;
249pub mod shared_object_congestion_tracker;
250pub mod shared_object_version_manager;
251pub mod submitted_transaction_cache;
252pub mod test_authority_builder;
253pub mod transaction_deferral;
254pub mod transaction_reject_reason_cache;
255mod weighted_moving_average;
256
257pub(crate) mod authority_store;
258pub mod backpressure;
259
260pub struct AuthorityMetrics {
262 tx_orders: IntCounter,
263 total_certs: IntCounter,
264 total_cert_attempts: IntCounter,
265 total_effects: IntCounter,
266 pub shared_obj_tx: IntCounter,
268 sponsored_tx: IntCounter,
269 tx_already_processed: IntCounter,
270 num_input_objs: Histogram,
271 num_shared_objects: Histogram,
273 batch_size: Histogram,
274
275 authority_state_handle_transaction_latency: Histogram,
276 authority_state_handle_vote_transaction_latency: Histogram,
277
278 execute_certificate_latency_single_writer: Histogram,
279 execute_certificate_latency_shared_object: Histogram,
280 await_transaction_latency: Histogram,
281
282 internal_execution_latency: Histogram,
283 execution_load_input_objects_latency: Histogram,
284 prepare_certificate_latency: Histogram,
285 commit_certificate_latency: Histogram,
286 db_checkpoint_latency: Histogram,
287
288 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
290 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
291 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
292 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
293
294 pub(crate) execution_driver_executed_transactions: IntCounter,
295 pub(crate) execution_driver_paused_transactions: IntCounter,
296 pub(crate) execution_driver_dispatch_queue: IntGauge,
297 pub(crate) execution_queueing_delay_s: Histogram,
298 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
299 pub(crate) execution_gas_latency_ratio: Histogram,
300
301 pub(crate) skipped_consensus_txns: IntCounter,
302 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
303
304 pub(crate) authority_overload_status: IntGauge,
305 pub(crate) authority_load_shedding_percentage: IntGauge,
306
307 pub(crate) transaction_overload_sources: IntCounterVec,
308
309 post_processing_total_events_emitted: IntCounter,
311 post_processing_total_tx_indexed: IntCounter,
312 post_processing_total_tx_had_event_processed: IntCounter,
313 post_processing_total_failures: IntCounter,
314
315 pub consensus_handler_processed: IntCounterVec,
317 pub consensus_handler_transaction_sizes: HistogramVec,
318 pub consensus_handler_num_low_scoring_authorities: IntGauge,
319 pub consensus_handler_scores: IntGaugeVec,
320 pub consensus_handler_deferred_transactions: IntCounter,
321 pub consensus_handler_congested_transactions: IntCounter,
322 pub consensus_handler_cancelled_transactions: IntCounter,
323 pub consensus_handler_max_object_costs: IntGaugeVec,
324 pub consensus_committed_subdags: IntCounterVec,
325 pub consensus_committed_messages: IntGaugeVec,
326 pub consensus_committed_user_transactions: IntGaugeVec,
327 pub consensus_finalized_user_transactions: IntGaugeVec,
328 pub consensus_rejected_user_transactions: IntGaugeVec,
329 pub consensus_calculated_throughput: IntGauge,
330 pub consensus_calculated_throughput_profile: IntGauge,
331 pub consensus_block_handler_block_processed: IntCounter,
332 pub consensus_block_handler_txn_processed: IntCounterVec,
333 pub consensus_block_handler_fastpath_executions: IntCounter,
334 pub consensus_timestamp_bias: Histogram,
335
336 pub limits_metrics: Arc<LimitsMetrics>,
337
338 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
340
341 pub zklogin_sig_count: IntCounter,
343 pub multisig_sig_count: IntCounter,
345
346 pub execution_queueing_latency: LatencyObserver,
349
350 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
357
358 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
361}
362
363const POSITIVE_INT_BUCKETS: &[f64] = &[
365 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
366 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
367 7000000., 10000000.,
368];
369
370const LATENCY_SEC_BUCKETS: &[f64] = &[
371 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
372 10., 20., 30., 60., 90.,
373];
374
375const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
377 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
378 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
379];
380
381const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
384 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
385 2.0, 10.0, 30.0,
386];
387
388const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
389 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
390 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
391];
392
393pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
394
395pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
399
400impl AuthorityMetrics {
401 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
402 let execute_certificate_latency = register_histogram_vec_with_registry!(
403 "authority_state_execute_certificate_latency",
404 "Latency of executing certificates, including waiting for inputs",
405 &["tx_type"],
406 LATENCY_SEC_BUCKETS.to_vec(),
407 registry,
408 )
409 .unwrap();
410
411 let execute_certificate_latency_single_writer =
412 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
413 let execute_certificate_latency_shared_object =
414 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
415
416 Self {
417 tx_orders: register_int_counter_with_registry!(
418 "total_transaction_orders",
419 "Total number of transaction orders",
420 registry,
421 )
422 .unwrap(),
423 total_certs: register_int_counter_with_registry!(
424 "total_transaction_certificates",
425 "Total number of transaction certificates handled",
426 registry,
427 )
428 .unwrap(),
429 total_cert_attempts: register_int_counter_with_registry!(
430 "total_handle_certificate_attempts",
431 "Number of calls to handle_certificate",
432 registry,
433 )
434 .unwrap(),
435 total_effects: register_int_counter_with_registry!(
437 "total_transaction_effects",
438 "Total number of transaction effects produced",
439 registry,
440 )
441 .unwrap(),
442
443 shared_obj_tx: register_int_counter_with_registry!(
444 "num_shared_obj_tx",
445 "Number of transactions involving shared objects",
446 registry,
447 )
448 .unwrap(),
449
450 sponsored_tx: register_int_counter_with_registry!(
451 "num_sponsored_tx",
452 "Number of sponsored transactions",
453 registry,
454 )
455 .unwrap(),
456
457 tx_already_processed: register_int_counter_with_registry!(
458 "num_tx_already_processed",
459 "Number of transaction orders already processed previously",
460 registry,
461 )
462 .unwrap(),
463 num_input_objs: register_histogram_with_registry!(
464 "num_input_objects",
465 "Distribution of number of input TX objects per TX",
466 POSITIVE_INT_BUCKETS.to_vec(),
467 registry,
468 )
469 .unwrap(),
470 num_shared_objects: register_histogram_with_registry!(
471 "num_shared_objects",
472 "Number of shared input objects per TX",
473 POSITIVE_INT_BUCKETS.to_vec(),
474 registry,
475 )
476 .unwrap(),
477 batch_size: register_histogram_with_registry!(
478 "batch_size",
479 "Distribution of size of transaction batch",
480 POSITIVE_INT_BUCKETS.to_vec(),
481 registry,
482 )
483 .unwrap(),
484 authority_state_handle_transaction_latency: register_histogram_with_registry!(
485 "authority_state_handle_transaction_latency",
486 "Latency of handling transactions",
487 LATENCY_SEC_BUCKETS.to_vec(),
488 registry,
489 )
490 .unwrap(),
491 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
492 "authority_state_handle_vote_transaction_latency",
493 "Latency of voting on transactions without signing",
494 LATENCY_SEC_BUCKETS.to_vec(),
495 registry,
496 )
497 .unwrap(),
498 execute_certificate_latency_single_writer,
499 execute_certificate_latency_shared_object,
500 await_transaction_latency: register_histogram_with_registry!(
501 "await_transaction_latency",
502 "Latency of awaiting user transaction execution, including waiting for inputs",
503 LATENCY_SEC_BUCKETS.to_vec(),
504 registry,
505 )
506 .unwrap(),
507 internal_execution_latency: register_histogram_with_registry!(
508 "authority_state_internal_execution_latency",
509 "Latency of actual certificate executions",
510 LATENCY_SEC_BUCKETS.to_vec(),
511 registry,
512 )
513 .unwrap(),
514 execution_load_input_objects_latency: register_histogram_with_registry!(
515 "authority_state_execution_load_input_objects_latency",
516 "Latency of loading input objects for execution",
517 LOW_LATENCY_SEC_BUCKETS.to_vec(),
518 registry,
519 )
520 .unwrap(),
521 prepare_certificate_latency: register_histogram_with_registry!(
522 "authority_state_prepare_certificate_latency",
523 "Latency of executing certificates, before committing the results",
524 LATENCY_SEC_BUCKETS.to_vec(),
525 registry,
526 )
527 .unwrap(),
528 commit_certificate_latency: register_histogram_with_registry!(
529 "authority_state_commit_certificate_latency",
530 "Latency of committing certificate execution results",
531 LATENCY_SEC_BUCKETS.to_vec(),
532 registry,
533 )
534 .unwrap(),
535 db_checkpoint_latency: register_histogram_with_registry!(
536 "db_checkpoint_latency",
537 "Latency of checkpointing dbs",
538 LATENCY_SEC_BUCKETS.to_vec(),
539 registry,
540 ).unwrap(),
541 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
542 "transaction_manager_num_enqueued_certificates",
543 "Current number of certificates enqueued to ExecutionScheduler",
544 &["result"],
545 registry,
546 )
547 .unwrap(),
548 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
549 "transaction_manager_num_pending_certificates",
550 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
551 registry,
552 )
553 .unwrap(),
554 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
555 "transaction_manager_num_executing_certificates",
556 "Number of executing certificates, including queued and actually running certificates",
557 registry,
558 )
559 .unwrap(),
560 authority_overload_status: register_int_gauge_with_registry!(
561 "authority_overload_status",
562 "Whether authority is current experiencing overload and enters load shedding mode.",
563 registry)
564 .unwrap(),
565 authority_load_shedding_percentage: register_int_gauge_with_registry!(
566 "authority_load_shedding_percentage",
567 "The percentage of transactions is shed when the authority is in load shedding mode.",
568 registry)
569 .unwrap(),
570 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571 "transaction_manager_transaction_queue_age_s",
572 "Time spent in waiting for transaction in the queue",
573 LATENCY_SEC_BUCKETS.to_vec(),
574 registry,
575 )
576 .unwrap(),
577 transaction_overload_sources: register_int_counter_vec_with_registry!(
578 "transaction_overload_sources",
579 "Number of times each source indicates transaction overload.",
580 &["source"],
581 registry)
582 .unwrap(),
583 execution_driver_executed_transactions: register_int_counter_with_registry!(
584 "execution_driver_executed_transactions",
585 "Cumulative number of transaction executed by execution driver",
586 registry,
587 )
588 .unwrap(),
589 execution_driver_paused_transactions: register_int_counter_with_registry!(
590 "execution_driver_paused_transactions",
591 "Cumulative number of transactions paused by execution driver",
592 registry,
593 )
594 .unwrap(),
595 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
596 "execution_driver_dispatch_queue",
597 "Number of transaction pending in execution driver dispatch queue",
598 registry,
599 )
600 .unwrap(),
601 execution_queueing_delay_s: register_histogram_with_registry!(
602 "execution_queueing_delay_s",
603 "Queueing delay between a transaction is ready for execution until it starts executing.",
604 LATENCY_SEC_BUCKETS.to_vec(),
605 registry
606 )
607 .unwrap(),
608 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
609 "prepare_cert_gas_latency_ratio",
610 "The ratio of computation gas divided by VM execution latency.",
611 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
612 registry
613 )
614 .unwrap(),
615 execution_gas_latency_ratio: register_histogram_with_registry!(
616 "execution_gas_latency_ratio",
617 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
618 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
619 registry
620 )
621 .unwrap(),
622 skipped_consensus_txns: register_int_counter_with_registry!(
623 "skipped_consensus_txns",
624 "Total number of consensus transactions skipped",
625 registry,
626 )
627 .unwrap(),
628 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
629 "skipped_consensus_txns_cache_hit",
630 "Total number of consensus transactions skipped because of local cache hit",
631 registry,
632 )
633 .unwrap(),
634 post_processing_total_events_emitted: register_int_counter_with_registry!(
635 "post_processing_total_events_emitted",
636 "Total number of events emitted in post processing",
637 registry,
638 )
639 .unwrap(),
640 post_processing_total_tx_indexed: register_int_counter_with_registry!(
641 "post_processing_total_tx_indexed",
642 "Total number of txes indexed in post processing",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
647 "post_processing_total_tx_had_event_processed",
648 "Total number of txes finished event processing in post processing",
649 registry,
650 )
651 .unwrap(),
652 post_processing_total_failures: register_int_counter_with_registry!(
653 "post_processing_total_failures",
654 "Total number of failure in post processing",
655 registry,
656 )
657 .unwrap(),
658 consensus_handler_processed: register_int_counter_vec_with_registry!(
659 "consensus_handler_processed",
660 "Number of transactions processed by consensus handler",
661 &["class"],
662 registry
663 ).unwrap(),
664 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
665 "consensus_handler_transaction_sizes",
666 "Sizes of each type of transactions processed by consensus handler",
667 &["class"],
668 POSITIVE_INT_BUCKETS.to_vec(),
669 registry
670 ).unwrap(),
671 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
672 "consensus_handler_num_low_scoring_authorities",
673 "Number of low scoring authorities based on reputation scores from consensus",
674 registry
675 ).unwrap(),
676 consensus_handler_scores: register_int_gauge_vec_with_registry!(
677 "consensus_handler_scores",
678 "scores from consensus for each authority",
679 &["authority"],
680 registry,
681 ).unwrap(),
682 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
683 "consensus_handler_deferred_transactions",
684 "Number of transactions deferred by consensus handler",
685 registry,
686 ).unwrap(),
687 consensus_handler_congested_transactions: register_int_counter_with_registry!(
688 "consensus_handler_congested_transactions",
689 "Number of transactions deferred by consensus handler due to congestion",
690 registry,
691 ).unwrap(),
692 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
693 "consensus_handler_cancelled_transactions",
694 "Number of transactions cancelled by consensus handler",
695 registry,
696 ).unwrap(),
697 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
698 "consensus_handler_max_congestion_control_object_costs",
699 "Max object costs for congestion control in the current consensus commit",
700 &["commit_type"],
701 registry,
702 ).unwrap(),
703 consensus_committed_subdags: register_int_counter_vec_with_registry!(
704 "consensus_committed_subdags",
705 "Number of committed subdags, sliced by author",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 consensus_committed_messages: register_int_gauge_vec_with_registry!(
710 "consensus_committed_messages",
711 "Total number of committed consensus messages, sliced by author",
712 &["authority"],
713 registry,
714 ).unwrap(),
715 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
716 "consensus_committed_user_transactions",
717 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
718 &["authority"],
719 registry,
720 ).unwrap(),
721 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
722 "consensus_finalized_user_transactions",
723 "Number of user transactions finalized, sliced by submitter",
724 &["authority"],
725 registry,
726 ).unwrap(),
727 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
728 "consensus_rejected_user_transactions",
729 "Number of user transactions rejected, sliced by submitter",
730 &["authority"],
731 registry,
732 ).unwrap(),
733 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
734 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
735 zklogin_sig_count: register_int_counter_with_registry!(
736 "zklogin_sig_count",
737 "Count of zkLogin signatures",
738 registry,
739 )
740 .unwrap(),
741 multisig_sig_count: register_int_counter_with_registry!(
742 "multisig_sig_count",
743 "Count of zkLogin signatures",
744 registry,
745 )
746 .unwrap(),
747 consensus_calculated_throughput: register_int_gauge_with_registry!(
748 "consensus_calculated_throughput",
749 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
750 registry,
751 ).unwrap(),
752 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
753 "consensus_calculated_throughput_profile",
754 "The current active calculated throughput profile",
755 registry
756 ).unwrap(),
757 consensus_block_handler_block_processed: register_int_counter_with_registry!(
758 "consensus_block_handler_block_processed",
759 "Number of blocks processed by consensus block handler.",
760 registry
761 ).unwrap(),
762 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
763 "consensus_block_handler_txn_processed",
764 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
765 &["outcome"],
766 registry
767 ).unwrap(),
768 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
769 "consensus_block_handler_fastpath_executions",
770 "Number of fastpath transactions sent for execution by consensus transaction handler",
771 registry,
772 ).unwrap(),
773 consensus_timestamp_bias: register_histogram_with_registry!(
774 "consensus_timestamp_bias",
775 "Bias/delay from consensus timestamp to system time",
776 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
777 registry
778 ).unwrap(),
779 execution_queueing_latency: LatencyObserver::new(),
780 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
781 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
782 }
783 }
784}
785
786pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794#[derive(Debug, Clone)]
797pub struct ExecutionEnv {
798 pub assigned_versions: AssignedVersions,
800 pub expected_effects_digest: Option<TransactionEffectsDigest>,
803 pub scheduling_source: SchedulingSource,
805 pub withdraw_status: BalanceWithdrawStatus,
807 pub barrier_dependencies: Vec<TransactionDigest>,
810}
811
812impl Default for ExecutionEnv {
813 fn default() -> Self {
814 Self {
815 assigned_versions: Default::default(),
816 expected_effects_digest: None,
817 scheduling_source: SchedulingSource::NonFastPath,
818 withdraw_status: BalanceWithdrawStatus::NoWithdraw,
819 barrier_dependencies: Default::default(),
820 }
821 }
822}
823
824impl ExecutionEnv {
825 pub fn new() -> Self {
826 Default::default()
827 }
828
829 pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
830 self.scheduling_source = scheduling_source;
831 self
832 }
833
834 pub fn with_expected_effects_digest(
835 mut self,
836 expected_effects_digest: TransactionEffectsDigest,
837 ) -> Self {
838 self.expected_effects_digest = Some(expected_effects_digest);
839 self
840 }
841
842 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
843 self.assigned_versions = assigned_versions;
844 self
845 }
846
847 pub fn with_sufficient_balance(mut self) -> Self {
848 self.withdraw_status = BalanceWithdrawStatus::SufficientBalance;
849 self
850 }
851
852 pub fn with_insufficient_balance(mut self) -> Self {
853 self.withdraw_status = BalanceWithdrawStatus::InsufficientBalance;
854 self
855 }
856
857 pub fn with_barrier_dependencies(
858 mut self,
859 barrier_dependencies: BTreeSet<TransactionDigest>,
860 ) -> Self {
861 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
862 self
863 }
864}
865
866#[derive(Debug)]
867pub struct ForkRecoveryState {
868 transaction_overrides:
870 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
871}
872
873impl Default for ForkRecoveryState {
874 fn default() -> Self {
875 Self {
876 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
877 }
878 }
879}
880
881impl ForkRecoveryState {
882 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
883 let Some(config) = config else {
884 return Ok(Self::default());
885 };
886
887 let mut transaction_overrides = HashMap::new();
888 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
889 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
890 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
891 })?;
892 let effects_digest =
893 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
894 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
895 })?;
896 transaction_overrides.insert(tx_digest, effects_digest);
897 }
898
899 Ok(Self {
900 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
901 })
902 }
903
904 pub fn get_transaction_override(
905 &self,
906 tx_digest: &TransactionDigest,
907 ) -> Option<TransactionEffectsDigest> {
908 self.transaction_overrides.read().get(tx_digest).copied()
909 }
910}
911
912pub struct AuthorityState {
913 pub name: AuthorityName,
916 pub secret: StableSyncAuthoritySigner,
918
919 input_loader: TransactionInputLoader,
921 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
922
923 epoch_store: ArcSwap<AuthorityPerEpochStore>,
924
925 execution_lock: RwLock<EpochId>,
930
931 pub indexes: Option<Arc<IndexStore>>,
932 pub rpc_index: Option<Arc<RpcIndexStore>>,
933
934 pub subscription_handler: Arc<SubscriptionHandler>,
935 pub checkpoint_store: Arc<CheckpointStore>,
936
937 committee_store: Arc<CommitteeStore>,
938
939 execution_scheduler: Arc<ExecutionScheduler>,
941
942 #[allow(unused)]
944 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
945
946 pub metrics: Arc<AuthorityMetrics>,
947 _pruner: AuthorityStorePruner,
948 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
949
950 db_checkpoint_config: DBCheckpointConfig,
952
953 pub config: NodeConfig,
954
955 pub overload_info: AuthorityOverloadInfo,
957
958 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
959
960 chain_identifier: ChainIdentifier,
962
963 pub(crate) congestion_tracker: Arc<CongestionTracker>,
964
965 pub traffic_controller: Option<Arc<TrafficController>>,
967
968 fork_recovery_state: Option<ForkRecoveryState>,
970}
971
972impl AuthorityState {
979 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
980 epoch_store.committee().authority_exists(&self.name)
981 }
982
983 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
984 !self.is_validator(epoch_store)
985 }
986
987 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
988 &self.committee_store
989 }
990
991 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
992 self.committee_store.clone()
993 }
994
995 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
996 &self.config.authority_overload_config
997 }
998
999 pub fn get_epoch_state_commitments(
1000 &self,
1001 epoch: EpochId,
1002 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1003 self.checkpoint_store.get_epoch_state_commitments(epoch)
1004 }
1005
1006 fn handle_transaction_deny_checks(
1007 &self,
1008 transaction: &VerifiedTransaction,
1009 epoch_store: &Arc<AuthorityPerEpochStore>,
1010 ) -> SuiResult<CheckedInputObjects> {
1011 let tx_digest = transaction.digest();
1012 let tx_data = transaction.data().transaction_data();
1013
1014 let input_object_kinds = tx_data.input_objects()?;
1015 let receiving_objects_refs = tx_data.receiving_objects();
1016
1017 sui_transaction_checks::deny::check_transaction_for_signing(
1021 tx_data,
1022 transaction.tx_signatures(),
1023 &input_object_kinds,
1024 &receiving_objects_refs,
1025 &self.config.transaction_deny_config,
1026 self.get_backing_package_store().as_ref(),
1027 )?;
1028
1029 let withdraws = tx_data.process_funds_withdrawals_for_signing()?;
1030
1031 self.execution_cache_trait_pointers
1032 .child_object_resolver
1033 .check_balances_available(&withdraws)?;
1034
1035 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1036 Some(tx_digest),
1037 &input_object_kinds,
1038 &receiving_objects_refs,
1039 epoch_store.epoch(),
1040 )?;
1041
1042 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1043 epoch_store.protocol_config(),
1044 epoch_store.reference_gas_price(),
1045 tx_data,
1046 input_objects,
1047 &receiving_objects,
1048 &self.metrics.bytecode_verifier_metrics,
1049 &self.config.verifier_signing_config,
1050 )?;
1051
1052 self.handle_coin_deny_list_checks(
1053 tx_data,
1054 &checked_input_objects,
1055 &receiving_objects,
1056 epoch_store,
1057 )?;
1058
1059 Ok(checked_input_objects)
1060 }
1061
1062 fn handle_coin_deny_list_checks(
1063 &self,
1064 tx_data: &TransactionData,
1065 checked_input_objects: &CheckedInputObjects,
1066 receiving_objects: &ReceivingObjects,
1067 epoch_store: &Arc<AuthorityPerEpochStore>,
1068 ) -> SuiResult<()> {
1069 let funds_withdraw_types = tx_data
1070 .get_funds_withdrawals()
1071 .into_iter()
1072 .filter_map(|withdraw| {
1073 withdraw
1074 .type_arg
1075 .get_balance_type_param()
1076 .unwrap()
1078 .map(|ty| ty.to_canonical_string(false))
1079 })
1080 .collect::<BTreeSet<_>>();
1081
1082 if epoch_store.coin_deny_list_v1_enabled() {
1083 check_coin_deny_list_v1(
1084 tx_data.sender(),
1085 checked_input_objects,
1086 receiving_objects,
1087 funds_withdraw_types.clone(),
1088 &self.get_object_store(),
1089 )?;
1090 }
1091
1092 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1093 check_coin_deny_list_v2_during_signing(
1094 tx_data.sender(),
1095 checked_input_objects,
1096 receiving_objects,
1097 funds_withdraw_types.clone(),
1098 &self.get_object_store(),
1099 )?;
1100 }
1101
1102 Ok(())
1103 }
1104
1105 #[instrument(level = "trace", skip_all)]
1108 fn handle_transaction_impl(
1109 &self,
1110 transaction: VerifiedTransaction,
1111 sign: bool,
1112 epoch_store: &Arc<AuthorityPerEpochStore>,
1113 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1114 let _execution_lock = self.execution_lock_for_signing()?;
1116
1117 let checked_input_objects =
1118 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1119
1120 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1121
1122 let tx_digest = *transaction.digest();
1123 let signed_transaction = if sign {
1124 Some(VerifiedSignedTransaction::new(
1125 epoch_store.epoch(),
1126 transaction,
1127 self.name,
1128 &*self.secret,
1129 ))
1130 } else {
1131 None
1132 };
1133
1134 self.get_cache_writer().acquire_transaction_locks(
1139 epoch_store,
1140 &owned_objects,
1141 tx_digest,
1142 signed_transaction.clone(),
1143 )?;
1144
1145 Ok(signed_transaction)
1146 }
1147
1148 #[instrument(level = "trace", skip_all)]
1150 pub async fn handle_transaction(
1151 &self,
1152 epoch_store: &Arc<AuthorityPerEpochStore>,
1153 transaction: VerifiedTransaction,
1154 ) -> SuiResult<HandleTransactionResponse> {
1155 let tx_digest = *transaction.digest();
1156 debug!("handle_transaction");
1157
1158 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1160 return Ok(HandleTransactionResponse { status });
1161 }
1162
1163 let _metrics_guard = self
1164 .metrics
1165 .authority_state_handle_transaction_latency
1166 .start_timer();
1167 self.metrics.tx_orders.inc();
1168
1169 if epoch_store.protocol_config().mysticeti_fastpath()
1170 && !self
1171 .wait_for_fastpath_dependency_objects(&transaction, epoch_store.epoch())
1172 .await?
1173 {
1174 debug!("fastpath input objects are still unavailable after waiting");
1175 }
1177
1178 self.handle_sign_transaction(epoch_store, transaction).await
1179 }
1180
1181 pub async fn handle_sign_transaction(
1183 &self,
1184 epoch_store: &Arc<AuthorityPerEpochStore>,
1185 transaction: VerifiedTransaction,
1186 ) -> SuiResult<HandleTransactionResponse> {
1187 let tx_digest = *transaction.digest();
1188 let signed = self.handle_transaction_impl(transaction, true, epoch_store);
1189 match signed {
1190 Ok(Some(s)) => {
1191 if self.is_validator(epoch_store)
1192 && let Some(validator_tx_finalizer) = &self.validator_tx_finalizer
1193 {
1194 let tx = s.clone();
1195 let validator_tx_finalizer = validator_tx_finalizer.clone();
1196 let cache_reader = self.get_transaction_cache_reader().clone();
1197 let epoch_store = epoch_store.clone();
1198 spawn_monitored_task!(epoch_store.within_alive_epoch(
1199 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1200 ));
1201 }
1202 Ok(HandleTransactionResponse {
1203 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1204 })
1205 }
1206 Ok(None) => panic!("handle_transaction_impl should return a signed transaction"),
1207 Err(err) => Ok(HandleTransactionResponse {
1211 status: self
1212 .get_transaction_status(&tx_digest, epoch_store)?
1213 .ok_or(err)?
1214 .1,
1215 }),
1216 }
1217 }
1218
1219 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1229 pub(crate) fn handle_vote_transaction(
1230 &self,
1231 epoch_store: &Arc<AuthorityPerEpochStore>,
1232 transaction: VerifiedTransaction,
1233 ) -> SuiResult<()> {
1234 debug!("handle_vote_transaction");
1235
1236 let _metrics_guard = self
1237 .metrics
1238 .authority_state_handle_vote_transaction_latency
1239 .start_timer();
1240 self.metrics.tx_orders.inc();
1241
1242 if !epoch_store
1246 .get_reconfig_state_read_lock_guard()
1247 .should_accept_user_certs()
1248 {
1249 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1250 }
1251
1252 let tx_digest = *transaction.digest();
1257 if epoch_store.is_recently_finalized(&tx_digest)
1258 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1259 {
1260 return Ok(());
1261 }
1262
1263 let result =
1264 self.handle_transaction_impl(transaction, false , epoch_store)?;
1265 assert!(
1266 result.is_none(),
1267 "handle_transaction_impl should not return a signed transaction when sign is false"
1268 );
1269 Ok(())
1270 }
1271
1272 pub fn check_transaction_validity(
1281 &self,
1282 epoch_store: &Arc<AuthorityPerEpochStore>,
1283 transaction: &VerifiedTransaction,
1284 ) -> SuiResult<()> {
1285 if !epoch_store
1286 .get_reconfig_state_read_lock_guard()
1287 .should_accept_user_certs()
1288 {
1289 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1290 }
1291
1292 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1293
1294 let checked_input_objects =
1295 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1296
1297 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1300 let cache_reader = self.get_object_cache_reader();
1301
1302 for obj_ref in &owned_objects {
1303 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1304 if obj_ref.1 < live_object.version() {
1307 return Err(SuiErrorKind::UserInputError {
1308 error: UserInputError::ObjectVersionUnavailableForConsumption {
1309 provided_obj_ref: *obj_ref,
1310 current_version: live_object.version(),
1311 },
1312 }
1313 .into());
1314 }
1315
1316 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1318 return Err(SuiErrorKind::UserInputError {
1319 error: UserInputError::InvalidObjectDigest {
1320 object_id: obj_ref.0,
1321 expected_digest: live_object.digest(),
1322 },
1323 }
1324 .into());
1325 }
1326 }
1327 }
1328
1329 Ok(())
1330 }
1331
1332 pub fn check_system_overload_at_signing(&self) -> bool {
1333 self.config
1334 .authority_overload_config
1335 .check_system_overload_at_signing
1336 }
1337
1338 pub fn check_system_overload_at_execution(&self) -> bool {
1339 self.config
1340 .authority_overload_config
1341 .check_system_overload_at_execution
1342 }
1343
1344 pub(crate) fn check_system_overload(
1345 &self,
1346 consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1347 tx_data: &SenderSignedData,
1348 do_authority_overload_check: bool,
1349 ) -> SuiResult {
1350 if do_authority_overload_check {
1351 self.check_authority_overload(tx_data).tap_err(|_| {
1352 self.update_overload_metrics("execution_queue");
1353 })?;
1354 }
1355 self.execution_scheduler
1356 .check_execution_overload(self.overload_config(), tx_data)
1357 .tap_err(|_| {
1358 self.update_overload_metrics("execution_pending");
1359 })?;
1360 consensus_overload_checker
1361 .check_consensus_overload()
1362 .tap_err(|_| {
1363 self.update_overload_metrics("consensus");
1364 })?;
1365
1366 let pending_tx_count = self
1367 .get_cache_commit()
1368 .approximate_pending_transaction_count();
1369 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1370 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1371 retry_after_secs: 10,
1372 }
1373 .into());
1374 }
1375
1376 Ok(())
1377 }
1378
1379 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1380 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1381 return Ok(());
1382 }
1383
1384 let load_shedding_percentage = self
1385 .overload_info
1386 .load_shedding_percentage
1387 .load(Ordering::Relaxed);
1388 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1389 }
1390
1391 fn update_overload_metrics(&self, source: &str) {
1392 self.metrics
1393 .transaction_overload_sources
1394 .with_label_values(&[source])
1395 .inc();
1396 }
1397
1398 pub(crate) async fn wait_for_fastpath_dependency_objects(
1402 &self,
1403 transaction: &VerifiedTransaction,
1404 epoch: EpochId,
1405 ) -> SuiResult<bool> {
1406 let txn_data = transaction.data().transaction_data();
1407 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1408
1409 let fastpath_dependency_objects: Vec<_> = move_objects
1411 .into_iter()
1412 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1413 .chain(
1414 packages
1415 .into_iter()
1416 .map(|package_id| InputKey::Package { id: package_id }),
1417 )
1418 .collect();
1419 let receiving_keys: HashSet<_> = receiving_objects
1420 .into_iter()
1421 .filter_map(|receiving_obj_ref| {
1422 self.should_wait_for_dependency_object(receiving_obj_ref)
1423 })
1424 .collect();
1425 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1426 return Ok(true);
1427 }
1428
1429 let max_wait = if cfg!(msim) {
1432 Duration::from_millis(200)
1433 } else {
1434 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1435 };
1436
1437 match timeout(
1438 max_wait,
1439 self.get_object_cache_reader().notify_read_input_objects(
1440 &fastpath_dependency_objects,
1441 &receiving_keys,
1442 epoch,
1443 ),
1444 )
1445 .await
1446 {
1447 Ok(()) => Ok(true),
1448 Err(_) => Ok(false),
1451 }
1452 }
1453
1454 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1461 let (obj_id, cur_version, _digest) = obj_ref;
1462 let Some(latest_obj_ref) = self
1463 .get_object_cache_reader()
1464 .get_latest_object_ref_or_tombstone(obj_id)
1465 else {
1466 return Some(InputKey::VersionedObject {
1468 id: FullObjectID::new(obj_id, None),
1469 version: cur_version,
1470 });
1471 };
1472 let latest_digest = latest_obj_ref.2;
1473 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1474 return None;
1476 }
1477 let latest_version = latest_obj_ref.1;
1478 if cur_version <= latest_version {
1479 return None;
1482 }
1483 Some(InputKey::VersionedObject {
1485 id: FullObjectID::new(obj_id, None),
1486 version: cur_version,
1487 })
1488 }
1489
1490 #[instrument(level = "trace", skip_all)]
1495 pub async fn wait_for_certificate_execution(
1496 &self,
1497 certificate: &VerifiedCertificate,
1498 epoch_store: &Arc<AuthorityPerEpochStore>,
1499 ) -> SuiResult<TransactionEffects> {
1500 self.wait_for_transaction_execution(
1501 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1502 epoch_store,
1503 )
1504 .await
1505 }
1506
1507 #[instrument(level = "trace", skip_all)]
1511 pub async fn wait_for_transaction_execution(
1512 &self,
1513 transaction: &VerifiedExecutableTransaction,
1514 epoch_store: &Arc<AuthorityPerEpochStore>,
1515 ) -> SuiResult<TransactionEffects> {
1516 let _metrics_guard = if transaction.is_consensus_tx() {
1517 self.metrics
1518 .execute_certificate_latency_shared_object
1519 .start_timer()
1520 } else {
1521 self.metrics
1522 .execute_certificate_latency_single_writer
1523 .start_timer()
1524 };
1525 trace!("execute_transaction");
1526
1527 self.metrics.total_cert_attempts.inc();
1528
1529 if !transaction.is_consensus_tx() {
1530 self.execution_scheduler.enqueue(
1534 vec![(
1535 Schedulable::Transaction(transaction.clone()),
1536 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1537 )],
1538 epoch_store,
1539 );
1540 }
1541
1542 epoch_store
1545 .within_alive_epoch(self.notify_read_effects(
1546 "AuthorityState::wait_for_transaction_execution",
1547 *transaction.digest(),
1548 ))
1549 .await
1550 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1551 .and_then(|r| r)
1552 }
1553
1554 pub async fn await_transaction_effects(
1558 &self,
1559 digest: TransactionDigest,
1560 epoch_store: &Arc<AuthorityPerEpochStore>,
1561 ) -> SuiResult<TransactionEffects> {
1562 let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1563 debug!("await_transaction");
1564
1565 epoch_store
1566 .within_alive_epoch(
1567 self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1568 )
1569 .await
1570 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1571 .and_then(|r| r)
1572 }
1573
1574 #[instrument(level = "trace", skip_all)]
1586 pub async fn try_execute_immediately(
1587 &self,
1588 certificate: &VerifiedExecutableTransaction,
1589 mut execution_env: ExecutionEnv,
1590 epoch_store: &Arc<AuthorityPerEpochStore>,
1591 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1592 let _scope = monitored_scope("Execution::try_execute_immediately");
1593 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1594
1595 let tx_digest = certificate.digest();
1596
1597 if let Some(fork_recovery) = &self.fork_recovery_state
1598 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1599 {
1600 warn!(
1601 ?tx_digest,
1602 original_digest = ?execution_env.expected_effects_digest,
1603 override_digest = ?override_digest,
1604 "Applying fork recovery override for transaction effects digest"
1605 );
1606 execution_env.expected_effects_digest = Some(override_digest);
1607 }
1608
1609 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1611
1612 let tx_cache_reader = self.get_transaction_cache_reader();
1613 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1614 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1615 assert_eq!(
1616 effects.digest(),
1617 expected_effects_digest,
1618 "Unexpected effects digest for transaction {:?}",
1619 tx_digest
1620 );
1621 }
1622 tx_guard.release();
1623 return ExecutionOutput::Success((effects, None));
1624 }
1625
1626 let execution_start_time = Instant::now();
1627
1628 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1633 else {
1634 tx_guard.release();
1635 return ExecutionOutput::EpochEnded;
1636 };
1637 if *execution_guard != epoch_store.epoch() {
1642 tx_guard.release();
1643 info!("The epoch of the execution_guard doesn't match the epoch store");
1644 return ExecutionOutput::EpochEnded;
1645 }
1646
1647 let scheduling_source = execution_env.scheduling_source;
1648 let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1649 tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1650 } else {
1651 None
1652 };
1653
1654 let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1655 mysticeti_fp_outputs
1656 {
1657 assert!(
1658 !certificate.is_consensus_tx(),
1659 "Mysticeti fastpath executed transactions cannot be consensus transactions"
1660 );
1661 debug!(
1666 ?tx_digest,
1667 "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1668 );
1669 (outputs, None, None)
1670 } else {
1671 let (transaction_outputs, timings, execution_error_opt) = match self
1672 .process_certificate(
1673 &tx_guard,
1674 &execution_guard,
1675 certificate,
1676 execution_env,
1677 epoch_store,
1678 ) {
1679 ExecutionOutput::Success(result) => result,
1680 output => return output.unwrap_err(),
1681 };
1682 (
1683 Arc::new(transaction_outputs),
1684 Some(timings),
1685 execution_error_opt,
1686 )
1687 };
1688
1689 fail_point!("crash");
1690
1691 let effects = transaction_outputs.effects.clone();
1692 debug!(
1693 ?tx_digest,
1694 fx_digest=?effects.digest(),
1695 "process_certificate succeeded in {:.3}ms",
1696 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1697 );
1698
1699 if scheduling_source == SchedulingSource::MysticetiFastPath {
1700 self.get_cache_writer()
1701 .write_fastpath_transaction_outputs(transaction_outputs);
1702 } else {
1703 let commit_result =
1704 self.commit_certificate(certificate, transaction_outputs, epoch_store);
1705 if let Err(err) = commit_result {
1706 error!(?tx_digest, "Error committing transaction: {err}");
1707 tx_guard.release();
1708 return ExecutionOutput::Fatal(err);
1709 }
1710 }
1711
1712 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1713 certificate.data().transaction_data().kind()
1714 {
1715 if let Some(err) = &execution_error_opt {
1716 debug_fatal!("Authenticator state update failed: {:?}", err);
1717 }
1718 epoch_store.update_authenticator_state(auth_state);
1719
1720 if cfg!(debug_assertions) {
1722 let authenticator_state = get_authenticator_state(self.get_object_store())
1723 .expect("Read cannot fail")
1724 .expect("Authenticator state must exist");
1725
1726 let mut sys_jwks: Vec<_> = authenticator_state
1727 .active_jwks
1728 .into_iter()
1729 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1730 .collect();
1731 let mut active_jwks: Vec<_> = epoch_store
1732 .signature_verifier
1733 .get_jwks()
1734 .into_iter()
1735 .collect();
1736 sys_jwks.sort();
1737 active_jwks.sort();
1738
1739 assert_eq!(sys_jwks, active_jwks);
1740 }
1741 }
1742
1743 tx_guard.commit_tx();
1744
1745 let elapsed = execution_start_time.elapsed();
1746 if let Some(timings) = timings {
1747 epoch_store.record_local_execution_time(
1748 certificate.data().transaction_data(),
1749 &effects,
1750 timings,
1751 elapsed,
1752 );
1753 }
1754
1755 let elapsed_us = elapsed.as_micros() as f64;
1756 if elapsed_us > 0.0 {
1757 self.metrics
1758 .execution_gas_latency_ratio
1759 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1760 };
1761 ExecutionOutput::Success((effects, execution_error_opt))
1762 }
1763
1764 pub fn read_objects_for_execution(
1765 &self,
1766 tx_lock: &CertLockGuard,
1767 certificate: &VerifiedExecutableTransaction,
1768 assigned_shared_object_versions: AssignedVersions,
1769 epoch_store: &Arc<AuthorityPerEpochStore>,
1770 ) -> SuiResult<InputObjects> {
1771 let _scope = monitored_scope("Execution::load_input_objects");
1772 let _metrics_guard = self
1773 .metrics
1774 .execution_load_input_objects_latency
1775 .start_timer();
1776 let input_objects = &certificate.data().transaction_data().input_objects()?;
1777 self.input_loader.read_objects_for_execution(
1778 &certificate.key(),
1779 tx_lock,
1780 input_objects,
1781 &assigned_shared_object_versions,
1782 epoch_store.epoch(),
1783 )
1784 }
1785
1786 pub async fn try_execute_for_test(
1789 &self,
1790 certificate: &VerifiedCertificate,
1791 execution_env: ExecutionEnv,
1792 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1793 let epoch_store = self.epoch_store_for_testing();
1794 let (effects, execution_error_opt) = self
1795 .try_execute_immediately(
1796 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1797 execution_env,
1798 &epoch_store,
1799 )
1800 .await
1801 .unwrap();
1802 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1803 (signed_effects, execution_error_opt)
1804 }
1805
1806 pub async fn notify_read_effects(
1807 &self,
1808 task_name: &'static str,
1809 digest: TransactionDigest,
1810 ) -> SuiResult<TransactionEffects> {
1811 Ok(self
1812 .get_transaction_cache_reader()
1813 .notify_read_executed_effects(task_name, &[digest])
1814 .await
1815 .pop()
1816 .expect("must return correct number of effects"))
1817 }
1818
1819 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1820 self.get_object_cache_reader()
1821 .check_owned_objects_are_live(owned_object_refs)
1822 }
1823
1824 pub(crate) fn debug_dump_transaction_state(
1829 &self,
1830 tx_digest: &TransactionDigest,
1831 effects: &TransactionEffects,
1832 expected_effects_digest: TransactionEffectsDigest,
1833 inner_temporary_store: &InnerTemporaryStore,
1834 certificate: &VerifiedExecutableTransaction,
1835 debug_dump_config: &StateDebugDumpConfig,
1836 ) -> SuiResult<PathBuf> {
1837 let dump_dir = debug_dump_config
1838 .dump_file_directory
1839 .as_ref()
1840 .cloned()
1841 .unwrap_or(std::env::temp_dir());
1842 let epoch_store = self.load_epoch_store_one_call_per_task();
1843
1844 NodeStateDump::new(
1845 tx_digest,
1846 effects,
1847 expected_effects_digest,
1848 self.get_object_store().as_ref(),
1849 &epoch_store,
1850 inner_temporary_store,
1851 certificate,
1852 )?
1853 .write_to_file(&dump_dir)
1854 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1855 }
1856
1857 #[instrument(level = "trace", skip_all)]
1858 pub(crate) fn process_certificate(
1859 &self,
1860 tx_guard: &CertTxGuard,
1861 execution_guard: &ExecutionLockReadGuard<'_>,
1862 certificate: &VerifiedExecutableTransaction,
1863 execution_env: ExecutionEnv,
1864 epoch_store: &Arc<AuthorityPerEpochStore>,
1865 ) -> ExecutionOutput<(
1866 TransactionOutputs,
1867 Vec<ExecutionTiming>,
1868 Option<ExecutionError>,
1869 )> {
1870 let _scope = monitored_scope("Execution::process_certificate");
1871 let tx_digest = *certificate.digest();
1872
1873 let input_objects = match self.read_objects_for_execution(
1874 tx_guard.as_lock_guard(),
1875 certificate,
1876 execution_env.assigned_versions,
1877 epoch_store,
1878 ) {
1879 Ok(objects) => objects,
1880 Err(e) => return ExecutionOutput::Fatal(e),
1881 };
1882
1883 let expected_effects_digest = match execution_env.expected_effects_digest {
1884 Some(expected_effects_digest) => Some(expected_effects_digest),
1885 None => {
1886 match epoch_store.get_signed_effects_digest(&tx_digest) {
1891 Ok(digest) => digest,
1892 Err(e) => return ExecutionOutput::Fatal(e),
1893 }
1894 }
1895 };
1896
1897 fail_point_if!("correlated-crash-process-certificate", || {
1898 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1899 sui_simulator::task::kill_current_node(None);
1900 }
1901 });
1902
1903 self.execute_certificate(
1908 execution_guard,
1909 certificate,
1910 input_objects,
1911 expected_effects_digest,
1912 execution_env.withdraw_status,
1913 epoch_store,
1914 )
1915 }
1916
1917 pub async fn reconfigure_traffic_control(
1918 &self,
1919 params: TrafficControlReconfigParams,
1920 ) -> Result<TrafficControlReconfigParams, SuiError> {
1921 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1922 traffic_controller.admin_reconfigure(params).await
1923 } else {
1924 Err(SuiErrorKind::InvalidAdminRequest(
1925 "Traffic controller is not configured on this node".to_string(),
1926 )
1927 .into())
1928 }
1929 }
1930
1931 #[instrument(level = "trace", skip_all)]
1932 fn commit_certificate(
1933 &self,
1934 certificate: &VerifiedExecutableTransaction,
1935 transaction_outputs: Arc<TransactionOutputs>,
1936 epoch_store: &Arc<AuthorityPerEpochStore>,
1937 ) -> SuiResult {
1938 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1939 monitored_scope("Execution::commit_certificate");
1940 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1941
1942 let tx_digest = certificate.digest();
1943
1944 epoch_store.insert_executed_in_epoch(tx_digest);
1947 let key = certificate.key();
1948 if !matches!(key, TransactionKey::Digest(_)) {
1949 epoch_store.insert_tx_key(key, *tx_digest)?;
1950 }
1951
1952 fail_point!("crash");
1954
1955 self.get_cache_writer()
1956 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1957
1958 if certificate.transaction_data().is_end_of_epoch_tx() {
1959 self.get_object_cache_reader()
1962 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1963 }
1964
1965 Ok(())
1966 }
1967
1968 fn update_metrics(
1969 &self,
1970 certificate: &VerifiedExecutableTransaction,
1971 inner_temporary_store: &InnerTemporaryStore,
1972 effects: &TransactionEffects,
1973 ) {
1974 if certificate.has_zklogin_sig() {
1976 self.metrics.zklogin_sig_count.inc();
1977 } else if certificate.has_upgraded_multisig() {
1978 self.metrics.multisig_sig_count.inc();
1979 }
1980
1981 self.metrics.total_effects.inc();
1982 self.metrics.total_certs.inc();
1983
1984 let consensus_object_count = effects.input_consensus_objects().len();
1985 if consensus_object_count > 0 {
1986 self.metrics.shared_obj_tx.inc();
1987 }
1988
1989 if certificate.is_sponsored_tx() {
1990 self.metrics.sponsored_tx.inc();
1991 }
1992
1993 let input_object_count = inner_temporary_store.input_objects.len();
1994 self.metrics
1995 .num_input_objs
1996 .observe(input_object_count as f64);
1997 self.metrics
1998 .num_shared_objects
1999 .observe(consensus_object_count as f64);
2000 self.metrics.batch_size.observe(
2001 certificate
2002 .data()
2003 .intent_message()
2004 .value
2005 .kind()
2006 .num_commands() as f64,
2007 );
2008 }
2009
2010 #[instrument(level = "trace", skip_all)]
2019 fn execute_certificate(
2020 &self,
2021 _execution_guard: &ExecutionLockReadGuard<'_>,
2022 certificate: &VerifiedExecutableTransaction,
2023 input_objects: InputObjects,
2024 expected_effects_digest: Option<TransactionEffectsDigest>,
2025 withdraw_status: BalanceWithdrawStatus,
2026 epoch_store: &Arc<AuthorityPerEpochStore>,
2027 ) -> ExecutionOutput<(
2028 TransactionOutputs,
2029 Vec<ExecutionTiming>,
2030 Option<ExecutionError>,
2031 )> {
2032 let _scope = monitored_scope("Execution::prepare_certificate");
2033 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2034 let prepare_certificate_start_time = tokio::time::Instant::now();
2035
2036 let tx_data = certificate.data().transaction_data();
2038 if let Err(e) = tx_data.validity_check(epoch_store.protocol_config()) {
2039 return ExecutionOutput::Fatal(e.into());
2040 }
2041
2042 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2046 certificate,
2047 input_objects,
2048 epoch_store.protocol_config(),
2049 epoch_store.reference_gas_price(),
2050 ) {
2051 Ok(result) => result,
2052 Err(e) => return ExecutionOutput::Fatal(e),
2053 };
2054
2055 let owned_object_refs = input_objects.inner().filter_owned_objects();
2056 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2057 return ExecutionOutput::Fatal(e);
2058 }
2059 let tx_digest = *certificate.digest();
2060 let protocol_config = epoch_store.protocol_config();
2061 let transaction_data = &certificate.data().intent_message().value;
2062 let (kind, signer, gas_data) = transaction_data.execution_parts();
2063 let early_execution_error = get_early_execution_error(
2064 &tx_digest,
2065 &input_objects,
2066 self.config.certificate_deny_config.certificate_deny_set(),
2067 &withdraw_status,
2068 );
2069 let execution_params = match early_execution_error {
2070 Some(error) => ExecutionOrEarlyError::Err(error),
2071 None => ExecutionOrEarlyError::Ok(()),
2072 };
2073
2074 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2075
2076 #[allow(unused_mut)]
2077 let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2078 epoch_store.executor().execute_transaction_to_effects(
2079 &tracking_store,
2080 protocol_config,
2081 self.metrics.limits_metrics.clone(),
2082 self.config
2085 .expensive_safety_check_config
2086 .enable_deep_per_tx_sui_conservation_check(),
2087 execution_params,
2088 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2089 epoch_store
2090 .epoch_start_config()
2091 .epoch_data()
2092 .epoch_start_timestamp(),
2093 input_objects,
2094 gas_data,
2095 gas_status,
2096 kind,
2097 signer,
2098 tx_digest,
2099 &mut None,
2100 );
2101
2102 if let Some(expected_effects_digest) = expected_effects_digest
2103 && effects.digest() != expected_effects_digest
2104 {
2105 match self.debug_dump_transaction_state(
2107 &tx_digest,
2108 &effects,
2109 expected_effects_digest,
2110 &inner_temp_store,
2111 certificate,
2112 &self.config.state_debug_dump_config,
2113 ) {
2114 Ok(out_path) => {
2115 info!(
2116 "Dumped node state for transaction {} to {}",
2117 tx_digest,
2118 out_path.as_path().display().to_string()
2119 );
2120 }
2121 Err(e) => {
2122 error!("Error dumping state for transaction {}: {e}", tx_digest);
2123 }
2124 }
2125 let expected_effects = self
2126 .get_transaction_cache_reader()
2127 .get_effects(&expected_effects_digest);
2128 error!(
2129 ?tx_digest,
2130 ?expected_effects_digest,
2131 actual_effects = ?effects,
2132 expected_effects = ?expected_effects,
2133 "fork detected!"
2134 );
2135 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2136 tx_digest,
2137 expected_effects_digest,
2138 effects.digest(),
2139 ) {
2140 error!("Failed to record transaction fork: {e}");
2141 }
2142
2143 fail_point_if!("kill_transaction_fork_node", || {
2144 #[cfg(msim)]
2145 {
2146 tracing::error!(
2147 fatal = true,
2148 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2149 tx_digest
2150 );
2151 sui_simulator::task::shutdown_current_node();
2152 }
2153 });
2154
2155 fatal!(
2156 "Transaction {} is expected to have effects digest {}, but got {}!",
2157 tx_digest,
2158 expected_effects_digest,
2159 effects.digest()
2160 );
2161 }
2162
2163 fail_point_arg!("simulate_fork_during_execution", |(
2164 forked_validators,
2165 full_halt,
2166 effects_overrides,
2167 fork_probability,
2168 ): (
2169 std::sync::Arc<
2170 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2171 >,
2172 bool,
2173 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2174 f32,
2175 )| {
2176 #[cfg(msim)]
2177 self.simulate_fork_during_execution(
2178 certificate,
2179 epoch_store,
2180 &mut effects,
2181 forked_validators,
2182 full_halt,
2183 effects_overrides,
2184 fork_probability,
2185 );
2186 });
2187
2188 let unchanged_loaded_runtime_objects =
2189 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2190 certificate.transaction_data(),
2191 &effects,
2192 &tracking_store.into_read_objects(),
2193 );
2194
2195 let _ = self
2197 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2198 .tap_err(|e| {
2199 self.metrics.post_processing_total_failures.inc();
2200 error!(?tx_digest, "tx post processing failed: {e}");
2201 });
2202
2203 self.update_metrics(certificate, &inner_temp_store, &effects);
2204
2205 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2206 certificate.clone().into_unsigned(),
2207 effects,
2208 inner_temp_store,
2209 unchanged_loaded_runtime_objects,
2210 );
2211
2212 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2213 if elapsed > 0.0 {
2214 self.metrics.prepare_cert_gas_latency_ratio.observe(
2215 transaction_outputs
2216 .effects
2217 .gas_cost_summary()
2218 .computation_cost as f64
2219 / elapsed,
2220 );
2221 }
2222
2223 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2224 }
2225
2226 pub fn prepare_certificate_for_benchmark(
2227 &self,
2228 certificate: &VerifiedExecutableTransaction,
2229 input_objects: InputObjects,
2230 epoch_store: &Arc<AuthorityPerEpochStore>,
2231 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2232 let lock = RwLock::new(epoch_store.epoch());
2233 let execution_guard = lock.try_read().unwrap();
2234
2235 let (transaction_outputs, _timings, execution_error_opt) = self
2236 .execute_certificate(
2237 &execution_guard,
2238 certificate,
2239 input_objects,
2240 None,
2241 BalanceWithdrawStatus::NoWithdraw,
2242 epoch_store,
2243 )
2244 .unwrap();
2245 Ok((transaction_outputs, execution_error_opt))
2246 }
2247
2248 #[instrument(skip_all)]
2249 #[allow(clippy::type_complexity)]
2250 pub async fn dry_exec_transaction(
2251 &self,
2252 transaction: TransactionData,
2253 transaction_digest: TransactionDigest,
2254 ) -> SuiResult<(
2255 DryRunTransactionBlockResponse,
2256 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2257 TransactionEffects,
2258 Option<ObjectID>,
2259 )> {
2260 let epoch_store = self.load_epoch_store_one_call_per_task();
2261 if !self.is_fullnode(&epoch_store) {
2262 return Err(SuiErrorKind::UnsupportedFeatureError {
2263 error: "dry-exec is only supported on fullnodes".to_string(),
2264 }
2265 .into());
2266 }
2267
2268 if transaction.kind().is_system_tx() {
2269 return Err(SuiErrorKind::UnsupportedFeatureError {
2270 error: "dry-exec does not support system transactions".to_string(),
2271 }
2272 .into());
2273 }
2274
2275 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2276 }
2277
2278 #[allow(clippy::type_complexity)]
2279 pub fn dry_exec_transaction_for_benchmark(
2280 &self,
2281 transaction: TransactionData,
2282 transaction_digest: TransactionDigest,
2283 ) -> SuiResult<(
2284 DryRunTransactionBlockResponse,
2285 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2286 TransactionEffects,
2287 Option<ObjectID>,
2288 )> {
2289 let epoch_store = self.load_epoch_store_one_call_per_task();
2290 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2291 }
2292
2293 #[allow(clippy::type_complexity)]
2294 fn dry_exec_transaction_impl(
2295 &self,
2296 epoch_store: &AuthorityPerEpochStore,
2297 transaction: TransactionData,
2298 transaction_digest: TransactionDigest,
2299 ) -> SuiResult<(
2300 DryRunTransactionBlockResponse,
2301 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2302 TransactionEffects,
2303 Option<ObjectID>,
2304 )> {
2305 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2307
2308 let input_object_kinds = transaction.input_objects()?;
2309 let receiving_object_refs = transaction.receiving_objects();
2310
2311 sui_transaction_checks::deny::check_transaction_for_signing(
2312 &transaction,
2313 &[],
2314 &input_object_kinds,
2315 &receiving_object_refs,
2316 &self.config.transaction_deny_config,
2317 self.get_backing_package_store().as_ref(),
2318 )?;
2319
2320 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2321 None,
2323 &input_object_kinds,
2324 &receiving_object_refs,
2325 epoch_store.epoch(),
2326 )?;
2327
2328 let mut gas_data = transaction.gas_data().clone();
2330 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2331 let sender = transaction.sender();
2332 const MIST_TO_SUI: u64 = 1_000_000_000;
2334 const DRY_RUN_SUI: u64 = 1_000_000_000;
2335 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2336 let gas_object_id = ObjectID::random();
2337 let gas_object = Object::new_move(
2338 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2339 Owner::AddressOwner(sender),
2340 TransactionDigest::genesis_marker(),
2341 );
2342 let gas_object_ref = gas_object.compute_object_reference();
2343 gas_data.payment = vec![gas_object_ref];
2344 (
2345 sui_transaction_checks::check_transaction_input_with_given_gas(
2346 epoch_store.protocol_config(),
2347 epoch_store.reference_gas_price(),
2348 &transaction,
2349 input_objects,
2350 receiving_objects,
2351 gas_object,
2352 &self.metrics.bytecode_verifier_metrics,
2353 &self.config.verifier_signing_config,
2354 )?,
2355 Some(gas_object_id),
2356 )
2357 } else {
2358 (
2359 sui_transaction_checks::check_transaction_input(
2360 epoch_store.protocol_config(),
2361 epoch_store.reference_gas_price(),
2362 &transaction,
2363 input_objects,
2364 &receiving_objects,
2365 &self.metrics.bytecode_verifier_metrics,
2366 &self.config.verifier_signing_config,
2367 )?,
2368 None,
2369 )
2370 };
2371
2372 let protocol_config = epoch_store.protocol_config();
2373 let (kind, signer, _) = transaction.execution_parts();
2374
2375 let silent = true;
2376 let executor = sui_execution::executor(protocol_config, silent)
2377 .expect("Creating an executor should not fail here");
2378
2379 let expensive_checks = false;
2380 let early_execution_error = get_early_execution_error(
2381 &transaction_digest,
2382 &checked_input_objects,
2383 self.config.certificate_deny_config.certificate_deny_set(),
2384 &BalanceWithdrawStatus::NoWithdraw,
2386 );
2387 let execution_params = match early_execution_error {
2388 Some(error) => ExecutionOrEarlyError::Err(error),
2389 None => ExecutionOrEarlyError::Ok(()),
2390 };
2391 let (inner_temp_store, _, effects, _timings, execution_error) = executor
2392 .execute_transaction_to_effects(
2393 self.get_backing_store().as_ref(),
2394 protocol_config,
2395 self.metrics.limits_metrics.clone(),
2396 expensive_checks,
2397 execution_params,
2398 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2399 epoch_store
2400 .epoch_start_config()
2401 .epoch_data()
2402 .epoch_start_timestamp(),
2403 checked_input_objects,
2404 gas_data,
2405 gas_status,
2406 kind,
2407 signer,
2408 transaction_digest,
2409 &mut None,
2410 );
2411 let tx_digest = *effects.transaction_digest();
2412
2413 let module_cache =
2414 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2415
2416 let mut layout_resolver =
2417 epoch_store
2418 .executor()
2419 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2420 &inner_temp_store,
2421 self.get_backing_package_store(),
2422 )));
2423 let object_changes = Vec::new();
2425
2426 let balance_changes = Vec::new();
2428
2429 let written_with_kind = effects
2430 .created()
2431 .into_iter()
2432 .map(|(oref, _)| (oref, WriteKind::Create))
2433 .chain(
2434 effects
2435 .unwrapped()
2436 .into_iter()
2437 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2438 )
2439 .chain(
2440 effects
2441 .mutated()
2442 .into_iter()
2443 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2444 )
2445 .map(|(oref, kind)| {
2446 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2447 (oref.0, (oref, obj.clone(), kind))
2449 })
2450 .collect();
2451
2452 let execution_error_source = execution_error
2453 .as_ref()
2454 .err()
2455 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2456
2457 Ok((
2458 DryRunTransactionBlockResponse {
2459 suggested_gas_price: self
2460 .congestion_tracker
2461 .get_suggested_gas_prices(&transaction),
2462 input: SuiTransactionBlockData::try_from_with_module_cache(
2463 transaction,
2464 &module_cache,
2465 )
2466 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2467 error: format!(
2468 "Failed to convert transaction to SuiTransactionBlockData: {}",
2469 e
2470 ),
2471 })?, effects: effects.clone().try_into()?,
2473 events: SuiTransactionBlockEvents::try_from(
2474 inner_temp_store.events.clone(),
2475 tx_digest,
2476 None,
2477 layout_resolver.as_mut(),
2478 )?,
2479 object_changes,
2480 balance_changes,
2481 execution_error_source,
2482 },
2483 written_with_kind,
2484 effects,
2485 mock_gas,
2486 ))
2487 }
2488
2489 pub fn simulate_transaction(
2490 &self,
2491 mut transaction: TransactionData,
2492 checks: TransactionChecks,
2493 ) -> SuiResult<SimulateTransactionResult> {
2494 if transaction.kind().is_system_tx() {
2495 return Err(SuiErrorKind::UnsupportedFeatureError {
2496 error: "simulate does not support system transactions".to_string(),
2497 }
2498 .into());
2499 }
2500
2501 let epoch_store = self.load_epoch_store_one_call_per_task();
2502 if !self.is_fullnode(&epoch_store) {
2503 return Err(SuiErrorKind::UnsupportedFeatureError {
2504 error: "simulate is only supported on fullnodes".to_string(),
2505 }
2506 .into());
2507 }
2508
2509 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2511
2512 let input_object_kinds = transaction.input_objects()?;
2513 let receiving_object_refs = transaction.receiving_objects();
2514
2515 sui_transaction_checks::deny::check_transaction_for_signing(
2516 &transaction,
2517 &[],
2518 &input_object_kinds,
2519 &receiving_object_refs,
2520 &self.config.transaction_deny_config,
2521 self.get_backing_package_store().as_ref(),
2522 )?;
2523
2524 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2525 None,
2527 &input_object_kinds,
2528 &receiving_object_refs,
2529 epoch_store.epoch(),
2530 )?;
2531
2532 let mock_gas_id = if transaction.gas().is_empty() {
2534 let mock_gas_object = Object::new_move(
2535 MoveObject::new_gas_coin(
2536 OBJECT_START_VERSION,
2537 ObjectID::MAX,
2538 DEV_INSPECT_GAS_COIN_VALUE,
2539 ),
2540 Owner::AddressOwner(transaction.gas_data().owner),
2541 TransactionDigest::genesis_marker(),
2542 );
2543 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2544 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2545 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2546 Some(mock_gas_object.id())
2547 } else {
2548 None
2549 };
2550
2551 let protocol_config = epoch_store.protocol_config();
2552
2553 let (gas_status, checked_input_objects) = if checks.enabled() {
2554 sui_transaction_checks::check_transaction_input(
2555 epoch_store.protocol_config(),
2556 epoch_store.reference_gas_price(),
2557 &transaction,
2558 input_objects,
2559 &receiving_objects,
2560 &self.metrics.bytecode_verifier_metrics,
2561 &self.config.verifier_signing_config,
2562 )?
2563 } else {
2564 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2565 protocol_config,
2566 transaction.kind(),
2567 input_objects,
2568 receiving_objects,
2569 )?;
2570 let gas_status = SuiGasStatus::new(
2571 transaction.gas_budget(),
2572 transaction.gas_price(),
2573 epoch_store.reference_gas_price(),
2574 protocol_config,
2575 )?;
2576
2577 (gas_status, checked_input_objects)
2578 };
2579
2580 let executor = sui_execution::executor(
2582 protocol_config,
2583 true, )
2585 .expect("Creating an executor should not fail here");
2586
2587 let (kind, signer, gas_data) = transaction.execution_parts();
2588 let early_execution_error = get_early_execution_error(
2589 &transaction.digest(),
2590 &checked_input_objects,
2591 self.config.certificate_deny_config.certificate_deny_set(),
2592 &BalanceWithdrawStatus::NoWithdraw,
2594 );
2595 let execution_params = match early_execution_error {
2596 Some(error) => ExecutionOrEarlyError::Err(error),
2597 None => ExecutionOrEarlyError::Ok(()),
2598 };
2599
2600 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2601
2602 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2603 &tracking_store,
2604 protocol_config,
2605 self.metrics.limits_metrics.clone(),
2606 false, execution_params,
2608 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2609 epoch_store
2610 .epoch_start_config()
2611 .epoch_data()
2612 .epoch_start_timestamp(),
2613 checked_input_objects,
2614 gas_data,
2615 gas_status,
2616 kind,
2617 signer,
2618 transaction.digest(),
2619 checks.disabled(),
2620 );
2621
2622 let loaded_runtime_objects = tracking_store.into_read_objects();
2623 let unchanged_loaded_runtime_objects =
2624 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2625 &transaction,
2626 &effects,
2627 &loaded_runtime_objects,
2628 );
2629
2630 let object_set = {
2631 let objects = {
2632 let mut objects = loaded_runtime_objects;
2633
2634 for o in inner_temp_store
2635 .input_objects
2636 .into_values()
2637 .chain(inner_temp_store.written.into_values())
2638 {
2639 objects.insert(o);
2640 }
2641
2642 objects
2643 };
2644
2645 let object_keys = sui_types::storage::get_transaction_object_set(
2646 &transaction,
2647 &effects,
2648 &unchanged_loaded_runtime_objects,
2649 );
2650
2651 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2652 for k in object_keys {
2653 if let Some(o) = objects.get(&k) {
2654 set.insert(o.clone());
2655 }
2656 }
2657
2658 set
2659 };
2660
2661 Ok(SimulateTransactionResult {
2662 objects: object_set,
2663 events: effects.events_digest().map(|_| inner_temp_store.events),
2664 effects,
2665 execution_result,
2666 mock_gas_id,
2667 unchanged_loaded_runtime_objects,
2668 })
2669 }
2670
2671 #[allow(clippy::collapsible_else_if)]
2673 #[instrument(skip_all)]
2674 pub async fn dev_inspect_transaction_block(
2675 &self,
2676 sender: SuiAddress,
2677 transaction_kind: TransactionKind,
2678 gas_price: Option<u64>,
2679 gas_budget: Option<u64>,
2680 gas_sponsor: Option<SuiAddress>,
2681 gas_objects: Option<Vec<ObjectRef>>,
2682 show_raw_txn_data_and_effects: Option<bool>,
2683 skip_checks: Option<bool>,
2684 ) -> SuiResult<DevInspectResults> {
2685 let epoch_store = self.load_epoch_store_one_call_per_task();
2686
2687 if !self.is_fullnode(&epoch_store) {
2688 return Err(SuiErrorKind::UnsupportedFeatureError {
2689 error: "dev-inspect is only supported on fullnodes".to_string(),
2690 }
2691 .into());
2692 }
2693
2694 if transaction_kind.is_system_tx() {
2695 return Err(SuiErrorKind::UnsupportedFeatureError {
2696 error: "system transactions are not supported".to_string(),
2697 }
2698 .into());
2699 }
2700
2701 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2702 let skip_checks = skip_checks.unwrap_or(true);
2703 let reference_gas_price = epoch_store.reference_gas_price();
2704 let protocol_config = epoch_store.protocol_config();
2705 let max_tx_gas = protocol_config.max_tx_gas();
2706
2707 let price = gas_price.unwrap_or(reference_gas_price);
2708 let budget = gas_budget.unwrap_or(max_tx_gas);
2709 let owner = gas_sponsor.unwrap_or(sender);
2710 let payment = gas_objects.unwrap_or_default();
2712 let mut transaction = TransactionData::V1(TransactionDataV1 {
2713 kind: transaction_kind.clone(),
2714 sender,
2715 gas_data: GasData {
2716 payment,
2717 owner,
2718 price,
2719 budget,
2720 },
2721 expiration: TransactionExpiration::None,
2722 });
2723
2724 let raw_txn_data = if show_raw_txn_data_and_effects {
2725 bcs::to_bytes(&transaction).map_err(|_| {
2726 SuiErrorKind::TransactionSerializationError {
2727 error: "Failed to serialize transaction during dev inspect".to_string(),
2728 }
2729 })?
2730 } else {
2731 vec![]
2732 };
2733
2734 transaction.validity_check_no_gas_check(protocol_config)?;
2735
2736 let input_object_kinds = transaction.input_objects()?;
2737 let receiving_object_refs = transaction.receiving_objects();
2738
2739 sui_transaction_checks::deny::check_transaction_for_signing(
2740 &transaction,
2741 &[],
2742 &input_object_kinds,
2743 &receiving_object_refs,
2744 &self.config.transaction_deny_config,
2745 self.get_backing_package_store().as_ref(),
2746 )?;
2747
2748 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2749 None,
2751 &input_object_kinds,
2752 &receiving_object_refs,
2753 epoch_store.epoch(),
2754 )?;
2755
2756 let (gas_status, checked_input_objects) = if skip_checks {
2757 if transaction.gas().is_empty() {
2761 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2763 DEV_INSPECT_GAS_COIN_VALUE,
2764 transaction.gas_owner(),
2765 );
2766 let gas_object_ref = dummy_gas_object.compute_object_reference();
2767 transaction.gas_data_mut().payment = vec![gas_object_ref];
2768 input_objects.push(ObjectReadResult::new(
2769 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2770 dummy_gas_object.into(),
2771 ));
2772 }
2773 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2774 protocol_config,
2775 &transaction_kind,
2776 input_objects,
2777 receiving_objects,
2778 )?;
2779 let gas_status = SuiGasStatus::new(
2780 max_tx_gas,
2781 transaction.gas_price(),
2782 reference_gas_price,
2783 protocol_config,
2784 )?;
2785
2786 (gas_status, checked_input_objects)
2787 } else {
2788 if transaction.gas().is_empty() {
2791 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2793 DEV_INSPECT_GAS_COIN_VALUE,
2794 transaction.gas_owner(),
2795 );
2796 let gas_object_ref = dummy_gas_object.compute_object_reference();
2797 transaction.gas_data_mut().payment = vec![gas_object_ref];
2798 sui_transaction_checks::check_transaction_input_with_given_gas(
2799 epoch_store.protocol_config(),
2800 epoch_store.reference_gas_price(),
2801 &transaction,
2802 input_objects,
2803 receiving_objects,
2804 dummy_gas_object,
2805 &self.metrics.bytecode_verifier_metrics,
2806 &self.config.verifier_signing_config,
2807 )?
2808 } else {
2809 sui_transaction_checks::check_transaction_input(
2810 epoch_store.protocol_config(),
2811 epoch_store.reference_gas_price(),
2812 &transaction,
2813 input_objects,
2814 &receiving_objects,
2815 &self.metrics.bytecode_verifier_metrics,
2816 &self.config.verifier_signing_config,
2817 )?
2818 }
2819 };
2820
2821 let executor = sui_execution::executor(protocol_config, true)
2822 .expect("Creating an executor should not fail here");
2823 let gas_data = transaction.gas_data().clone();
2824 let intent_msg = IntentMessage::new(
2825 Intent {
2826 version: IntentVersion::V0,
2827 scope: IntentScope::TransactionData,
2828 app_id: AppId::Sui,
2829 },
2830 transaction,
2831 );
2832 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2833 let early_execution_error = get_early_execution_error(
2834 &transaction_digest,
2835 &checked_input_objects,
2836 self.config.certificate_deny_config.certificate_deny_set(),
2837 &BalanceWithdrawStatus::NoWithdraw,
2839 );
2840 let execution_params = match early_execution_error {
2841 Some(error) => ExecutionOrEarlyError::Err(error),
2842 None => ExecutionOrEarlyError::Ok(()),
2843 };
2844 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2845 self.get_backing_store().as_ref(),
2846 protocol_config,
2847 self.metrics.limits_metrics.clone(),
2848 false,
2849 execution_params,
2850 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2851 epoch_store
2852 .epoch_start_config()
2853 .epoch_data()
2854 .epoch_start_timestamp(),
2855 checked_input_objects,
2856 gas_data,
2857 gas_status,
2858 transaction_kind,
2859 sender,
2860 transaction_digest,
2861 skip_checks,
2862 );
2863
2864 let raw_effects = if show_raw_txn_data_and_effects {
2865 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2866 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2867 })?
2868 } else {
2869 vec![]
2870 };
2871
2872 let mut layout_resolver =
2873 epoch_store
2874 .executor()
2875 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2876 &inner_temp_store,
2877 self.get_backing_package_store(),
2878 )));
2879
2880 DevInspectResults::new(
2881 effects,
2882 inner_temp_store.events.clone(),
2883 execution_result,
2884 raw_txn_data,
2885 raw_effects,
2886 layout_resolver.as_mut(),
2887 )
2888 }
2889
2890 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2892 let epoch_store = self.epoch_store_for_testing();
2893 Ok(epoch_store.reference_gas_price())
2894 }
2895
2896 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2897 self.get_transaction_cache_reader()
2898 .is_tx_already_executed(digest)
2899 }
2900
2901 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2902 fn index_tx(
2903 &self,
2904 indexes: &IndexStore,
2905 digest: &TransactionDigest,
2906 cert: &VerifiedExecutableTransaction,
2908 effects: &TransactionEffects,
2909 events: &TransactionEvents,
2910 timestamp_ms: u64,
2911 tx_coins: Option<TxCoins>,
2912 written: &WrittenObjects,
2913 inner_temporary_store: &InnerTemporaryStore,
2914 epoch_store: &Arc<AuthorityPerEpochStore>,
2915 ) -> SuiResult<u64> {
2916 let changes = self
2917 .process_object_index(effects, written, inner_temporary_store, epoch_store)
2918 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2919
2920 indexes.index_tx(
2921 cert.data().intent_message().value.sender(),
2922 cert.data()
2923 .intent_message()
2924 .value
2925 .input_objects()?
2926 .iter()
2927 .map(|o| o.object_id()),
2928 effects
2929 .all_changed_objects()
2930 .into_iter()
2931 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2932 cert.data()
2933 .intent_message()
2934 .value
2935 .move_calls()
2936 .into_iter()
2937 .map(|(package, module, function)| {
2938 (*package, module.to_owned(), function.to_owned())
2939 }),
2940 events,
2941 changes,
2942 digest,
2943 timestamp_ms,
2944 tx_coins,
2945 )
2946 }
2947
2948 #[cfg(msim)]
2949 fn simulate_fork_during_execution(
2950 &self,
2951 certificate: &VerifiedExecutableTransaction,
2952 epoch_store: &Arc<AuthorityPerEpochStore>,
2953 effects: &mut TransactionEffects,
2954 forked_validators: std::sync::Arc<
2955 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2956 >,
2957 full_halt: bool,
2958 effects_overrides: std::sync::Arc<
2959 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2960 >,
2961 fork_probability: f32,
2962 ) {
2963 use std::cell::RefCell;
2964 thread_local! {
2965 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2966 }
2967 if !certificate.data().intent_message().value.is_system_tx() {
2968 let committee = epoch_store.committee();
2969 let cur_stake = (**committee).weight(&self.name);
2970 if cur_stake > 0 {
2971 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2972 let already_forked = forked_validators
2973 .lock()
2974 .ok()
2975 .map(|set| set.contains(&self.name))
2976 .unwrap_or(false);
2977
2978 if !already_forked {
2979 let should_fork = if full_halt {
2980 *total_stake <= committee.validity_threshold()
2982 } else {
2983 *total_stake + cur_stake < committee.validity_threshold()
2985 };
2986
2987 if should_fork {
2988 *total_stake += cur_stake;
2989
2990 if let Ok(mut external_set) = forked_validators.lock() {
2991 external_set.insert(self.name);
2992 info!("forked_validators: {:?}", external_set);
2993 }
2994 }
2995 }
2996
2997 if let Ok(external_set) = forked_validators.lock() {
2998 if external_set.contains(&self.name) {
2999 let tx_digest = certificate.digest().to_string();
3005 if let Ok(mut overrides) = effects_overrides.lock() {
3006 if overrides.contains_key(&tx_digest)
3007 || overrides.is_empty()
3008 && sui_simulator::random::deterministic_probability(
3009 &tx_digest,
3010 fork_probability,
3011 )
3012 {
3013 let original_effects_digest = effects.digest().to_string();
3014 overrides
3015 .insert(tx_digest.clone(), original_effects_digest.clone());
3016 info!(
3017 ?tx_digest,
3018 ?original_effects_digest,
3019 "Captured forked effects digest for transaction"
3020 );
3021 effects.gas_cost_summary_mut_for_testing().computation_cost +=
3022 1;
3023 }
3024 }
3025 }
3026 }
3027 });
3028 }
3029 }
3030 }
3031
3032 fn process_object_index(
3033 &self,
3034 effects: &TransactionEffects,
3035 written: &WrittenObjects,
3036 inner_temporary_store: &InnerTemporaryStore,
3037 epoch_store: &Arc<AuthorityPerEpochStore>,
3038 ) -> SuiResult<ObjectIndexChanges> {
3039 let mut layout_resolver =
3040 epoch_store
3041 .executor()
3042 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3043 inner_temporary_store,
3044 self.get_backing_package_store(),
3045 )));
3046
3047 let modified_at_version = effects
3048 .modified_at_versions()
3049 .into_iter()
3050 .collect::<HashMap<_, _>>();
3051
3052 let tx_digest = effects.transaction_digest();
3053 let mut deleted_owners = vec![];
3054 let mut deleted_dynamic_fields = vec![];
3055 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3056 let old_version = modified_at_version.get(&id).unwrap();
3057 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3060 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3061 ) {
3062 Owner::AddressOwner(addr)
3063 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3064 Owner::ObjectOwner(object_id) => {
3065 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3066 }
3067 _ => {}
3068 }
3069 }
3070
3071 let mut new_owners = vec![];
3072 let mut new_dynamic_fields = vec![];
3073
3074 for (oref, owner, kind) in effects.all_changed_objects() {
3075 let id = &oref.0;
3076 if let WriteKind::Mutate = kind {
3078 let Some(old_version) = modified_at_version.get(id) else {
3079 panic!(
3080 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3081 tx_digest
3082 );
3083 };
3084 let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3087 else {
3088 panic!(
3089 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3090 tx_digest, id, old_version
3091 );
3092 };
3093 if old_object.owner != owner {
3094 match old_object.owner {
3095 Owner::AddressOwner(addr)
3096 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3097 deleted_owners.push((addr, *id));
3098 }
3099 Owner::ObjectOwner(object_id) => {
3100 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3101 }
3102 _ => {}
3103 }
3104 }
3105 }
3106
3107 match owner {
3108 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3109 let new_object = written.get(id).unwrap_or_else(
3111 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3112 );
3113 assert_eq!(
3114 new_object.version(),
3115 oref.1,
3116 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3117 tx_digest,
3118 id,
3119 new_object.version(),
3120 oref.1
3121 );
3122
3123 let type_ = new_object
3124 .type_()
3125 .map(|type_| ObjectType::Struct(type_.clone()))
3126 .unwrap_or(ObjectType::Package);
3127
3128 new_owners.push((
3129 (addr, *id),
3130 ObjectInfo {
3131 object_id: *id,
3132 version: oref.1,
3133 digest: oref.2,
3134 type_,
3135 owner,
3136 previous_transaction: *effects.transaction_digest(),
3137 },
3138 ));
3139 }
3140 Owner::ObjectOwner(owner) => {
3141 let new_object = written.get(id).unwrap_or_else(
3142 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3143 );
3144 assert_eq!(
3145 new_object.version(),
3146 oref.1,
3147 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3148 tx_digest,
3149 id,
3150 new_object.version(),
3151 oref.1
3152 );
3153
3154 let Some(df_info) = self
3155 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3156 .unwrap_or_else(|e| {
3157 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3158 None
3159 }
3160 )
3161 else {
3162 continue;
3164 };
3165 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3166 }
3167 _ => {}
3168 }
3169 }
3170
3171 Ok(ObjectIndexChanges {
3172 deleted_owners,
3173 deleted_dynamic_fields,
3174 new_owners,
3175 new_dynamic_fields,
3176 })
3177 }
3178
3179 fn try_create_dynamic_field_info(
3180 &self,
3181 o: &Object,
3182 written: &WrittenObjects,
3183 resolver: &mut dyn LayoutResolver,
3184 ) -> SuiResult<Option<DynamicFieldInfo>> {
3185 let Some(move_object) = o.data.try_as_move().cloned() else {
3187 return Ok(None);
3188 };
3189
3190 if !move_object.type_().is_dynamic_field() {
3192 return Ok(None);
3193 }
3194
3195 let layout = resolver
3196 .get_annotated_layout(&move_object.type_().clone().into())?
3197 .into_layout();
3198
3199 let field =
3200 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3201 SuiErrorKind::ObjectDeserializationError {
3202 error: e.to_string(),
3203 }
3204 })?;
3205
3206 let type_ = field.kind;
3207 let name_type: TypeTag = field.name_layout.into();
3208 let bcs_name = field.name_bytes.to_owned();
3209
3210 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3211 .map_err(|e| {
3212 warn!("{e}");
3213 SuiErrorKind::ObjectDeserializationError {
3214 error: e.to_string(),
3215 }
3216 })?;
3217
3218 let name = DynamicFieldName {
3219 type_: name_type,
3220 value: SuiMoveValue::from(name_value).to_json_value(),
3221 };
3222
3223 let value_metadata = field.value_metadata().map_err(|e| {
3224 warn!("{e}");
3225 SuiErrorKind::ObjectDeserializationError {
3226 error: e.to_string(),
3227 }
3228 })?;
3229
3230 Ok(Some(match value_metadata {
3231 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3232 name,
3233 bcs_name,
3234 type_,
3235 object_type: object_type.to_canonical_string(true),
3236 object_id: o.id(),
3237 version: o.version(),
3238 digest: o.digest(),
3239 },
3240
3241 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3242 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3246 let version = object.version();
3247 let digest = object.digest();
3248 let object_type = object.data.type_().unwrap().clone();
3249 (version, digest, object_type)
3250 } else {
3251 let object = self
3253 .get_object_store()
3254 .get_object_by_key(&object_id, o.version())
3255 .ok_or_else(|| UserInputError::ObjectNotFound {
3256 object_id,
3257 version: Some(o.version()),
3258 })?;
3259 let version = object.version();
3260 let digest = object.digest();
3261 let object_type = object.data.type_().unwrap().clone();
3262 (version, digest, object_type)
3263 };
3264
3265 DynamicFieldInfo {
3266 name,
3267 bcs_name,
3268 type_,
3269 object_type: object_type.to_string(),
3270 object_id,
3271 version,
3272 digest,
3273 }
3274 }
3275 }))
3276 }
3277
3278 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3279 fn post_process_one_tx(
3280 &self,
3281 certificate: &VerifiedExecutableTransaction,
3282 effects: &TransactionEffects,
3283 inner_temporary_store: &InnerTemporaryStore,
3284 epoch_store: &Arc<AuthorityPerEpochStore>,
3285 ) -> SuiResult {
3286 if self.indexes.is_none() {
3287 return Ok(());
3288 }
3289
3290 let _scope = monitored_scope("Execution::post_process_one_tx");
3291
3292 let tx_digest = certificate.digest();
3293 let timestamp_ms = Self::unixtime_now_ms();
3294 let events = &inner_temporary_store.events;
3295 let written = &inner_temporary_store.written;
3296 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3297 effects,
3298 inner_temporary_store,
3299 epoch_store,
3300 );
3301
3302 if let Some(indexes) = &self.indexes {
3304 let _ = self
3305 .index_tx(
3306 indexes.as_ref(),
3307 tx_digest,
3308 certificate,
3309 effects,
3310 events,
3311 timestamp_ms,
3312 tx_coins,
3313 written,
3314 inner_temporary_store,
3315 epoch_store,
3316 )
3317 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3318 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3319 .expect("Indexing tx should not fail");
3320
3321 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3322 let events = self.make_transaction_block_events(
3323 events.clone(),
3324 *tx_digest,
3325 timestamp_ms,
3326 epoch_store,
3327 inner_temporary_store,
3328 )?;
3329 self.subscription_handler
3331 .process_tx(certificate.data().transaction_data(), &effects, &events)
3332 .tap_ok(|_| {
3333 self.metrics
3334 .post_processing_total_tx_had_event_processed
3335 .inc()
3336 })
3337 .tap_err(|e| {
3338 warn!(
3339 ?tx_digest,
3340 "Post processing - Couldn't process events for tx: {}", e
3341 )
3342 })?;
3343
3344 self.metrics
3345 .post_processing_total_events_emitted
3346 .inc_by(events.data.len() as u64);
3347 };
3348 Ok(())
3349 }
3350
3351 fn make_transaction_block_events(
3352 &self,
3353 transaction_events: TransactionEvents,
3354 digest: TransactionDigest,
3355 timestamp_ms: u64,
3356 epoch_store: &Arc<AuthorityPerEpochStore>,
3357 inner_temporary_store: &InnerTemporaryStore,
3358 ) -> SuiResult<SuiTransactionBlockEvents> {
3359 let mut layout_resolver =
3360 epoch_store
3361 .executor()
3362 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3363 inner_temporary_store,
3364 self.get_backing_package_store(),
3365 )));
3366 SuiTransactionBlockEvents::try_from(
3367 transaction_events,
3368 digest,
3369 Some(timestamp_ms),
3370 layout_resolver.as_mut(),
3371 )
3372 }
3373
3374 pub fn unixtime_now_ms() -> u64 {
3375 let now = SystemTime::now()
3376 .duration_since(UNIX_EPOCH)
3377 .expect("Time went backwards")
3378 .as_millis();
3379 u64::try_from(now).expect("Travelling in time machine")
3380 }
3381
3382 #[instrument(level = "trace", skip_all)]
3386 pub async fn handle_transaction_info_request(
3387 &self,
3388 request: TransactionInfoRequest,
3389 ) -> SuiResult<TransactionInfoResponse> {
3390 let epoch_store = self.load_epoch_store_one_call_per_task();
3391 let (transaction, status) = self
3392 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3393 .ok_or(SuiErrorKind::TransactionNotFound {
3394 digest: request.transaction_digest,
3395 })?;
3396 Ok(TransactionInfoResponse {
3397 transaction,
3398 status,
3399 })
3400 }
3401
3402 #[instrument(level = "trace", skip_all)]
3403 pub async fn handle_object_info_request(
3404 &self,
3405 request: ObjectInfoRequest,
3406 ) -> SuiResult<ObjectInfoResponse> {
3407 let epoch_store = self.load_epoch_store_one_call_per_task();
3408
3409 let requested_object_seq = match request.request_kind {
3410 ObjectInfoRequestKind::LatestObjectInfo => {
3411 let (_, seq, _) = self
3412 .get_object_or_tombstone(request.object_id)
3413 .await
3414 .ok_or_else(|| {
3415 SuiError::from(UserInputError::ObjectNotFound {
3416 object_id: request.object_id,
3417 version: None,
3418 })
3419 })?;
3420 seq
3421 }
3422 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3423 };
3424
3425 let object = self
3426 .get_object_store()
3427 .get_object_by_key(&request.object_id, requested_object_seq)
3428 .ok_or_else(|| {
3429 SuiError::from(UserInputError::ObjectNotFound {
3430 object_id: request.object_id,
3431 version: Some(requested_object_seq),
3432 })
3433 })?;
3434
3435 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3436 (request.generate_layout, object.data.try_as_move())
3437 {
3438 Some(into_struct_layout(
3439 self.load_epoch_store_one_call_per_task()
3440 .executor()
3441 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3442 .get_annotated_layout(&move_obj.type_().clone().into())?,
3443 )?)
3444 } else {
3445 None
3446 };
3447
3448 let lock = if !object.is_address_owned() {
3449 None
3451 } else {
3452 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3453 .await?
3454 .map(|s| s.into_inner())
3455 };
3456
3457 Ok(ObjectInfoResponse {
3458 object,
3459 layout,
3460 lock_for_debugging: lock,
3461 })
3462 }
3463
3464 #[instrument(level = "trace", skip_all)]
3465 pub fn handle_checkpoint_request(
3466 &self,
3467 request: &CheckpointRequest,
3468 ) -> SuiResult<CheckpointResponse> {
3469 let summary = match request.sequence_number {
3470 Some(seq) => self
3471 .checkpoint_store
3472 .get_checkpoint_by_sequence_number(seq)?,
3473 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474 }
3475 .map(|v| v.into_inner());
3476 let contents = match &summary {
3477 Some(s) => self
3478 .checkpoint_store
3479 .get_checkpoint_contents(&s.content_digest)?,
3480 None => None,
3481 };
3482 Ok(CheckpointResponse {
3483 checkpoint: summary,
3484 contents,
3485 })
3486 }
3487
3488 #[instrument(level = "trace", skip_all)]
3489 pub fn handle_checkpoint_request_v2(
3490 &self,
3491 request: &CheckpointRequestV2,
3492 ) -> SuiResult<CheckpointResponseV2> {
3493 let summary = if request.certified {
3494 let summary = match request.sequence_number {
3495 Some(seq) => self
3496 .checkpoint_store
3497 .get_checkpoint_by_sequence_number(seq)?,
3498 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3499 }
3500 .map(|v| v.into_inner());
3501 summary.map(CheckpointSummaryResponse::Certified)
3502 } else {
3503 let summary = match request.sequence_number {
3504 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3505 None => self
3506 .checkpoint_store
3507 .get_latest_locally_computed_checkpoint()?,
3508 };
3509 summary.map(CheckpointSummaryResponse::Pending)
3510 };
3511 let contents = match &summary {
3512 Some(s) => self
3513 .checkpoint_store
3514 .get_checkpoint_contents(&s.content_digest())?,
3515 None => None,
3516 };
3517 Ok(CheckpointResponseV2 {
3518 checkpoint: summary,
3519 contents,
3520 })
3521 }
3522
3523 fn check_protocol_version(
3524 supported_protocol_versions: SupportedProtocolVersions,
3525 current_version: ProtocolVersion,
3526 ) {
3527 info!("current protocol version is now {:?}", current_version);
3528 info!("supported versions are: {:?}", supported_protocol_versions);
3529 if !supported_protocol_versions.is_version_supported(current_version) {
3530 let msg = format!(
3531 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3532 current_version, supported_protocol_versions,
3533 );
3534
3535 error!("{}", msg);
3536 eprintln!("{}", msg);
3537
3538 #[cfg(not(msim))]
3539 std::process::exit(1);
3540
3541 #[cfg(msim)]
3542 sui_simulator::task::shutdown_current_node();
3543 }
3544 }
3545
3546 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3548 pub async fn new(
3549 name: AuthorityName,
3550 secret: StableSyncAuthoritySigner,
3551 supported_protocol_versions: SupportedProtocolVersions,
3552 store: Arc<AuthorityStore>,
3553 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3554 epoch_store: Arc<AuthorityPerEpochStore>,
3555 committee_store: Arc<CommitteeStore>,
3556 indexes: Option<Arc<IndexStore>>,
3557 rpc_index: Option<Arc<RpcIndexStore>>,
3558 checkpoint_store: Arc<CheckpointStore>,
3559 prometheus_registry: &Registry,
3560 genesis_objects: &[Object],
3561 db_checkpoint_config: &DBCheckpointConfig,
3562 config: NodeConfig,
3563 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3564 chain_identifier: ChainIdentifier,
3565 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3566 policy_config: Option<PolicyConfig>,
3567 firewall_config: Option<RemoteFirewallConfig>,
3568 pruner_watermarks: Arc<PrunerWatermarks>,
3569 ) -> Arc<Self> {
3570 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3571
3572 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3573 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3574 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3575 execution_cache_trait_pointers.object_cache_reader.clone(),
3576 execution_cache_trait_pointers.child_object_resolver.clone(),
3577 execution_cache_trait_pointers
3578 .transaction_cache_reader
3579 .clone(),
3580 tx_ready_certificates,
3581 &epoch_store,
3582 metrics.clone(),
3583 ));
3584 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3585
3586 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3587 epoch_store.get_parent_path(),
3588 &config.authority_store_pruning_config,
3589 );
3590 let _pruner = AuthorityStorePruner::new(
3591 store.perpetual_tables.clone(),
3592 checkpoint_store.clone(),
3593 rpc_index.clone(),
3594 indexes.clone(),
3595 config.authority_store_pruning_config.clone(),
3596 epoch_store.committee().authority_exists(&name),
3597 epoch_store.epoch_start_state().epoch_duration_ms(),
3598 prometheus_registry,
3599 pruner_db,
3600 pruner_watermarks,
3601 );
3602 let input_loader =
3603 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3604 let epoch = epoch_store.epoch();
3605 let traffic_controller_metrics =
3606 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3607 let traffic_controller = if let Some(policy_config) = policy_config {
3608 Some(Arc::new(
3609 TrafficController::init(
3610 policy_config,
3611 traffic_controller_metrics,
3612 firewall_config.clone(),
3613 )
3614 .await,
3615 ))
3616 } else {
3617 None
3618 };
3619
3620 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3621 ForkRecoveryState::new(Some(fork_config))
3622 .expect("Failed to initialize fork recovery state")
3623 });
3624
3625 let state = Arc::new(AuthorityState {
3626 name,
3627 secret,
3628 execution_lock: RwLock::new(epoch),
3629 epoch_store: ArcSwap::new(epoch_store.clone()),
3630 input_loader,
3631 execution_cache_trait_pointers,
3632 indexes,
3633 rpc_index,
3634 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3635 checkpoint_store,
3636 committee_store,
3637 execution_scheduler,
3638 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3639 metrics,
3640 _pruner,
3641 _authority_per_epoch_pruner,
3642 db_checkpoint_config: db_checkpoint_config.clone(),
3643 config,
3644 overload_info: AuthorityOverloadInfo::default(),
3645 validator_tx_finalizer,
3646 chain_identifier,
3647 congestion_tracker: Arc::new(CongestionTracker::new()),
3648 traffic_controller,
3649 fork_recovery_state,
3650 });
3651
3652 let state_clone = Arc::downgrade(&state);
3653 spawn_monitored_task!(fix_indexes(state_clone));
3654 let authority_state = Arc::downgrade(&state);
3656 spawn_monitored_task!(execution_process(
3657 authority_state,
3658 rx_ready_certificates,
3659 rx_execution_shutdown,
3660 ));
3661 state
3663 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3664 .expect("Error indexing genesis objects.");
3665
3666 state
3667 }
3668
3669 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3671 &self.execution_cache_trait_pointers.object_cache_reader
3672 }
3673
3674 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3675 &self.execution_cache_trait_pointers.transaction_cache_reader
3676 }
3677
3678 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3679 &self.execution_cache_trait_pointers.cache_writer
3680 }
3681
3682 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3683 &self.execution_cache_trait_pointers.backing_store
3684 }
3685
3686 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3687 &self.execution_cache_trait_pointers.child_object_resolver
3688 }
3689
3690 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3691 &self.execution_cache_trait_pointers.backing_package_store
3692 }
3693
3694 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3695 &self.execution_cache_trait_pointers.object_store
3696 }
3697
3698 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3699 &self.execution_cache_trait_pointers.reconfig_api
3700 }
3701
3702 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3703 &self.execution_cache_trait_pointers.global_state_hash_store
3704 }
3705
3706 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3707 &self.execution_cache_trait_pointers.checkpoint_cache
3708 }
3709
3710 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3711 &self.execution_cache_trait_pointers.state_sync_store
3712 }
3713
3714 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3715 &self.execution_cache_trait_pointers.cache_commit
3716 }
3717
3718 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3719 self.execution_cache_trait_pointers
3720 .testing_api
3721 .database_for_testing()
3722 }
3723
3724 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3725 &self,
3726 config: NodeConfig,
3727 metrics: Arc<AuthorityStorePruningMetrics>,
3728 ) -> anyhow::Result<()> {
3729 use crate::authority::authority_store_pruner::PrunerWatermarks;
3730 let watermarks = Arc::new(PrunerWatermarks::default());
3731 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3732 &self.database_for_testing().perpetual_tables,
3733 &self.checkpoint_store,
3734 self.rpc_index.as_deref(),
3735 None,
3736 config.authority_store_pruning_config,
3737 metrics,
3738 EPOCH_DURATION_MS_FOR_TESTING,
3739 &watermarks,
3740 )
3741 .await
3742 }
3743
3744 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3745 &self.execution_scheduler
3746 }
3747
3748 fn create_owner_index_if_empty(
3749 &self,
3750 genesis_objects: &[Object],
3751 epoch_store: &Arc<AuthorityPerEpochStore>,
3752 ) -> SuiResult {
3753 let Some(index_store) = &self.indexes else {
3754 return Ok(());
3755 };
3756 if !index_store.is_empty() {
3757 return Ok(());
3758 }
3759
3760 let mut new_owners = vec![];
3761 let mut new_dynamic_fields = vec![];
3762 let mut layout_resolver = epoch_store
3763 .executor()
3764 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3765 for o in genesis_objects.iter() {
3766 match o.owner {
3767 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3768 new_owners.push((
3769 (addr, o.id()),
3770 ObjectInfo::new(&o.compute_object_reference(), o),
3771 ))
3772 }
3773 Owner::ObjectOwner(object_id) => {
3774 let id = o.id();
3775 let Some(info) = self.try_create_dynamic_field_info(
3776 o,
3777 &BTreeMap::new(),
3778 layout_resolver.as_mut(),
3779 )?
3780 else {
3781 continue;
3782 };
3783 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3784 }
3785 _ => {}
3786 }
3787 }
3788
3789 index_store.insert_genesis_objects(ObjectIndexChanges {
3790 deleted_owners: vec![],
3791 deleted_dynamic_fields: vec![],
3792 new_owners,
3793 new_dynamic_fields,
3794 })
3795 }
3796
3797 pub fn execution_lock_for_executable_transaction(
3801 &self,
3802 transaction: &VerifiedExecutableTransaction,
3803 ) -> Option<ExecutionLockReadGuard<'_>> {
3804 let lock = self.execution_lock.try_read().ok()?;
3805 if *lock == transaction.auth_sig().epoch() {
3806 Some(lock)
3807 } else {
3808 None
3810 }
3811 }
3812
3813 pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3818 self.execution_lock
3819 .try_read()
3820 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3821 }
3822
3823 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3824 self.execution_lock.write().await
3825 }
3826
3827 #[instrument(level = "error", skip_all)]
3828 pub async fn reconfigure(
3829 &self,
3830 cur_epoch_store: &AuthorityPerEpochStore,
3831 supported_protocol_versions: SupportedProtocolVersions,
3832 new_committee: Committee,
3833 epoch_start_configuration: EpochStartConfiguration,
3834 state_hasher: Arc<GlobalStateHasher>,
3835 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3836 epoch_last_checkpoint: CheckpointSequenceNumber,
3837 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3838 Self::check_protocol_version(
3839 supported_protocol_versions,
3840 epoch_start_configuration
3841 .epoch_start_state()
3842 .protocol_version(),
3843 );
3844
3845 self.committee_store.insert_new_committee(&new_committee)?;
3846
3847 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3849
3850 cur_epoch_store.epoch_terminated().await;
3852
3853 {
3857 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3858 if state.should_accept_user_certs() {
3859 cur_epoch_store.close_user_certs(state);
3866 }
3867 }
3869
3870 self.get_reconfig_api()
3871 .clear_state_end_of_epoch(&execution_lock);
3872 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3873 self.maybe_reaccumulate_state_hash(
3874 cur_epoch_store,
3875 epoch_start_configuration
3876 .epoch_start_state()
3877 .protocol_version(),
3878 );
3879 self.get_reconfig_api()
3880 .set_epoch_start_configuration(&epoch_start_configuration);
3881 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3882 && self
3883 .db_checkpoint_config
3884 .perform_db_checkpoints_at_epoch_end
3885 {
3886 let checkpoint_indexes = self
3887 .db_checkpoint_config
3888 .perform_index_db_checkpoints_at_epoch_end
3889 .unwrap_or(false);
3890 let current_epoch = cur_epoch_store.epoch();
3891 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3892 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3893 }
3894
3895 self.get_reconfig_api()
3896 .reconfigure_cache(&epoch_start_configuration)
3897 .await;
3898
3899 let new_epoch = new_committee.epoch;
3900 let new_epoch_store = self
3901 .reopen_epoch_db(
3902 cur_epoch_store,
3903 new_committee,
3904 epoch_start_configuration,
3905 expensive_safety_check_config,
3906 epoch_last_checkpoint,
3907 )
3908 .await?;
3909 assert_eq!(new_epoch_store.epoch(), new_epoch);
3910 self.execution_scheduler
3911 .reconfigure(&new_epoch_store, self.get_child_object_resolver());
3912 *execution_lock = new_epoch;
3913 Ok(new_epoch_store)
3917 }
3918
3919 pub async fn settle_transactions_for_testing(
3920 &self,
3921 ckpt_seq: CheckpointSequenceNumber,
3922 effects: &[TransactionEffects],
3923 ) {
3924 let builder = AccumulatorSettlementTxBuilder::new(
3925 Some(self.get_transaction_cache_reader().as_ref()),
3926 effects,
3927 ckpt_seq,
3928 0,
3929 );
3930 let epoch_store = self.epoch_store_for_testing();
3931 let epoch = epoch_store.epoch();
3932 let (settlements, barrier) = builder.build_tx(
3933 epoch_store.protocol_config(),
3934 epoch,
3935 epoch_store
3936 .epoch_start_config()
3937 .accumulator_root_obj_initial_shared_version()
3938 .unwrap(),
3939 ckpt_seq,
3940 ckpt_seq,
3941 );
3942
3943 let mut settlements: Vec<_> = settlements
3944 .into_iter()
3945 .map(|tx| {
3946 VerifiedExecutableTransaction::new_system(
3947 VerifiedTransaction::new_system_transaction(tx),
3948 epoch,
3949 )
3950 })
3951 .collect();
3952 let barrier = VerifiedExecutableTransaction::new_system(
3953 VerifiedTransaction::new_system_transaction(barrier),
3954 epoch,
3955 );
3956
3957 settlements.push(barrier);
3958
3959 let assigned_versions = epoch_store
3960 .assign_shared_object_versions_for_tests(
3961 self.get_object_cache_reader().as_ref(),
3962 &settlements,
3963 )
3964 .unwrap();
3965
3966 let version_map = assigned_versions.into_map();
3967
3968 for tx in settlements {
3969 let env = ExecutionEnv::new()
3970 .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
3971 let (effects, _) = self
3972 .try_execute_immediately(&tx, env, &epoch_store)
3973 .await
3974 .unwrap();
3975 assert!(effects.status().is_ok());
3976 }
3977 }
3978
3979 pub async fn reconfigure_for_testing(&self) {
3983 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3984 let epoch_store = self.epoch_store_for_testing().clone();
3985 let protocol_config = epoch_store.protocol_config().clone();
3986 let _guard =
3993 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3994 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3995 self.get_backing_package_store().clone(),
3996 self.get_object_store().clone(),
3997 &self.config.expensive_safety_check_config,
3998 self.checkpoint_store
3999 .get_epoch_last_checkpoint(epoch_store.epoch())
4000 .unwrap()
4001 .map(|c| *c.sequence_number())
4002 .unwrap_or_default(),
4003 );
4004 self.execution_scheduler
4005 .reconfigure(&new_epoch_store, self.get_child_object_resolver());
4006 let new_epoch = new_epoch_store.epoch();
4007 self.epoch_store.store(new_epoch_store);
4008 epoch_store.epoch_terminated().await;
4009
4010 *execution_lock = new_epoch;
4011 }
4012
4013 #[instrument(level = "error", skip_all)]
4016 fn maybe_reaccumulate_state_hash(
4017 &self,
4018 cur_epoch_store: &AuthorityPerEpochStore,
4019 new_protocol_version: ProtocolVersion,
4020 ) {
4021 self.get_reconfig_api()
4022 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4023 }
4024
4025 #[instrument(level = "error", skip_all)]
4026 fn check_system_consistency(
4027 &self,
4028 cur_epoch_store: &AuthorityPerEpochStore,
4029 state_hasher: Arc<GlobalStateHasher>,
4030 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4031 ) {
4032 info!(
4033 "Performing sui conservation consistency check for epoch {}",
4034 cur_epoch_store.epoch()
4035 );
4036
4037 if let Err(err) = self
4038 .get_reconfig_api()
4039 .expensive_check_sui_conservation(cur_epoch_store)
4040 {
4041 if cfg!(debug_assertions) {
4042 panic!("{}", err);
4043 } else {
4044 warn!("Sui conservation consistency check failed: {}", err);
4047 }
4048 } else {
4049 info!("Sui conservation consistency check passed");
4050 }
4051
4052 if expensive_safety_check_config.enable_state_consistency_check() {
4054 info!(
4055 "Performing state consistency check for epoch {}",
4056 cur_epoch_store.epoch()
4057 );
4058 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4059 }
4060
4061 if expensive_safety_check_config.enable_secondary_index_checks()
4062 && let Some(indexes) = self.indexes.clone()
4063 {
4064 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4065 .expect("secondary indexes are inconsistent");
4066 }
4067 }
4068
4069 fn expensive_check_is_consistent_state(
4070 &self,
4071 state_hasher: Arc<GlobalStateHasher>,
4072 cur_epoch_store: &AuthorityPerEpochStore,
4073 ) {
4074 let live_object_set_hash = state_hasher.digest_live_object_set(
4075 !cur_epoch_store
4076 .protocol_config()
4077 .simplified_unwrap_then_delete(),
4078 );
4079
4080 let root_state_hash: ECMHLiveObjectSetDigest = self
4081 .get_global_state_hash_store()
4082 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4083 .expect("Retrieving root state hash cannot fail")
4084 .expect("Root state hash for epoch must exist")
4085 .1
4086 .digest()
4087 .into();
4088
4089 let is_inconsistent = root_state_hash != live_object_set_hash;
4090 if is_inconsistent {
4091 debug_fatal!(
4092 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4093 root_state_hash,
4094 live_object_set_hash
4095 );
4096 } else {
4097 info!("State consistency check passed");
4098 }
4099
4100 state_hasher.set_inconsistent_state(is_inconsistent);
4101 }
4102
4103 pub fn current_epoch_for_testing(&self) -> EpochId {
4104 self.epoch_store_for_testing().epoch()
4105 }
4106
4107 #[instrument(level = "error", skip_all)]
4108 pub fn checkpoint_all_dbs(
4109 &self,
4110 checkpoint_path: &Path,
4111 cur_epoch_store: &AuthorityPerEpochStore,
4112 checkpoint_indexes: bool,
4113 ) -> SuiResult {
4114 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4115 let current_epoch = cur_epoch_store.epoch();
4116
4117 if checkpoint_path.exists() {
4118 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4119 return Ok(());
4120 }
4121
4122 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4123 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4124
4125 if checkpoint_path_tmp.exists() {
4126 fs::remove_dir_all(&checkpoint_path_tmp)
4127 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4128 }
4129
4130 fs::create_dir_all(&checkpoint_path_tmp)
4131 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4132 fs::create_dir(&store_checkpoint_path_tmp)
4133 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4134
4135 self.checkpoint_store
4138 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4139
4140 self.get_reconfig_api()
4141 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4142
4143 self.committee_store
4144 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4145
4146 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4147 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4148 }
4149
4150 fs::rename(checkpoint_path_tmp, checkpoint_path)
4151 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4152 Ok(())
4153 }
4154
4155 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4161 self.epoch_store.load()
4162 }
4163
4164 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4166 self.load_epoch_store_one_call_per_task()
4167 }
4168
4169 pub fn clone_committee_for_testing(&self) -> Committee {
4170 Committee::clone(self.epoch_store_for_testing().committee())
4171 }
4172
4173 #[instrument(level = "trace", skip_all)]
4174 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4175 self.get_object_store().get_object(object_id)
4176 }
4177
4178 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4179 Ok(self
4180 .get_object(&SUI_SYSTEM_ADDRESS.into())
4181 .await
4182 .expect("framework object should always exist")
4183 .compute_object_reference())
4184 }
4185
4186 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4188 self.get_object_cache_reader()
4189 .get_sui_system_state_object_unsafe()
4190 }
4191
4192 #[instrument(level = "trace", skip_all)]
4193 fn get_transaction_checkpoint_sequence(
4194 &self,
4195 digest: &TransactionDigest,
4196 epoch_store: &AuthorityPerEpochStore,
4197 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4198 epoch_store.get_transaction_checkpoint(digest)
4199 }
4200
4201 #[instrument(level = "trace", skip_all)]
4202 pub fn get_checkpoint_by_sequence_number(
4203 &self,
4204 sequence_number: CheckpointSequenceNumber,
4205 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4206 Ok(self
4207 .checkpoint_store
4208 .get_checkpoint_by_sequence_number(sequence_number)?)
4209 }
4210
4211 #[instrument(level = "trace", skip_all)]
4212 pub fn get_transaction_checkpoint_for_tests(
4213 &self,
4214 digest: &TransactionDigest,
4215 epoch_store: &AuthorityPerEpochStore,
4216 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4217 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4218 let Some(checkpoint) = checkpoint else {
4219 return Ok(None);
4220 };
4221 let checkpoint = self
4222 .checkpoint_store
4223 .get_checkpoint_by_sequence_number(checkpoint)?;
4224 Ok(checkpoint)
4225 }
4226
4227 #[instrument(level = "trace", skip_all)]
4228 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4229 Ok(
4230 match self
4231 .get_object_cache_reader()
4232 .get_latest_object_or_tombstone(*object_id)
4233 {
4234 Some((_, ObjectOrTombstone::Object(object))) => {
4235 let layout = self.get_object_layout(&object)?;
4236 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4237 }
4238 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4239 None => ObjectRead::NotExists(*object_id),
4240 },
4241 )
4242 }
4243
4244 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4246 self.chain_identifier
4247 }
4248
4249 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4250 self.fork_recovery_state.as_ref()
4251 }
4252
4253 #[instrument(level = "trace", skip_all)]
4254 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4255 where
4256 T: DeserializeOwned,
4257 {
4258 let o = self.get_object_read(object_id)?.into_object()?;
4259 if let Some(move_object) = o.data.try_as_move() {
4260 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4261 SuiErrorKind::ObjectDeserializationError {
4262 error: format!("{e}"),
4263 }
4264 })?)
4265 } else {
4266 Err(SuiErrorKind::ObjectDeserializationError {
4267 error: format!("Provided object : [{object_id}] is not a Move object."),
4268 }
4269 .into())
4270 }
4271 }
4272
4273 #[instrument(level = "trace", skip_all)]
4279 pub fn get_past_object_read(
4280 &self,
4281 object_id: &ObjectID,
4282 version: SequenceNumber,
4283 ) -> SuiResult<PastObjectRead> {
4284 let Some(obj_ref) = self
4286 .get_object_cache_reader()
4287 .get_latest_object_ref_or_tombstone(*object_id)
4288 else {
4289 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4290 };
4291
4292 if version > obj_ref.1 {
4293 return Ok(PastObjectRead::VersionTooHigh {
4294 object_id: *object_id,
4295 asked_version: version,
4296 latest_version: obj_ref.1,
4297 });
4298 }
4299
4300 if version < obj_ref.1 {
4301 return Ok(match self.read_object_at_version(object_id, version)? {
4303 Some((object, layout)) => {
4304 let obj_ref = object.compute_object_reference();
4305 PastObjectRead::VersionFound(obj_ref, object, layout)
4306 }
4307
4308 None => PastObjectRead::VersionNotFound(*object_id, version),
4309 });
4310 }
4311
4312 if !obj_ref.2.is_alive() {
4313 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4314 }
4315
4316 match self.read_object_at_version(object_id, obj_ref.1)? {
4317 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4318 None => {
4319 debug_fatal!(
4320 "Object with in parent_entry is missing from object store, datastore is \
4321 inconsistent"
4322 );
4323 Err(UserInputError::ObjectNotFound {
4324 object_id: *object_id,
4325 version: Some(obj_ref.1),
4326 }
4327 .into())
4328 }
4329 }
4330 }
4331
4332 #[instrument(level = "trace", skip_all)]
4333 fn read_object_at_version(
4334 &self,
4335 object_id: &ObjectID,
4336 version: SequenceNumber,
4337 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4338 let Some(object) = self
4339 .get_object_cache_reader()
4340 .get_object_by_key(object_id, version)
4341 else {
4342 return Ok(None);
4343 };
4344
4345 let layout = self.get_object_layout(&object)?;
4346 Ok(Some((object, layout)))
4347 }
4348
4349 fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4350 let layout = object
4351 .data
4352 .try_as_move()
4353 .map(|object| {
4354 into_struct_layout(
4355 self.load_epoch_store_one_call_per_task()
4356 .executor()
4357 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4359 .get_annotated_layout(&object.type_().clone().into())?,
4360 )
4361 })
4362 .transpose()?;
4363 Ok(layout)
4364 }
4365
4366 fn get_owner_at_version(
4367 &self,
4368 object_id: &ObjectID,
4369 version: SequenceNumber,
4370 ) -> SuiResult<Owner> {
4371 self.get_object_store()
4372 .get_object_by_key(object_id, version)
4373 .ok_or_else(|| {
4374 SuiError::from(UserInputError::ObjectNotFound {
4375 object_id: *object_id,
4376 version: Some(version),
4377 })
4378 })
4379 .map(|o| o.owner.clone())
4380 }
4381
4382 #[instrument(level = "trace", skip_all)]
4383 pub fn get_owner_objects(
4384 &self,
4385 owner: SuiAddress,
4386 cursor: Option<ObjectID>,
4388 limit: usize,
4389 filter: Option<SuiObjectDataFilter>,
4390 ) -> SuiResult<Vec<ObjectInfo>> {
4391 if let Some(indexes) = &self.indexes {
4392 indexes.get_owner_objects(owner, cursor, limit, filter)
4393 } else {
4394 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4395 }
4396 }
4397
4398 #[instrument(level = "trace", skip_all)]
4399 pub fn get_owned_coins_iterator_with_cursor(
4400 &self,
4401 owner: SuiAddress,
4402 cursor: (String, u64, ObjectID),
4404 limit: usize,
4405 one_coin_type_only: bool,
4406 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4407 if let Some(indexes) = &self.indexes {
4408 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4409 } else {
4410 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4411 }
4412 }
4413
4414 #[instrument(level = "trace", skip_all)]
4415 pub fn get_owner_objects_iterator(
4416 &self,
4417 owner: SuiAddress,
4418 cursor: Option<ObjectID>,
4420 filter: Option<SuiObjectDataFilter>,
4421 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4422 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4423 if let Some(indexes) = &self.indexes {
4424 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4425 } else {
4426 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4427 }
4428 }
4429
4430 #[instrument(level = "trace", skip_all)]
4431 pub async fn get_move_objects<T>(
4432 &self,
4433 owner: SuiAddress,
4434 type_: MoveObjectType,
4435 ) -> SuiResult<Vec<T>>
4436 where
4437 T: DeserializeOwned,
4438 {
4439 let object_ids = self
4440 .get_owner_objects_iterator(owner, None, None)?
4441 .filter(|o| match &o.type_ {
4442 ObjectType::Struct(s) => &type_ == s,
4443 ObjectType::Package => false,
4444 })
4445 .map(|info| ObjectKey(info.object_id, info.version))
4446 .collect::<Vec<_>>();
4447 let mut move_objects = vec![];
4448
4449 let objects = self
4450 .get_object_store()
4451 .multi_get_objects_by_key(&object_ids);
4452
4453 for (o, id) in objects.into_iter().zip(object_ids) {
4454 let object = o.ok_or_else(|| {
4455 SuiError::from(UserInputError::ObjectNotFound {
4456 object_id: id.0,
4457 version: Some(id.1),
4458 })
4459 })?;
4460 let move_object = object.data.try_as_move().ok_or_else(|| {
4461 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4462 })?;
4463 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4464 SuiErrorKind::ObjectDeserializationError {
4465 error: format!("{e}"),
4466 }
4467 })?);
4468 }
4469 Ok(move_objects)
4470 }
4471
4472 #[instrument(level = "trace", skip_all)]
4473 pub fn get_dynamic_fields(
4474 &self,
4475 owner: ObjectID,
4476 cursor: Option<ObjectID>,
4478 limit: usize,
4479 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4480 Ok(self
4481 .get_dynamic_fields_iterator(owner, cursor)?
4482 .take(limit)
4483 .collect::<Result<Vec<_>, _>>()?)
4484 }
4485
4486 fn get_dynamic_fields_iterator(
4487 &self,
4488 owner: ObjectID,
4489 cursor: Option<ObjectID>,
4491 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4492 {
4493 if let Some(indexes) = &self.indexes {
4494 indexes.get_dynamic_fields_iterator(owner, cursor)
4495 } else {
4496 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4497 }
4498 }
4499
4500 #[instrument(level = "trace", skip_all)]
4501 pub fn get_dynamic_field_object_id(
4502 &self,
4503 owner: ObjectID,
4504 name_type: TypeTag,
4505 name_bcs_bytes: &[u8],
4506 ) -> SuiResult<Option<ObjectID>> {
4507 if let Some(indexes) = &self.indexes {
4508 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4509 } else {
4510 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4511 }
4512 }
4513
4514 #[instrument(level = "trace", skip_all)]
4515 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4516 Ok(self.get_indexes()?.next_sequence_number())
4517 }
4518
4519 #[instrument(level = "trace", skip_all)]
4520 pub async fn get_executed_transaction_and_effects(
4521 &self,
4522 digest: TransactionDigest,
4523 kv_store: Arc<TransactionKeyValueStore>,
4524 ) -> SuiResult<(Transaction, TransactionEffects)> {
4525 let transaction = kv_store.get_tx(digest).await?;
4526 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4527 Ok((transaction, effects))
4528 }
4529
4530 #[instrument(level = "trace", skip_all)]
4531 pub fn multi_get_checkpoint_by_sequence_number(
4532 &self,
4533 sequence_numbers: &[CheckpointSequenceNumber],
4534 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4535 Ok(self
4536 .checkpoint_store
4537 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4538 }
4539
4540 #[instrument(level = "trace", skip_all)]
4541 pub fn get_transaction_events(
4542 &self,
4543 digest: &TransactionDigest,
4544 ) -> SuiResult<TransactionEvents> {
4545 self.get_transaction_cache_reader()
4546 .get_events(digest)
4547 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4548 }
4549
4550 pub fn get_transaction_input_objects(
4551 &self,
4552 effects: &TransactionEffects,
4553 ) -> SuiResult<Vec<Object>> {
4554 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4555 .map_err(Into::into)
4556 }
4557
4558 pub fn get_transaction_output_objects(
4559 &self,
4560 effects: &TransactionEffects,
4561 ) -> SuiResult<Vec<Object>> {
4562 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4563 .map_err(Into::into)
4564 }
4565
4566 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4567 match &self.indexes {
4568 Some(i) => Ok(i.clone()),
4569 None => Err(SuiErrorKind::UnsupportedFeatureError {
4570 error: "extended object indexing is not enabled on this server".into(),
4571 }
4572 .into()),
4573 }
4574 }
4575
4576 pub async fn get_transactions_for_tests(
4577 self: &Arc<Self>,
4578 filter: Option<TransactionFilter>,
4579 cursor: Option<TransactionDigest>,
4580 limit: Option<usize>,
4581 reverse: bool,
4582 ) -> SuiResult<Vec<TransactionDigest>> {
4583 let metrics = KeyValueStoreMetrics::new_for_tests();
4584 let kv_store = Arc::new(TransactionKeyValueStore::new(
4585 "rocksdb",
4586 metrics,
4587 self.clone(),
4588 ));
4589 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4590 .await
4591 }
4592
4593 #[instrument(level = "trace", skip_all)]
4594 pub async fn get_transactions(
4595 &self,
4596 kv_store: &Arc<TransactionKeyValueStore>,
4597 filter: Option<TransactionFilter>,
4598 cursor: Option<TransactionDigest>,
4600 limit: Option<usize>,
4601 reverse: bool,
4602 ) -> SuiResult<Vec<TransactionDigest>> {
4603 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4604 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4605 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4606 if reverse {
4607 let iter = iter
4608 .rev()
4609 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4610 .skip(usize::from(cursor.is_some()));
4611 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4612 } else {
4613 let iter = iter
4614 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4615 .skip(usize::from(cursor.is_some()));
4616 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4617 }
4618 }
4619 self.get_indexes()?
4620 .get_transactions(filter, cursor, limit, reverse)
4621 }
4622
4623 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4624 &self.checkpoint_store
4625 }
4626
4627 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4628 self.get_checkpoint_store()
4629 .get_highest_executed_checkpoint_seq_number()?
4630 .ok_or(
4631 SuiErrorKind::UserInputError {
4632 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4633 }
4634 .into(),
4635 )
4636 }
4637
4638 #[cfg(msim)]
4639 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4640 self.database_for_testing()
4641 .perpetual_tables
4642 .get_highest_pruned_checkpoint()
4643 .map(|c| c.unwrap_or(0))
4644 .map_err(Into::into)
4645 }
4646
4647 #[instrument(level = "trace", skip_all)]
4648 pub fn get_checkpoint_summary_by_sequence_number(
4649 &self,
4650 sequence_number: CheckpointSequenceNumber,
4651 ) -> SuiResult<CheckpointSummary> {
4652 let verified_checkpoint = self
4653 .get_checkpoint_store()
4654 .get_checkpoint_by_sequence_number(sequence_number)?;
4655 match verified_checkpoint {
4656 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4657 None => Err(SuiErrorKind::UserInputError {
4658 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4659 }
4660 .into()),
4661 }
4662 }
4663
4664 #[instrument(level = "trace", skip_all)]
4665 pub fn get_checkpoint_summary_by_digest(
4666 &self,
4667 digest: CheckpointDigest,
4668 ) -> SuiResult<CheckpointSummary> {
4669 let verified_checkpoint = self
4670 .get_checkpoint_store()
4671 .get_checkpoint_by_digest(&digest)?;
4672 match verified_checkpoint {
4673 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4674 None => Err(SuiErrorKind::UserInputError {
4675 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4676 }
4677 .into()),
4678 }
4679 }
4680
4681 #[instrument(level = "trace", skip_all)]
4682 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4683 if is_system_package(package_id) {
4684 return self.find_genesis_txn_digest();
4685 }
4686 Ok(self
4687 .get_object_read(&package_id)?
4688 .into_object()?
4689 .previous_transaction)
4690 }
4691
4692 #[instrument(level = "trace", skip_all)]
4693 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4694 let summary = self
4695 .get_verified_checkpoint_by_sequence_number(0)?
4696 .into_message();
4697 let content = self.get_checkpoint_contents(summary.content_digest)?;
4698 let genesis_transaction = content.enumerate_transactions(&summary).next();
4699 Ok(genesis_transaction
4700 .ok_or(SuiErrorKind::UserInputError {
4701 error: UserInputError::GenesisTransactionNotFound,
4702 })?
4703 .1
4704 .transaction)
4705 }
4706
4707 #[instrument(level = "trace", skip_all)]
4708 pub fn get_verified_checkpoint_by_sequence_number(
4709 &self,
4710 sequence_number: CheckpointSequenceNumber,
4711 ) -> SuiResult<VerifiedCheckpoint> {
4712 let verified_checkpoint = self
4713 .get_checkpoint_store()
4714 .get_checkpoint_by_sequence_number(sequence_number)?;
4715 match verified_checkpoint {
4716 Some(verified_checkpoint) => Ok(verified_checkpoint),
4717 None => Err(SuiErrorKind::UserInputError {
4718 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4719 }
4720 .into()),
4721 }
4722 }
4723
4724 #[instrument(level = "trace", skip_all)]
4725 pub fn get_verified_checkpoint_summary_by_digest(
4726 &self,
4727 digest: CheckpointDigest,
4728 ) -> SuiResult<VerifiedCheckpoint> {
4729 let verified_checkpoint = self
4730 .get_checkpoint_store()
4731 .get_checkpoint_by_digest(&digest)?;
4732 match verified_checkpoint {
4733 Some(verified_checkpoint) => Ok(verified_checkpoint),
4734 None => Err(SuiErrorKind::UserInputError {
4735 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4736 }
4737 .into()),
4738 }
4739 }
4740
4741 #[instrument(level = "trace", skip_all)]
4742 pub fn get_checkpoint_contents(
4743 &self,
4744 digest: CheckpointContentsDigest,
4745 ) -> SuiResult<CheckpointContents> {
4746 self.get_checkpoint_store()
4747 .get_checkpoint_contents(&digest)?
4748 .ok_or(
4749 SuiErrorKind::UserInputError {
4750 error: UserInputError::CheckpointContentsNotFound(digest),
4751 }
4752 .into(),
4753 )
4754 }
4755
4756 #[instrument(level = "trace", skip_all)]
4757 pub fn get_checkpoint_contents_by_sequence_number(
4758 &self,
4759 sequence_number: CheckpointSequenceNumber,
4760 ) -> SuiResult<CheckpointContents> {
4761 let verified_checkpoint = self
4762 .get_checkpoint_store()
4763 .get_checkpoint_by_sequence_number(sequence_number)?;
4764 match verified_checkpoint {
4765 Some(verified_checkpoint) => {
4766 let content_digest = verified_checkpoint.into_inner().content_digest;
4767 self.get_checkpoint_contents(content_digest)
4768 }
4769 None => Err(SuiErrorKind::UserInputError {
4770 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4771 }
4772 .into()),
4773 }
4774 }
4775
4776 #[instrument(level = "trace", skip_all)]
4777 pub async fn query_events(
4778 &self,
4779 kv_store: &Arc<TransactionKeyValueStore>,
4780 query: EventFilter,
4781 cursor: Option<EventID>,
4783 limit: usize,
4784 descending: bool,
4785 ) -> SuiResult<Vec<SuiEvent>> {
4786 let index_store = self.get_indexes()?;
4787
4788 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4790 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4791 SuiErrorKind::TransactionNotFound {
4792 digest: cursor.tx_digest,
4793 },
4794 )?;
4795 (tx_seq, cursor.event_seq as usize)
4796 } else if descending {
4797 (u64::MAX, usize::MAX)
4798 } else {
4799 (0, 0)
4800 };
4801
4802 let limit = limit + 1;
4803 let mut event_keys = match query {
4804 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4805 EventFilter::Transaction(digest) => {
4806 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4807 }
4808 EventFilter::MoveModule { package, module } => {
4809 let module_id = ModuleId::new(package.into(), module);
4810 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4811 }
4812 EventFilter::MoveEventType(struct_name) => index_store
4813 .events_by_move_event_struct_name(
4814 &struct_name,
4815 tx_num,
4816 event_num,
4817 limit,
4818 descending,
4819 )?,
4820 EventFilter::Sender(sender) => {
4821 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4822 }
4823 EventFilter::TimeRange {
4824 start_time,
4825 end_time,
4826 } => index_store
4827 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4828 EventFilter::MoveEventModule { package, module } => index_store
4829 .events_by_move_event_module(
4830 &ModuleId::new(package.into(), module),
4831 tx_num,
4832 event_num,
4833 limit,
4834 descending,
4835 )?,
4836 EventFilter::Any(_) => {
4838 return Err(SuiErrorKind::UserInputError {
4839 error: UserInputError::Unsupported(
4840 "'Any' queries are not supported by the fullnode.".to_string(),
4841 ),
4842 }
4843 .into());
4844 }
4845 };
4846
4847 if cursor.is_some() {
4850 if !event_keys.is_empty() {
4851 event_keys.remove(0);
4852 }
4853 } else {
4854 event_keys.truncate(limit - 1);
4855 }
4856
4857 let transaction_digests = event_keys
4859 .iter()
4860 .map(|(_, digest, _, _)| *digest)
4861 .collect::<HashSet<_>>()
4862 .into_iter()
4863 .collect::<Vec<_>>();
4864
4865 let events = kv_store
4866 .multi_get_events_by_tx_digests(&transaction_digests)
4867 .await?;
4868
4869 let events_map: HashMap<_, _> =
4870 transaction_digests.iter().zip(events.into_iter()).collect();
4871
4872 let stored_events = event_keys
4873 .into_iter()
4874 .map(|k| {
4875 (
4876 k,
4877 events_map
4878 .get(&k.1)
4879 .expect("fetched digest is missing")
4880 .clone()
4881 .and_then(|e| e.data.get(k.2).cloned()),
4882 )
4883 })
4884 .map(
4885 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4886 event
4887 .map(|e| (e, tx_digest, event_seq, timestamp))
4888 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4889 },
4890 )
4891 .collect::<Result<Vec<_>, _>>()?;
4892
4893 let epoch_store = self.load_epoch_store_one_call_per_task();
4894 let backing_store = self.get_backing_package_store().as_ref();
4895 let mut layout_resolver = epoch_store
4896 .executor()
4897 .type_layout_resolver(Box::new(backing_store));
4898 let mut events = vec![];
4899 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4900 events.push(SuiEvent::try_from(
4901 e.clone(),
4902 tx_digest,
4903 event_seq as u64,
4904 Some(timestamp),
4905 layout_resolver.get_annotated_layout(&e.type_)?,
4906 )?)
4907 }
4908 Ok(events)
4909 }
4910
4911 pub async fn insert_genesis_object(&self, object: Object) {
4912 self.get_reconfig_api().insert_genesis_object(object);
4913 }
4914
4915 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4916 futures::future::join_all(
4917 objects
4918 .iter()
4919 .map(|o| self.insert_genesis_object(o.clone())),
4920 )
4921 .await;
4922 }
4923
4924 #[instrument(level = "trace", skip_all)]
4926 pub fn get_transaction_status(
4927 &self,
4928 transaction_digest: &TransactionDigest,
4929 epoch_store: &Arc<AuthorityPerEpochStore>,
4930 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
4931 if let Some(effects) =
4933 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4934 {
4935 if let Some(transaction) = self
4936 .get_transaction_cache_reader()
4937 .get_transaction_block(transaction_digest)
4938 {
4939 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4940 let events = if effects.events_digest().is_some() {
4941 self.get_transaction_events(effects.transaction_digest())?
4942 } else {
4943 TransactionEvents::default()
4944 };
4945 return Ok(Some((
4946 (*transaction).clone().into_message(),
4947 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4948 )));
4949 } else {
4950 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4954 }
4955 }
4956 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4957 self.metrics.tx_already_processed.inc();
4958 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4959 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4960 } else {
4961 Ok(None)
4962 }
4963 }
4964
4965 #[instrument(level = "trace", skip_all)]
4969 pub fn get_signed_effects_and_maybe_resign(
4970 &self,
4971 transaction_digest: &TransactionDigest,
4972 epoch_store: &Arc<AuthorityPerEpochStore>,
4973 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
4974 let effects = self
4975 .get_transaction_cache_reader()
4976 .get_executed_effects(transaction_digest);
4977 match effects {
4978 Some(effects) => {
4979 if effects.executed_epoch() != epoch_store.epoch() {
4999 debug!(
5000 tx_digest=?transaction_digest,
5001 effects_epoch=?effects.executed_epoch(),
5002 epoch=?epoch_store.epoch(),
5003 "Re-signing the effects with the current epoch"
5004 );
5005 }
5006 Ok(Some(self.sign_effects(effects, epoch_store)?))
5007 }
5008 None => Ok(None),
5009 }
5010 }
5011
5012 #[instrument(level = "trace", skip_all)]
5013 pub(crate) fn sign_effects(
5014 &self,
5015 effects: TransactionEffects,
5016 epoch_store: &Arc<AuthorityPerEpochStore>,
5017 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5018 let tx_digest = *effects.transaction_digest();
5019 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5020 Some(sig) => {
5021 debug_assert!(sig.epoch == epoch_store.epoch());
5022 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5023 }
5024 _ => {
5025 let sig = AuthoritySignInfo::new(
5026 epoch_store.epoch(),
5027 &effects,
5028 Intent::sui_app(IntentScope::TransactionEffects),
5029 self.name,
5030 &*self.secret,
5031 );
5032
5033 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5034
5035 epoch_store.insert_effects_digest_and_signature(
5036 &tx_digest,
5037 effects.digest(),
5038 &sig,
5039 )?;
5040
5041 effects
5042 }
5043 };
5044
5045 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5046 signed_effects,
5047 ))
5048 }
5049
5050 #[instrument(level = "trace", skip_all)]
5052 fn fullnode_only_get_tx_coins_for_indexing(
5053 &self,
5054 effects: &TransactionEffects,
5055 inner_temporary_store: &InnerTemporaryStore,
5056 epoch_store: &Arc<AuthorityPerEpochStore>,
5057 ) -> Option<TxCoins> {
5058 if self.indexes.is_none() || self.is_validator(epoch_store) {
5059 return None;
5060 }
5061 let written_coin_objects = inner_temporary_store
5062 .written
5063 .iter()
5064 .filter_map(|(k, v)| {
5065 if v.is_coin() {
5066 Some((*k, v.clone()))
5067 } else {
5068 None
5069 }
5070 })
5071 .collect();
5072 let mut input_coin_objects = inner_temporary_store
5073 .input_objects
5074 .iter()
5075 .filter_map(|(k, v)| {
5076 if v.is_coin() {
5077 Some((*k, v.clone()))
5078 } else {
5079 None
5080 }
5081 })
5082 .collect::<ObjectMap>();
5083
5084 for (object_id, version) in effects.modified_at_versions() {
5088 if inner_temporary_store
5089 .loaded_runtime_objects
5090 .contains_key(&object_id)
5091 && let Some(object) = self
5092 .get_object_store()
5093 .get_object_by_key(&object_id, version)
5094 && object.is_coin()
5095 {
5096 input_coin_objects.insert(object_id, object);
5097 }
5098 }
5099
5100 Some((input_coin_objects, written_coin_objects))
5101 }
5102
5103 #[instrument(level = "trace", skip_all)]
5111 pub async fn get_transaction_lock(
5112 &self,
5113 object_ref: &ObjectRef,
5114 epoch_store: &AuthorityPerEpochStore,
5115 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5116 let lock_info = self
5117 .get_object_cache_reader()
5118 .get_lock(*object_ref, epoch_store)?;
5119 let lock_info = match lock_info {
5120 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5121 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5122 provided_obj_ref: *object_ref,
5123 current_version: locked_ref.1,
5124 }
5125 .into());
5126 }
5127 ObjectLockStatus::Initialized => {
5128 return Ok(None);
5129 }
5130 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5131 };
5132
5133 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5134 }
5135
5136 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5137 self.get_object_cache_reader().get_objects(objects)
5138 }
5139
5140 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5141 self.get_object_cache_reader()
5142 .get_latest_object_ref_or_tombstone(object_id)
5143 }
5144
5145 pub fn set_override_protocol_upgrade_buffer_stake(
5154 &self,
5155 expected_epoch: EpochId,
5156 buffer_stake_bps: u64,
5157 ) -> SuiResult {
5158 let epoch_store = self.load_epoch_store_one_call_per_task();
5159 let actual_epoch = epoch_store.epoch();
5160 if actual_epoch != expected_epoch {
5161 return Err(SuiErrorKind::WrongEpoch {
5162 expected_epoch,
5163 actual_epoch,
5164 }
5165 .into());
5166 }
5167
5168 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5169 }
5170
5171 pub fn clear_override_protocol_upgrade_buffer_stake(
5172 &self,
5173 expected_epoch: EpochId,
5174 ) -> SuiResult {
5175 let epoch_store = self.load_epoch_store_one_call_per_task();
5176 let actual_epoch = epoch_store.epoch();
5177 if actual_epoch != expected_epoch {
5178 return Err(SuiErrorKind::WrongEpoch {
5179 expected_epoch,
5180 actual_epoch,
5181 }
5182 .into());
5183 }
5184
5185 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5186 }
5187
5188 pub async fn get_available_system_packages(
5191 &self,
5192 binary_config: &BinaryConfig,
5193 ) -> Vec<ObjectRef> {
5194 let mut results = vec![];
5195
5196 let system_packages = BuiltInFramework::iter_system_packages();
5197
5198 #[cfg(msim)]
5200 let extra_packages = framework_injection::get_extra_packages(self.name);
5201 #[cfg(msim)]
5202 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5203
5204 for system_package in system_packages {
5205 let modules = system_package.modules().to_vec();
5206 #[cfg(msim)]
5208 let modules =
5209 match framework_injection::get_override_modules(&system_package.id, self.name) {
5210 Some(overrides) if overrides.is_empty() => continue,
5211 Some(overrides) => overrides,
5212 None => modules,
5213 };
5214
5215 let Some(obj_ref) = sui_framework::compare_system_package(
5216 &self.get_object_store(),
5217 &system_package.id,
5218 &modules,
5219 system_package.dependencies.to_vec(),
5220 binary_config,
5221 )
5222 .await
5223 else {
5224 return vec![];
5225 };
5226 results.push(obj_ref);
5227 }
5228
5229 results
5230 }
5231
5232 async fn get_system_package_bytes(
5246 &self,
5247 system_packages: Vec<ObjectRef>,
5248 binary_config: &BinaryConfig,
5249 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5250 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5251 let objects = self.get_objects(&ids).await;
5252
5253 let mut res = Vec::with_capacity(system_packages.len());
5254 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5255 let prev_transaction = match object {
5256 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5257 info!("Framework {} does not need updating", system_package_ref.0);
5259 continue;
5260 }
5261
5262 Some(cur_object) => cur_object.previous_transaction,
5263 None => TransactionDigest::genesis_marker(),
5264 };
5265
5266 #[cfg(msim)]
5267 let SystemPackage {
5268 id: _,
5269 bytes,
5270 dependencies,
5271 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5272 .unwrap_or_else(|| {
5273 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5274 });
5275
5276 #[cfg(not(msim))]
5277 let SystemPackage {
5278 id: _,
5279 bytes,
5280 dependencies,
5281 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5282
5283 let modules: Vec<_> = bytes
5284 .iter()
5285 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5286 .collect();
5287
5288 let new_object = Object::new_system_package(
5289 &modules,
5290 system_package_ref.1,
5291 dependencies.clone(),
5292 prev_transaction,
5293 );
5294
5295 let new_ref = new_object.compute_object_reference();
5296 if new_ref != system_package_ref {
5297 if cfg!(msim) {
5298 debug_fatal!(
5300 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5301 );
5302 } else {
5303 error!(
5304 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5305 );
5306 }
5307 return None;
5308 }
5309
5310 res.push((system_package_ref.1, bytes, dependencies));
5311 }
5312
5313 Some(res)
5314 }
5315
5316 fn is_protocol_version_supported_v1(
5318 current_protocol_version: ProtocolVersion,
5319 proposed_protocol_version: ProtocolVersion,
5320 protocol_config: &ProtocolConfig,
5321 committee: &Committee,
5322 capabilities: Vec<AuthorityCapabilitiesV1>,
5323 mut buffer_stake_bps: u64,
5324 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5325 if proposed_protocol_version > current_protocol_version + 1
5326 && !protocol_config.advance_to_highest_supported_protocol_version()
5327 {
5328 return None;
5329 }
5330
5331 if buffer_stake_bps > 10000 {
5332 warn!("clamping buffer_stake_bps to 10000");
5333 buffer_stake_bps = 10000;
5334 }
5335
5336 let mut desired_upgrades: Vec<_> = capabilities
5339 .into_iter()
5340 .filter_map(|mut cap| {
5341 if cap.available_system_packages.is_empty() {
5343 return None;
5344 }
5345
5346 cap.available_system_packages.sort();
5347
5348 info!(
5349 "validator {:?} supports {:?} with system packages: {:?}",
5350 cap.authority.concise(),
5351 cap.supported_protocol_versions,
5352 cap.available_system_packages,
5353 );
5354
5355 cap.supported_protocol_versions
5359 .is_version_supported(proposed_protocol_version)
5360 .then_some((cap.available_system_packages, cap.authority))
5361 })
5362 .collect();
5363
5364 desired_upgrades.sort();
5366 desired_upgrades
5367 .into_iter()
5368 .chunk_by(|(packages, _authority)| packages.clone())
5369 .into_iter()
5370 .find_map(|(packages, group)| {
5371 assert!(!packages.is_empty());
5373
5374 let mut stake_aggregator: StakeAggregator<(), true> =
5375 StakeAggregator::new(Arc::new(committee.clone()));
5376
5377 for (_, authority) in group {
5378 stake_aggregator.insert_generic(authority, ());
5379 }
5380
5381 let total_votes = stake_aggregator.total_votes();
5382 let quorum_threshold = committee.quorum_threshold();
5383 let f = committee.total_votes() - committee.quorum_threshold();
5384
5385 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5387 let effective_threshold = quorum_threshold + buffer_stake;
5388
5389 info!(
5390 ?total_votes,
5391 ?quorum_threshold,
5392 ?buffer_stake_bps,
5393 ?effective_threshold,
5394 ?proposed_protocol_version,
5395 ?packages,
5396 "support for upgrade"
5397 );
5398
5399 let has_support = total_votes >= effective_threshold;
5400 has_support.then_some((proposed_protocol_version, packages))
5401 })
5402 }
5403
5404 fn is_protocol_version_supported_v2(
5405 current_protocol_version: ProtocolVersion,
5406 proposed_protocol_version: ProtocolVersion,
5407 protocol_config: &ProtocolConfig,
5408 committee: &Committee,
5409 capabilities: Vec<AuthorityCapabilitiesV2>,
5410 mut buffer_stake_bps: u64,
5411 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5412 if proposed_protocol_version > current_protocol_version + 1
5413 && !protocol_config.advance_to_highest_supported_protocol_version()
5414 {
5415 return None;
5416 }
5417
5418 if buffer_stake_bps > 10000 {
5419 warn!("clamping buffer_stake_bps to 10000");
5420 buffer_stake_bps = 10000;
5421 }
5422
5423 let mut desired_upgrades: Vec<_> = capabilities
5426 .into_iter()
5427 .filter_map(|mut cap| {
5428 if cap.available_system_packages.is_empty() {
5430 return None;
5431 }
5432
5433 cap.available_system_packages.sort();
5434
5435 info!(
5436 "validator {:?} supports {:?} with system packages: {:?}",
5437 cap.authority.concise(),
5438 cap.supported_protocol_versions,
5439 cap.available_system_packages,
5440 );
5441
5442 cap.supported_protocol_versions
5446 .get_version_digest(proposed_protocol_version)
5447 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5448 })
5449 .collect();
5450
5451 desired_upgrades.sort();
5453 desired_upgrades
5454 .into_iter()
5455 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5456 .into_iter()
5457 .find_map(|((digest, packages), group)| {
5458 assert!(!packages.is_empty());
5460
5461 let mut stake_aggregator: StakeAggregator<(), true> =
5462 StakeAggregator::new(Arc::new(committee.clone()));
5463
5464 for (_, _, authority) in group {
5465 stake_aggregator.insert_generic(authority, ());
5466 }
5467
5468 let total_votes = stake_aggregator.total_votes();
5469 let quorum_threshold = committee.quorum_threshold();
5470 let f = committee.total_votes() - committee.quorum_threshold();
5471
5472 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5474 let effective_threshold = quorum_threshold + buffer_stake;
5475
5476 info!(
5477 protocol_config_digest = ?digest,
5478 ?total_votes,
5479 ?quorum_threshold,
5480 ?buffer_stake_bps,
5481 ?effective_threshold,
5482 ?proposed_protocol_version,
5483 ?packages,
5484 "support for upgrade"
5485 );
5486
5487 let has_support = total_votes >= effective_threshold;
5488 has_support.then_some((proposed_protocol_version, packages))
5489 })
5490 }
5491
5492 fn choose_protocol_version_and_system_packages_v1(
5494 current_protocol_version: ProtocolVersion,
5495 protocol_config: &ProtocolConfig,
5496 committee: &Committee,
5497 capabilities: Vec<AuthorityCapabilitiesV1>,
5498 buffer_stake_bps: u64,
5499 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5500 let mut next_protocol_version = current_protocol_version;
5501 let mut system_packages = vec![];
5502
5503 while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
5504 current_protocol_version,
5505 next_protocol_version + 1,
5506 protocol_config,
5507 committee,
5508 capabilities.clone(),
5509 buffer_stake_bps,
5510 ) {
5511 next_protocol_version = version;
5512 system_packages = packages;
5513 }
5514
5515 (next_protocol_version, system_packages)
5516 }
5517
5518 fn choose_protocol_version_and_system_packages_v2(
5519 current_protocol_version: ProtocolVersion,
5520 protocol_config: &ProtocolConfig,
5521 committee: &Committee,
5522 capabilities: Vec<AuthorityCapabilitiesV2>,
5523 buffer_stake_bps: u64,
5524 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5525 let mut next_protocol_version = current_protocol_version;
5526 let mut system_packages = vec![];
5527
5528 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5529 current_protocol_version,
5530 next_protocol_version + 1,
5531 protocol_config,
5532 committee,
5533 capabilities.clone(),
5534 buffer_stake_bps,
5535 ) {
5536 next_protocol_version = version;
5537 system_packages = packages;
5538 }
5539
5540 (next_protocol_version, system_packages)
5541 }
5542
5543 #[instrument(level = "debug", skip_all)]
5544 fn create_authenticator_state_tx(
5545 &self,
5546 epoch_store: &Arc<AuthorityPerEpochStore>,
5547 ) -> Option<EndOfEpochTransactionKind> {
5548 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5549 info!("authenticator state transactions not enabled");
5550 return None;
5551 }
5552
5553 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5554 let tx = if authenticator_state_exists {
5555 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5556 let min_epoch =
5557 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5558 let authenticator_obj_initial_shared_version = epoch_store
5559 .epoch_start_config()
5560 .authenticator_obj_initial_shared_version()
5561 .expect("initial version must exist");
5562
5563 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5564 min_epoch,
5565 authenticator_obj_initial_shared_version,
5566 );
5567
5568 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5569
5570 tx
5571 } else {
5572 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5573 info!("Creating AuthenticatorStateCreate tx");
5574 tx
5575 };
5576 Some(tx)
5577 }
5578
5579 #[instrument(level = "debug", skip_all)]
5580 fn create_randomness_state_tx(
5581 &self,
5582 epoch_store: &Arc<AuthorityPerEpochStore>,
5583 ) -> Option<EndOfEpochTransactionKind> {
5584 if !epoch_store.protocol_config().random_beacon() {
5585 info!("randomness state transactions not enabled");
5586 return None;
5587 }
5588
5589 if epoch_store.randomness_state_exists() {
5590 return None;
5591 }
5592
5593 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5594 info!("Creating RandomnessStateCreate tx");
5595 Some(tx)
5596 }
5597
5598 #[instrument(level = "debug", skip_all)]
5599 fn create_accumulator_root_tx(
5600 &self,
5601 epoch_store: &Arc<AuthorityPerEpochStore>,
5602 ) -> Option<EndOfEpochTransactionKind> {
5603 if !epoch_store
5604 .protocol_config()
5605 .create_root_accumulator_object()
5606 {
5607 info!("accumulator root creation not enabled");
5608 return None;
5609 }
5610
5611 if epoch_store.accumulator_root_exists() {
5612 return None;
5613 }
5614
5615 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5616 info!("Creating AccumulatorRootCreate tx");
5617 Some(tx)
5618 }
5619
5620 #[instrument(level = "debug", skip_all)]
5621 fn create_coin_registry_tx(
5622 &self,
5623 epoch_store: &Arc<AuthorityPerEpochStore>,
5624 ) -> Option<EndOfEpochTransactionKind> {
5625 if !epoch_store.protocol_config().enable_coin_registry() {
5626 info!("coin registry not enabled");
5627 return None;
5628 }
5629
5630 if epoch_store.coin_registry_exists() {
5631 return None;
5632 }
5633
5634 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5635 info!("Creating CoinRegistryCreate tx");
5636 Some(tx)
5637 }
5638
5639 #[instrument(level = "debug", skip_all)]
5640 fn create_display_registry_tx(
5641 &self,
5642 epoch_store: &Arc<AuthorityPerEpochStore>,
5643 ) -> Option<EndOfEpochTransactionKind> {
5644 if !epoch_store.protocol_config().enable_display_registry() {
5645 info!("display registry not enabled");
5646 return None;
5647 }
5648
5649 if epoch_store.display_registry_exists() {
5650 return None;
5651 }
5652
5653 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5654 info!("Creating DisplayRegistryCreate tx");
5655 Some(tx)
5656 }
5657
5658 #[instrument(level = "debug", skip_all)]
5659 fn create_bridge_tx(
5660 &self,
5661 epoch_store: &Arc<AuthorityPerEpochStore>,
5662 ) -> Option<EndOfEpochTransactionKind> {
5663 if !epoch_store.protocol_config().enable_bridge() {
5664 info!("bridge not enabled");
5665 return None;
5666 }
5667 if epoch_store.bridge_exists() {
5668 return None;
5669 }
5670 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5671 info!("Creating Bridge Create tx");
5672 Some(tx)
5673 }
5674
5675 #[instrument(level = "debug", skip_all)]
5676 fn init_bridge_committee_tx(
5677 &self,
5678 epoch_store: &Arc<AuthorityPerEpochStore>,
5679 ) -> Option<EndOfEpochTransactionKind> {
5680 if !epoch_store.protocol_config().enable_bridge() {
5681 info!("bridge not enabled");
5682 return None;
5683 }
5684 if !epoch_store
5685 .protocol_config()
5686 .should_try_to_finalize_bridge_committee()
5687 {
5688 info!("should not try to finalize bridge committee yet");
5689 return None;
5690 }
5691 if !epoch_store.bridge_exists() {
5693 return None;
5694 }
5695
5696 if epoch_store.bridge_committee_initiated() {
5697 return None;
5698 }
5699
5700 let bridge_initial_shared_version = epoch_store
5701 .epoch_start_config()
5702 .bridge_obj_initial_shared_version()
5703 .expect("initial version must exist");
5704 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5705 info!("Init Bridge committee tx");
5706 Some(tx)
5707 }
5708
5709 #[instrument(level = "debug", skip_all)]
5710 fn create_deny_list_state_tx(
5711 &self,
5712 epoch_store: &Arc<AuthorityPerEpochStore>,
5713 ) -> Option<EndOfEpochTransactionKind> {
5714 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5715 return None;
5716 }
5717
5718 if epoch_store.coin_deny_list_state_exists() {
5719 return None;
5720 }
5721
5722 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5723 info!("Creating DenyListStateCreate tx");
5724 Some(tx)
5725 }
5726
5727 #[instrument(level = "debug", skip_all)]
5728 fn create_address_alias_state_tx(
5729 &self,
5730 epoch_store: &Arc<AuthorityPerEpochStore>,
5731 ) -> Option<EndOfEpochTransactionKind> {
5732 if !epoch_store.protocol_config().address_aliases() {
5733 info!("address aliases not enabled");
5734 return None;
5735 }
5736
5737 if epoch_store.address_alias_state_exists() {
5738 return None;
5739 }
5740
5741 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5742 info!("Creating AddressAliasStateCreate tx");
5743 Some(tx)
5744 }
5745
5746 #[instrument(level = "debug", skip_all)]
5747 fn create_execution_time_observations_tx(
5748 &self,
5749 epoch_store: &Arc<AuthorityPerEpochStore>,
5750 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5751 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5752 ) -> Option<EndOfEpochTransactionKind> {
5753 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5754 .protocol_config()
5755 .per_object_congestion_control_mode()
5756 else {
5757 return None;
5758 };
5759
5760 let start_checkpoint = std::cmp::max(
5763 last_checkpoint_before_end_of_epoch
5764 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5765 epoch_store
5767 .epoch()
5768 .checked_sub(1)
5769 .map(|prev_epoch| {
5770 self.checkpoint_store
5771 .get_epoch_last_checkpoint_seq_number(prev_epoch)
5772 .expect("typed store must not fail")
5773 .expect(
5774 "sequence number of last checkpoint of preceding epoch must be saved",
5775 )
5776 + 1
5777 })
5778 .unwrap_or(1),
5779 );
5780 info!(
5781 "reading checkpoint range {:?}..={:?}",
5782 start_checkpoint, last_checkpoint_before_end_of_epoch
5783 );
5784 let sequence_numbers =
5785 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5786 let contents_digests: Vec<_> = self
5787 .checkpoint_store
5788 .multi_get_locally_computed_checkpoints(&sequence_numbers)
5789 .expect("typed store must not fail")
5790 .into_iter()
5791 .zip(sequence_numbers)
5792 .map(|(maybe_checkpoint, sequence_number)| {
5793 if let Some(checkpoint) = maybe_checkpoint {
5794 checkpoint.content_digest
5795 } else {
5796 self.checkpoint_store
5799 .get_checkpoint_by_sequence_number(sequence_number)
5800 .expect("typed store must not fail")
5801 .unwrap_or_else(|| {
5802 fatal!("preceding checkpoints must exist by end of epoch")
5803 })
5804 .data()
5805 .content_digest
5806 }
5807 })
5808 .collect();
5809 let tx_digests: Vec<_> = self
5810 .checkpoint_store
5811 .multi_get_checkpoint_content(&contents_digests)
5812 .expect("typed store must not fail")
5813 .into_iter()
5814 .flat_map(|maybe_contents| {
5815 maybe_contents
5816 .expect("preceding checkpoint contents must exist by end of epoch")
5817 .into_inner()
5818 .into_iter()
5819 .map(|digests| digests.transaction)
5820 })
5821 .collect();
5822 let included_execution_time_observations: HashSet<_> = self
5823 .get_transaction_cache_reader()
5824 .multi_get_transaction_blocks(&tx_digests)
5825 .into_iter()
5826 .flat_map(|maybe_tx| {
5827 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5828 .expect("preceding transaction must exist by end of epoch")
5829 .transaction_data()
5830 .kind()
5831 {
5832 #[allow(clippy::unnecessary_to_owned)]
5833 itertools::Either::Left(
5834 ptb.commands
5835 .to_owned()
5836 .into_iter()
5837 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5838 )
5839 } else {
5840 itertools::Either::Right(std::iter::empty())
5841 }
5842 })
5843 .chain(end_of_epoch_observation_keys.into_iter())
5844 .collect();
5845
5846 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5847 epoch_store
5848 .get_end_of_epoch_execution_time_observations()
5849 .filter_and_sort_v1(
5850 |(key, _)| included_execution_time_observations.contains(key),
5851 params.stored_observations_limit.try_into().unwrap(),
5852 ),
5853 );
5854 info!("Creating StoreExecutionTimeObservations tx");
5855 Some(tx)
5856 }
5857
5858 #[instrument(level = "error", skip_all)]
5869 pub async fn create_and_execute_advance_epoch_tx(
5870 &self,
5871 epoch_store: &Arc<AuthorityPerEpochStore>,
5872 gas_cost_summary: &GasCostSummary,
5873 checkpoint: CheckpointSequenceNumber,
5874 epoch_start_timestamp_ms: CheckpointTimestamp,
5875 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5876 last_checkpoint: CheckpointSequenceNumber,
5879 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5880 let mut txns = Vec::new();
5881
5882 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5883 txns.push(tx);
5884 }
5885 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5886 txns.push(tx);
5887 }
5888 if let Some(tx) = self.create_bridge_tx(epoch_store) {
5889 txns.push(tx);
5890 }
5891 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5892 txns.push(tx);
5893 }
5894 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5895 txns.push(tx);
5896 }
5897 if let Some(tx) = self.create_execution_time_observations_tx(
5898 epoch_store,
5899 end_of_epoch_observation_keys,
5900 last_checkpoint,
5901 ) {
5902 txns.push(tx);
5903 }
5904 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5905 txns.push(tx);
5906 }
5907
5908 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5909 txns.push(tx);
5910 }
5911
5912 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5913 txns.push(tx);
5914 }
5915
5916 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
5917 txns.push(tx);
5918 }
5919
5920 let next_epoch = epoch_store.epoch() + 1;
5921
5922 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5923
5924 let (next_epoch_protocol_version, next_epoch_system_packages) =
5925 if epoch_store.protocol_config().authority_capabilities_v2() {
5926 Self::choose_protocol_version_and_system_packages_v2(
5927 epoch_store.protocol_version(),
5928 epoch_store.protocol_config(),
5929 epoch_store.committee(),
5930 epoch_store
5931 .get_capabilities_v2()
5932 .expect("read capabilities from db cannot fail"),
5933 buffer_stake_bps,
5934 )
5935 } else {
5936 Self::choose_protocol_version_and_system_packages_v1(
5937 epoch_store.protocol_version(),
5938 epoch_store.protocol_config(),
5939 epoch_store.committee(),
5940 epoch_store
5941 .get_capabilities_v1()
5942 .expect("read capabilities from db cannot fail"),
5943 buffer_stake_bps,
5944 )
5945 };
5946
5947 let config = epoch_store.protocol_config();
5950 let binary_config = config.binary_config(None);
5951 let Some(next_epoch_system_package_bytes) = self
5952 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5953 .await
5954 else {
5955 if next_epoch_protocol_version <= ProtocolVersion::MAX {
5956 debug_fatal!(
5960 "upgraded system packages {:?} are not locally available, cannot create \
5961 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5962 next_epoch_system_packages
5963 );
5964 } else {
5965 error!(
5966 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5967 next_epoch_protocol_version
5968 );
5969 }
5970 return Err(CheckpointBuilderError::SystemPackagesMissing);
5979 };
5980
5981 let tx = if epoch_store
5982 .protocol_config()
5983 .end_of_epoch_transaction_supported()
5984 {
5985 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5986 next_epoch,
5987 next_epoch_protocol_version,
5988 gas_cost_summary.storage_cost,
5989 gas_cost_summary.computation_cost,
5990 gas_cost_summary.storage_rebate,
5991 gas_cost_summary.non_refundable_storage_fee,
5992 epoch_start_timestamp_ms,
5993 next_epoch_system_package_bytes,
5994 ));
5995
5996 VerifiedTransaction::new_end_of_epoch_transaction(txns)
5997 } else {
5998 VerifiedTransaction::new_change_epoch(
5999 next_epoch,
6000 next_epoch_protocol_version,
6001 gas_cost_summary.storage_cost,
6002 gas_cost_summary.computation_cost,
6003 gas_cost_summary.storage_rebate,
6004 gas_cost_summary.non_refundable_storage_fee,
6005 epoch_start_timestamp_ms,
6006 next_epoch_system_package_bytes,
6007 )
6008 };
6009
6010 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6011 tx.clone(),
6012 epoch_store.epoch(),
6013 checkpoint,
6014 );
6015
6016 let tx_digest = executable_tx.digest();
6017
6018 info!(
6019 ?next_epoch,
6020 ?next_epoch_protocol_version,
6021 ?next_epoch_system_packages,
6022 computation_cost=?gas_cost_summary.computation_cost,
6023 storage_cost=?gas_cost_summary.storage_cost,
6024 storage_rebate=?gas_cost_summary.storage_rebate,
6025 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6026 ?tx_digest,
6027 "Creating advance epoch transaction"
6028 );
6029
6030 fail_point_async!("change_epoch_tx_delay");
6031 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6032
6033 if self
6036 .get_transaction_cache_reader()
6037 .is_tx_already_executed(tx_digest)
6038 {
6039 warn!("change epoch tx has already been executed via state sync");
6040 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6041 }
6042
6043 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6044 else {
6045 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6046 };
6047
6048 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6051 self.get_object_cache_reader().as_ref(),
6052 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6053 )?;
6054
6055 assert_eq!(assigned_versions.0.len(), 1);
6056 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6057
6058 let input_objects = self.read_objects_for_execution(
6059 &tx_lock,
6060 &executable_tx,
6061 assigned_versions,
6062 epoch_store,
6063 )?;
6064
6065 let (transaction_outputs, _timings, _execution_error_opt) = self
6066 .execute_certificate(
6067 &execution_guard,
6068 &executable_tx,
6069 input_objects,
6070 None,
6071 BalanceWithdrawStatus::NoWithdraw,
6072 epoch_store,
6073 )
6074 .unwrap();
6075 let system_obj = get_sui_system_state(&transaction_outputs.written)
6076 .expect("change epoch tx must write to system object");
6077
6078 let effects = transaction_outputs.effects;
6079 self.get_state_sync_store()
6083 .insert_transaction_and_effects(&tx, &effects);
6084
6085 info!(
6086 "Effects summary of the change epoch transaction: {:?}",
6087 effects.summary_for_debug()
6088 );
6089 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6090 assert!(effects.status().is_ok());
6092 Ok((system_obj, effects))
6093 }
6094
6095 #[instrument(level = "error", skip_all)]
6096 async fn reopen_epoch_db(
6097 &self,
6098 cur_epoch_store: &AuthorityPerEpochStore,
6099 new_committee: Committee,
6100 epoch_start_configuration: EpochStartConfiguration,
6101 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6102 epoch_last_checkpoint: CheckpointSequenceNumber,
6103 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6104 let new_epoch = new_committee.epoch;
6105 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6106 assert_eq!(
6107 epoch_start_configuration.epoch_start_state().epoch(),
6108 new_committee.epoch
6109 );
6110 fail_point!("before-open-new-epoch-store");
6111 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6112 self.name,
6113 new_committee,
6114 epoch_start_configuration,
6115 self.get_backing_package_store().clone(),
6116 self.get_object_store().clone(),
6117 expensive_safety_check_config,
6118 epoch_last_checkpoint,
6119 )?;
6120 self.epoch_store.store(new_epoch_store.clone());
6121 Ok(new_epoch_store)
6122 }
6123
6124 #[cfg(test)]
6125 pub(crate) fn iter_live_object_set_for_testing(
6126 &self,
6127 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6128 let include_wrapped_object = !self
6129 .epoch_store_for_testing()
6130 .protocol_config()
6131 .simplified_unwrap_then_delete();
6132 self.get_global_state_hash_store()
6133 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6134 }
6135
6136 #[cfg(test)]
6137 pub(crate) fn shutdown_execution_for_test(&self) {
6138 self.tx_execution_shutdown
6139 .lock()
6140 .take()
6141 .unwrap()
6142 .send(())
6143 .unwrap();
6144 }
6145
6146 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6148 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6149 self.get_object_cache_reader()
6150 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6151 self.get_reconfig_api()
6152 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6153 Ok(())
6154 }
6155}
6156
6157pub struct RandomnessRoundReceiver {
6158 authority_state: Arc<AuthorityState>,
6159 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6160}
6161
6162impl RandomnessRoundReceiver {
6163 pub fn spawn(
6164 authority_state: Arc<AuthorityState>,
6165 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6166 ) -> JoinHandle<()> {
6167 let rrr = RandomnessRoundReceiver {
6168 authority_state,
6169 randomness_rx,
6170 };
6171 spawn_monitored_task!(rrr.run())
6172 }
6173
6174 async fn run(mut self) {
6175 info!("RandomnessRoundReceiver event loop started");
6176
6177 loop {
6178 tokio::select! {
6179 maybe_recv = self.randomness_rx.recv() => {
6180 if let Some((epoch, round, bytes)) = maybe_recv {
6181 self.handle_new_randomness(epoch, round, bytes).await;
6182 } else {
6183 break;
6184 }
6185 },
6186 }
6187 }
6188
6189 info!("RandomnessRoundReceiver event loop ended");
6190 }
6191
6192 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6193 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6194 fail_point_async!("randomness-delay");
6195
6196 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6197 if epoch_store.epoch() != epoch {
6198 warn!(
6199 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6200 epoch_store.epoch()
6201 );
6202 return;
6203 }
6204 let key = TransactionKey::RandomnessRound(epoch, round);
6205 let transaction = VerifiedTransaction::new_randomness_state_update(
6206 epoch,
6207 round,
6208 bytes,
6209 epoch_store
6210 .epoch_start_config()
6211 .randomness_obj_initial_shared_version()
6212 .expect("randomness state obj must exist"),
6213 );
6214 debug!(
6215 "created randomness state update transaction with digest: {:?}",
6216 transaction.digest()
6217 );
6218 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6219 let digest = *transaction.digest();
6220
6221 self.authority_state
6226 .get_cache_commit()
6227 .persist_transaction(&transaction);
6228
6229 if epoch_store.insert_tx_key(key, digest).is_err() {
6231 warn!("epoch ended while handling new randomness");
6232 }
6233
6234 let authority_state = self.authority_state.clone();
6235 spawn_monitored_task!(async move {
6236 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6243 let result = tokio::time::timeout(
6244 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6245 authority_state
6246 .get_transaction_cache_reader()
6247 .notify_read_executed_effects(
6248 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6249 &[digest],
6250 ),
6251 )
6252 .await;
6253 let mut effects = match result {
6254 Ok(result) => result,
6255 Err(_) => {
6256 if cfg!(debug_assertions) {
6257 panic!(
6259 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6260 );
6261 }
6262 warn!(
6263 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6264 );
6265 authority_state
6267 .get_transaction_cache_reader()
6268 .notify_read_executed_effects(
6269 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6270 &[digest],
6271 )
6272 .await
6273 }
6274 };
6275
6276 let effects = effects.pop().expect("should return effects");
6277 if *effects.status() != ExecutionStatus::Success {
6278 fatal!(
6279 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6280 );
6281 }
6282 debug!(
6283 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6284 );
6285 });
6286 }
6287}
6288
6289#[async_trait]
6290impl TransactionKeyValueStoreTrait for AuthorityState {
6291 #[instrument(skip(self))]
6292 async fn multi_get(
6293 &self,
6294 transactions: &[TransactionDigest],
6295 effects: &[TransactionDigest],
6296 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6297 let txns = if !transactions.is_empty() {
6298 self.get_transaction_cache_reader()
6299 .multi_get_transaction_blocks(transactions)
6300 .into_iter()
6301 .map(|t| t.map(|t| (*t).clone().into_inner()))
6302 .collect()
6303 } else {
6304 vec![]
6305 };
6306
6307 let fx = if !effects.is_empty() {
6308 self.get_transaction_cache_reader()
6309 .multi_get_executed_effects(effects)
6310 } else {
6311 vec![]
6312 };
6313
6314 Ok((txns, fx))
6315 }
6316
6317 #[instrument(skip(self))]
6318 async fn multi_get_checkpoints(
6319 &self,
6320 checkpoint_summaries: &[CheckpointSequenceNumber],
6321 checkpoint_contents: &[CheckpointSequenceNumber],
6322 checkpoint_summaries_by_digest: &[CheckpointDigest],
6323 ) -> SuiResult<(
6324 Vec<Option<CertifiedCheckpointSummary>>,
6325 Vec<Option<CheckpointContents>>,
6326 Vec<Option<CertifiedCheckpointSummary>>,
6327 )> {
6328 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6330 let store = self.get_checkpoint_store();
6331 for seq in checkpoint_summaries {
6332 let checkpoint = store
6333 .get_checkpoint_by_sequence_number(*seq)?
6334 .map(|c| c.into_inner());
6335
6336 summaries.push(checkpoint);
6337 }
6338
6339 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6340 for seq in checkpoint_contents {
6341 let checkpoint = store
6342 .get_checkpoint_by_sequence_number(*seq)?
6343 .and_then(|summary| {
6344 store
6345 .get_checkpoint_contents(&summary.content_digest)
6346 .expect("db read cannot fail")
6347 });
6348 contents.push(checkpoint);
6349 }
6350
6351 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6352 for digest in checkpoint_summaries_by_digest {
6353 let checkpoint = store
6354 .get_checkpoint_by_digest(digest)?
6355 .map(|c| c.into_inner());
6356 summaries_by_digest.push(checkpoint);
6357 }
6358 Ok((summaries, contents, summaries_by_digest))
6359 }
6360
6361 #[instrument(skip(self))]
6362 async fn deprecated_get_transaction_checkpoint(
6363 &self,
6364 digest: TransactionDigest,
6365 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6366 Ok(self
6367 .get_checkpoint_cache()
6368 .deprecated_get_transaction_checkpoint(&digest)
6369 .map(|(_epoch, checkpoint)| checkpoint))
6370 }
6371
6372 #[instrument(skip(self))]
6373 async fn get_object(
6374 &self,
6375 object_id: ObjectID,
6376 version: VersionNumber,
6377 ) -> SuiResult<Option<Object>> {
6378 Ok(self
6379 .get_object_cache_reader()
6380 .get_object_by_key(&object_id, version))
6381 }
6382
6383 #[instrument(skip_all)]
6384 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6385 Ok(self
6386 .get_object_cache_reader()
6387 .multi_get_objects_by_key(object_keys))
6388 }
6389
6390 #[instrument(skip(self))]
6391 async fn multi_get_transaction_checkpoint(
6392 &self,
6393 digests: &[TransactionDigest],
6394 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6395 let res = self
6396 .get_checkpoint_cache()
6397 .deprecated_multi_get_transaction_checkpoint(digests);
6398
6399 Ok(res
6400 .into_iter()
6401 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6402 .collect())
6403 }
6404
6405 #[instrument(skip(self))]
6406 async fn multi_get_events_by_tx_digests(
6407 &self,
6408 digests: &[TransactionDigest],
6409 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6410 if digests.is_empty() {
6411 return Ok(vec![]);
6412 }
6413
6414 Ok(self
6415 .get_transaction_cache_reader()
6416 .multi_get_events(digests))
6417 }
6418}
6419
6420#[cfg(msim)]
6421pub mod framework_injection {
6422 use move_binary_format::CompiledModule;
6423 use std::collections::BTreeMap;
6424 use std::{cell::RefCell, collections::BTreeSet};
6425 use sui_framework::{BuiltInFramework, SystemPackage};
6426 use sui_types::base_types::{AuthorityName, ObjectID};
6427 use sui_types::is_system_package;
6428
6429 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6430
6431 thread_local! {
6433 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6434 }
6435
6436 type Framework = Vec<CompiledModule>;
6437
6438 pub type PackageUpgradeCallback =
6439 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6440
6441 enum PackageOverrideConfig {
6442 Global(Framework),
6443 PerValidator(PackageUpgradeCallback),
6444 }
6445
6446 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6447 modules
6448 .iter()
6449 .map(|m| {
6450 let mut buf = Vec::new();
6451 m.serialize_with_version(m.version, &mut buf).unwrap();
6452 buf
6453 })
6454 .collect()
6455 }
6456
6457 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6458 OVERRIDE.with(|bs| {
6459 bs.borrow_mut()
6460 .insert(package_id, PackageOverrideConfig::Global(modules))
6461 });
6462 }
6463
6464 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6465 OVERRIDE.with(|bs| {
6466 bs.borrow_mut()
6467 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6468 });
6469 }
6470
6471 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6472 OVERRIDE.with(|bs| {
6473 let mut new_packages_not_to_include: BTreeSet<_> =
6474 BuiltInFramework::all_package_ids().into_iter().collect();
6475 for pkg in &packages {
6476 new_packages_not_to_include.remove(&pkg.id);
6477 }
6478 for pkg in packages {
6479 bs.borrow_mut()
6480 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6481 }
6482 for empty_pkg in new_packages_not_to_include {
6483 bs.borrow_mut()
6484 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6485 }
6486 });
6487 }
6488
6489 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6490 OVERRIDE.with(|cfg| {
6491 cfg.borrow().get(package_id).and_then(|entry| match entry {
6492 PackageOverrideConfig::Global(framework) => {
6493 Some(compiled_modules_to_bytes(framework))
6494 }
6495 PackageOverrideConfig::PerValidator(func) => {
6496 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6497 }
6498 })
6499 })
6500 }
6501
6502 pub fn get_override_modules(
6503 package_id: &ObjectID,
6504 name: AuthorityName,
6505 ) -> Option<Vec<CompiledModule>> {
6506 OVERRIDE.with(|cfg| {
6507 cfg.borrow().get(package_id).and_then(|entry| match entry {
6508 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6509 PackageOverrideConfig::PerValidator(func) => func(name),
6510 })
6511 })
6512 }
6513
6514 pub fn get_override_system_package(
6515 package_id: &ObjectID,
6516 name: AuthorityName,
6517 ) -> Option<SystemPackage> {
6518 let bytes = get_override_bytes(package_id, name)?;
6519 let dependencies = if is_system_package(*package_id) {
6520 BuiltInFramework::get_package_by_id(package_id)
6521 .dependencies
6522 .to_vec()
6523 } else {
6524 BuiltInFramework::all_package_ids()
6526 };
6527 Some(SystemPackage {
6528 id: *package_id,
6529 bytes,
6530 dependencies,
6531 })
6532 }
6533
6534 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6535 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6536 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6537 cfg.borrow()
6538 .keys()
6539 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6540 .collect()
6541 });
6542
6543 extra
6544 .into_iter()
6545 .map(|package| SystemPackage {
6546 id: package,
6547 bytes: get_override_bytes(&package, name).unwrap(),
6548 dependencies: BuiltInFramework::all_package_ids(),
6549 })
6550 .collect()
6551 }
6552}
6553
6554#[derive(Debug, Serialize, Deserialize, Clone)]
6555pub struct ObjDumpFormat {
6556 pub id: ObjectID,
6557 pub version: VersionNumber,
6558 pub digest: ObjectDigest,
6559 pub object: Object,
6560}
6561
6562impl ObjDumpFormat {
6563 fn new(object: Object) -> Self {
6564 let oref = object.compute_object_reference();
6565 Self {
6566 id: oref.0,
6567 version: oref.1,
6568 digest: oref.2,
6569 object,
6570 }
6571 }
6572}
6573
6574#[derive(Debug, Serialize, Deserialize, Clone)]
6575pub struct NodeStateDump {
6576 pub tx_digest: TransactionDigest,
6577 pub sender_signed_data: SenderSignedData,
6578 pub executed_epoch: u64,
6579 pub reference_gas_price: u64,
6580 pub protocol_version: u64,
6581 pub epoch_start_timestamp_ms: u64,
6582 pub computed_effects: TransactionEffects,
6583 pub expected_effects_digest: TransactionEffectsDigest,
6584 pub relevant_system_packages: Vec<ObjDumpFormat>,
6585 pub shared_objects: Vec<ObjDumpFormat>,
6586 pub loaded_child_objects: Vec<ObjDumpFormat>,
6587 pub modified_at_versions: Vec<ObjDumpFormat>,
6588 pub runtime_reads: Vec<ObjDumpFormat>,
6589 pub input_objects: Vec<ObjDumpFormat>,
6590}
6591
6592impl NodeStateDump {
6593 pub fn new(
6594 tx_digest: &TransactionDigest,
6595 effects: &TransactionEffects,
6596 expected_effects_digest: TransactionEffectsDigest,
6597 object_store: &dyn ObjectStore,
6598 epoch_store: &Arc<AuthorityPerEpochStore>,
6599 inner_temporary_store: &InnerTemporaryStore,
6600 certificate: &VerifiedExecutableTransaction,
6601 ) -> SuiResult<Self> {
6602 let executed_epoch = epoch_store.epoch();
6604 let reference_gas_price = epoch_store.reference_gas_price();
6605 let epoch_start_config = epoch_store.epoch_start_config();
6606 let protocol_version = epoch_store.protocol_version().as_u64();
6607 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6608
6609 let mut relevant_system_packages = Vec::new();
6611 for sys_package_id in BuiltInFramework::all_package_ids() {
6612 if let Some(w) = object_store.get_object(&sys_package_id) {
6613 relevant_system_packages.push(ObjDumpFormat::new(w))
6614 }
6615 }
6616
6617 let mut shared_objects = Vec::new();
6619 for kind in effects.input_consensus_objects() {
6620 match kind {
6621 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6622 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6623 shared_objects.push(ObjDumpFormat::new(w))
6624 }
6625 }
6626 InputConsensusObject::ReadConsensusStreamEnded(..)
6627 | InputConsensusObject::MutateConsensusStreamEnded(..)
6628 | InputConsensusObject::Cancelled(..) => (), }
6630 }
6631
6632 let mut loaded_child_objects = Vec::new();
6635 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6636 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6637 loaded_child_objects.push(ObjDumpFormat::new(w))
6638 }
6639 }
6640
6641 let mut modified_at_versions = Vec::new();
6643 for (id, ver) in effects.modified_at_versions() {
6644 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6645 modified_at_versions.push(ObjDumpFormat::new(w))
6646 }
6647 }
6648
6649 let mut runtime_reads = Vec::new();
6652 for obj in inner_temporary_store
6653 .runtime_packages_loaded_from_db
6654 .values()
6655 {
6656 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6657 }
6658
6659 Ok(Self {
6662 tx_digest: *tx_digest,
6663 executed_epoch,
6664 reference_gas_price,
6665 epoch_start_timestamp_ms,
6666 protocol_version,
6667 relevant_system_packages,
6668 shared_objects,
6669 loaded_child_objects,
6670 modified_at_versions,
6671 runtime_reads,
6672 sender_signed_data: certificate.clone().into_message(),
6673 input_objects: inner_temporary_store
6674 .input_objects
6675 .values()
6676 .map(|o| ObjDumpFormat::new(o.clone()))
6677 .collect(),
6678 computed_effects: effects.clone(),
6679 expected_effects_digest,
6680 })
6681 }
6682
6683 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6684 let mut objects = Vec::new();
6685 objects.extend(self.relevant_system_packages.clone());
6686 objects.extend(self.shared_objects.clone());
6687 objects.extend(self.loaded_child_objects.clone());
6688 objects.extend(self.modified_at_versions.clone());
6689 objects.extend(self.runtime_reads.clone());
6690 objects.extend(self.input_objects.clone());
6691 objects
6692 }
6693
6694 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6695 let file_name = format!(
6696 "{}_{}_NODE_DUMP.json",
6697 self.tx_digest,
6698 AuthorityState::unixtime_now_ms()
6699 );
6700 let mut path = path.to_path_buf();
6701 path.push(&file_name);
6702 let mut file = File::create(path.clone())?;
6703 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6704 Ok(path)
6705 }
6706
6707 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6708 let file = File::open(path)?;
6709 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6710 }
6711}