1use crate::checkpoints::CheckpointBuilderError;
6use crate::checkpoints::CheckpointBuilderResult;
7use crate::congestion_tracker::CongestionTracker;
8use crate::consensus_adapter::ConsensusOverloadChecker;
9use crate::execution_cache::ExecutionCacheTraitPointers;
10use crate::execution_cache::TransactionCacheRead;
11use crate::execution_scheduler::ExecutionScheduler;
12use crate::execution_scheduler::SchedulingSource;
13use crate::jsonrpc_index::CoinIndexKey2;
14use crate::rpc_index::RpcIndexStore;
15use crate::traffic_controller::TrafficController;
16use crate::traffic_controller::metrics::TrafficControllerMetrics;
17use crate::transaction_outputs::TransactionOutputs;
18use crate::verify_indexes::{fix_indexes, verify_indexes};
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22use fastcrypto::encoding::Base58;
23use fastcrypto::encoding::Encoding;
24use fastcrypto::hash::MultisetHash;
25use itertools::Itertools;
26use move_binary_format::CompiledModule;
27use move_binary_format::binary_config::BinaryConfig;
28use move_core_types::annotated_value::MoveStructLayout;
29use move_core_types::language_storage::ModuleId;
30use mysten_common::fatal;
31use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
32use parking_lot::Mutex;
33use prometheus::{
34 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
35 register_histogram_vec_with_registry, register_histogram_with_registry,
36 register_int_counter_vec_with_registry, register_int_counter_with_registry,
37 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
38};
39use serde::de::DeserializeOwned;
40use serde::{Deserialize, Serialize};
41use shared_object_version_manager::AssignedVersions;
42use shared_object_version_manager::Schedulable;
43use std::collections::BTreeMap;
44use std::collections::BTreeSet;
45use std::fs::File;
46use std::io::Write;
47use std::path::{Path, PathBuf};
48use std::sync::atomic::Ordering;
49use std::time::Duration;
50use std::time::Instant;
51use std::time::SystemTime;
52use std::time::UNIX_EPOCH;
53use std::{
54 collections::{HashMap, HashSet},
55 fs,
56 pin::Pin,
57 str::FromStr,
58 sync::Arc,
59 vec,
60};
61use sui_config::NodeConfig;
62use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
63use sui_protocol_config::PerObjectCongestionControlMode;
64use sui_types::crypto::RandomnessRound;
65use sui_types::dynamic_field::visitor as DFV;
66use sui_types::execution::ExecutionOutput;
67use sui_types::execution::ExecutionTimeObservationKey;
68use sui_types::execution::ExecutionTiming;
69use sui_types::execution_params::BalanceWithdrawStatus;
70use sui_types::execution_params::ExecutionOrEarlyError;
71use sui_types::execution_params::get_early_execution_error;
72use sui_types::execution_status::ExecutionStatus;
73use sui_types::inner_temporary_store::PackageStoreWithFallback;
74use sui_types::layout_resolver::LayoutResolver;
75use sui_types::layout_resolver::into_struct_layout;
76use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
77use sui_types::object::bounded_visitor::BoundedVisitor;
78use sui_types::storage::ChildObjectResolver;
79use sui_types::storage::InputKey;
80use sui_types::storage::TrackingBackingStore;
81use sui_types::traffic_control::{
82 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
83};
84use sui_types::transaction_executor::SimulateTransactionResult;
85use sui_types::transaction_executor::TransactionChecks;
86use tap::TapFallible;
87use tokio::sync::RwLock;
88use tokio::sync::mpsc::unbounded_channel;
89use tokio::sync::{mpsc, oneshot};
90use tokio::task::JoinHandle;
91use tokio::time::timeout;
92use tracing::trace;
93use tracing::{debug, error, info, instrument, warn};
94
95use self::authority_store::ExecutionLockWriteGuard;
96use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
97pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
98use mysten_metrics::{monitored_scope, spawn_monitored_task};
99
100use crate::jsonrpc_index::IndexStore;
101use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
102use mysten_common::debug_fatal;
103use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
104use sui_config::genesis::Genesis;
105use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
106use sui_framework::{BuiltInFramework, SystemPackage};
107use sui_json_rpc_types::{
108 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
109 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
110 SuiTransactionBlockEvents, TransactionFilter,
111};
112use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
113use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
114use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
115use sui_types::authenticator_state::get_authenticator_state;
116use sui_types::committee::{EpochId, ProtocolVersion};
117use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
118use sui_types::deny_list_v1::check_coin_deny_list_v1;
119use sui_types::digests::ChainIdentifier;
120use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
121use sui_types::effects::{
122 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
123 TransactionEvents, VerifiedSignedTransactionEffects,
124};
125use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
126use sui_types::event::{Event, EventID};
127use sui_types::executable_transaction::VerifiedExecutableTransaction;
128use sui_types::gas::{GasCostSummary, SuiGasStatus};
129use sui_types::inner_temporary_store::{
130 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
131};
132use sui_types::message_envelope::Message;
133use sui_types::messages_checkpoint::{
134 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
135 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
136 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
137 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
138};
139use sui_types::messages_grpc::{
140 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind,
141 ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
142};
143use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
144use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
145use sui_types::storage::{
146 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
147};
148use sui_types::sui_system_state::SuiSystemStateTrait;
149use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
150use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
151use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
152use sui_types::{
153 SUI_SYSTEM_ADDRESS,
154 base_types::*,
155 committee::Committee,
156 crypto::AuthoritySignature,
157 error::{SuiError, SuiResult},
158 object::{Object, ObjectRead},
159 transaction::*,
160};
161use sui_types::{TypeTag, is_system_package};
162use typed_store::TypedStoreError;
163
164use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
165use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
166use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
167use crate::authority::authority_store_pruner::{
168 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
169};
170use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
171use crate::authority::epoch_start_configuration::EpochStartConfiguration;
172use crate::checkpoints::CheckpointStore;
173use crate::epoch::committee_store::CommitteeStore;
174use crate::execution_cache::{
175 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
176 ObjectCacheRead, StateSyncAPI,
177};
178use crate::execution_driver::execution_process;
179use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
180use crate::metrics::LatencyObserver;
181use crate::metrics::RateTracker;
182use crate::module_cache_metrics::ResolverMetrics;
183use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
184use crate::stake_aggregator::StakeAggregator;
185use crate::subscription_handler::SubscriptionHandler;
186use crate::transaction_input_loader::TransactionInputLoader;
187
188#[cfg(msim)]
189pub use crate::checkpoints::checkpoint_executor::utils::{
190 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
191};
192
193use crate::authority::authority_store_tables::AuthorityPrunerTables;
194use crate::authority_client::NetworkAuthorityClient;
195use crate::validator_tx_finalizer::ValidatorTxFinalizer;
196#[cfg(msim)]
197use sui_types::committee::CommitteeTrait;
198use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
199
200#[cfg(test)]
201#[path = "unit_tests/authority_tests.rs"]
202pub mod authority_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/transaction_tests.rs"]
206pub mod transaction_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/batch_transaction_tests.rs"]
210mod batch_transaction_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/move_integration_tests.rs"]
214pub mod move_integration_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/gas_tests.rs"]
218mod gas_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/batch_verification_tests.rs"]
222mod batch_verification_tests;
223
224#[cfg(test)]
225#[path = "unit_tests/coin_deny_list_tests.rs"]
226mod coin_deny_list_tests;
227
228#[cfg(test)]
229#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
230mod mysticeti_fastpath_execution_tests;
231
232#[cfg(test)]
233#[path = "unit_tests/auth_unit_test_utils.rs"]
234pub mod auth_unit_test_utils;
235
236pub mod authority_test_utils;
237
238pub mod authority_per_epoch_store;
239pub mod authority_per_epoch_store_pruner;
240
241pub mod authority_store_pruner;
242pub mod authority_store_tables;
243pub mod authority_store_types;
244pub mod consensus_tx_status_cache;
245pub mod epoch_start_configuration;
246pub mod execution_time_estimator;
247pub mod shared_object_congestion_tracker;
248pub mod shared_object_version_manager;
249pub mod submitted_transaction_cache;
250pub mod test_authority_builder;
251pub mod transaction_deferral;
252pub mod transaction_reject_reason_cache;
253mod weighted_moving_average;
254
255pub(crate) mod authority_store;
256pub mod backpressure;
257
258pub struct AuthorityMetrics {
260 tx_orders: IntCounter,
261 total_certs: IntCounter,
262 total_cert_attempts: IntCounter,
263 total_effects: IntCounter,
264 pub shared_obj_tx: IntCounter,
266 sponsored_tx: IntCounter,
267 tx_already_processed: IntCounter,
268 num_input_objs: Histogram,
269 num_shared_objects: Histogram,
271 batch_size: Histogram,
272
273 authority_state_handle_transaction_latency: Histogram,
274 authority_state_handle_vote_transaction_latency: Histogram,
275
276 execute_certificate_latency_single_writer: Histogram,
277 execute_certificate_latency_shared_object: Histogram,
278 await_transaction_latency: Histogram,
279
280 internal_execution_latency: Histogram,
281 execution_load_input_objects_latency: Histogram,
282 prepare_certificate_latency: Histogram,
283 commit_certificate_latency: Histogram,
284 db_checkpoint_latency: Histogram,
285
286 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
288 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
289 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
290 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
291
292 pub(crate) execution_driver_executed_transactions: IntCounter,
293 pub(crate) execution_driver_paused_transactions: IntCounter,
294 pub(crate) execution_driver_dispatch_queue: IntGauge,
295 pub(crate) execution_queueing_delay_s: Histogram,
296 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
297 pub(crate) execution_gas_latency_ratio: Histogram,
298
299 pub(crate) skipped_consensus_txns: IntCounter,
300 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
301
302 pub(crate) authority_overload_status: IntGauge,
303 pub(crate) authority_load_shedding_percentage: IntGauge,
304
305 pub(crate) transaction_overload_sources: IntCounterVec,
306
307 post_processing_total_events_emitted: IntCounter,
309 post_processing_total_tx_indexed: IntCounter,
310 post_processing_total_tx_had_event_processed: IntCounter,
311 post_processing_total_failures: IntCounter,
312
313 pub consensus_handler_processed: IntCounterVec,
315 pub consensus_handler_transaction_sizes: HistogramVec,
316 pub consensus_handler_num_low_scoring_authorities: IntGauge,
317 pub consensus_handler_scores: IntGaugeVec,
318 pub consensus_handler_deferred_transactions: IntCounter,
319 pub consensus_handler_congested_transactions: IntCounter,
320 pub consensus_handler_cancelled_transactions: IntCounter,
321 pub consensus_handler_max_object_costs: IntGaugeVec,
322 pub consensus_committed_subdags: IntCounterVec,
323 pub consensus_committed_messages: IntGaugeVec,
324 pub consensus_committed_user_transactions: IntGaugeVec,
325 pub consensus_finalized_user_transactions: IntGaugeVec,
326 pub consensus_rejected_user_transactions: IntGaugeVec,
327 pub consensus_calculated_throughput: IntGauge,
328 pub consensus_calculated_throughput_profile: IntGauge,
329 pub consensus_block_handler_block_processed: IntCounter,
330 pub consensus_block_handler_txn_processed: IntCounterVec,
331 pub consensus_block_handler_fastpath_executions: IntCounter,
332 pub consensus_timestamp_bias: Histogram,
333
334 pub limits_metrics: Arc<LimitsMetrics>,
335
336 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
338
339 pub zklogin_sig_count: IntCounter,
341 pub multisig_sig_count: IntCounter,
343
344 pub execution_queueing_latency: LatencyObserver,
347
348 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
355
356 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
359}
360
361const POSITIVE_INT_BUCKETS: &[f64] = &[
363 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
364 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
365 7000000., 10000000.,
366];
367
368const LATENCY_SEC_BUCKETS: &[f64] = &[
369 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
370 10., 20., 30., 60., 90.,
371];
372
373const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
375 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
376 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
377];
378
379const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
382 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
383 2.0, 10.0, 30.0,
384];
385
386const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
387 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
388 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
389];
390
391pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000;
392
393pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
397
398impl AuthorityMetrics {
399 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
400 let execute_certificate_latency = register_histogram_vec_with_registry!(
401 "authority_state_execute_certificate_latency",
402 "Latency of executing certificates, including waiting for inputs",
403 &["tx_type"],
404 LATENCY_SEC_BUCKETS.to_vec(),
405 registry,
406 )
407 .unwrap();
408
409 let execute_certificate_latency_single_writer =
410 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
411 let execute_certificate_latency_shared_object =
412 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
413
414 Self {
415 tx_orders: register_int_counter_with_registry!(
416 "total_transaction_orders",
417 "Total number of transaction orders",
418 registry,
419 )
420 .unwrap(),
421 total_certs: register_int_counter_with_registry!(
422 "total_transaction_certificates",
423 "Total number of transaction certificates handled",
424 registry,
425 )
426 .unwrap(),
427 total_cert_attempts: register_int_counter_with_registry!(
428 "total_handle_certificate_attempts",
429 "Number of calls to handle_certificate",
430 registry,
431 )
432 .unwrap(),
433 total_effects: register_int_counter_with_registry!(
435 "total_transaction_effects",
436 "Total number of transaction effects produced",
437 registry,
438 )
439 .unwrap(),
440
441 shared_obj_tx: register_int_counter_with_registry!(
442 "num_shared_obj_tx",
443 "Number of transactions involving shared objects",
444 registry,
445 )
446 .unwrap(),
447
448 sponsored_tx: register_int_counter_with_registry!(
449 "num_sponsored_tx",
450 "Number of sponsored transactions",
451 registry,
452 )
453 .unwrap(),
454
455 tx_already_processed: register_int_counter_with_registry!(
456 "num_tx_already_processed",
457 "Number of transaction orders already processed previously",
458 registry,
459 )
460 .unwrap(),
461 num_input_objs: register_histogram_with_registry!(
462 "num_input_objects",
463 "Distribution of number of input TX objects per TX",
464 POSITIVE_INT_BUCKETS.to_vec(),
465 registry,
466 )
467 .unwrap(),
468 num_shared_objects: register_histogram_with_registry!(
469 "num_shared_objects",
470 "Number of shared input objects per TX",
471 POSITIVE_INT_BUCKETS.to_vec(),
472 registry,
473 )
474 .unwrap(),
475 batch_size: register_histogram_with_registry!(
476 "batch_size",
477 "Distribution of size of transaction batch",
478 POSITIVE_INT_BUCKETS.to_vec(),
479 registry,
480 )
481 .unwrap(),
482 authority_state_handle_transaction_latency: register_histogram_with_registry!(
483 "authority_state_handle_transaction_latency",
484 "Latency of handling transactions",
485 LATENCY_SEC_BUCKETS.to_vec(),
486 registry,
487 )
488 .unwrap(),
489 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
490 "authority_state_handle_vote_transaction_latency",
491 "Latency of voting on transactions without signing",
492 LATENCY_SEC_BUCKETS.to_vec(),
493 registry,
494 )
495 .unwrap(),
496 execute_certificate_latency_single_writer,
497 execute_certificate_latency_shared_object,
498 await_transaction_latency: register_histogram_with_registry!(
499 "await_transaction_latency",
500 "Latency of awaiting user transaction execution, including waiting for inputs",
501 LATENCY_SEC_BUCKETS.to_vec(),
502 registry,
503 )
504 .unwrap(),
505 internal_execution_latency: register_histogram_with_registry!(
506 "authority_state_internal_execution_latency",
507 "Latency of actual certificate executions",
508 LATENCY_SEC_BUCKETS.to_vec(),
509 registry,
510 )
511 .unwrap(),
512 execution_load_input_objects_latency: register_histogram_with_registry!(
513 "authority_state_execution_load_input_objects_latency",
514 "Latency of loading input objects for execution",
515 LOW_LATENCY_SEC_BUCKETS.to_vec(),
516 registry,
517 )
518 .unwrap(),
519 prepare_certificate_latency: register_histogram_with_registry!(
520 "authority_state_prepare_certificate_latency",
521 "Latency of executing certificates, before committing the results",
522 LATENCY_SEC_BUCKETS.to_vec(),
523 registry,
524 )
525 .unwrap(),
526 commit_certificate_latency: register_histogram_with_registry!(
527 "authority_state_commit_certificate_latency",
528 "Latency of committing certificate execution results",
529 LATENCY_SEC_BUCKETS.to_vec(),
530 registry,
531 )
532 .unwrap(),
533 db_checkpoint_latency: register_histogram_with_registry!(
534 "db_checkpoint_latency",
535 "Latency of checkpointing dbs",
536 LATENCY_SEC_BUCKETS.to_vec(),
537 registry,
538 ).unwrap(),
539 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
540 "transaction_manager_num_enqueued_certificates",
541 "Current number of certificates enqueued to ExecutionScheduler",
542 &["result"],
543 registry,
544 )
545 .unwrap(),
546 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
547 "transaction_manager_num_pending_certificates",
548 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
549 registry,
550 )
551 .unwrap(),
552 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
553 "transaction_manager_num_executing_certificates",
554 "Number of executing certificates, including queued and actually running certificates",
555 registry,
556 )
557 .unwrap(),
558 authority_overload_status: register_int_gauge_with_registry!(
559 "authority_overload_status",
560 "Whether authority is current experiencing overload and enters load shedding mode.",
561 registry)
562 .unwrap(),
563 authority_load_shedding_percentage: register_int_gauge_with_registry!(
564 "authority_load_shedding_percentage",
565 "The percentage of transactions is shed when the authority is in load shedding mode.",
566 registry)
567 .unwrap(),
568 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
569 "transaction_manager_transaction_queue_age_s",
570 "Time spent in waiting for transaction in the queue",
571 LATENCY_SEC_BUCKETS.to_vec(),
572 registry,
573 )
574 .unwrap(),
575 transaction_overload_sources: register_int_counter_vec_with_registry!(
576 "transaction_overload_sources",
577 "Number of times each source indicates transaction overload.",
578 &["source"],
579 registry)
580 .unwrap(),
581 execution_driver_executed_transactions: register_int_counter_with_registry!(
582 "execution_driver_executed_transactions",
583 "Cumulative number of transaction executed by execution driver",
584 registry,
585 )
586 .unwrap(),
587 execution_driver_paused_transactions: register_int_counter_with_registry!(
588 "execution_driver_paused_transactions",
589 "Cumulative number of transactions paused by execution driver",
590 registry,
591 )
592 .unwrap(),
593 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
594 "execution_driver_dispatch_queue",
595 "Number of transaction pending in execution driver dispatch queue",
596 registry,
597 )
598 .unwrap(),
599 execution_queueing_delay_s: register_histogram_with_registry!(
600 "execution_queueing_delay_s",
601 "Queueing delay between a transaction is ready for execution until it starts executing.",
602 LATENCY_SEC_BUCKETS.to_vec(),
603 registry
604 )
605 .unwrap(),
606 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
607 "prepare_cert_gas_latency_ratio",
608 "The ratio of computation gas divided by VM execution latency.",
609 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
610 registry
611 )
612 .unwrap(),
613 execution_gas_latency_ratio: register_histogram_with_registry!(
614 "execution_gas_latency_ratio",
615 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
616 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 skipped_consensus_txns: register_int_counter_with_registry!(
621 "skipped_consensus_txns",
622 "Total number of consensus transactions skipped",
623 registry,
624 )
625 .unwrap(),
626 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
627 "skipped_consensus_txns_cache_hit",
628 "Total number of consensus transactions skipped because of local cache hit",
629 registry,
630 )
631 .unwrap(),
632 post_processing_total_events_emitted: register_int_counter_with_registry!(
633 "post_processing_total_events_emitted",
634 "Total number of events emitted in post processing",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_tx_indexed: register_int_counter_with_registry!(
639 "post_processing_total_tx_indexed",
640 "Total number of txes indexed in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
645 "post_processing_total_tx_had_event_processed",
646 "Total number of txes finished event processing in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_failures: register_int_counter_with_registry!(
651 "post_processing_total_failures",
652 "Total number of failure in post processing",
653 registry,
654 )
655 .unwrap(),
656 consensus_handler_processed: register_int_counter_vec_with_registry!(
657 "consensus_handler_processed",
658 "Number of transactions processed by consensus handler",
659 &["class"],
660 registry
661 ).unwrap(),
662 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
663 "consensus_handler_transaction_sizes",
664 "Sizes of each type of transactions processed by consensus handler",
665 &["class"],
666 POSITIVE_INT_BUCKETS.to_vec(),
667 registry
668 ).unwrap(),
669 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
670 "consensus_handler_num_low_scoring_authorities",
671 "Number of low scoring authorities based on reputation scores from consensus",
672 registry
673 ).unwrap(),
674 consensus_handler_scores: register_int_gauge_vec_with_registry!(
675 "consensus_handler_scores",
676 "scores from consensus for each authority",
677 &["authority"],
678 registry,
679 ).unwrap(),
680 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
681 "consensus_handler_deferred_transactions",
682 "Number of transactions deferred by consensus handler",
683 registry,
684 ).unwrap(),
685 consensus_handler_congested_transactions: register_int_counter_with_registry!(
686 "consensus_handler_congested_transactions",
687 "Number of transactions deferred by consensus handler due to congestion",
688 registry,
689 ).unwrap(),
690 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
691 "consensus_handler_cancelled_transactions",
692 "Number of transactions cancelled by consensus handler",
693 registry,
694 ).unwrap(),
695 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
696 "consensus_handler_max_congestion_control_object_costs",
697 "Max object costs for congestion control in the current consensus commit",
698 &["commit_type"],
699 registry,
700 ).unwrap(),
701 consensus_committed_subdags: register_int_counter_vec_with_registry!(
702 "consensus_committed_subdags",
703 "Number of committed subdags, sliced by author",
704 &["authority"],
705 registry,
706 ).unwrap(),
707 consensus_committed_messages: register_int_gauge_vec_with_registry!(
708 "consensus_committed_messages",
709 "Total number of committed consensus messages, sliced by author",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
714 "consensus_committed_user_transactions",
715 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_finalized_user_transactions",
721 "Number of user transactions finalized, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
726 "consensus_rejected_user_transactions",
727 "Number of user transactions rejected, sliced by submitter",
728 &["authority"],
729 registry,
730 ).unwrap(),
731 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733 zklogin_sig_count: register_int_counter_with_registry!(
734 "zklogin_sig_count",
735 "Count of zkLogin signatures",
736 registry,
737 )
738 .unwrap(),
739 multisig_sig_count: register_int_counter_with_registry!(
740 "multisig_sig_count",
741 "Count of zkLogin signatures",
742 registry,
743 )
744 .unwrap(),
745 consensus_calculated_throughput: register_int_gauge_with_registry!(
746 "consensus_calculated_throughput",
747 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748 registry,
749 ).unwrap(),
750 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751 "consensus_calculated_throughput_profile",
752 "The current active calculated throughput profile",
753 registry
754 ).unwrap(),
755 consensus_block_handler_block_processed: register_int_counter_with_registry!(
756 "consensus_block_handler_block_processed",
757 "Number of blocks processed by consensus block handler.",
758 registry
759 ).unwrap(),
760 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
761 "consensus_block_handler_txn_processed",
762 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
763 &["outcome"],
764 registry
765 ).unwrap(),
766 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
767 "consensus_block_handler_fastpath_executions",
768 "Number of fastpath transactions sent for execution by consensus transaction handler",
769 registry,
770 ).unwrap(),
771 consensus_timestamp_bias: register_histogram_with_registry!(
772 "consensus_timestamp_bias",
773 "Bias/delay from consensus timestamp to system time",
774 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
775 registry
776 ).unwrap(),
777 execution_queueing_latency: LatencyObserver::new(),
778 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
779 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780 }
781 }
782}
783
784pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
791
792#[derive(Debug, Clone)]
795pub struct ExecutionEnv {
796 pub assigned_versions: AssignedVersions,
798 pub expected_effects_digest: Option<TransactionEffectsDigest>,
801 pub scheduling_source: SchedulingSource,
803 pub withdraw_status: BalanceWithdrawStatus,
805 pub barrier_dependencies: Vec<TransactionDigest>,
808}
809
810impl Default for ExecutionEnv {
811 fn default() -> Self {
812 Self {
813 assigned_versions: Default::default(),
814 expected_effects_digest: None,
815 scheduling_source: SchedulingSource::NonFastPath,
816 withdraw_status: BalanceWithdrawStatus::NoWithdraw,
817 barrier_dependencies: Default::default(),
818 }
819 }
820}
821
822impl ExecutionEnv {
823 pub fn new() -> Self {
824 Default::default()
825 }
826
827 pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
828 self.scheduling_source = scheduling_source;
829 self
830 }
831
832 pub fn with_expected_effects_digest(
833 mut self,
834 expected_effects_digest: TransactionEffectsDigest,
835 ) -> Self {
836 self.expected_effects_digest = Some(expected_effects_digest);
837 self
838 }
839
840 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
841 self.assigned_versions = assigned_versions;
842 self
843 }
844
845 pub fn with_sufficient_balance(mut self) -> Self {
846 self.withdraw_status = BalanceWithdrawStatus::SufficientBalance;
847 self
848 }
849
850 pub fn with_insufficient_balance(mut self) -> Self {
851 self.withdraw_status = BalanceWithdrawStatus::InsufficientBalance;
852 self
853 }
854
855 pub fn with_barrier_dependencies(
856 mut self,
857 barrier_dependencies: BTreeSet<TransactionDigest>,
858 ) -> Self {
859 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
860 self
861 }
862}
863
864#[derive(Debug)]
865pub struct ForkRecoveryState {
866 transaction_overrides:
868 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
869}
870
871impl Default for ForkRecoveryState {
872 fn default() -> Self {
873 Self {
874 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
875 }
876 }
877}
878
879impl ForkRecoveryState {
880 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
881 let Some(config) = config else {
882 return Ok(Self::default());
883 };
884
885 let mut transaction_overrides = HashMap::new();
886 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
887 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
888 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
889 })?;
890 let effects_digest =
891 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
892 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
893 })?;
894 transaction_overrides.insert(tx_digest, effects_digest);
895 }
896
897 Ok(Self {
898 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
899 })
900 }
901
902 pub fn get_transaction_override(
903 &self,
904 tx_digest: &TransactionDigest,
905 ) -> Option<TransactionEffectsDigest> {
906 self.transaction_overrides.read().get(tx_digest).copied()
907 }
908}
909
910pub struct AuthorityState {
911 pub name: AuthorityName,
914 pub secret: StableSyncAuthoritySigner,
916
917 input_loader: TransactionInputLoader,
919 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
920
921 epoch_store: ArcSwap<AuthorityPerEpochStore>,
922
923 execution_lock: RwLock<EpochId>,
928
929 pub indexes: Option<Arc<IndexStore>>,
930 pub rpc_index: Option<Arc<RpcIndexStore>>,
931
932 pub subscription_handler: Arc<SubscriptionHandler>,
933 pub checkpoint_store: Arc<CheckpointStore>,
934
935 committee_store: Arc<CommitteeStore>,
936
937 execution_scheduler: Arc<ExecutionScheduler>,
939
940 #[allow(unused)]
942 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
943
944 pub metrics: Arc<AuthorityMetrics>,
945 _pruner: AuthorityStorePruner,
946 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
947
948 db_checkpoint_config: DBCheckpointConfig,
950
951 pub config: NodeConfig,
952
953 pub overload_info: AuthorityOverloadInfo,
955
956 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
957
958 chain_identifier: ChainIdentifier,
960
961 pub(crate) congestion_tracker: Arc<CongestionTracker>,
962
963 pub traffic_controller: Option<Arc<TrafficController>>,
965
966 fork_recovery_state: Option<ForkRecoveryState>,
968}
969
970impl AuthorityState {
977 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
978 epoch_store.committee().authority_exists(&self.name)
979 }
980
981 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
982 !self.is_validator(epoch_store)
983 }
984
985 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
986 &self.committee_store
987 }
988
989 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
990 self.committee_store.clone()
991 }
992
993 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
994 &self.config.authority_overload_config
995 }
996
997 pub fn get_epoch_state_commitments(
998 &self,
999 epoch: EpochId,
1000 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1001 self.checkpoint_store.get_epoch_state_commitments(epoch)
1002 }
1003
1004 fn handle_transaction_deny_checks(
1005 &self,
1006 transaction: &VerifiedTransaction,
1007 epoch_store: &Arc<AuthorityPerEpochStore>,
1008 ) -> SuiResult<CheckedInputObjects> {
1009 let tx_digest = transaction.digest();
1010 let tx_data = transaction.data().transaction_data();
1011
1012 let input_object_kinds = tx_data.input_objects()?;
1013 let receiving_objects_refs = tx_data.receiving_objects();
1014
1015 sui_transaction_checks::deny::check_transaction_for_signing(
1019 tx_data,
1020 transaction.tx_signatures(),
1021 &input_object_kinds,
1022 &receiving_objects_refs,
1023 &self.config.transaction_deny_config,
1024 self.get_backing_package_store().as_ref(),
1025 )?;
1026
1027 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1028 Some(tx_digest),
1029 &input_object_kinds,
1030 &receiving_objects_refs,
1031 epoch_store.epoch(),
1032 )?;
1033
1034 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1035 epoch_store.protocol_config(),
1036 epoch_store.reference_gas_price(),
1037 tx_data,
1038 input_objects,
1039 &receiving_objects,
1040 &self.metrics.bytecode_verifier_metrics,
1041 &self.config.verifier_signing_config,
1042 )?;
1043
1044 self.handle_coin_deny_list_checks(
1045 tx_data,
1046 &checked_input_objects,
1047 &receiving_objects,
1048 epoch_store,
1049 )?;
1050
1051 Ok(checked_input_objects)
1052 }
1053
1054 fn handle_coin_deny_list_checks(
1055 &self,
1056 tx_data: &TransactionData,
1057 checked_input_objects: &CheckedInputObjects,
1058 receiving_objects: &ReceivingObjects,
1059 epoch_store: &Arc<AuthorityPerEpochStore>,
1060 ) -> SuiResult<()> {
1061 let funds_withdraw_types = tx_data
1062 .get_funds_withdrawals()
1063 .into_iter()
1064 .filter_map(|withdraw| {
1065 withdraw
1066 .type_arg
1067 .get_balance_type_param()
1068 .unwrap()
1070 .map(|ty| ty.to_canonical_string(false))
1071 })
1072 .collect::<BTreeSet<_>>();
1073
1074 if epoch_store.coin_deny_list_v1_enabled() {
1075 check_coin_deny_list_v1(
1076 tx_data.sender(),
1077 checked_input_objects,
1078 receiving_objects,
1079 funds_withdraw_types.clone(),
1080 &self.get_object_store(),
1081 )?;
1082 }
1083
1084 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1085 check_coin_deny_list_v2_during_signing(
1086 tx_data.sender(),
1087 checked_input_objects,
1088 receiving_objects,
1089 funds_withdraw_types.clone(),
1090 &self.get_object_store(),
1091 )?;
1092 }
1093
1094 Ok(())
1095 }
1096
1097 #[instrument(level = "trace", skip_all)]
1100 fn handle_transaction_impl(
1101 &self,
1102 transaction: VerifiedTransaction,
1103 sign: bool,
1104 epoch_store: &Arc<AuthorityPerEpochStore>,
1105 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1106 let _execution_lock = self.execution_lock_for_signing()?;
1108
1109 let checked_input_objects =
1110 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1111
1112 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1113
1114 let tx_digest = *transaction.digest();
1115 let signed_transaction = if sign {
1116 Some(VerifiedSignedTransaction::new(
1117 epoch_store.epoch(),
1118 transaction,
1119 self.name,
1120 &*self.secret,
1121 ))
1122 } else {
1123 None
1124 };
1125
1126 self.get_cache_writer().acquire_transaction_locks(
1131 epoch_store,
1132 &owned_objects,
1133 tx_digest,
1134 signed_transaction.clone(),
1135 )?;
1136
1137 Ok(signed_transaction)
1138 }
1139
1140 #[instrument(level = "trace", skip_all)]
1142 pub async fn handle_transaction(
1143 &self,
1144 epoch_store: &Arc<AuthorityPerEpochStore>,
1145 transaction: VerifiedTransaction,
1146 ) -> SuiResult<HandleTransactionResponse> {
1147 let tx_digest = *transaction.digest();
1148 debug!("handle_transaction");
1149
1150 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1152 return Ok(HandleTransactionResponse { status });
1153 }
1154
1155 let _metrics_guard = self
1156 .metrics
1157 .authority_state_handle_transaction_latency
1158 .start_timer();
1159 self.metrics.tx_orders.inc();
1160
1161 if epoch_store.protocol_config().mysticeti_fastpath()
1162 && !self
1163 .wait_for_fastpath_dependency_objects(&transaction, epoch_store.epoch())
1164 .await?
1165 {
1166 debug!("fastpath input objects are still unavailable after waiting");
1167 }
1169
1170 self.handle_sign_transaction(epoch_store, transaction).await
1171 }
1172
1173 pub async fn handle_sign_transaction(
1175 &self,
1176 epoch_store: &Arc<AuthorityPerEpochStore>,
1177 transaction: VerifiedTransaction,
1178 ) -> SuiResult<HandleTransactionResponse> {
1179 let tx_digest = *transaction.digest();
1180 let signed = self.handle_transaction_impl(transaction, true, epoch_store);
1181 match signed {
1182 Ok(Some(s)) => {
1183 if self.is_validator(epoch_store)
1184 && let Some(validator_tx_finalizer) = &self.validator_tx_finalizer
1185 {
1186 let tx = s.clone();
1187 let validator_tx_finalizer = validator_tx_finalizer.clone();
1188 let cache_reader = self.get_transaction_cache_reader().clone();
1189 let epoch_store = epoch_store.clone();
1190 spawn_monitored_task!(epoch_store.within_alive_epoch(
1191 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1192 ));
1193 }
1194 Ok(HandleTransactionResponse {
1195 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1196 })
1197 }
1198 Ok(None) => panic!("handle_transaction_impl should return a signed transaction"),
1199 Err(err) => Ok(HandleTransactionResponse {
1203 status: self
1204 .get_transaction_status(&tx_digest, epoch_store)?
1205 .ok_or(err)?
1206 .1,
1207 }),
1208 }
1209 }
1210
1211 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1221 pub(crate) fn handle_vote_transaction(
1222 &self,
1223 epoch_store: &Arc<AuthorityPerEpochStore>,
1224 transaction: VerifiedTransaction,
1225 ) -> SuiResult<()> {
1226 debug!("handle_vote_transaction");
1227
1228 let _metrics_guard = self
1229 .metrics
1230 .authority_state_handle_vote_transaction_latency
1231 .start_timer();
1232 self.metrics.tx_orders.inc();
1233
1234 if !epoch_store
1238 .get_reconfig_state_read_lock_guard()
1239 .should_accept_user_certs()
1240 {
1241 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1242 }
1243
1244 let result =
1245 self.handle_transaction_impl(transaction, false , epoch_store)?;
1246 assert!(
1247 result.is_none(),
1248 "handle_transaction_impl should not return a signed transaction when sign is false"
1249 );
1250 Ok(())
1251 }
1252
1253 pub fn check_transaction_validity(
1262 &self,
1263 epoch_store: &Arc<AuthorityPerEpochStore>,
1264 transaction: &VerifiedTransaction,
1265 ) -> SuiResult<()> {
1266 if !epoch_store
1267 .get_reconfig_state_read_lock_guard()
1268 .should_accept_user_certs()
1269 {
1270 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1271 }
1272
1273 let checked_input_objects =
1274 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1275
1276 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1279 let cache_reader = self.get_object_cache_reader();
1280
1281 for obj_ref in &owned_objects {
1282 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1283 if obj_ref.1 < live_object.version() {
1286 return Err(SuiErrorKind::UserInputError {
1287 error: UserInputError::ObjectVersionUnavailableForConsumption {
1288 provided_obj_ref: *obj_ref,
1289 current_version: live_object.version(),
1290 },
1291 }
1292 .into());
1293 }
1294
1295 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1297 return Err(SuiErrorKind::UserInputError {
1298 error: UserInputError::InvalidObjectDigest {
1299 object_id: obj_ref.0,
1300 expected_digest: live_object.digest(),
1301 },
1302 }
1303 .into());
1304 }
1305 }
1306 }
1307
1308 Ok(())
1309 }
1310
1311 pub fn check_system_overload_at_signing(&self) -> bool {
1312 self.config
1313 .authority_overload_config
1314 .check_system_overload_at_signing
1315 }
1316
1317 pub fn check_system_overload_at_execution(&self) -> bool {
1318 self.config
1319 .authority_overload_config
1320 .check_system_overload_at_execution
1321 }
1322
1323 pub(crate) fn check_system_overload(
1324 &self,
1325 consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1326 tx_data: &SenderSignedData,
1327 do_authority_overload_check: bool,
1328 ) -> SuiResult {
1329 if do_authority_overload_check {
1330 self.check_authority_overload(tx_data).tap_err(|_| {
1331 self.update_overload_metrics("execution_queue");
1332 })?;
1333 }
1334 self.execution_scheduler
1335 .check_execution_overload(self.overload_config(), tx_data)
1336 .tap_err(|_| {
1337 self.update_overload_metrics("execution_pending");
1338 })?;
1339 consensus_overload_checker
1340 .check_consensus_overload()
1341 .tap_err(|_| {
1342 self.update_overload_metrics("consensus");
1343 })?;
1344
1345 let pending_tx_count = self
1346 .get_cache_commit()
1347 .approximate_pending_transaction_count();
1348 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1349 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1350 retry_after_secs: 10,
1351 }
1352 .into());
1353 }
1354
1355 Ok(())
1356 }
1357
1358 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1359 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1360 return Ok(());
1361 }
1362
1363 let load_shedding_percentage = self
1364 .overload_info
1365 .load_shedding_percentage
1366 .load(Ordering::Relaxed);
1367 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1368 }
1369
1370 fn update_overload_metrics(&self, source: &str) {
1371 self.metrics
1372 .transaction_overload_sources
1373 .with_label_values(&[source])
1374 .inc();
1375 }
1376
1377 pub(crate) async fn wait_for_fastpath_dependency_objects(
1381 &self,
1382 transaction: &VerifiedTransaction,
1383 epoch: EpochId,
1384 ) -> SuiResult<bool> {
1385 let txn_data = transaction.data().transaction_data();
1386 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1387
1388 let fastpath_dependency_objects: Vec<_> = move_objects
1390 .into_iter()
1391 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1392 .chain(
1393 packages
1394 .into_iter()
1395 .map(|package_id| InputKey::Package { id: package_id }),
1396 )
1397 .collect();
1398 let receiving_keys: HashSet<_> = receiving_objects
1399 .into_iter()
1400 .filter_map(|receiving_obj_ref| {
1401 self.should_wait_for_dependency_object(receiving_obj_ref)
1402 })
1403 .collect();
1404 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1405 return Ok(true);
1406 }
1407
1408 let max_wait = if cfg!(msim) {
1411 Duration::from_millis(200)
1412 } else {
1413 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1414 };
1415
1416 match timeout(
1417 max_wait,
1418 self.get_object_cache_reader().notify_read_input_objects(
1419 &fastpath_dependency_objects,
1420 &receiving_keys,
1421 epoch,
1422 ),
1423 )
1424 .await
1425 {
1426 Ok(()) => Ok(true),
1427 Err(_) => Ok(false),
1430 }
1431 }
1432
1433 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1440 let (obj_id, cur_version, _digest) = obj_ref;
1441 let Some(latest_obj_ref) = self
1442 .get_object_cache_reader()
1443 .get_latest_object_ref_or_tombstone(obj_id)
1444 else {
1445 return Some(InputKey::VersionedObject {
1447 id: FullObjectID::new(obj_id, None),
1448 version: cur_version,
1449 });
1450 };
1451 let latest_digest = latest_obj_ref.2;
1452 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1453 return None;
1455 }
1456 let latest_version = latest_obj_ref.1;
1457 if cur_version <= latest_version {
1458 return None;
1461 }
1462 Some(InputKey::VersionedObject {
1464 id: FullObjectID::new(obj_id, None),
1465 version: cur_version,
1466 })
1467 }
1468
1469 #[instrument(level = "trace", skip_all)]
1474 pub async fn wait_for_certificate_execution(
1475 &self,
1476 certificate: &VerifiedCertificate,
1477 epoch_store: &Arc<AuthorityPerEpochStore>,
1478 ) -> SuiResult<TransactionEffects> {
1479 self.wait_for_transaction_execution(
1480 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1481 epoch_store,
1482 )
1483 .await
1484 }
1485
1486 #[instrument(level = "trace", skip_all)]
1490 pub async fn wait_for_transaction_execution(
1491 &self,
1492 transaction: &VerifiedExecutableTransaction,
1493 epoch_store: &Arc<AuthorityPerEpochStore>,
1494 ) -> SuiResult<TransactionEffects> {
1495 let _metrics_guard = if transaction.is_consensus_tx() {
1496 self.metrics
1497 .execute_certificate_latency_shared_object
1498 .start_timer()
1499 } else {
1500 self.metrics
1501 .execute_certificate_latency_single_writer
1502 .start_timer()
1503 };
1504 trace!("execute_transaction");
1505
1506 self.metrics.total_cert_attempts.inc();
1507
1508 if !transaction.is_consensus_tx() {
1509 self.execution_scheduler.enqueue(
1513 vec![(
1514 Schedulable::Transaction(transaction.clone()),
1515 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1516 )],
1517 epoch_store,
1518 );
1519 }
1520
1521 epoch_store
1524 .within_alive_epoch(self.notify_read_effects(
1525 "AuthorityState::wait_for_transaction_execution",
1526 *transaction.digest(),
1527 ))
1528 .await
1529 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1530 .and_then(|r| r)
1531 }
1532
1533 pub async fn await_transaction_effects(
1537 &self,
1538 digest: TransactionDigest,
1539 epoch_store: &Arc<AuthorityPerEpochStore>,
1540 ) -> SuiResult<TransactionEffects> {
1541 let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1542 debug!("await_transaction");
1543
1544 epoch_store
1545 .within_alive_epoch(
1546 self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1547 )
1548 .await
1549 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1550 .and_then(|r| r)
1551 }
1552
1553 #[instrument(level = "trace", skip_all)]
1565 pub async fn try_execute_immediately(
1566 &self,
1567 certificate: &VerifiedExecutableTransaction,
1568 mut execution_env: ExecutionEnv,
1569 epoch_store: &Arc<AuthorityPerEpochStore>,
1570 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1571 let _scope = monitored_scope("Execution::try_execute_immediately");
1572 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1573
1574 let tx_digest = certificate.digest();
1575
1576 if let Some(fork_recovery) = &self.fork_recovery_state
1577 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1578 {
1579 warn!(
1580 ?tx_digest,
1581 original_digest = ?execution_env.expected_effects_digest,
1582 override_digest = ?override_digest,
1583 "Applying fork recovery override for transaction effects digest"
1584 );
1585 execution_env.expected_effects_digest = Some(override_digest);
1586 }
1587
1588 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1590
1591 let tx_cache_reader = self.get_transaction_cache_reader();
1592 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1593 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1594 assert_eq!(
1595 effects.digest(),
1596 expected_effects_digest,
1597 "Unexpected effects digest for transaction {:?}",
1598 tx_digest
1599 );
1600 }
1601 tx_guard.release();
1602 return ExecutionOutput::Success((effects, None));
1603 }
1604
1605 let execution_start_time = Instant::now();
1606
1607 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1612 else {
1613 tx_guard.release();
1614 return ExecutionOutput::EpochEnded;
1615 };
1616 if *execution_guard != epoch_store.epoch() {
1621 tx_guard.release();
1622 info!("The epoch of the execution_guard doesn't match the epoch store");
1623 return ExecutionOutput::EpochEnded;
1624 }
1625
1626 let scheduling_source = execution_env.scheduling_source;
1627 let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1628 tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1629 } else {
1630 None
1631 };
1632
1633 let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1634 mysticeti_fp_outputs
1635 {
1636 assert!(
1637 !certificate.is_consensus_tx(),
1638 "Mysticeti fastpath executed transactions cannot be consensus transactions"
1639 );
1640 debug!(
1645 ?tx_digest,
1646 "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1647 );
1648 (outputs, None, None)
1649 } else {
1650 let (transaction_outputs, timings, execution_error_opt) = match self
1651 .process_certificate(
1652 &tx_guard,
1653 &execution_guard,
1654 certificate,
1655 execution_env,
1656 epoch_store,
1657 ) {
1658 ExecutionOutput::Success(result) => result,
1659 output => return output.unwrap_err(),
1660 };
1661 (
1662 Arc::new(transaction_outputs),
1663 Some(timings),
1664 execution_error_opt,
1665 )
1666 };
1667
1668 fail_point!("crash");
1669
1670 let effects = transaction_outputs.effects.clone();
1671 debug!(
1672 ?tx_digest,
1673 fx_digest=?effects.digest(),
1674 "process_certificate succeeded in {:.3}ms",
1675 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1676 );
1677
1678 if scheduling_source == SchedulingSource::MysticetiFastPath {
1679 self.get_cache_writer()
1680 .write_fastpath_transaction_outputs(transaction_outputs);
1681 } else {
1682 let commit_result =
1683 self.commit_certificate(certificate, transaction_outputs, epoch_store);
1684 if let Err(err) = commit_result {
1685 error!(?tx_digest, "Error committing transaction: {err}");
1686 tx_guard.release();
1687 return ExecutionOutput::Fatal(err);
1688 }
1689 }
1690
1691 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1692 certificate.data().transaction_data().kind()
1693 {
1694 if let Some(err) = &execution_error_opt {
1695 debug_fatal!("Authenticator state update failed: {:?}", err);
1696 }
1697 epoch_store.update_authenticator_state(auth_state);
1698
1699 if cfg!(debug_assertions) {
1701 let authenticator_state = get_authenticator_state(self.get_object_store())
1702 .expect("Read cannot fail")
1703 .expect("Authenticator state must exist");
1704
1705 let mut sys_jwks: Vec<_> = authenticator_state
1706 .active_jwks
1707 .into_iter()
1708 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1709 .collect();
1710 let mut active_jwks: Vec<_> = epoch_store
1711 .signature_verifier
1712 .get_jwks()
1713 .into_iter()
1714 .collect();
1715 sys_jwks.sort();
1716 active_jwks.sort();
1717
1718 assert_eq!(sys_jwks, active_jwks);
1719 }
1720 }
1721
1722 tx_guard.commit_tx();
1723
1724 let elapsed = execution_start_time.elapsed();
1725 if let Some(timings) = timings {
1726 epoch_store.record_local_execution_time(
1727 certificate.data().transaction_data(),
1728 &effects,
1729 timings,
1730 elapsed,
1731 );
1732 }
1733
1734 let elapsed_us = elapsed.as_micros() as f64;
1735 if elapsed_us > 0.0 {
1736 self.metrics
1737 .execution_gas_latency_ratio
1738 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1739 };
1740 ExecutionOutput::Success((effects, execution_error_opt))
1741 }
1742
1743 pub fn read_objects_for_execution(
1744 &self,
1745 tx_lock: &CertLockGuard,
1746 certificate: &VerifiedExecutableTransaction,
1747 assigned_shared_object_versions: AssignedVersions,
1748 epoch_store: &Arc<AuthorityPerEpochStore>,
1749 ) -> SuiResult<InputObjects> {
1750 let _scope = monitored_scope("Execution::load_input_objects");
1751 let _metrics_guard = self
1752 .metrics
1753 .execution_load_input_objects_latency
1754 .start_timer();
1755 let input_objects = &certificate.data().transaction_data().input_objects()?;
1756 self.input_loader.read_objects_for_execution(
1757 &certificate.key(),
1758 tx_lock,
1759 input_objects,
1760 &assigned_shared_object_versions,
1761 epoch_store.epoch(),
1762 )
1763 }
1764
1765 pub async fn try_execute_for_test(
1768 &self,
1769 certificate: &VerifiedCertificate,
1770 execution_env: ExecutionEnv,
1771 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1772 let epoch_store = self.epoch_store_for_testing();
1773 let (effects, execution_error_opt) = self
1774 .try_execute_immediately(
1775 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1776 execution_env,
1777 &epoch_store,
1778 )
1779 .await
1780 .unwrap();
1781 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1782 (signed_effects, execution_error_opt)
1783 }
1784
1785 pub async fn notify_read_effects(
1786 &self,
1787 task_name: &'static str,
1788 digest: TransactionDigest,
1789 ) -> SuiResult<TransactionEffects> {
1790 Ok(self
1791 .get_transaction_cache_reader()
1792 .notify_read_executed_effects(task_name, &[digest])
1793 .await
1794 .pop()
1795 .expect("must return correct number of effects"))
1796 }
1797
1798 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1799 self.get_object_cache_reader()
1800 .check_owned_objects_are_live(owned_object_refs)
1801 }
1802
1803 pub(crate) fn debug_dump_transaction_state(
1808 &self,
1809 tx_digest: &TransactionDigest,
1810 effects: &TransactionEffects,
1811 expected_effects_digest: TransactionEffectsDigest,
1812 inner_temporary_store: &InnerTemporaryStore,
1813 certificate: &VerifiedExecutableTransaction,
1814 debug_dump_config: &StateDebugDumpConfig,
1815 ) -> SuiResult<PathBuf> {
1816 let dump_dir = debug_dump_config
1817 .dump_file_directory
1818 .as_ref()
1819 .cloned()
1820 .unwrap_or(std::env::temp_dir());
1821 let epoch_store = self.load_epoch_store_one_call_per_task();
1822
1823 NodeStateDump::new(
1824 tx_digest,
1825 effects,
1826 expected_effects_digest,
1827 self.get_object_store().as_ref(),
1828 &epoch_store,
1829 inner_temporary_store,
1830 certificate,
1831 )?
1832 .write_to_file(&dump_dir)
1833 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1834 }
1835
1836 #[instrument(level = "trace", skip_all)]
1837 pub(crate) fn process_certificate(
1838 &self,
1839 tx_guard: &CertTxGuard,
1840 execution_guard: &ExecutionLockReadGuard<'_>,
1841 certificate: &VerifiedExecutableTransaction,
1842 execution_env: ExecutionEnv,
1843 epoch_store: &Arc<AuthorityPerEpochStore>,
1844 ) -> ExecutionOutput<(
1845 TransactionOutputs,
1846 Vec<ExecutionTiming>,
1847 Option<ExecutionError>,
1848 )> {
1849 let _scope = monitored_scope("Execution::process_certificate");
1850 let tx_digest = *certificate.digest();
1851
1852 let input_objects = match self.read_objects_for_execution(
1853 tx_guard.as_lock_guard(),
1854 certificate,
1855 execution_env.assigned_versions,
1856 epoch_store,
1857 ) {
1858 Ok(objects) => objects,
1859 Err(e) => return ExecutionOutput::Fatal(e),
1860 };
1861
1862 let expected_effects_digest = match execution_env.expected_effects_digest {
1863 Some(expected_effects_digest) => Some(expected_effects_digest),
1864 None => {
1865 match epoch_store.get_signed_effects_digest(&tx_digest) {
1870 Ok(digest) => digest,
1871 Err(e) => return ExecutionOutput::Fatal(e),
1872 }
1873 }
1874 };
1875
1876 fail_point_if!("correlated-crash-process-certificate", || {
1877 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1878 sui_simulator::task::kill_current_node(None);
1879 }
1880 });
1881
1882 self.execute_certificate(
1887 execution_guard,
1888 certificate,
1889 input_objects,
1890 expected_effects_digest,
1891 execution_env.withdraw_status,
1892 epoch_store,
1893 )
1894 }
1895
1896 pub async fn reconfigure_traffic_control(
1897 &self,
1898 params: TrafficControlReconfigParams,
1899 ) -> Result<TrafficControlReconfigParams, SuiError> {
1900 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1901 traffic_controller.admin_reconfigure(params).await
1902 } else {
1903 Err(SuiErrorKind::InvalidAdminRequest(
1904 "Traffic controller is not configured on this node".to_string(),
1905 )
1906 .into())
1907 }
1908 }
1909
1910 #[instrument(level = "trace", skip_all)]
1911 fn commit_certificate(
1912 &self,
1913 certificate: &VerifiedExecutableTransaction,
1914 transaction_outputs: Arc<TransactionOutputs>,
1915 epoch_store: &Arc<AuthorityPerEpochStore>,
1916 ) -> SuiResult {
1917 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1918 monitored_scope("Execution::commit_certificate");
1919 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1920
1921 let tx_digest = certificate.digest();
1922
1923 epoch_store.insert_executed_in_epoch(tx_digest);
1926 let key = certificate.key();
1927 if !matches!(key, TransactionKey::Digest(_)) {
1928 epoch_store.insert_tx_key(key, *tx_digest)?;
1929 }
1930
1931 fail_point!("crash");
1933
1934 self.get_cache_writer()
1935 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1936
1937 if certificate.transaction_data().is_end_of_epoch_tx() {
1938 self.get_object_cache_reader()
1941 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1942 }
1943
1944 Ok(())
1945 }
1946
1947 fn update_metrics(
1948 &self,
1949 certificate: &VerifiedExecutableTransaction,
1950 inner_temporary_store: &InnerTemporaryStore,
1951 effects: &TransactionEffects,
1952 ) {
1953 if certificate.has_zklogin_sig() {
1955 self.metrics.zklogin_sig_count.inc();
1956 } else if certificate.has_upgraded_multisig() {
1957 self.metrics.multisig_sig_count.inc();
1958 }
1959
1960 self.metrics.total_effects.inc();
1961 self.metrics.total_certs.inc();
1962
1963 let consensus_object_count = effects.input_consensus_objects().len();
1964 if consensus_object_count > 0 {
1965 self.metrics.shared_obj_tx.inc();
1966 }
1967
1968 if certificate.is_sponsored_tx() {
1969 self.metrics.sponsored_tx.inc();
1970 }
1971
1972 let input_object_count = inner_temporary_store.input_objects.len();
1973 self.metrics
1974 .num_input_objs
1975 .observe(input_object_count as f64);
1976 self.metrics
1977 .num_shared_objects
1978 .observe(consensus_object_count as f64);
1979 self.metrics.batch_size.observe(
1980 certificate
1981 .data()
1982 .intent_message()
1983 .value
1984 .kind()
1985 .num_commands() as f64,
1986 );
1987 }
1988
1989 #[instrument(level = "trace", skip_all)]
1998 fn execute_certificate(
1999 &self,
2000 _execution_guard: &ExecutionLockReadGuard<'_>,
2001 certificate: &VerifiedExecutableTransaction,
2002 input_objects: InputObjects,
2003 expected_effects_digest: Option<TransactionEffectsDigest>,
2004 withdraw_status: BalanceWithdrawStatus,
2005 epoch_store: &Arc<AuthorityPerEpochStore>,
2006 ) -> ExecutionOutput<(
2007 TransactionOutputs,
2008 Vec<ExecutionTiming>,
2009 Option<ExecutionError>,
2010 )> {
2011 let _scope = monitored_scope("Execution::prepare_certificate");
2012 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2013 let prepare_certificate_start_time = tokio::time::Instant::now();
2014
2015 let tx_data = certificate.data().transaction_data();
2017 if let Err(e) = tx_data.validity_check(epoch_store.protocol_config()) {
2018 return ExecutionOutput::Fatal(e.into());
2019 }
2020
2021 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2025 certificate,
2026 input_objects,
2027 epoch_store.protocol_config(),
2028 epoch_store.reference_gas_price(),
2029 ) {
2030 Ok(result) => result,
2031 Err(e) => return ExecutionOutput::Fatal(e),
2032 };
2033
2034 let owned_object_refs = input_objects.inner().filter_owned_objects();
2035 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2036 return ExecutionOutput::Fatal(e);
2037 }
2038 let tx_digest = *certificate.digest();
2039 let protocol_config = epoch_store.protocol_config();
2040 let transaction_data = &certificate.data().intent_message().value;
2041 let (kind, signer, gas_data) = transaction_data.execution_parts();
2042 let early_execution_error = get_early_execution_error(
2043 &tx_digest,
2044 &input_objects,
2045 self.config.certificate_deny_config.certificate_deny_set(),
2046 &withdraw_status,
2047 );
2048 let execution_params = match early_execution_error {
2049 Some(error) => ExecutionOrEarlyError::Err(error),
2050 None => ExecutionOrEarlyError::Ok(()),
2051 };
2052
2053 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2054
2055 #[allow(unused_mut)]
2056 let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2057 epoch_store.executor().execute_transaction_to_effects(
2058 &tracking_store,
2059 protocol_config,
2060 self.metrics.limits_metrics.clone(),
2061 self.config
2064 .expensive_safety_check_config
2065 .enable_deep_per_tx_sui_conservation_check(),
2066 execution_params,
2067 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2068 epoch_store
2069 .epoch_start_config()
2070 .epoch_data()
2071 .epoch_start_timestamp(),
2072 input_objects,
2073 gas_data,
2074 gas_status,
2075 kind,
2076 signer,
2077 tx_digest,
2078 &mut None,
2079 );
2080
2081 if let Some(expected_effects_digest) = expected_effects_digest
2082 && effects.digest() != expected_effects_digest
2083 {
2084 match self.debug_dump_transaction_state(
2086 &tx_digest,
2087 &effects,
2088 expected_effects_digest,
2089 &inner_temp_store,
2090 certificate,
2091 &self.config.state_debug_dump_config,
2092 ) {
2093 Ok(out_path) => {
2094 info!(
2095 "Dumped node state for transaction {} to {}",
2096 tx_digest,
2097 out_path.as_path().display().to_string()
2098 );
2099 }
2100 Err(e) => {
2101 error!("Error dumping state for transaction {}: {e}", tx_digest);
2102 }
2103 }
2104 error!(
2105 ?tx_digest,
2106 ?expected_effects_digest,
2107 actual_effects = ?effects,
2108 "fork detected!"
2109 );
2110 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2111 tx_digest,
2112 expected_effects_digest,
2113 effects.digest(),
2114 ) {
2115 error!("Failed to record transaction fork: {e}");
2116 }
2117
2118 fail_point_if!("kill_transaction_fork_node", || {
2119 #[cfg(msim)]
2120 {
2121 tracing::error!(
2122 fatal = true,
2123 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2124 tx_digest
2125 );
2126 sui_simulator::task::shutdown_current_node();
2127 }
2128 });
2129
2130 fatal!(
2131 "Transaction {} is expected to have effects digest {}, but got {}!",
2132 tx_digest,
2133 expected_effects_digest,
2134 effects.digest()
2135 );
2136 }
2137
2138 fail_point_arg!("simulate_fork_during_execution", |(
2139 forked_validators,
2140 full_halt,
2141 effects_overrides,
2142 fork_probability,
2143 ): (
2144 std::sync::Arc<
2145 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2146 >,
2147 bool,
2148 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2149 f32,
2150 )| {
2151 #[cfg(msim)]
2152 self.simulate_fork_during_execution(
2153 certificate,
2154 epoch_store,
2155 &mut effects,
2156 forked_validators,
2157 full_halt,
2158 effects_overrides,
2159 fork_probability,
2160 );
2161 });
2162
2163 let unchanged_loaded_runtime_objects =
2164 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2165 certificate.transaction_data(),
2166 &effects,
2167 &tracking_store.into_read_objects(),
2168 );
2169
2170 let _ = self
2172 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2173 .tap_err(|e| {
2174 self.metrics.post_processing_total_failures.inc();
2175 error!(?tx_digest, "tx post processing failed: {e}");
2176 });
2177
2178 self.update_metrics(certificate, &inner_temp_store, &effects);
2179
2180 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2181 certificate.clone().into_unsigned(),
2182 effects,
2183 inner_temp_store,
2184 unchanged_loaded_runtime_objects,
2185 );
2186
2187 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2188 if elapsed > 0.0 {
2189 self.metrics.prepare_cert_gas_latency_ratio.observe(
2190 transaction_outputs
2191 .effects
2192 .gas_cost_summary()
2193 .computation_cost as f64
2194 / elapsed,
2195 );
2196 }
2197
2198 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2199 }
2200
2201 pub fn prepare_certificate_for_benchmark(
2202 &self,
2203 certificate: &VerifiedExecutableTransaction,
2204 input_objects: InputObjects,
2205 epoch_store: &Arc<AuthorityPerEpochStore>,
2206 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2207 let lock = RwLock::new(epoch_store.epoch());
2208 let execution_guard = lock.try_read().unwrap();
2209
2210 let (transaction_outputs, _timings, execution_error_opt) = self
2211 .execute_certificate(
2212 &execution_guard,
2213 certificate,
2214 input_objects,
2215 None,
2216 BalanceWithdrawStatus::NoWithdraw,
2217 epoch_store,
2218 )
2219 .unwrap();
2220 Ok((transaction_outputs, execution_error_opt))
2221 }
2222
2223 #[instrument(skip_all)]
2224 #[allow(clippy::type_complexity)]
2225 pub async fn dry_exec_transaction(
2226 &self,
2227 transaction: TransactionData,
2228 transaction_digest: TransactionDigest,
2229 ) -> SuiResult<(
2230 DryRunTransactionBlockResponse,
2231 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2232 TransactionEffects,
2233 Option<ObjectID>,
2234 )> {
2235 let epoch_store = self.load_epoch_store_one_call_per_task();
2236 if !self.is_fullnode(&epoch_store) {
2237 return Err(SuiErrorKind::UnsupportedFeatureError {
2238 error: "dry-exec is only supported on fullnodes".to_string(),
2239 }
2240 .into());
2241 }
2242
2243 if transaction.kind().is_system_tx() {
2244 return Err(SuiErrorKind::UnsupportedFeatureError {
2245 error: "dry-exec does not support system transactions".to_string(),
2246 }
2247 .into());
2248 }
2249
2250 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2251 }
2252
2253 #[allow(clippy::type_complexity)]
2254 pub fn dry_exec_transaction_for_benchmark(
2255 &self,
2256 transaction: TransactionData,
2257 transaction_digest: TransactionDigest,
2258 ) -> SuiResult<(
2259 DryRunTransactionBlockResponse,
2260 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2261 TransactionEffects,
2262 Option<ObjectID>,
2263 )> {
2264 let epoch_store = self.load_epoch_store_one_call_per_task();
2265 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2266 }
2267
2268 #[allow(clippy::type_complexity)]
2269 fn dry_exec_transaction_impl(
2270 &self,
2271 epoch_store: &AuthorityPerEpochStore,
2272 transaction: TransactionData,
2273 transaction_digest: TransactionDigest,
2274 ) -> SuiResult<(
2275 DryRunTransactionBlockResponse,
2276 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2277 TransactionEffects,
2278 Option<ObjectID>,
2279 )> {
2280 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2282
2283 let input_object_kinds = transaction.input_objects()?;
2284 let receiving_object_refs = transaction.receiving_objects();
2285
2286 sui_transaction_checks::deny::check_transaction_for_signing(
2287 &transaction,
2288 &[],
2289 &input_object_kinds,
2290 &receiving_object_refs,
2291 &self.config.transaction_deny_config,
2292 self.get_backing_package_store().as_ref(),
2293 )?;
2294
2295 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2296 None,
2298 &input_object_kinds,
2299 &receiving_object_refs,
2300 epoch_store.epoch(),
2301 )?;
2302
2303 let mut gas_data = transaction.gas_data().clone();
2305 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2306 let sender = transaction.sender();
2307 const MIST_TO_SUI: u64 = 1_000_000_000;
2309 const DRY_RUN_SUI: u64 = 1_000_000_000;
2310 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2311 let gas_object_id = ObjectID::random();
2312 let gas_object = Object::new_move(
2313 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2314 Owner::AddressOwner(sender),
2315 TransactionDigest::genesis_marker(),
2316 );
2317 let gas_object_ref = gas_object.compute_object_reference();
2318 gas_data.payment = vec![gas_object_ref];
2319 (
2320 sui_transaction_checks::check_transaction_input_with_given_gas(
2321 epoch_store.protocol_config(),
2322 epoch_store.reference_gas_price(),
2323 &transaction,
2324 input_objects,
2325 receiving_objects,
2326 gas_object,
2327 &self.metrics.bytecode_verifier_metrics,
2328 &self.config.verifier_signing_config,
2329 )?,
2330 Some(gas_object_id),
2331 )
2332 } else {
2333 (
2334 sui_transaction_checks::check_transaction_input(
2335 epoch_store.protocol_config(),
2336 epoch_store.reference_gas_price(),
2337 &transaction,
2338 input_objects,
2339 &receiving_objects,
2340 &self.metrics.bytecode_verifier_metrics,
2341 &self.config.verifier_signing_config,
2342 )?,
2343 None,
2344 )
2345 };
2346
2347 let protocol_config = epoch_store.protocol_config();
2348 let (kind, signer, _) = transaction.execution_parts();
2349
2350 let silent = true;
2351 let executor = sui_execution::executor(protocol_config, silent)
2352 .expect("Creating an executor should not fail here");
2353
2354 let expensive_checks = false;
2355 let early_execution_error = get_early_execution_error(
2356 &transaction_digest,
2357 &checked_input_objects,
2358 self.config.certificate_deny_config.certificate_deny_set(),
2359 &BalanceWithdrawStatus::NoWithdraw,
2361 );
2362 let execution_params = match early_execution_error {
2363 Some(error) => ExecutionOrEarlyError::Err(error),
2364 None => ExecutionOrEarlyError::Ok(()),
2365 };
2366 let (inner_temp_store, _, effects, _timings, execution_error) = executor
2367 .execute_transaction_to_effects(
2368 self.get_backing_store().as_ref(),
2369 protocol_config,
2370 self.metrics.limits_metrics.clone(),
2371 expensive_checks,
2372 execution_params,
2373 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2374 epoch_store
2375 .epoch_start_config()
2376 .epoch_data()
2377 .epoch_start_timestamp(),
2378 checked_input_objects,
2379 gas_data,
2380 gas_status,
2381 kind,
2382 signer,
2383 transaction_digest,
2384 &mut None,
2385 );
2386 let tx_digest = *effects.transaction_digest();
2387
2388 let module_cache =
2389 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2390
2391 let mut layout_resolver =
2392 epoch_store
2393 .executor()
2394 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2395 &inner_temp_store,
2396 self.get_backing_package_store(),
2397 )));
2398 let object_changes = Vec::new();
2400
2401 let balance_changes = Vec::new();
2403
2404 let written_with_kind = effects
2405 .created()
2406 .into_iter()
2407 .map(|(oref, _)| (oref, WriteKind::Create))
2408 .chain(
2409 effects
2410 .unwrapped()
2411 .into_iter()
2412 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2413 )
2414 .chain(
2415 effects
2416 .mutated()
2417 .into_iter()
2418 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2419 )
2420 .map(|(oref, kind)| {
2421 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2422 (oref.0, (oref, obj.clone(), kind))
2424 })
2425 .collect();
2426
2427 let execution_error_source = execution_error
2428 .as_ref()
2429 .err()
2430 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2431
2432 Ok((
2433 DryRunTransactionBlockResponse {
2434 suggested_gas_price: self
2435 .congestion_tracker
2436 .get_suggested_gas_prices(&transaction),
2437 input: SuiTransactionBlockData::try_from_with_module_cache(
2438 transaction,
2439 &module_cache,
2440 )
2441 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2442 error: format!(
2443 "Failed to convert transaction to SuiTransactionBlockData: {}",
2444 e
2445 ),
2446 })?, effects: effects.clone().try_into()?,
2448 events: SuiTransactionBlockEvents::try_from(
2449 inner_temp_store.events.clone(),
2450 tx_digest,
2451 None,
2452 layout_resolver.as_mut(),
2453 )?,
2454 object_changes,
2455 balance_changes,
2456 execution_error_source,
2457 },
2458 written_with_kind,
2459 effects,
2460 mock_gas,
2461 ))
2462 }
2463
2464 pub fn simulate_transaction(
2465 &self,
2466 mut transaction: TransactionData,
2467 checks: TransactionChecks,
2468 ) -> SuiResult<SimulateTransactionResult> {
2469 if transaction.kind().is_system_tx() {
2470 return Err(SuiErrorKind::UnsupportedFeatureError {
2471 error: "simulate does not support system transactions".to_string(),
2472 }
2473 .into());
2474 }
2475
2476 let epoch_store = self.load_epoch_store_one_call_per_task();
2477 if !self.is_fullnode(&epoch_store) {
2478 return Err(SuiErrorKind::UnsupportedFeatureError {
2479 error: "simulate is only supported on fullnodes".to_string(),
2480 }
2481 .into());
2482 }
2483
2484 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2486
2487 let input_object_kinds = transaction.input_objects()?;
2488 let receiving_object_refs = transaction.receiving_objects();
2489
2490 sui_transaction_checks::deny::check_transaction_for_signing(
2491 &transaction,
2492 &[],
2493 &input_object_kinds,
2494 &receiving_object_refs,
2495 &self.config.transaction_deny_config,
2496 self.get_backing_package_store().as_ref(),
2497 )?;
2498
2499 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2500 None,
2502 &input_object_kinds,
2503 &receiving_object_refs,
2504 epoch_store.epoch(),
2505 )?;
2506
2507 let mock_gas_id = if transaction.gas().is_empty() {
2509 let mock_gas_object = Object::new_move(
2510 MoveObject::new_gas_coin(
2511 OBJECT_START_VERSION,
2512 ObjectID::MAX,
2513 DEV_INSPECT_GAS_COIN_VALUE,
2514 ),
2515 Owner::AddressOwner(transaction.gas_data().owner),
2516 TransactionDigest::genesis_marker(),
2517 );
2518 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2519 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2520 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2521 Some(mock_gas_object.id())
2522 } else {
2523 None
2524 };
2525
2526 let protocol_config = epoch_store.protocol_config();
2527
2528 let (gas_status, checked_input_objects) = if checks.enabled() {
2529 sui_transaction_checks::check_transaction_input(
2530 epoch_store.protocol_config(),
2531 epoch_store.reference_gas_price(),
2532 &transaction,
2533 input_objects,
2534 &receiving_objects,
2535 &self.metrics.bytecode_verifier_metrics,
2536 &self.config.verifier_signing_config,
2537 )?
2538 } else {
2539 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2540 protocol_config,
2541 transaction.kind(),
2542 input_objects,
2543 receiving_objects,
2544 )?;
2545 let gas_status = SuiGasStatus::new(
2546 transaction.gas_budget(),
2547 transaction.gas_price(),
2548 epoch_store.reference_gas_price(),
2549 protocol_config,
2550 )?;
2551
2552 (gas_status, checked_input_objects)
2553 };
2554
2555 let executor = sui_execution::executor(
2557 protocol_config,
2558 true, )
2560 .expect("Creating an executor should not fail here");
2561
2562 let (kind, signer, gas_data) = transaction.execution_parts();
2563 let early_execution_error = get_early_execution_error(
2564 &transaction.digest(),
2565 &checked_input_objects,
2566 self.config.certificate_deny_config.certificate_deny_set(),
2567 &BalanceWithdrawStatus::NoWithdraw,
2569 );
2570 let execution_params = match early_execution_error {
2571 Some(error) => ExecutionOrEarlyError::Err(error),
2572 None => ExecutionOrEarlyError::Ok(()),
2573 };
2574
2575 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2576
2577 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2578 &tracking_store,
2579 protocol_config,
2580 self.metrics.limits_metrics.clone(),
2581 false, execution_params,
2583 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2584 epoch_store
2585 .epoch_start_config()
2586 .epoch_data()
2587 .epoch_start_timestamp(),
2588 checked_input_objects,
2589 gas_data,
2590 gas_status,
2591 kind,
2592 signer,
2593 transaction.digest(),
2594 checks.disabled(),
2595 );
2596
2597 let loaded_runtime_objects = tracking_store.into_read_objects();
2598 let unchanged_loaded_runtime_objects =
2599 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2600 &transaction,
2601 &effects,
2602 &loaded_runtime_objects,
2603 );
2604
2605 let object_set = {
2606 let objects = {
2607 let mut objects = loaded_runtime_objects;
2608
2609 for o in inner_temp_store
2610 .input_objects
2611 .into_values()
2612 .chain(inner_temp_store.written.into_values())
2613 {
2614 objects.insert(o);
2615 }
2616
2617 objects
2618 };
2619
2620 let object_keys = sui_types::storage::get_transaction_object_set(
2621 &transaction,
2622 &effects,
2623 &unchanged_loaded_runtime_objects,
2624 );
2625
2626 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2627 for k in object_keys {
2628 if let Some(o) = objects.get(&k) {
2629 set.insert(o.clone());
2630 }
2631 }
2632
2633 set
2634 };
2635
2636 Ok(SimulateTransactionResult {
2637 objects: object_set,
2638 events: effects.events_digest().map(|_| inner_temp_store.events),
2639 effects,
2640 execution_result,
2641 mock_gas_id,
2642 unchanged_loaded_runtime_objects,
2643 })
2644 }
2645
2646 #[allow(clippy::collapsible_else_if)]
2648 #[instrument(skip_all)]
2649 pub async fn dev_inspect_transaction_block(
2650 &self,
2651 sender: SuiAddress,
2652 transaction_kind: TransactionKind,
2653 gas_price: Option<u64>,
2654 gas_budget: Option<u64>,
2655 gas_sponsor: Option<SuiAddress>,
2656 gas_objects: Option<Vec<ObjectRef>>,
2657 show_raw_txn_data_and_effects: Option<bool>,
2658 skip_checks: Option<bool>,
2659 ) -> SuiResult<DevInspectResults> {
2660 let epoch_store = self.load_epoch_store_one_call_per_task();
2661
2662 if !self.is_fullnode(&epoch_store) {
2663 return Err(SuiErrorKind::UnsupportedFeatureError {
2664 error: "dev-inspect is only supported on fullnodes".to_string(),
2665 }
2666 .into());
2667 }
2668
2669 if transaction_kind.is_system_tx() {
2670 return Err(SuiErrorKind::UnsupportedFeatureError {
2671 error: "system transactions are not supported".to_string(),
2672 }
2673 .into());
2674 }
2675
2676 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2677 let skip_checks = skip_checks.unwrap_or(true);
2678 let reference_gas_price = epoch_store.reference_gas_price();
2679 let protocol_config = epoch_store.protocol_config();
2680 let max_tx_gas = protocol_config.max_tx_gas();
2681
2682 let price = gas_price.unwrap_or(reference_gas_price);
2683 let budget = gas_budget.unwrap_or(max_tx_gas);
2684 let owner = gas_sponsor.unwrap_or(sender);
2685 let payment = gas_objects.unwrap_or_default();
2687 let mut transaction = TransactionData::V1(TransactionDataV1 {
2688 kind: transaction_kind.clone(),
2689 sender,
2690 gas_data: GasData {
2691 payment,
2692 owner,
2693 price,
2694 budget,
2695 },
2696 expiration: TransactionExpiration::None,
2697 });
2698
2699 let raw_txn_data = if show_raw_txn_data_and_effects {
2700 bcs::to_bytes(&transaction).map_err(|_| {
2701 SuiErrorKind::TransactionSerializationError {
2702 error: "Failed to serialize transaction during dev inspect".to_string(),
2703 }
2704 })?
2705 } else {
2706 vec![]
2707 };
2708
2709 transaction.validity_check_no_gas_check(protocol_config)?;
2710
2711 let input_object_kinds = transaction.input_objects()?;
2712 let receiving_object_refs = transaction.receiving_objects();
2713
2714 sui_transaction_checks::deny::check_transaction_for_signing(
2715 &transaction,
2716 &[],
2717 &input_object_kinds,
2718 &receiving_object_refs,
2719 &self.config.transaction_deny_config,
2720 self.get_backing_package_store().as_ref(),
2721 )?;
2722
2723 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2724 None,
2726 &input_object_kinds,
2727 &receiving_object_refs,
2728 epoch_store.epoch(),
2729 )?;
2730
2731 let (gas_status, checked_input_objects) = if skip_checks {
2732 if transaction.gas().is_empty() {
2736 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2738 DEV_INSPECT_GAS_COIN_VALUE,
2739 transaction.gas_owner(),
2740 );
2741 let gas_object_ref = dummy_gas_object.compute_object_reference();
2742 transaction.gas_data_mut().payment = vec![gas_object_ref];
2743 input_objects.push(ObjectReadResult::new(
2744 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2745 dummy_gas_object.into(),
2746 ));
2747 }
2748 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2749 protocol_config,
2750 &transaction_kind,
2751 input_objects,
2752 receiving_objects,
2753 )?;
2754 let gas_status = SuiGasStatus::new(
2755 max_tx_gas,
2756 transaction.gas_price(),
2757 reference_gas_price,
2758 protocol_config,
2759 )?;
2760
2761 (gas_status, checked_input_objects)
2762 } else {
2763 if transaction.gas().is_empty() {
2766 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2768 DEV_INSPECT_GAS_COIN_VALUE,
2769 transaction.gas_owner(),
2770 );
2771 let gas_object_ref = dummy_gas_object.compute_object_reference();
2772 transaction.gas_data_mut().payment = vec![gas_object_ref];
2773 sui_transaction_checks::check_transaction_input_with_given_gas(
2774 epoch_store.protocol_config(),
2775 epoch_store.reference_gas_price(),
2776 &transaction,
2777 input_objects,
2778 receiving_objects,
2779 dummy_gas_object,
2780 &self.metrics.bytecode_verifier_metrics,
2781 &self.config.verifier_signing_config,
2782 )?
2783 } else {
2784 sui_transaction_checks::check_transaction_input(
2785 epoch_store.protocol_config(),
2786 epoch_store.reference_gas_price(),
2787 &transaction,
2788 input_objects,
2789 &receiving_objects,
2790 &self.metrics.bytecode_verifier_metrics,
2791 &self.config.verifier_signing_config,
2792 )?
2793 }
2794 };
2795
2796 let executor = sui_execution::executor(protocol_config, true)
2797 .expect("Creating an executor should not fail here");
2798 let gas_data = transaction.gas_data().clone();
2799 let intent_msg = IntentMessage::new(
2800 Intent {
2801 version: IntentVersion::V0,
2802 scope: IntentScope::TransactionData,
2803 app_id: AppId::Sui,
2804 },
2805 transaction,
2806 );
2807 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2808 let early_execution_error = get_early_execution_error(
2809 &transaction_digest,
2810 &checked_input_objects,
2811 self.config.certificate_deny_config.certificate_deny_set(),
2812 &BalanceWithdrawStatus::NoWithdraw,
2814 );
2815 let execution_params = match early_execution_error {
2816 Some(error) => ExecutionOrEarlyError::Err(error),
2817 None => ExecutionOrEarlyError::Ok(()),
2818 };
2819 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2820 self.get_backing_store().as_ref(),
2821 protocol_config,
2822 self.metrics.limits_metrics.clone(),
2823 false,
2824 execution_params,
2825 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2826 epoch_store
2827 .epoch_start_config()
2828 .epoch_data()
2829 .epoch_start_timestamp(),
2830 checked_input_objects,
2831 gas_data,
2832 gas_status,
2833 transaction_kind,
2834 sender,
2835 transaction_digest,
2836 skip_checks,
2837 );
2838
2839 let raw_effects = if show_raw_txn_data_and_effects {
2840 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2841 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2842 })?
2843 } else {
2844 vec![]
2845 };
2846
2847 let mut layout_resolver =
2848 epoch_store
2849 .executor()
2850 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2851 &inner_temp_store,
2852 self.get_backing_package_store(),
2853 )));
2854
2855 DevInspectResults::new(
2856 effects,
2857 inner_temp_store.events.clone(),
2858 execution_result,
2859 raw_txn_data,
2860 raw_effects,
2861 layout_resolver.as_mut(),
2862 )
2863 }
2864
2865 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2867 let epoch_store = self.epoch_store_for_testing();
2868 Ok(epoch_store.reference_gas_price())
2869 }
2870
2871 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2872 self.get_transaction_cache_reader()
2873 .is_tx_already_executed(digest)
2874 }
2875
2876 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2877 fn index_tx(
2878 &self,
2879 indexes: &IndexStore,
2880 digest: &TransactionDigest,
2881 cert: &VerifiedExecutableTransaction,
2883 effects: &TransactionEffects,
2884 events: &TransactionEvents,
2885 timestamp_ms: u64,
2886 tx_coins: Option<TxCoins>,
2887 written: &WrittenObjects,
2888 inner_temporary_store: &InnerTemporaryStore,
2889 epoch_store: &Arc<AuthorityPerEpochStore>,
2890 ) -> SuiResult<u64> {
2891 let changes = self
2892 .process_object_index(effects, written, inner_temporary_store, epoch_store)
2893 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2894
2895 indexes.index_tx(
2896 cert.data().intent_message().value.sender(),
2897 cert.data()
2898 .intent_message()
2899 .value
2900 .input_objects()?
2901 .iter()
2902 .map(|o| o.object_id()),
2903 effects
2904 .all_changed_objects()
2905 .into_iter()
2906 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2907 cert.data()
2908 .intent_message()
2909 .value
2910 .move_calls()
2911 .into_iter()
2912 .map(|(package, module, function)| {
2913 (*package, module.to_owned(), function.to_owned())
2914 }),
2915 events,
2916 changes,
2917 digest,
2918 timestamp_ms,
2919 tx_coins,
2920 )
2921 }
2922
2923 #[cfg(msim)]
2924 fn simulate_fork_during_execution(
2925 &self,
2926 certificate: &VerifiedExecutableTransaction,
2927 epoch_store: &Arc<AuthorityPerEpochStore>,
2928 effects: &mut TransactionEffects,
2929 forked_validators: std::sync::Arc<
2930 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2931 >,
2932 full_halt: bool,
2933 effects_overrides: std::sync::Arc<
2934 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2935 >,
2936 fork_probability: f32,
2937 ) {
2938 use std::cell::RefCell;
2939 thread_local! {
2940 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2941 }
2942 if !certificate.data().intent_message().value.is_system_tx() {
2943 let committee = epoch_store.committee();
2944 let cur_stake = (**committee).weight(&self.name);
2945 if cur_stake > 0 {
2946 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2947 let already_forked = forked_validators
2948 .lock()
2949 .ok()
2950 .map(|set| set.contains(&self.name))
2951 .unwrap_or(false);
2952
2953 if !already_forked {
2954 let should_fork = if full_halt {
2955 *total_stake <= committee.validity_threshold()
2957 } else {
2958 *total_stake + cur_stake < committee.validity_threshold()
2960 };
2961
2962 if should_fork {
2963 *total_stake += cur_stake;
2964
2965 if let Ok(mut external_set) = forked_validators.lock() {
2966 external_set.insert(self.name);
2967 info!("forked_validators: {:?}", external_set);
2968 }
2969 }
2970 }
2971
2972 if let Ok(external_set) = forked_validators.lock() {
2973 if external_set.contains(&self.name) {
2974 let tx_digest = certificate.digest().to_string();
2980 if let Ok(mut overrides) = effects_overrides.lock() {
2981 if overrides.contains_key(&tx_digest)
2982 || overrides.is_empty()
2983 && sui_simulator::random::deterministic_probability(
2984 &tx_digest,
2985 fork_probability,
2986 )
2987 {
2988 let original_effects_digest = effects.digest().to_string();
2989 overrides
2990 .insert(tx_digest.clone(), original_effects_digest.clone());
2991 info!(
2992 ?tx_digest,
2993 ?original_effects_digest,
2994 "Captured forked effects digest for transaction"
2995 );
2996 effects.gas_cost_summary_mut_for_testing().computation_cost +=
2997 1;
2998 }
2999 }
3000 }
3001 }
3002 });
3003 }
3004 }
3005 }
3006
3007 fn process_object_index(
3008 &self,
3009 effects: &TransactionEffects,
3010 written: &WrittenObjects,
3011 inner_temporary_store: &InnerTemporaryStore,
3012 epoch_store: &Arc<AuthorityPerEpochStore>,
3013 ) -> SuiResult<ObjectIndexChanges> {
3014 let mut layout_resolver =
3015 epoch_store
3016 .executor()
3017 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3018 inner_temporary_store,
3019 self.get_backing_package_store(),
3020 )));
3021
3022 let modified_at_version = effects
3023 .modified_at_versions()
3024 .into_iter()
3025 .collect::<HashMap<_, _>>();
3026
3027 let tx_digest = effects.transaction_digest();
3028 let mut deleted_owners = vec![];
3029 let mut deleted_dynamic_fields = vec![];
3030 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3031 let old_version = modified_at_version.get(&id).unwrap();
3032 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3035 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3036 ) {
3037 Owner::AddressOwner(addr)
3038 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3039 Owner::ObjectOwner(object_id) => {
3040 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3041 }
3042 _ => {}
3043 }
3044 }
3045
3046 let mut new_owners = vec![];
3047 let mut new_dynamic_fields = vec![];
3048
3049 for (oref, owner, kind) in effects.all_changed_objects() {
3050 let id = &oref.0;
3051 if let WriteKind::Mutate = kind {
3053 let Some(old_version) = modified_at_version.get(id) else {
3054 panic!(
3055 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3056 tx_digest
3057 );
3058 };
3059 let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3062 else {
3063 panic!(
3064 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3065 tx_digest, id, old_version
3066 );
3067 };
3068 if old_object.owner != owner {
3069 match old_object.owner {
3070 Owner::AddressOwner(addr)
3071 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3072 deleted_owners.push((addr, *id));
3073 }
3074 Owner::ObjectOwner(object_id) => {
3075 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3076 }
3077 _ => {}
3078 }
3079 }
3080 }
3081
3082 match owner {
3083 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3084 let new_object = written.get(id).unwrap_or_else(
3086 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3087 );
3088 assert_eq!(
3089 new_object.version(),
3090 oref.1,
3091 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3092 tx_digest,
3093 id,
3094 new_object.version(),
3095 oref.1
3096 );
3097
3098 let type_ = new_object
3099 .type_()
3100 .map(|type_| ObjectType::Struct(type_.clone()))
3101 .unwrap_or(ObjectType::Package);
3102
3103 new_owners.push((
3104 (addr, *id),
3105 ObjectInfo {
3106 object_id: *id,
3107 version: oref.1,
3108 digest: oref.2,
3109 type_,
3110 owner,
3111 previous_transaction: *effects.transaction_digest(),
3112 },
3113 ));
3114 }
3115 Owner::ObjectOwner(owner) => {
3116 let new_object = written.get(id).unwrap_or_else(
3117 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3118 );
3119 assert_eq!(
3120 new_object.version(),
3121 oref.1,
3122 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3123 tx_digest,
3124 id,
3125 new_object.version(),
3126 oref.1
3127 );
3128
3129 let Some(df_info) = self
3130 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3131 .unwrap_or_else(|e| {
3132 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3133 None
3134 }
3135 )
3136 else {
3137 continue;
3139 };
3140 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3141 }
3142 _ => {}
3143 }
3144 }
3145
3146 Ok(ObjectIndexChanges {
3147 deleted_owners,
3148 deleted_dynamic_fields,
3149 new_owners,
3150 new_dynamic_fields,
3151 })
3152 }
3153
3154 fn try_create_dynamic_field_info(
3155 &self,
3156 o: &Object,
3157 written: &WrittenObjects,
3158 resolver: &mut dyn LayoutResolver,
3159 ) -> SuiResult<Option<DynamicFieldInfo>> {
3160 let Some(move_object) = o.data.try_as_move().cloned() else {
3162 return Ok(None);
3163 };
3164
3165 if !move_object.type_().is_dynamic_field() {
3167 return Ok(None);
3168 }
3169
3170 let layout = resolver
3171 .get_annotated_layout(&move_object.type_().clone().into())?
3172 .into_layout();
3173
3174 let field =
3175 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3176 SuiErrorKind::ObjectDeserializationError {
3177 error: e.to_string(),
3178 }
3179 })?;
3180
3181 let type_ = field.kind;
3182 let name_type: TypeTag = field.name_layout.into();
3183 let bcs_name = field.name_bytes.to_owned();
3184
3185 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3186 .map_err(|e| {
3187 warn!("{e}");
3188 SuiErrorKind::ObjectDeserializationError {
3189 error: e.to_string(),
3190 }
3191 })?;
3192
3193 let name = DynamicFieldName {
3194 type_: name_type,
3195 value: SuiMoveValue::from(name_value).to_json_value(),
3196 };
3197
3198 let value_metadata = field.value_metadata().map_err(|e| {
3199 warn!("{e}");
3200 SuiErrorKind::ObjectDeserializationError {
3201 error: e.to_string(),
3202 }
3203 })?;
3204
3205 Ok(Some(match value_metadata {
3206 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3207 name,
3208 bcs_name,
3209 type_,
3210 object_type: object_type.to_canonical_string(true),
3211 object_id: o.id(),
3212 version: o.version(),
3213 digest: o.digest(),
3214 },
3215
3216 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3217 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3221 let version = object.version();
3222 let digest = object.digest();
3223 let object_type = object.data.type_().unwrap().clone();
3224 (version, digest, object_type)
3225 } else {
3226 let object = self
3228 .get_object_store()
3229 .get_object_by_key(&object_id, o.version())
3230 .ok_or_else(|| UserInputError::ObjectNotFound {
3231 object_id,
3232 version: Some(o.version()),
3233 })?;
3234 let version = object.version();
3235 let digest = object.digest();
3236 let object_type = object.data.type_().unwrap().clone();
3237 (version, digest, object_type)
3238 };
3239
3240 DynamicFieldInfo {
3241 name,
3242 bcs_name,
3243 type_,
3244 object_type: object_type.to_string(),
3245 object_id,
3246 version,
3247 digest,
3248 }
3249 }
3250 }))
3251 }
3252
3253 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3254 fn post_process_one_tx(
3255 &self,
3256 certificate: &VerifiedExecutableTransaction,
3257 effects: &TransactionEffects,
3258 inner_temporary_store: &InnerTemporaryStore,
3259 epoch_store: &Arc<AuthorityPerEpochStore>,
3260 ) -> SuiResult {
3261 if self.indexes.is_none() {
3262 return Ok(());
3263 }
3264
3265 let _scope = monitored_scope("Execution::post_process_one_tx");
3266
3267 let tx_digest = certificate.digest();
3268 let timestamp_ms = Self::unixtime_now_ms();
3269 let events = &inner_temporary_store.events;
3270 let written = &inner_temporary_store.written;
3271 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3272 effects,
3273 inner_temporary_store,
3274 epoch_store,
3275 );
3276
3277 if let Some(indexes) = &self.indexes {
3279 let _ = self
3280 .index_tx(
3281 indexes.as_ref(),
3282 tx_digest,
3283 certificate,
3284 effects,
3285 events,
3286 timestamp_ms,
3287 tx_coins,
3288 written,
3289 inner_temporary_store,
3290 epoch_store,
3291 )
3292 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3293 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3294 .expect("Indexing tx should not fail");
3295
3296 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3297 let events = self.make_transaction_block_events(
3298 events.clone(),
3299 *tx_digest,
3300 timestamp_ms,
3301 epoch_store,
3302 inner_temporary_store,
3303 )?;
3304 self.subscription_handler
3306 .process_tx(certificate.data().transaction_data(), &effects, &events)
3307 .tap_ok(|_| {
3308 self.metrics
3309 .post_processing_total_tx_had_event_processed
3310 .inc()
3311 })
3312 .tap_err(|e| {
3313 warn!(
3314 ?tx_digest,
3315 "Post processing - Couldn't process events for tx: {}", e
3316 )
3317 })?;
3318
3319 self.metrics
3320 .post_processing_total_events_emitted
3321 .inc_by(events.data.len() as u64);
3322 };
3323 Ok(())
3324 }
3325
3326 fn make_transaction_block_events(
3327 &self,
3328 transaction_events: TransactionEvents,
3329 digest: TransactionDigest,
3330 timestamp_ms: u64,
3331 epoch_store: &Arc<AuthorityPerEpochStore>,
3332 inner_temporary_store: &InnerTemporaryStore,
3333 ) -> SuiResult<SuiTransactionBlockEvents> {
3334 let mut layout_resolver =
3335 epoch_store
3336 .executor()
3337 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3338 inner_temporary_store,
3339 self.get_backing_package_store(),
3340 )));
3341 SuiTransactionBlockEvents::try_from(
3342 transaction_events,
3343 digest,
3344 Some(timestamp_ms),
3345 layout_resolver.as_mut(),
3346 )
3347 }
3348
3349 pub fn unixtime_now_ms() -> u64 {
3350 let now = SystemTime::now()
3351 .duration_since(UNIX_EPOCH)
3352 .expect("Time went backwards")
3353 .as_millis();
3354 u64::try_from(now).expect("Travelling in time machine")
3355 }
3356
3357 #[instrument(level = "trace", skip_all)]
3361 pub async fn handle_transaction_info_request(
3362 &self,
3363 request: TransactionInfoRequest,
3364 ) -> SuiResult<TransactionInfoResponse> {
3365 let epoch_store = self.load_epoch_store_one_call_per_task();
3366 let (transaction, status) = self
3367 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3368 .ok_or(SuiErrorKind::TransactionNotFound {
3369 digest: request.transaction_digest,
3370 })?;
3371 Ok(TransactionInfoResponse {
3372 transaction,
3373 status,
3374 })
3375 }
3376
3377 #[instrument(level = "trace", skip_all)]
3378 pub async fn handle_object_info_request(
3379 &self,
3380 request: ObjectInfoRequest,
3381 ) -> SuiResult<ObjectInfoResponse> {
3382 let epoch_store = self.load_epoch_store_one_call_per_task();
3383
3384 let requested_object_seq = match request.request_kind {
3385 ObjectInfoRequestKind::LatestObjectInfo => {
3386 let (_, seq, _) = self
3387 .get_object_or_tombstone(request.object_id)
3388 .await
3389 .ok_or_else(|| {
3390 SuiError::from(UserInputError::ObjectNotFound {
3391 object_id: request.object_id,
3392 version: None,
3393 })
3394 })?;
3395 seq
3396 }
3397 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3398 };
3399
3400 let object = self
3401 .get_object_store()
3402 .get_object_by_key(&request.object_id, requested_object_seq)
3403 .ok_or_else(|| {
3404 SuiError::from(UserInputError::ObjectNotFound {
3405 object_id: request.object_id,
3406 version: Some(requested_object_seq),
3407 })
3408 })?;
3409
3410 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3411 (request.generate_layout, object.data.try_as_move())
3412 {
3413 Some(into_struct_layout(
3414 self.load_epoch_store_one_call_per_task()
3415 .executor()
3416 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3417 .get_annotated_layout(&move_obj.type_().clone().into())?,
3418 )?)
3419 } else {
3420 None
3421 };
3422
3423 let lock = if !object.is_address_owned() {
3424 None
3426 } else {
3427 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3428 .await?
3429 .map(|s| s.into_inner())
3430 };
3431
3432 Ok(ObjectInfoResponse {
3433 object,
3434 layout,
3435 lock_for_debugging: lock,
3436 })
3437 }
3438
3439 #[instrument(level = "trace", skip_all)]
3440 pub fn handle_checkpoint_request(
3441 &self,
3442 request: &CheckpointRequest,
3443 ) -> SuiResult<CheckpointResponse> {
3444 let summary = match request.sequence_number {
3445 Some(seq) => self
3446 .checkpoint_store
3447 .get_checkpoint_by_sequence_number(seq)?,
3448 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3449 }
3450 .map(|v| v.into_inner());
3451 let contents = match &summary {
3452 Some(s) => self
3453 .checkpoint_store
3454 .get_checkpoint_contents(&s.content_digest)?,
3455 None => None,
3456 };
3457 Ok(CheckpointResponse {
3458 checkpoint: summary,
3459 contents,
3460 })
3461 }
3462
3463 #[instrument(level = "trace", skip_all)]
3464 pub fn handle_checkpoint_request_v2(
3465 &self,
3466 request: &CheckpointRequestV2,
3467 ) -> SuiResult<CheckpointResponseV2> {
3468 let summary = if request.certified {
3469 let summary = match request.sequence_number {
3470 Some(seq) => self
3471 .checkpoint_store
3472 .get_checkpoint_by_sequence_number(seq)?,
3473 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3474 }
3475 .map(|v| v.into_inner());
3476 summary.map(CheckpointSummaryResponse::Certified)
3477 } else {
3478 let summary = match request.sequence_number {
3479 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3480 None => self
3481 .checkpoint_store
3482 .get_latest_locally_computed_checkpoint()?,
3483 };
3484 summary.map(CheckpointSummaryResponse::Pending)
3485 };
3486 let contents = match &summary {
3487 Some(s) => self
3488 .checkpoint_store
3489 .get_checkpoint_contents(&s.content_digest())?,
3490 None => None,
3491 };
3492 Ok(CheckpointResponseV2 {
3493 checkpoint: summary,
3494 contents,
3495 })
3496 }
3497
3498 fn check_protocol_version(
3499 supported_protocol_versions: SupportedProtocolVersions,
3500 current_version: ProtocolVersion,
3501 ) {
3502 info!("current protocol version is now {:?}", current_version);
3503 info!("supported versions are: {:?}", supported_protocol_versions);
3504 if !supported_protocol_versions.is_version_supported(current_version) {
3505 let msg = format!(
3506 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3507 current_version, supported_protocol_versions,
3508 );
3509
3510 error!("{}", msg);
3511 eprintln!("{}", msg);
3512
3513 #[cfg(not(msim))]
3514 std::process::exit(1);
3515
3516 #[cfg(msim)]
3517 sui_simulator::task::shutdown_current_node();
3518 }
3519 }
3520
3521 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3523 pub async fn new(
3524 name: AuthorityName,
3525 secret: StableSyncAuthoritySigner,
3526 supported_protocol_versions: SupportedProtocolVersions,
3527 store: Arc<AuthorityStore>,
3528 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3529 epoch_store: Arc<AuthorityPerEpochStore>,
3530 committee_store: Arc<CommitteeStore>,
3531 indexes: Option<Arc<IndexStore>>,
3532 rpc_index: Option<Arc<RpcIndexStore>>,
3533 checkpoint_store: Arc<CheckpointStore>,
3534 prometheus_registry: &Registry,
3535 genesis_objects: &[Object],
3536 db_checkpoint_config: &DBCheckpointConfig,
3537 config: NodeConfig,
3538 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3539 chain_identifier: ChainIdentifier,
3540 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3541 policy_config: Option<PolicyConfig>,
3542 firewall_config: Option<RemoteFirewallConfig>,
3543 pruner_watermarks: Arc<PrunerWatermarks>,
3544 ) -> Arc<Self> {
3545 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3546
3547 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3548 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3549 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3550 execution_cache_trait_pointers.object_cache_reader.clone(),
3551 execution_cache_trait_pointers.object_store.clone(),
3552 execution_cache_trait_pointers
3553 .transaction_cache_reader
3554 .clone(),
3555 tx_ready_certificates,
3556 &epoch_store,
3557 metrics.clone(),
3558 ));
3559 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3560
3561 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3562 epoch_store.get_parent_path(),
3563 &config.authority_store_pruning_config,
3564 );
3565 let _pruner = AuthorityStorePruner::new(
3566 store.perpetual_tables.clone(),
3567 checkpoint_store.clone(),
3568 rpc_index.clone(),
3569 indexes.clone(),
3570 config.authority_store_pruning_config.clone(),
3571 epoch_store.committee().authority_exists(&name),
3572 epoch_store.epoch_start_state().epoch_duration_ms(),
3573 prometheus_registry,
3574 pruner_db,
3575 pruner_watermarks,
3576 );
3577 let input_loader =
3578 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3579 let epoch = epoch_store.epoch();
3580 let traffic_controller_metrics =
3581 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3582 let traffic_controller = if let Some(policy_config) = policy_config {
3583 Some(Arc::new(
3584 TrafficController::init(
3585 policy_config,
3586 traffic_controller_metrics,
3587 firewall_config.clone(),
3588 )
3589 .await,
3590 ))
3591 } else {
3592 None
3593 };
3594
3595 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3596 ForkRecoveryState::new(Some(fork_config))
3597 .expect("Failed to initialize fork recovery state")
3598 });
3599
3600 let state = Arc::new(AuthorityState {
3601 name,
3602 secret,
3603 execution_lock: RwLock::new(epoch),
3604 epoch_store: ArcSwap::new(epoch_store.clone()),
3605 input_loader,
3606 execution_cache_trait_pointers,
3607 indexes,
3608 rpc_index,
3609 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3610 checkpoint_store,
3611 committee_store,
3612 execution_scheduler,
3613 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3614 metrics,
3615 _pruner,
3616 _authority_per_epoch_pruner,
3617 db_checkpoint_config: db_checkpoint_config.clone(),
3618 config,
3619 overload_info: AuthorityOverloadInfo::default(),
3620 validator_tx_finalizer,
3621 chain_identifier,
3622 congestion_tracker: Arc::new(CongestionTracker::new()),
3623 traffic_controller,
3624 fork_recovery_state,
3625 });
3626
3627 let state_clone = Arc::downgrade(&state);
3628 spawn_monitored_task!(fix_indexes(state_clone));
3629 let authority_state = Arc::downgrade(&state);
3631 spawn_monitored_task!(execution_process(
3632 authority_state,
3633 rx_ready_certificates,
3634 rx_execution_shutdown,
3635 ));
3636 state
3638 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3639 .expect("Error indexing genesis objects.");
3640
3641 state
3642 }
3643
3644 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3646 &self.execution_cache_trait_pointers.object_cache_reader
3647 }
3648
3649 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3650 &self.execution_cache_trait_pointers.transaction_cache_reader
3651 }
3652
3653 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3654 &self.execution_cache_trait_pointers.cache_writer
3655 }
3656
3657 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3658 &self.execution_cache_trait_pointers.backing_store
3659 }
3660
3661 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3662 &self.execution_cache_trait_pointers.child_object_resolver
3663 }
3664
3665 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3666 &self.execution_cache_trait_pointers.backing_package_store
3667 }
3668
3669 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3670 &self.execution_cache_trait_pointers.object_store
3671 }
3672
3673 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3674 &self.execution_cache_trait_pointers.reconfig_api
3675 }
3676
3677 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3678 &self.execution_cache_trait_pointers.global_state_hash_store
3679 }
3680
3681 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3682 &self.execution_cache_trait_pointers.checkpoint_cache
3683 }
3684
3685 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3686 &self.execution_cache_trait_pointers.state_sync_store
3687 }
3688
3689 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3690 &self.execution_cache_trait_pointers.cache_commit
3691 }
3692
3693 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3694 self.execution_cache_trait_pointers
3695 .testing_api
3696 .database_for_testing()
3697 }
3698
3699 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3700 &self,
3701 config: NodeConfig,
3702 metrics: Arc<AuthorityStorePruningMetrics>,
3703 ) -> anyhow::Result<()> {
3704 use crate::authority::authority_store_pruner::PrunerWatermarks;
3705 let watermarks = Arc::new(PrunerWatermarks::default());
3706 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3707 &self.database_for_testing().perpetual_tables,
3708 &self.checkpoint_store,
3709 self.rpc_index.as_deref(),
3710 None,
3711 config.authority_store_pruning_config,
3712 metrics,
3713 EPOCH_DURATION_MS_FOR_TESTING,
3714 &watermarks,
3715 )
3716 .await
3717 }
3718
3719 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3720 &self.execution_scheduler
3721 }
3722
3723 fn create_owner_index_if_empty(
3724 &self,
3725 genesis_objects: &[Object],
3726 epoch_store: &Arc<AuthorityPerEpochStore>,
3727 ) -> SuiResult {
3728 let Some(index_store) = &self.indexes else {
3729 return Ok(());
3730 };
3731 if !index_store.is_empty() {
3732 return Ok(());
3733 }
3734
3735 let mut new_owners = vec![];
3736 let mut new_dynamic_fields = vec![];
3737 let mut layout_resolver = epoch_store
3738 .executor()
3739 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3740 for o in genesis_objects.iter() {
3741 match o.owner {
3742 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3743 new_owners.push((
3744 (addr, o.id()),
3745 ObjectInfo::new(&o.compute_object_reference(), o),
3746 ))
3747 }
3748 Owner::ObjectOwner(object_id) => {
3749 let id = o.id();
3750 let Some(info) = self.try_create_dynamic_field_info(
3751 o,
3752 &BTreeMap::new(),
3753 layout_resolver.as_mut(),
3754 )?
3755 else {
3756 continue;
3757 };
3758 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3759 }
3760 _ => {}
3761 }
3762 }
3763
3764 index_store.insert_genesis_objects(ObjectIndexChanges {
3765 deleted_owners: vec![],
3766 deleted_dynamic_fields: vec![],
3767 new_owners,
3768 new_dynamic_fields,
3769 })
3770 }
3771
3772 pub fn execution_lock_for_executable_transaction(
3776 &self,
3777 transaction: &VerifiedExecutableTransaction,
3778 ) -> Option<ExecutionLockReadGuard<'_>> {
3779 let lock = self.execution_lock.try_read().ok()?;
3780 if *lock == transaction.auth_sig().epoch() {
3781 Some(lock)
3782 } else {
3783 None
3785 }
3786 }
3787
3788 pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3793 self.execution_lock
3794 .try_read()
3795 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3796 }
3797
3798 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3799 self.execution_lock.write().await
3800 }
3801
3802 #[instrument(level = "error", skip_all)]
3803 pub async fn reconfigure(
3804 &self,
3805 cur_epoch_store: &AuthorityPerEpochStore,
3806 supported_protocol_versions: SupportedProtocolVersions,
3807 new_committee: Committee,
3808 epoch_start_configuration: EpochStartConfiguration,
3809 state_hasher: Arc<GlobalStateHasher>,
3810 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3811 epoch_last_checkpoint: CheckpointSequenceNumber,
3812 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3813 Self::check_protocol_version(
3814 supported_protocol_versions,
3815 epoch_start_configuration
3816 .epoch_start_state()
3817 .protocol_version(),
3818 );
3819
3820 self.committee_store.insert_new_committee(&new_committee)?;
3821
3822 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3824
3825 cur_epoch_store.epoch_terminated().await;
3827
3828 {
3832 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3833 if state.should_accept_user_certs() {
3834 cur_epoch_store.close_user_certs(state);
3841 }
3842 }
3844
3845 self.get_reconfig_api()
3846 .clear_state_end_of_epoch(&execution_lock);
3847 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3848 self.maybe_reaccumulate_state_hash(
3849 cur_epoch_store,
3850 epoch_start_configuration
3851 .epoch_start_state()
3852 .protocol_version(),
3853 );
3854 self.get_reconfig_api()
3855 .set_epoch_start_configuration(&epoch_start_configuration);
3856 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3857 && self
3858 .db_checkpoint_config
3859 .perform_db_checkpoints_at_epoch_end
3860 {
3861 let checkpoint_indexes = self
3862 .db_checkpoint_config
3863 .perform_index_db_checkpoints_at_epoch_end
3864 .unwrap_or(false);
3865 let current_epoch = cur_epoch_store.epoch();
3866 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3867 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3868 }
3869
3870 self.get_reconfig_api()
3871 .reconfigure_cache(&epoch_start_configuration)
3872 .await;
3873
3874 let new_epoch = new_committee.epoch;
3875 let new_epoch_store = self
3876 .reopen_epoch_db(
3877 cur_epoch_store,
3878 new_committee,
3879 epoch_start_configuration,
3880 expensive_safety_check_config,
3881 epoch_last_checkpoint,
3882 )
3883 .await?;
3884 assert_eq!(new_epoch_store.epoch(), new_epoch);
3885 self.execution_scheduler.reconfigure(&new_epoch_store);
3886 *execution_lock = new_epoch;
3887 Ok(new_epoch_store)
3891 }
3892
3893 pub async fn reconfigure_for_testing(&self) {
3897 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3898 let epoch_store = self.epoch_store_for_testing().clone();
3899 let protocol_config = epoch_store.protocol_config().clone();
3900 let _guard =
3907 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3908 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3909 self.get_backing_package_store().clone(),
3910 self.get_object_store().clone(),
3911 &self.config.expensive_safety_check_config,
3912 self.checkpoint_store
3913 .get_epoch_last_checkpoint(epoch_store.epoch())
3914 .unwrap()
3915 .map(|c| *c.sequence_number())
3916 .unwrap_or_default(),
3917 );
3918 self.execution_scheduler.reconfigure(&new_epoch_store);
3919 let new_epoch = new_epoch_store.epoch();
3920 self.epoch_store.store(new_epoch_store);
3921 epoch_store.epoch_terminated().await;
3922
3923 *execution_lock = new_epoch;
3924 }
3925
3926 #[instrument(level = "error", skip_all)]
3929 fn maybe_reaccumulate_state_hash(
3930 &self,
3931 cur_epoch_store: &AuthorityPerEpochStore,
3932 new_protocol_version: ProtocolVersion,
3933 ) {
3934 self.get_reconfig_api()
3935 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
3936 }
3937
3938 #[instrument(level = "error", skip_all)]
3939 fn check_system_consistency(
3940 &self,
3941 cur_epoch_store: &AuthorityPerEpochStore,
3942 state_hasher: Arc<GlobalStateHasher>,
3943 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3944 ) {
3945 info!(
3946 "Performing sui conservation consistency check for epoch {}",
3947 cur_epoch_store.epoch()
3948 );
3949
3950 if let Err(err) = self
3951 .get_reconfig_api()
3952 .expensive_check_sui_conservation(cur_epoch_store)
3953 {
3954 if cfg!(debug_assertions) {
3955 panic!("{}", err);
3956 } else {
3957 warn!("Sui conservation consistency check failed: {}", err);
3960 }
3961 } else {
3962 info!("Sui conservation consistency check passed");
3963 }
3964
3965 if expensive_safety_check_config.enable_state_consistency_check() {
3967 info!(
3968 "Performing state consistency check for epoch {}",
3969 cur_epoch_store.epoch()
3970 );
3971 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
3972 }
3973
3974 if expensive_safety_check_config.enable_secondary_index_checks()
3975 && let Some(indexes) = self.indexes.clone()
3976 {
3977 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3978 .expect("secondary indexes are inconsistent");
3979 }
3980 }
3981
3982 fn expensive_check_is_consistent_state(
3983 &self,
3984 state_hasher: Arc<GlobalStateHasher>,
3985 cur_epoch_store: &AuthorityPerEpochStore,
3986 ) {
3987 let live_object_set_hash = state_hasher.digest_live_object_set(
3988 !cur_epoch_store
3989 .protocol_config()
3990 .simplified_unwrap_then_delete(),
3991 );
3992
3993 let root_state_hash: ECMHLiveObjectSetDigest = self
3994 .get_global_state_hash_store()
3995 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3996 .expect("Retrieving root state hash cannot fail")
3997 .expect("Root state hash for epoch must exist")
3998 .1
3999 .digest()
4000 .into();
4001
4002 let is_inconsistent = root_state_hash != live_object_set_hash;
4003 if is_inconsistent {
4004 debug_fatal!(
4005 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4006 root_state_hash,
4007 live_object_set_hash
4008 );
4009 } else {
4010 info!("State consistency check passed");
4011 }
4012
4013 state_hasher.set_inconsistent_state(is_inconsistent);
4014 }
4015
4016 pub fn current_epoch_for_testing(&self) -> EpochId {
4017 self.epoch_store_for_testing().epoch()
4018 }
4019
4020 #[instrument(level = "error", skip_all)]
4021 pub fn checkpoint_all_dbs(
4022 &self,
4023 checkpoint_path: &Path,
4024 cur_epoch_store: &AuthorityPerEpochStore,
4025 checkpoint_indexes: bool,
4026 ) -> SuiResult {
4027 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4028 let current_epoch = cur_epoch_store.epoch();
4029
4030 if checkpoint_path.exists() {
4031 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4032 return Ok(());
4033 }
4034
4035 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4036 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4037
4038 if checkpoint_path_tmp.exists() {
4039 fs::remove_dir_all(&checkpoint_path_tmp)
4040 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4041 }
4042
4043 fs::create_dir_all(&checkpoint_path_tmp)
4044 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4045 fs::create_dir(&store_checkpoint_path_tmp)
4046 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4047
4048 self.checkpoint_store
4051 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4052
4053 self.get_reconfig_api()
4054 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4055
4056 self.committee_store
4057 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4058
4059 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4060 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4061 }
4062
4063 fs::rename(checkpoint_path_tmp, checkpoint_path)
4064 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4065 Ok(())
4066 }
4067
4068 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4074 self.epoch_store.load()
4075 }
4076
4077 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4079 self.load_epoch_store_one_call_per_task()
4080 }
4081
4082 pub fn clone_committee_for_testing(&self) -> Committee {
4083 Committee::clone(self.epoch_store_for_testing().committee())
4084 }
4085
4086 #[instrument(level = "trace", skip_all)]
4087 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4088 self.get_object_store().get_object(object_id)
4089 }
4090
4091 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4092 Ok(self
4093 .get_object(&SUI_SYSTEM_ADDRESS.into())
4094 .await
4095 .expect("framework object should always exist")
4096 .compute_object_reference())
4097 }
4098
4099 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4101 self.get_object_cache_reader()
4102 .get_sui_system_state_object_unsafe()
4103 }
4104
4105 #[instrument(level = "trace", skip_all)]
4106 fn get_transaction_checkpoint_sequence(
4107 &self,
4108 digest: &TransactionDigest,
4109 epoch_store: &AuthorityPerEpochStore,
4110 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4111 epoch_store.get_transaction_checkpoint(digest)
4112 }
4113
4114 #[instrument(level = "trace", skip_all)]
4115 pub fn get_checkpoint_by_sequence_number(
4116 &self,
4117 sequence_number: CheckpointSequenceNumber,
4118 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4119 Ok(self
4120 .checkpoint_store
4121 .get_checkpoint_by_sequence_number(sequence_number)?)
4122 }
4123
4124 #[instrument(level = "trace", skip_all)]
4125 pub fn get_transaction_checkpoint_for_tests(
4126 &self,
4127 digest: &TransactionDigest,
4128 epoch_store: &AuthorityPerEpochStore,
4129 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4130 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4131 let Some(checkpoint) = checkpoint else {
4132 return Ok(None);
4133 };
4134 let checkpoint = self
4135 .checkpoint_store
4136 .get_checkpoint_by_sequence_number(checkpoint)?;
4137 Ok(checkpoint)
4138 }
4139
4140 #[instrument(level = "trace", skip_all)]
4141 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4142 Ok(
4143 match self
4144 .get_object_cache_reader()
4145 .get_latest_object_or_tombstone(*object_id)
4146 {
4147 Some((_, ObjectOrTombstone::Object(object))) => {
4148 let layout = self.get_object_layout(&object)?;
4149 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4150 }
4151 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4152 None => ObjectRead::NotExists(*object_id),
4153 },
4154 )
4155 }
4156
4157 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4159 self.chain_identifier
4160 }
4161
4162 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4163 self.fork_recovery_state.as_ref()
4164 }
4165
4166 #[instrument(level = "trace", skip_all)]
4167 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4168 where
4169 T: DeserializeOwned,
4170 {
4171 let o = self.get_object_read(object_id)?.into_object()?;
4172 if let Some(move_object) = o.data.try_as_move() {
4173 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4174 SuiErrorKind::ObjectDeserializationError {
4175 error: format!("{e}"),
4176 }
4177 })?)
4178 } else {
4179 Err(SuiErrorKind::ObjectDeserializationError {
4180 error: format!("Provided object : [{object_id}] is not a Move object."),
4181 }
4182 .into())
4183 }
4184 }
4185
4186 #[instrument(level = "trace", skip_all)]
4192 pub fn get_past_object_read(
4193 &self,
4194 object_id: &ObjectID,
4195 version: SequenceNumber,
4196 ) -> SuiResult<PastObjectRead> {
4197 let Some(obj_ref) = self
4199 .get_object_cache_reader()
4200 .get_latest_object_ref_or_tombstone(*object_id)
4201 else {
4202 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4203 };
4204
4205 if version > obj_ref.1 {
4206 return Ok(PastObjectRead::VersionTooHigh {
4207 object_id: *object_id,
4208 asked_version: version,
4209 latest_version: obj_ref.1,
4210 });
4211 }
4212
4213 if version < obj_ref.1 {
4214 return Ok(match self.read_object_at_version(object_id, version)? {
4216 Some((object, layout)) => {
4217 let obj_ref = object.compute_object_reference();
4218 PastObjectRead::VersionFound(obj_ref, object, layout)
4219 }
4220
4221 None => PastObjectRead::VersionNotFound(*object_id, version),
4222 });
4223 }
4224
4225 if !obj_ref.2.is_alive() {
4226 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4227 }
4228
4229 match self.read_object_at_version(object_id, obj_ref.1)? {
4230 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4231 None => {
4232 debug_fatal!(
4233 "Object with in parent_entry is missing from object store, datastore is \
4234 inconsistent"
4235 );
4236 Err(UserInputError::ObjectNotFound {
4237 object_id: *object_id,
4238 version: Some(obj_ref.1),
4239 }
4240 .into())
4241 }
4242 }
4243 }
4244
4245 #[instrument(level = "trace", skip_all)]
4246 fn read_object_at_version(
4247 &self,
4248 object_id: &ObjectID,
4249 version: SequenceNumber,
4250 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4251 let Some(object) = self
4252 .get_object_cache_reader()
4253 .get_object_by_key(object_id, version)
4254 else {
4255 return Ok(None);
4256 };
4257
4258 let layout = self.get_object_layout(&object)?;
4259 Ok(Some((object, layout)))
4260 }
4261
4262 fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4263 let layout = object
4264 .data
4265 .try_as_move()
4266 .map(|object| {
4267 into_struct_layout(
4268 self.load_epoch_store_one_call_per_task()
4269 .executor()
4270 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4272 .get_annotated_layout(&object.type_().clone().into())?,
4273 )
4274 })
4275 .transpose()?;
4276 Ok(layout)
4277 }
4278
4279 fn get_owner_at_version(
4280 &self,
4281 object_id: &ObjectID,
4282 version: SequenceNumber,
4283 ) -> SuiResult<Owner> {
4284 self.get_object_store()
4285 .get_object_by_key(object_id, version)
4286 .ok_or_else(|| {
4287 SuiError::from(UserInputError::ObjectNotFound {
4288 object_id: *object_id,
4289 version: Some(version),
4290 })
4291 })
4292 .map(|o| o.owner.clone())
4293 }
4294
4295 #[instrument(level = "trace", skip_all)]
4296 pub fn get_owner_objects(
4297 &self,
4298 owner: SuiAddress,
4299 cursor: Option<ObjectID>,
4301 limit: usize,
4302 filter: Option<SuiObjectDataFilter>,
4303 ) -> SuiResult<Vec<ObjectInfo>> {
4304 if let Some(indexes) = &self.indexes {
4305 indexes.get_owner_objects(owner, cursor, limit, filter)
4306 } else {
4307 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4308 }
4309 }
4310
4311 #[instrument(level = "trace", skip_all)]
4312 pub fn get_owned_coins_iterator_with_cursor(
4313 &self,
4314 owner: SuiAddress,
4315 cursor: (String, u64, ObjectID),
4317 limit: usize,
4318 one_coin_type_only: bool,
4319 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4320 if let Some(indexes) = &self.indexes {
4321 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4322 } else {
4323 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4324 }
4325 }
4326
4327 #[instrument(level = "trace", skip_all)]
4328 pub fn get_owner_objects_iterator(
4329 &self,
4330 owner: SuiAddress,
4331 cursor: Option<ObjectID>,
4333 filter: Option<SuiObjectDataFilter>,
4334 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4335 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4336 if let Some(indexes) = &self.indexes {
4337 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4338 } else {
4339 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4340 }
4341 }
4342
4343 #[instrument(level = "trace", skip_all)]
4344 pub async fn get_move_objects<T>(
4345 &self,
4346 owner: SuiAddress,
4347 type_: MoveObjectType,
4348 ) -> SuiResult<Vec<T>>
4349 where
4350 T: DeserializeOwned,
4351 {
4352 let object_ids = self
4353 .get_owner_objects_iterator(owner, None, None)?
4354 .filter(|o| match &o.type_ {
4355 ObjectType::Struct(s) => &type_ == s,
4356 ObjectType::Package => false,
4357 })
4358 .map(|info| ObjectKey(info.object_id, info.version))
4359 .collect::<Vec<_>>();
4360 let mut move_objects = vec![];
4361
4362 let objects = self
4363 .get_object_store()
4364 .multi_get_objects_by_key(&object_ids);
4365
4366 for (o, id) in objects.into_iter().zip(object_ids) {
4367 let object = o.ok_or_else(|| {
4368 SuiError::from(UserInputError::ObjectNotFound {
4369 object_id: id.0,
4370 version: Some(id.1),
4371 })
4372 })?;
4373 let move_object = object.data.try_as_move().ok_or_else(|| {
4374 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4375 })?;
4376 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4377 SuiErrorKind::ObjectDeserializationError {
4378 error: format!("{e}"),
4379 }
4380 })?);
4381 }
4382 Ok(move_objects)
4383 }
4384
4385 #[instrument(level = "trace", skip_all)]
4386 pub fn get_dynamic_fields(
4387 &self,
4388 owner: ObjectID,
4389 cursor: Option<ObjectID>,
4391 limit: usize,
4392 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4393 Ok(self
4394 .get_dynamic_fields_iterator(owner, cursor)?
4395 .take(limit)
4396 .collect::<Result<Vec<_>, _>>()?)
4397 }
4398
4399 fn get_dynamic_fields_iterator(
4400 &self,
4401 owner: ObjectID,
4402 cursor: Option<ObjectID>,
4404 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4405 {
4406 if let Some(indexes) = &self.indexes {
4407 indexes.get_dynamic_fields_iterator(owner, cursor)
4408 } else {
4409 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4410 }
4411 }
4412
4413 #[instrument(level = "trace", skip_all)]
4414 pub fn get_dynamic_field_object_id(
4415 &self,
4416 owner: ObjectID,
4417 name_type: TypeTag,
4418 name_bcs_bytes: &[u8],
4419 ) -> SuiResult<Option<ObjectID>> {
4420 if let Some(indexes) = &self.indexes {
4421 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4422 } else {
4423 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4424 }
4425 }
4426
4427 #[instrument(level = "trace", skip_all)]
4428 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4429 Ok(self.get_indexes()?.next_sequence_number())
4430 }
4431
4432 #[instrument(level = "trace", skip_all)]
4433 pub async fn get_executed_transaction_and_effects(
4434 &self,
4435 digest: TransactionDigest,
4436 kv_store: Arc<TransactionKeyValueStore>,
4437 ) -> SuiResult<(Transaction, TransactionEffects)> {
4438 let transaction = kv_store.get_tx(digest).await?;
4439 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4440 Ok((transaction, effects))
4441 }
4442
4443 #[instrument(level = "trace", skip_all)]
4444 pub fn multi_get_checkpoint_by_sequence_number(
4445 &self,
4446 sequence_numbers: &[CheckpointSequenceNumber],
4447 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4448 Ok(self
4449 .checkpoint_store
4450 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4451 }
4452
4453 #[instrument(level = "trace", skip_all)]
4454 pub fn get_transaction_events(
4455 &self,
4456 digest: &TransactionDigest,
4457 ) -> SuiResult<TransactionEvents> {
4458 self.get_transaction_cache_reader()
4459 .get_events(digest)
4460 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4461 }
4462
4463 pub fn get_transaction_input_objects(
4464 &self,
4465 effects: &TransactionEffects,
4466 ) -> SuiResult<Vec<Object>> {
4467 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4468 .map_err(Into::into)
4469 }
4470
4471 pub fn get_transaction_output_objects(
4472 &self,
4473 effects: &TransactionEffects,
4474 ) -> SuiResult<Vec<Object>> {
4475 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4476 .map_err(Into::into)
4477 }
4478
4479 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4480 match &self.indexes {
4481 Some(i) => Ok(i.clone()),
4482 None => Err(SuiErrorKind::UnsupportedFeatureError {
4483 error: "extended object indexing is not enabled on this server".into(),
4484 }
4485 .into()),
4486 }
4487 }
4488
4489 pub async fn get_transactions_for_tests(
4490 self: &Arc<Self>,
4491 filter: Option<TransactionFilter>,
4492 cursor: Option<TransactionDigest>,
4493 limit: Option<usize>,
4494 reverse: bool,
4495 ) -> SuiResult<Vec<TransactionDigest>> {
4496 let metrics = KeyValueStoreMetrics::new_for_tests();
4497 let kv_store = Arc::new(TransactionKeyValueStore::new(
4498 "rocksdb",
4499 metrics,
4500 self.clone(),
4501 ));
4502 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4503 .await
4504 }
4505
4506 #[instrument(level = "trace", skip_all)]
4507 pub async fn get_transactions(
4508 &self,
4509 kv_store: &Arc<TransactionKeyValueStore>,
4510 filter: Option<TransactionFilter>,
4511 cursor: Option<TransactionDigest>,
4513 limit: Option<usize>,
4514 reverse: bool,
4515 ) -> SuiResult<Vec<TransactionDigest>> {
4516 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4517 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4518 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4519 if reverse {
4520 let iter = iter
4521 .rev()
4522 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4523 .skip(usize::from(cursor.is_some()));
4524 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4525 } else {
4526 let iter = iter
4527 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4528 .skip(usize::from(cursor.is_some()));
4529 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4530 }
4531 }
4532 self.get_indexes()?
4533 .get_transactions(filter, cursor, limit, reverse)
4534 }
4535
4536 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4537 &self.checkpoint_store
4538 }
4539
4540 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4541 self.get_checkpoint_store()
4542 .get_highest_executed_checkpoint_seq_number()?
4543 .ok_or(
4544 SuiErrorKind::UserInputError {
4545 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4546 }
4547 .into(),
4548 )
4549 }
4550
4551 #[cfg(msim)]
4552 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4553 self.database_for_testing()
4554 .perpetual_tables
4555 .get_highest_pruned_checkpoint()
4556 .map(|c| c.unwrap_or(0))
4557 .map_err(Into::into)
4558 }
4559
4560 #[instrument(level = "trace", skip_all)]
4561 pub fn get_checkpoint_summary_by_sequence_number(
4562 &self,
4563 sequence_number: CheckpointSequenceNumber,
4564 ) -> SuiResult<CheckpointSummary> {
4565 let verified_checkpoint = self
4566 .get_checkpoint_store()
4567 .get_checkpoint_by_sequence_number(sequence_number)?;
4568 match verified_checkpoint {
4569 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4570 None => Err(SuiErrorKind::UserInputError {
4571 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4572 }
4573 .into()),
4574 }
4575 }
4576
4577 #[instrument(level = "trace", skip_all)]
4578 pub fn get_checkpoint_summary_by_digest(
4579 &self,
4580 digest: CheckpointDigest,
4581 ) -> SuiResult<CheckpointSummary> {
4582 let verified_checkpoint = self
4583 .get_checkpoint_store()
4584 .get_checkpoint_by_digest(&digest)?;
4585 match verified_checkpoint {
4586 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4587 None => Err(SuiErrorKind::UserInputError {
4588 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4589 }
4590 .into()),
4591 }
4592 }
4593
4594 #[instrument(level = "trace", skip_all)]
4595 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4596 if is_system_package(package_id) {
4597 return self.find_genesis_txn_digest();
4598 }
4599 Ok(self
4600 .get_object_read(&package_id)?
4601 .into_object()?
4602 .previous_transaction)
4603 }
4604
4605 #[instrument(level = "trace", skip_all)]
4606 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4607 let summary = self
4608 .get_verified_checkpoint_by_sequence_number(0)?
4609 .into_message();
4610 let content = self.get_checkpoint_contents(summary.content_digest)?;
4611 let genesis_transaction = content.enumerate_transactions(&summary).next();
4612 Ok(genesis_transaction
4613 .ok_or(SuiErrorKind::UserInputError {
4614 error: UserInputError::GenesisTransactionNotFound,
4615 })?
4616 .1
4617 .transaction)
4618 }
4619
4620 #[instrument(level = "trace", skip_all)]
4621 pub fn get_verified_checkpoint_by_sequence_number(
4622 &self,
4623 sequence_number: CheckpointSequenceNumber,
4624 ) -> SuiResult<VerifiedCheckpoint> {
4625 let verified_checkpoint = self
4626 .get_checkpoint_store()
4627 .get_checkpoint_by_sequence_number(sequence_number)?;
4628 match verified_checkpoint {
4629 Some(verified_checkpoint) => Ok(verified_checkpoint),
4630 None => Err(SuiErrorKind::UserInputError {
4631 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4632 }
4633 .into()),
4634 }
4635 }
4636
4637 #[instrument(level = "trace", skip_all)]
4638 pub fn get_verified_checkpoint_summary_by_digest(
4639 &self,
4640 digest: CheckpointDigest,
4641 ) -> SuiResult<VerifiedCheckpoint> {
4642 let verified_checkpoint = self
4643 .get_checkpoint_store()
4644 .get_checkpoint_by_digest(&digest)?;
4645 match verified_checkpoint {
4646 Some(verified_checkpoint) => Ok(verified_checkpoint),
4647 None => Err(SuiErrorKind::UserInputError {
4648 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4649 }
4650 .into()),
4651 }
4652 }
4653
4654 #[instrument(level = "trace", skip_all)]
4655 pub fn get_checkpoint_contents(
4656 &self,
4657 digest: CheckpointContentsDigest,
4658 ) -> SuiResult<CheckpointContents> {
4659 self.get_checkpoint_store()
4660 .get_checkpoint_contents(&digest)?
4661 .ok_or(
4662 SuiErrorKind::UserInputError {
4663 error: UserInputError::CheckpointContentsNotFound(digest),
4664 }
4665 .into(),
4666 )
4667 }
4668
4669 #[instrument(level = "trace", skip_all)]
4670 pub fn get_checkpoint_contents_by_sequence_number(
4671 &self,
4672 sequence_number: CheckpointSequenceNumber,
4673 ) -> SuiResult<CheckpointContents> {
4674 let verified_checkpoint = self
4675 .get_checkpoint_store()
4676 .get_checkpoint_by_sequence_number(sequence_number)?;
4677 match verified_checkpoint {
4678 Some(verified_checkpoint) => {
4679 let content_digest = verified_checkpoint.into_inner().content_digest;
4680 self.get_checkpoint_contents(content_digest)
4681 }
4682 None => Err(SuiErrorKind::UserInputError {
4683 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4684 }
4685 .into()),
4686 }
4687 }
4688
4689 #[instrument(level = "trace", skip_all)]
4690 pub async fn query_events(
4691 &self,
4692 kv_store: &Arc<TransactionKeyValueStore>,
4693 query: EventFilter,
4694 cursor: Option<EventID>,
4696 limit: usize,
4697 descending: bool,
4698 ) -> SuiResult<Vec<SuiEvent>> {
4699 let index_store = self.get_indexes()?;
4700
4701 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4703 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4704 SuiErrorKind::TransactionNotFound {
4705 digest: cursor.tx_digest,
4706 },
4707 )?;
4708 (tx_seq, cursor.event_seq as usize)
4709 } else if descending {
4710 (u64::MAX, usize::MAX)
4711 } else {
4712 (0, 0)
4713 };
4714
4715 let limit = limit + 1;
4716 let mut event_keys = match query {
4717 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4718 EventFilter::Transaction(digest) => {
4719 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4720 }
4721 EventFilter::MoveModule { package, module } => {
4722 let module_id = ModuleId::new(package.into(), module);
4723 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4724 }
4725 EventFilter::MoveEventType(struct_name) => index_store
4726 .events_by_move_event_struct_name(
4727 &struct_name,
4728 tx_num,
4729 event_num,
4730 limit,
4731 descending,
4732 )?,
4733 EventFilter::Sender(sender) => {
4734 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4735 }
4736 EventFilter::TimeRange {
4737 start_time,
4738 end_time,
4739 } => index_store
4740 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4741 EventFilter::MoveEventModule { package, module } => index_store
4742 .events_by_move_event_module(
4743 &ModuleId::new(package.into(), module),
4744 tx_num,
4745 event_num,
4746 limit,
4747 descending,
4748 )?,
4749 EventFilter::Any(_) => {
4751 return Err(SuiErrorKind::UserInputError {
4752 error: UserInputError::Unsupported(
4753 "'Any' queries are not supported by the fullnode.".to_string(),
4754 ),
4755 }
4756 .into());
4757 }
4758 };
4759
4760 if cursor.is_some() {
4763 if !event_keys.is_empty() {
4764 event_keys.remove(0);
4765 }
4766 } else {
4767 event_keys.truncate(limit - 1);
4768 }
4769
4770 let transaction_digests = event_keys
4772 .iter()
4773 .map(|(_, digest, _, _)| *digest)
4774 .collect::<HashSet<_>>()
4775 .into_iter()
4776 .collect::<Vec<_>>();
4777
4778 let events = kv_store
4779 .multi_get_events_by_tx_digests(&transaction_digests)
4780 .await?;
4781
4782 let events_map: HashMap<_, _> =
4783 transaction_digests.iter().zip(events.into_iter()).collect();
4784
4785 let stored_events = event_keys
4786 .into_iter()
4787 .map(|k| {
4788 (
4789 k,
4790 events_map
4791 .get(&k.1)
4792 .expect("fetched digest is missing")
4793 .clone()
4794 .and_then(|e| e.data.get(k.2).cloned()),
4795 )
4796 })
4797 .map(
4798 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4799 event
4800 .map(|e| (e, tx_digest, event_seq, timestamp))
4801 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4802 },
4803 )
4804 .collect::<Result<Vec<_>, _>>()?;
4805
4806 let epoch_store = self.load_epoch_store_one_call_per_task();
4807 let backing_store = self.get_backing_package_store().as_ref();
4808 let mut layout_resolver = epoch_store
4809 .executor()
4810 .type_layout_resolver(Box::new(backing_store));
4811 let mut events = vec![];
4812 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4813 events.push(SuiEvent::try_from(
4814 e.clone(),
4815 tx_digest,
4816 event_seq as u64,
4817 Some(timestamp),
4818 layout_resolver.get_annotated_layout(&e.type_)?,
4819 )?)
4820 }
4821 Ok(events)
4822 }
4823
4824 pub async fn insert_genesis_object(&self, object: Object) {
4825 self.get_reconfig_api().insert_genesis_object(object);
4826 }
4827
4828 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4829 futures::future::join_all(
4830 objects
4831 .iter()
4832 .map(|o| self.insert_genesis_object(o.clone())),
4833 )
4834 .await;
4835 }
4836
4837 #[instrument(level = "trace", skip_all)]
4839 pub fn get_transaction_status(
4840 &self,
4841 transaction_digest: &TransactionDigest,
4842 epoch_store: &Arc<AuthorityPerEpochStore>,
4843 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
4844 if let Some(effects) =
4846 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4847 {
4848 if let Some(transaction) = self
4849 .get_transaction_cache_reader()
4850 .get_transaction_block(transaction_digest)
4851 {
4852 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4853 let events = if effects.events_digest().is_some() {
4854 self.get_transaction_events(effects.transaction_digest())?
4855 } else {
4856 TransactionEvents::default()
4857 };
4858 return Ok(Some((
4859 (*transaction).clone().into_message(),
4860 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4861 )));
4862 } else {
4863 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4867 }
4868 }
4869 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4870 self.metrics.tx_already_processed.inc();
4871 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4872 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4873 } else {
4874 Ok(None)
4875 }
4876 }
4877
4878 #[instrument(level = "trace", skip_all)]
4882 pub fn get_signed_effects_and_maybe_resign(
4883 &self,
4884 transaction_digest: &TransactionDigest,
4885 epoch_store: &Arc<AuthorityPerEpochStore>,
4886 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
4887 let effects = self
4888 .get_transaction_cache_reader()
4889 .get_executed_effects(transaction_digest);
4890 match effects {
4891 Some(effects) => {
4892 if effects.executed_epoch() != epoch_store.epoch() {
4912 debug!(
4913 tx_digest=?transaction_digest,
4914 effects_epoch=?effects.executed_epoch(),
4915 epoch=?epoch_store.epoch(),
4916 "Re-signing the effects with the current epoch"
4917 );
4918 }
4919 Ok(Some(self.sign_effects(effects, epoch_store)?))
4920 }
4921 None => Ok(None),
4922 }
4923 }
4924
4925 #[instrument(level = "trace", skip_all)]
4926 pub(crate) fn sign_effects(
4927 &self,
4928 effects: TransactionEffects,
4929 epoch_store: &Arc<AuthorityPerEpochStore>,
4930 ) -> SuiResult<VerifiedSignedTransactionEffects> {
4931 let tx_digest = *effects.transaction_digest();
4932 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4933 Some(sig) => {
4934 debug_assert!(sig.epoch == epoch_store.epoch());
4935 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4936 }
4937 _ => {
4938 let sig = AuthoritySignInfo::new(
4939 epoch_store.epoch(),
4940 &effects,
4941 Intent::sui_app(IntentScope::TransactionEffects),
4942 self.name,
4943 &*self.secret,
4944 );
4945
4946 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4947
4948 epoch_store.insert_effects_digest_and_signature(
4949 &tx_digest,
4950 effects.digest(),
4951 &sig,
4952 )?;
4953
4954 effects
4955 }
4956 };
4957
4958 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4959 signed_effects,
4960 ))
4961 }
4962
4963 #[instrument(level = "trace", skip_all)]
4965 fn fullnode_only_get_tx_coins_for_indexing(
4966 &self,
4967 effects: &TransactionEffects,
4968 inner_temporary_store: &InnerTemporaryStore,
4969 epoch_store: &Arc<AuthorityPerEpochStore>,
4970 ) -> Option<TxCoins> {
4971 if self.indexes.is_none() || self.is_validator(epoch_store) {
4972 return None;
4973 }
4974 let written_coin_objects = inner_temporary_store
4975 .written
4976 .iter()
4977 .filter_map(|(k, v)| {
4978 if v.is_coin() {
4979 Some((*k, v.clone()))
4980 } else {
4981 None
4982 }
4983 })
4984 .collect();
4985 let mut input_coin_objects = inner_temporary_store
4986 .input_objects
4987 .iter()
4988 .filter_map(|(k, v)| {
4989 if v.is_coin() {
4990 Some((*k, v.clone()))
4991 } else {
4992 None
4993 }
4994 })
4995 .collect::<ObjectMap>();
4996
4997 for (object_id, version) in effects.modified_at_versions() {
5001 if inner_temporary_store
5002 .loaded_runtime_objects
5003 .contains_key(&object_id)
5004 && let Some(object) = self
5005 .get_object_store()
5006 .get_object_by_key(&object_id, version)
5007 && object.is_coin()
5008 {
5009 input_coin_objects.insert(object_id, object);
5010 }
5011 }
5012
5013 Some((input_coin_objects, written_coin_objects))
5014 }
5015
5016 #[instrument(level = "trace", skip_all)]
5024 pub async fn get_transaction_lock(
5025 &self,
5026 object_ref: &ObjectRef,
5027 epoch_store: &AuthorityPerEpochStore,
5028 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5029 let lock_info = self
5030 .get_object_cache_reader()
5031 .get_lock(*object_ref, epoch_store)?;
5032 let lock_info = match lock_info {
5033 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5034 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5035 provided_obj_ref: *object_ref,
5036 current_version: locked_ref.1,
5037 }
5038 .into());
5039 }
5040 ObjectLockStatus::Initialized => {
5041 return Ok(None);
5042 }
5043 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5044 };
5045
5046 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5047 }
5048
5049 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5050 self.get_object_cache_reader().get_objects(objects)
5051 }
5052
5053 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5054 self.get_object_cache_reader()
5055 .get_latest_object_ref_or_tombstone(object_id)
5056 }
5057
5058 pub fn set_override_protocol_upgrade_buffer_stake(
5067 &self,
5068 expected_epoch: EpochId,
5069 buffer_stake_bps: u64,
5070 ) -> SuiResult {
5071 let epoch_store = self.load_epoch_store_one_call_per_task();
5072 let actual_epoch = epoch_store.epoch();
5073 if actual_epoch != expected_epoch {
5074 return Err(SuiErrorKind::WrongEpoch {
5075 expected_epoch,
5076 actual_epoch,
5077 }
5078 .into());
5079 }
5080
5081 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5082 }
5083
5084 pub fn clear_override_protocol_upgrade_buffer_stake(
5085 &self,
5086 expected_epoch: EpochId,
5087 ) -> SuiResult {
5088 let epoch_store = self.load_epoch_store_one_call_per_task();
5089 let actual_epoch = epoch_store.epoch();
5090 if actual_epoch != expected_epoch {
5091 return Err(SuiErrorKind::WrongEpoch {
5092 expected_epoch,
5093 actual_epoch,
5094 }
5095 .into());
5096 }
5097
5098 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5099 }
5100
5101 pub async fn get_available_system_packages(
5104 &self,
5105 binary_config: &BinaryConfig,
5106 ) -> Vec<ObjectRef> {
5107 let mut results = vec![];
5108
5109 let system_packages = BuiltInFramework::iter_system_packages();
5110
5111 #[cfg(msim)]
5113 let extra_packages = framework_injection::get_extra_packages(self.name);
5114 #[cfg(msim)]
5115 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5116
5117 for system_package in system_packages {
5118 let modules = system_package.modules().to_vec();
5119 #[cfg(msim)]
5121 let modules =
5122 match framework_injection::get_override_modules(&system_package.id, self.name) {
5123 Some(overrides) if overrides.is_empty() => continue,
5124 Some(overrides) => overrides,
5125 None => modules,
5126 };
5127
5128 let Some(obj_ref) = sui_framework::compare_system_package(
5129 &self.get_object_store(),
5130 &system_package.id,
5131 &modules,
5132 system_package.dependencies.to_vec(),
5133 binary_config,
5134 )
5135 .await
5136 else {
5137 return vec![];
5138 };
5139 results.push(obj_ref);
5140 }
5141
5142 results
5143 }
5144
5145 async fn get_system_package_bytes(
5159 &self,
5160 system_packages: Vec<ObjectRef>,
5161 binary_config: &BinaryConfig,
5162 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5163 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5164 let objects = self.get_objects(&ids).await;
5165
5166 let mut res = Vec::with_capacity(system_packages.len());
5167 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5168 let prev_transaction = match object {
5169 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5170 info!("Framework {} does not need updating", system_package_ref.0);
5172 continue;
5173 }
5174
5175 Some(cur_object) => cur_object.previous_transaction,
5176 None => TransactionDigest::genesis_marker(),
5177 };
5178
5179 #[cfg(msim)]
5180 let SystemPackage {
5181 id: _,
5182 bytes,
5183 dependencies,
5184 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5185 .unwrap_or_else(|| {
5186 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5187 });
5188
5189 #[cfg(not(msim))]
5190 let SystemPackage {
5191 id: _,
5192 bytes,
5193 dependencies,
5194 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5195
5196 let modules: Vec<_> = bytes
5197 .iter()
5198 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5199 .collect();
5200
5201 let new_object = Object::new_system_package(
5202 &modules,
5203 system_package_ref.1,
5204 dependencies.clone(),
5205 prev_transaction,
5206 );
5207
5208 let new_ref = new_object.compute_object_reference();
5209 if new_ref != system_package_ref {
5210 if cfg!(msim) {
5211 debug_fatal!(
5213 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5214 );
5215 } else {
5216 error!(
5217 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5218 );
5219 }
5220 return None;
5221 }
5222
5223 res.push((system_package_ref.1, bytes, dependencies));
5224 }
5225
5226 Some(res)
5227 }
5228
5229 fn is_protocol_version_supported_v1(
5231 current_protocol_version: ProtocolVersion,
5232 proposed_protocol_version: ProtocolVersion,
5233 protocol_config: &ProtocolConfig,
5234 committee: &Committee,
5235 capabilities: Vec<AuthorityCapabilitiesV1>,
5236 mut buffer_stake_bps: u64,
5237 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5238 if proposed_protocol_version > current_protocol_version + 1
5239 && !protocol_config.advance_to_highest_supported_protocol_version()
5240 {
5241 return None;
5242 }
5243
5244 if buffer_stake_bps > 10000 {
5245 warn!("clamping buffer_stake_bps to 10000");
5246 buffer_stake_bps = 10000;
5247 }
5248
5249 let mut desired_upgrades: Vec<_> = capabilities
5252 .into_iter()
5253 .filter_map(|mut cap| {
5254 if cap.available_system_packages.is_empty() {
5256 return None;
5257 }
5258
5259 cap.available_system_packages.sort();
5260
5261 info!(
5262 "validator {:?} supports {:?} with system packages: {:?}",
5263 cap.authority.concise(),
5264 cap.supported_protocol_versions,
5265 cap.available_system_packages,
5266 );
5267
5268 cap.supported_protocol_versions
5272 .is_version_supported(proposed_protocol_version)
5273 .then_some((cap.available_system_packages, cap.authority))
5274 })
5275 .collect();
5276
5277 desired_upgrades.sort();
5279 desired_upgrades
5280 .into_iter()
5281 .chunk_by(|(packages, _authority)| packages.clone())
5282 .into_iter()
5283 .find_map(|(packages, group)| {
5284 assert!(!packages.is_empty());
5286
5287 let mut stake_aggregator: StakeAggregator<(), true> =
5288 StakeAggregator::new(Arc::new(committee.clone()));
5289
5290 for (_, authority) in group {
5291 stake_aggregator.insert_generic(authority, ());
5292 }
5293
5294 let total_votes = stake_aggregator.total_votes();
5295 let quorum_threshold = committee.quorum_threshold();
5296 let f = committee.total_votes() - committee.quorum_threshold();
5297
5298 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5300 let effective_threshold = quorum_threshold + buffer_stake;
5301
5302 info!(
5303 ?total_votes,
5304 ?quorum_threshold,
5305 ?buffer_stake_bps,
5306 ?effective_threshold,
5307 ?proposed_protocol_version,
5308 ?packages,
5309 "support for upgrade"
5310 );
5311
5312 let has_support = total_votes >= effective_threshold;
5313 has_support.then_some((proposed_protocol_version, packages))
5314 })
5315 }
5316
5317 fn is_protocol_version_supported_v2(
5318 current_protocol_version: ProtocolVersion,
5319 proposed_protocol_version: ProtocolVersion,
5320 protocol_config: &ProtocolConfig,
5321 committee: &Committee,
5322 capabilities: Vec<AuthorityCapabilitiesV2>,
5323 mut buffer_stake_bps: u64,
5324 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5325 if proposed_protocol_version > current_protocol_version + 1
5326 && !protocol_config.advance_to_highest_supported_protocol_version()
5327 {
5328 return None;
5329 }
5330
5331 if buffer_stake_bps > 10000 {
5332 warn!("clamping buffer_stake_bps to 10000");
5333 buffer_stake_bps = 10000;
5334 }
5335
5336 let mut desired_upgrades: Vec<_> = capabilities
5339 .into_iter()
5340 .filter_map(|mut cap| {
5341 if cap.available_system_packages.is_empty() {
5343 return None;
5344 }
5345
5346 cap.available_system_packages.sort();
5347
5348 info!(
5349 "validator {:?} supports {:?} with system packages: {:?}",
5350 cap.authority.concise(),
5351 cap.supported_protocol_versions,
5352 cap.available_system_packages,
5353 );
5354
5355 cap.supported_protocol_versions
5359 .get_version_digest(proposed_protocol_version)
5360 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5361 })
5362 .collect();
5363
5364 desired_upgrades.sort();
5366 desired_upgrades
5367 .into_iter()
5368 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5369 .into_iter()
5370 .find_map(|((digest, packages), group)| {
5371 assert!(!packages.is_empty());
5373
5374 let mut stake_aggregator: StakeAggregator<(), true> =
5375 StakeAggregator::new(Arc::new(committee.clone()));
5376
5377 for (_, _, authority) in group {
5378 stake_aggregator.insert_generic(authority, ());
5379 }
5380
5381 let total_votes = stake_aggregator.total_votes();
5382 let quorum_threshold = committee.quorum_threshold();
5383 let f = committee.total_votes() - committee.quorum_threshold();
5384
5385 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5387 let effective_threshold = quorum_threshold + buffer_stake;
5388
5389 info!(
5390 protocol_config_digest = ?digest,
5391 ?total_votes,
5392 ?quorum_threshold,
5393 ?buffer_stake_bps,
5394 ?effective_threshold,
5395 ?proposed_protocol_version,
5396 ?packages,
5397 "support for upgrade"
5398 );
5399
5400 let has_support = total_votes >= effective_threshold;
5401 has_support.then_some((proposed_protocol_version, packages))
5402 })
5403 }
5404
5405 fn choose_protocol_version_and_system_packages_v1(
5407 current_protocol_version: ProtocolVersion,
5408 protocol_config: &ProtocolConfig,
5409 committee: &Committee,
5410 capabilities: Vec<AuthorityCapabilitiesV1>,
5411 buffer_stake_bps: u64,
5412 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5413 let mut next_protocol_version = current_protocol_version;
5414 let mut system_packages = vec![];
5415
5416 while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
5417 current_protocol_version,
5418 next_protocol_version + 1,
5419 protocol_config,
5420 committee,
5421 capabilities.clone(),
5422 buffer_stake_bps,
5423 ) {
5424 next_protocol_version = version;
5425 system_packages = packages;
5426 }
5427
5428 (next_protocol_version, system_packages)
5429 }
5430
5431 fn choose_protocol_version_and_system_packages_v2(
5432 current_protocol_version: ProtocolVersion,
5433 protocol_config: &ProtocolConfig,
5434 committee: &Committee,
5435 capabilities: Vec<AuthorityCapabilitiesV2>,
5436 buffer_stake_bps: u64,
5437 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5438 let mut next_protocol_version = current_protocol_version;
5439 let mut system_packages = vec![];
5440
5441 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5442 current_protocol_version,
5443 next_protocol_version + 1,
5444 protocol_config,
5445 committee,
5446 capabilities.clone(),
5447 buffer_stake_bps,
5448 ) {
5449 next_protocol_version = version;
5450 system_packages = packages;
5451 }
5452
5453 (next_protocol_version, system_packages)
5454 }
5455
5456 #[instrument(level = "debug", skip_all)]
5457 fn create_authenticator_state_tx(
5458 &self,
5459 epoch_store: &Arc<AuthorityPerEpochStore>,
5460 ) -> Option<EndOfEpochTransactionKind> {
5461 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5462 info!("authenticator state transactions not enabled");
5463 return None;
5464 }
5465
5466 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5467 let tx = if authenticator_state_exists {
5468 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5469 let min_epoch =
5470 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5471 let authenticator_obj_initial_shared_version = epoch_store
5472 .epoch_start_config()
5473 .authenticator_obj_initial_shared_version()
5474 .expect("initial version must exist");
5475
5476 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5477 min_epoch,
5478 authenticator_obj_initial_shared_version,
5479 );
5480
5481 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5482
5483 tx
5484 } else {
5485 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5486 info!("Creating AuthenticatorStateCreate tx");
5487 tx
5488 };
5489 Some(tx)
5490 }
5491
5492 #[instrument(level = "debug", skip_all)]
5493 fn create_randomness_state_tx(
5494 &self,
5495 epoch_store: &Arc<AuthorityPerEpochStore>,
5496 ) -> Option<EndOfEpochTransactionKind> {
5497 if !epoch_store.protocol_config().random_beacon() {
5498 info!("randomness state transactions not enabled");
5499 return None;
5500 }
5501
5502 if epoch_store.randomness_state_exists() {
5503 return None;
5504 }
5505
5506 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5507 info!("Creating RandomnessStateCreate tx");
5508 Some(tx)
5509 }
5510
5511 #[instrument(level = "debug", skip_all)]
5512 fn create_accumulator_root_tx(
5513 &self,
5514 epoch_store: &Arc<AuthorityPerEpochStore>,
5515 ) -> Option<EndOfEpochTransactionKind> {
5516 if !epoch_store
5517 .protocol_config()
5518 .create_root_accumulator_object()
5519 {
5520 info!("accumulator root creation not enabled");
5521 return None;
5522 }
5523
5524 if epoch_store.accumulator_root_exists() {
5525 return None;
5526 }
5527
5528 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5529 info!("Creating AccumulatorRootCreate tx");
5530 Some(tx)
5531 }
5532
5533 #[instrument(level = "debug", skip_all)]
5534 fn create_coin_registry_tx(
5535 &self,
5536 epoch_store: &Arc<AuthorityPerEpochStore>,
5537 ) -> Option<EndOfEpochTransactionKind> {
5538 if !epoch_store.protocol_config().enable_coin_registry() {
5539 info!("coin registry not enabled");
5540 return None;
5541 }
5542
5543 if epoch_store.coin_registry_exists() {
5544 return None;
5545 }
5546
5547 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5548 info!("Creating CoinRegistryCreate tx");
5549 Some(tx)
5550 }
5551
5552 #[instrument(level = "debug", skip_all)]
5553 fn create_display_registry_tx(
5554 &self,
5555 epoch_store: &Arc<AuthorityPerEpochStore>,
5556 ) -> Option<EndOfEpochTransactionKind> {
5557 if !epoch_store.protocol_config().enable_display_registry() {
5558 info!("display registry not enabled");
5559 return None;
5560 }
5561
5562 if epoch_store.display_registry_exists() {
5563 return None;
5564 }
5565
5566 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5567 info!("Creating DisplayRegistryCreate tx");
5568 Some(tx)
5569 }
5570
5571 #[instrument(level = "debug", skip_all)]
5572 fn create_bridge_tx(
5573 &self,
5574 epoch_store: &Arc<AuthorityPerEpochStore>,
5575 ) -> Option<EndOfEpochTransactionKind> {
5576 if !epoch_store.protocol_config().enable_bridge() {
5577 info!("bridge not enabled");
5578 return None;
5579 }
5580 if epoch_store.bridge_exists() {
5581 return None;
5582 }
5583 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5584 info!("Creating Bridge Create tx");
5585 Some(tx)
5586 }
5587
5588 #[instrument(level = "debug", skip_all)]
5589 fn init_bridge_committee_tx(
5590 &self,
5591 epoch_store: &Arc<AuthorityPerEpochStore>,
5592 ) -> Option<EndOfEpochTransactionKind> {
5593 if !epoch_store.protocol_config().enable_bridge() {
5594 info!("bridge not enabled");
5595 return None;
5596 }
5597 if !epoch_store
5598 .protocol_config()
5599 .should_try_to_finalize_bridge_committee()
5600 {
5601 info!("should not try to finalize bridge committee yet");
5602 return None;
5603 }
5604 if !epoch_store.bridge_exists() {
5606 return None;
5607 }
5608
5609 if epoch_store.bridge_committee_initiated() {
5610 return None;
5611 }
5612
5613 let bridge_initial_shared_version = epoch_store
5614 .epoch_start_config()
5615 .bridge_obj_initial_shared_version()
5616 .expect("initial version must exist");
5617 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5618 info!("Init Bridge committee tx");
5619 Some(tx)
5620 }
5621
5622 #[instrument(level = "debug", skip_all)]
5623 fn create_deny_list_state_tx(
5624 &self,
5625 epoch_store: &Arc<AuthorityPerEpochStore>,
5626 ) -> Option<EndOfEpochTransactionKind> {
5627 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5628 return None;
5629 }
5630
5631 if epoch_store.coin_deny_list_state_exists() {
5632 return None;
5633 }
5634
5635 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5636 info!("Creating DenyListStateCreate tx");
5637 Some(tx)
5638 }
5639
5640 #[instrument(level = "debug", skip_all)]
5641 fn create_execution_time_observations_tx(
5642 &self,
5643 epoch_store: &Arc<AuthorityPerEpochStore>,
5644 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5645 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5646 ) -> Option<EndOfEpochTransactionKind> {
5647 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5648 .protocol_config()
5649 .per_object_congestion_control_mode()
5650 else {
5651 return None;
5652 };
5653
5654 let start_checkpoint = std::cmp::max(
5657 last_checkpoint_before_end_of_epoch
5658 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5659 epoch_store
5661 .epoch()
5662 .checked_sub(1)
5663 .map(|prev_epoch| {
5664 self.checkpoint_store
5665 .get_epoch_last_checkpoint_seq_number(prev_epoch)
5666 .expect("typed store must not fail")
5667 .expect(
5668 "sequence number of last checkpoint of preceding epoch must be saved",
5669 )
5670 + 1
5671 })
5672 .unwrap_or(1),
5673 );
5674 info!(
5675 "reading checkpoint range {:?}..={:?}",
5676 start_checkpoint, last_checkpoint_before_end_of_epoch
5677 );
5678 let sequence_numbers =
5679 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5680 let contents_digests: Vec<_> = self
5681 .checkpoint_store
5682 .multi_get_locally_computed_checkpoints(&sequence_numbers)
5683 .expect("typed store must not fail")
5684 .into_iter()
5685 .zip(sequence_numbers)
5686 .map(|(maybe_checkpoint, sequence_number)| {
5687 if let Some(checkpoint) = maybe_checkpoint {
5688 checkpoint.content_digest
5689 } else {
5690 self.checkpoint_store
5693 .get_checkpoint_by_sequence_number(sequence_number)
5694 .expect("typed store must not fail")
5695 .unwrap_or_else(|| {
5696 fatal!("preceding checkpoints must exist by end of epoch")
5697 })
5698 .data()
5699 .content_digest
5700 }
5701 })
5702 .collect();
5703 let tx_digests: Vec<_> = self
5704 .checkpoint_store
5705 .multi_get_checkpoint_content(&contents_digests)
5706 .expect("typed store must not fail")
5707 .into_iter()
5708 .flat_map(|maybe_contents| {
5709 maybe_contents
5710 .expect("preceding checkpoint contents must exist by end of epoch")
5711 .into_inner()
5712 .into_iter()
5713 .map(|digests| digests.transaction)
5714 })
5715 .collect();
5716 let included_execution_time_observations: HashSet<_> = self
5717 .get_transaction_cache_reader()
5718 .multi_get_transaction_blocks(&tx_digests)
5719 .into_iter()
5720 .flat_map(|maybe_tx| {
5721 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5722 .expect("preceding transaction must exist by end of epoch")
5723 .transaction_data()
5724 .kind()
5725 {
5726 #[allow(clippy::unnecessary_to_owned)]
5727 itertools::Either::Left(
5728 ptb.commands
5729 .to_owned()
5730 .into_iter()
5731 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5732 )
5733 } else {
5734 itertools::Either::Right(std::iter::empty())
5735 }
5736 })
5737 .chain(end_of_epoch_observation_keys.into_iter())
5738 .collect();
5739
5740 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5741 epoch_store
5742 .get_end_of_epoch_execution_time_observations()
5743 .filter_and_sort_v1(
5744 |(key, _)| included_execution_time_observations.contains(key),
5745 params.stored_observations_limit.try_into().unwrap(),
5746 ),
5747 );
5748 info!("Creating StoreExecutionTimeObservations tx");
5749 Some(tx)
5750 }
5751
5752 #[instrument(level = "error", skip_all)]
5763 pub async fn create_and_execute_advance_epoch_tx(
5764 &self,
5765 epoch_store: &Arc<AuthorityPerEpochStore>,
5766 gas_cost_summary: &GasCostSummary,
5767 checkpoint: CheckpointSequenceNumber,
5768 epoch_start_timestamp_ms: CheckpointTimestamp,
5769 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5770 last_checkpoint: CheckpointSequenceNumber,
5773 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5774 let mut txns = Vec::new();
5775
5776 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5777 txns.push(tx);
5778 }
5779 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5780 txns.push(tx);
5781 }
5782 if let Some(tx) = self.create_bridge_tx(epoch_store) {
5783 txns.push(tx);
5784 }
5785 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5786 txns.push(tx);
5787 }
5788 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5789 txns.push(tx);
5790 }
5791 if let Some(tx) = self.create_execution_time_observations_tx(
5792 epoch_store,
5793 end_of_epoch_observation_keys,
5794 last_checkpoint,
5795 ) {
5796 txns.push(tx);
5797 }
5798 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5799 txns.push(tx);
5800 }
5801
5802 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5803 txns.push(tx);
5804 }
5805
5806 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5807 txns.push(tx);
5808 }
5809
5810 let next_epoch = epoch_store.epoch() + 1;
5811
5812 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5813
5814 let (next_epoch_protocol_version, next_epoch_system_packages) =
5815 if epoch_store.protocol_config().authority_capabilities_v2() {
5816 Self::choose_protocol_version_and_system_packages_v2(
5817 epoch_store.protocol_version(),
5818 epoch_store.protocol_config(),
5819 epoch_store.committee(),
5820 epoch_store
5821 .get_capabilities_v2()
5822 .expect("read capabilities from db cannot fail"),
5823 buffer_stake_bps,
5824 )
5825 } else {
5826 Self::choose_protocol_version_and_system_packages_v1(
5827 epoch_store.protocol_version(),
5828 epoch_store.protocol_config(),
5829 epoch_store.committee(),
5830 epoch_store
5831 .get_capabilities_v1()
5832 .expect("read capabilities from db cannot fail"),
5833 buffer_stake_bps,
5834 )
5835 };
5836
5837 let config = epoch_store.protocol_config();
5840 let binary_config = config.binary_config(None);
5841 let Some(next_epoch_system_package_bytes) = self
5842 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5843 .await
5844 else {
5845 if next_epoch_protocol_version <= ProtocolVersion::MAX {
5846 debug_fatal!(
5850 "upgraded system packages {:?} are not locally available, cannot create \
5851 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5852 next_epoch_system_packages
5853 );
5854 } else {
5855 error!(
5856 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5857 next_epoch_protocol_version
5858 );
5859 }
5860 return Err(CheckpointBuilderError::SystemPackagesMissing);
5869 };
5870
5871 let tx = if epoch_store
5872 .protocol_config()
5873 .end_of_epoch_transaction_supported()
5874 {
5875 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5876 next_epoch,
5877 next_epoch_protocol_version,
5878 gas_cost_summary.storage_cost,
5879 gas_cost_summary.computation_cost,
5880 gas_cost_summary.storage_rebate,
5881 gas_cost_summary.non_refundable_storage_fee,
5882 epoch_start_timestamp_ms,
5883 next_epoch_system_package_bytes,
5884 ));
5885
5886 VerifiedTransaction::new_end_of_epoch_transaction(txns)
5887 } else {
5888 VerifiedTransaction::new_change_epoch(
5889 next_epoch,
5890 next_epoch_protocol_version,
5891 gas_cost_summary.storage_cost,
5892 gas_cost_summary.computation_cost,
5893 gas_cost_summary.storage_rebate,
5894 gas_cost_summary.non_refundable_storage_fee,
5895 epoch_start_timestamp_ms,
5896 next_epoch_system_package_bytes,
5897 )
5898 };
5899
5900 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5901 tx.clone(),
5902 epoch_store.epoch(),
5903 checkpoint,
5904 );
5905
5906 let tx_digest = executable_tx.digest();
5907
5908 info!(
5909 ?next_epoch,
5910 ?next_epoch_protocol_version,
5911 ?next_epoch_system_packages,
5912 computation_cost=?gas_cost_summary.computation_cost,
5913 storage_cost=?gas_cost_summary.storage_cost,
5914 storage_rebate=?gas_cost_summary.storage_rebate,
5915 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5916 ?tx_digest,
5917 "Creating advance epoch transaction"
5918 );
5919
5920 fail_point_async!("change_epoch_tx_delay");
5921 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5922
5923 if self
5926 .get_transaction_cache_reader()
5927 .is_tx_already_executed(tx_digest)
5928 {
5929 warn!("change epoch tx has already been executed via state sync");
5930 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
5931 }
5932
5933 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
5934 else {
5935 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
5936 };
5937
5938 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
5941 self.get_object_cache_reader().as_ref(),
5942 std::iter::once(&Schedulable::Transaction(&executable_tx)),
5943 )?;
5944
5945 assert_eq!(assigned_versions.0.len(), 1);
5946 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
5947
5948 let input_objects = self.read_objects_for_execution(
5949 &tx_lock,
5950 &executable_tx,
5951 assigned_versions,
5952 epoch_store,
5953 )?;
5954
5955 let (transaction_outputs, _timings, _execution_error_opt) = self
5956 .execute_certificate(
5957 &execution_guard,
5958 &executable_tx,
5959 input_objects,
5960 None,
5961 BalanceWithdrawStatus::NoWithdraw,
5962 epoch_store,
5963 )
5964 .unwrap();
5965 let system_obj = get_sui_system_state(&transaction_outputs.written)
5966 .expect("change epoch tx must write to system object");
5967
5968 let effects = transaction_outputs.effects;
5969 self.get_state_sync_store()
5973 .insert_transaction_and_effects(&tx, &effects);
5974
5975 info!(
5976 "Effects summary of the change epoch transaction: {:?}",
5977 effects.summary_for_debug()
5978 );
5979 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5980 assert!(effects.status().is_ok());
5982 Ok((system_obj, effects))
5983 }
5984
5985 #[instrument(level = "error", skip_all)]
5986 async fn reopen_epoch_db(
5987 &self,
5988 cur_epoch_store: &AuthorityPerEpochStore,
5989 new_committee: Committee,
5990 epoch_start_configuration: EpochStartConfiguration,
5991 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5992 epoch_last_checkpoint: CheckpointSequenceNumber,
5993 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
5994 let new_epoch = new_committee.epoch;
5995 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5996 assert_eq!(
5997 epoch_start_configuration.epoch_start_state().epoch(),
5998 new_committee.epoch
5999 );
6000 fail_point!("before-open-new-epoch-store");
6001 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6002 self.name,
6003 new_committee,
6004 epoch_start_configuration,
6005 self.get_backing_package_store().clone(),
6006 self.get_object_store().clone(),
6007 expensive_safety_check_config,
6008 epoch_last_checkpoint,
6009 )?;
6010 self.epoch_store.store(new_epoch_store.clone());
6011 Ok(new_epoch_store)
6012 }
6013
6014 #[cfg(test)]
6015 pub(crate) fn iter_live_object_set_for_testing(
6016 &self,
6017 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6018 let include_wrapped_object = !self
6019 .epoch_store_for_testing()
6020 .protocol_config()
6021 .simplified_unwrap_then_delete();
6022 self.get_global_state_hash_store()
6023 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6024 }
6025
6026 #[cfg(test)]
6027 pub(crate) fn shutdown_execution_for_test(&self) {
6028 self.tx_execution_shutdown
6029 .lock()
6030 .take()
6031 .unwrap()
6032 .send(())
6033 .unwrap();
6034 }
6035
6036 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6038 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6039 self.get_object_cache_reader()
6040 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6041 self.get_reconfig_api()
6042 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6043 Ok(())
6044 }
6045}
6046
6047pub struct RandomnessRoundReceiver {
6048 authority_state: Arc<AuthorityState>,
6049 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6050}
6051
6052impl RandomnessRoundReceiver {
6053 pub fn spawn(
6054 authority_state: Arc<AuthorityState>,
6055 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6056 ) -> JoinHandle<()> {
6057 let rrr = RandomnessRoundReceiver {
6058 authority_state,
6059 randomness_rx,
6060 };
6061 spawn_monitored_task!(rrr.run())
6062 }
6063
6064 async fn run(mut self) {
6065 info!("RandomnessRoundReceiver event loop started");
6066
6067 loop {
6068 tokio::select! {
6069 maybe_recv = self.randomness_rx.recv() => {
6070 if let Some((epoch, round, bytes)) = maybe_recv {
6071 self.handle_new_randomness(epoch, round, bytes).await;
6072 } else {
6073 break;
6074 }
6075 },
6076 }
6077 }
6078
6079 info!("RandomnessRoundReceiver event loop ended");
6080 }
6081
6082 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6083 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6084 fail_point_async!("randomness-delay");
6085
6086 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6087 if epoch_store.epoch() != epoch {
6088 warn!(
6089 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6090 epoch_store.epoch()
6091 );
6092 return;
6093 }
6094 let key = TransactionKey::RandomnessRound(epoch, round);
6095 let transaction = VerifiedTransaction::new_randomness_state_update(
6096 epoch,
6097 round,
6098 bytes,
6099 epoch_store
6100 .epoch_start_config()
6101 .randomness_obj_initial_shared_version()
6102 .expect("randomness state obj must exist"),
6103 );
6104 debug!(
6105 "created randomness state update transaction with digest: {:?}",
6106 transaction.digest()
6107 );
6108 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6109 let digest = *transaction.digest();
6110
6111 self.authority_state
6116 .get_cache_commit()
6117 .persist_transaction(&transaction);
6118
6119 if epoch_store.insert_tx_key(key, digest).is_err() {
6121 warn!("epoch ended while handling new randomness");
6122 }
6123
6124 let authority_state = self.authority_state.clone();
6125 spawn_monitored_task!(async move {
6126 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6133 let result = tokio::time::timeout(
6134 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6135 authority_state
6136 .get_transaction_cache_reader()
6137 .notify_read_executed_effects(
6138 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6139 &[digest],
6140 ),
6141 )
6142 .await;
6143 let mut effects = match result {
6144 Ok(result) => result,
6145 Err(_) => {
6146 if cfg!(debug_assertions) {
6147 panic!(
6149 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6150 );
6151 }
6152 warn!(
6153 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6154 );
6155 authority_state
6157 .get_transaction_cache_reader()
6158 .notify_read_executed_effects(
6159 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6160 &[digest],
6161 )
6162 .await
6163 }
6164 };
6165
6166 let effects = effects.pop().expect("should return effects");
6167 if *effects.status() != ExecutionStatus::Success {
6168 fatal!(
6169 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6170 );
6171 }
6172 debug!(
6173 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6174 );
6175 });
6176 }
6177}
6178
6179#[async_trait]
6180impl TransactionKeyValueStoreTrait for AuthorityState {
6181 #[instrument(skip(self))]
6182 async fn multi_get(
6183 &self,
6184 transactions: &[TransactionDigest],
6185 effects: &[TransactionDigest],
6186 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6187 let txns = if !transactions.is_empty() {
6188 self.get_transaction_cache_reader()
6189 .multi_get_transaction_blocks(transactions)
6190 .into_iter()
6191 .map(|t| t.map(|t| (*t).clone().into_inner()))
6192 .collect()
6193 } else {
6194 vec![]
6195 };
6196
6197 let fx = if !effects.is_empty() {
6198 self.get_transaction_cache_reader()
6199 .multi_get_executed_effects(effects)
6200 } else {
6201 vec![]
6202 };
6203
6204 Ok((txns, fx))
6205 }
6206
6207 #[instrument(skip(self))]
6208 async fn multi_get_checkpoints(
6209 &self,
6210 checkpoint_summaries: &[CheckpointSequenceNumber],
6211 checkpoint_contents: &[CheckpointSequenceNumber],
6212 checkpoint_summaries_by_digest: &[CheckpointDigest],
6213 ) -> SuiResult<(
6214 Vec<Option<CertifiedCheckpointSummary>>,
6215 Vec<Option<CheckpointContents>>,
6216 Vec<Option<CertifiedCheckpointSummary>>,
6217 )> {
6218 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6220 let store = self.get_checkpoint_store();
6221 for seq in checkpoint_summaries {
6222 let checkpoint = store
6223 .get_checkpoint_by_sequence_number(*seq)?
6224 .map(|c| c.into_inner());
6225
6226 summaries.push(checkpoint);
6227 }
6228
6229 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6230 for seq in checkpoint_contents {
6231 let checkpoint = store
6232 .get_checkpoint_by_sequence_number(*seq)?
6233 .and_then(|summary| {
6234 store
6235 .get_checkpoint_contents(&summary.content_digest)
6236 .expect("db read cannot fail")
6237 });
6238 contents.push(checkpoint);
6239 }
6240
6241 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6242 for digest in checkpoint_summaries_by_digest {
6243 let checkpoint = store
6244 .get_checkpoint_by_digest(digest)?
6245 .map(|c| c.into_inner());
6246 summaries_by_digest.push(checkpoint);
6247 }
6248 Ok((summaries, contents, summaries_by_digest))
6249 }
6250
6251 #[instrument(skip(self))]
6252 async fn deprecated_get_transaction_checkpoint(
6253 &self,
6254 digest: TransactionDigest,
6255 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6256 Ok(self
6257 .get_checkpoint_cache()
6258 .deprecated_get_transaction_checkpoint(&digest)
6259 .map(|(_epoch, checkpoint)| checkpoint))
6260 }
6261
6262 #[instrument(skip(self))]
6263 async fn get_object(
6264 &self,
6265 object_id: ObjectID,
6266 version: VersionNumber,
6267 ) -> SuiResult<Option<Object>> {
6268 Ok(self
6269 .get_object_cache_reader()
6270 .get_object_by_key(&object_id, version))
6271 }
6272
6273 #[instrument(skip_all)]
6274 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6275 Ok(self
6276 .get_object_cache_reader()
6277 .multi_get_objects_by_key(object_keys))
6278 }
6279
6280 #[instrument(skip(self))]
6281 async fn multi_get_transaction_checkpoint(
6282 &self,
6283 digests: &[TransactionDigest],
6284 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6285 let res = self
6286 .get_checkpoint_cache()
6287 .deprecated_multi_get_transaction_checkpoint(digests);
6288
6289 Ok(res
6290 .into_iter()
6291 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6292 .collect())
6293 }
6294
6295 #[instrument(skip(self))]
6296 async fn multi_get_events_by_tx_digests(
6297 &self,
6298 digests: &[TransactionDigest],
6299 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6300 if digests.is_empty() {
6301 return Ok(vec![]);
6302 }
6303
6304 Ok(self
6305 .get_transaction_cache_reader()
6306 .multi_get_events(digests))
6307 }
6308}
6309
6310#[cfg(msim)]
6311pub mod framework_injection {
6312 use move_binary_format::CompiledModule;
6313 use std::collections::BTreeMap;
6314 use std::{cell::RefCell, collections::BTreeSet};
6315 use sui_framework::{BuiltInFramework, SystemPackage};
6316 use sui_types::base_types::{AuthorityName, ObjectID};
6317 use sui_types::is_system_package;
6318
6319 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6320
6321 thread_local! {
6323 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6324 }
6325
6326 type Framework = Vec<CompiledModule>;
6327
6328 pub type PackageUpgradeCallback =
6329 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6330
6331 enum PackageOverrideConfig {
6332 Global(Framework),
6333 PerValidator(PackageUpgradeCallback),
6334 }
6335
6336 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6337 modules
6338 .iter()
6339 .map(|m| {
6340 let mut buf = Vec::new();
6341 m.serialize_with_version(m.version, &mut buf).unwrap();
6342 buf
6343 })
6344 .collect()
6345 }
6346
6347 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6348 OVERRIDE.with(|bs| {
6349 bs.borrow_mut()
6350 .insert(package_id, PackageOverrideConfig::Global(modules))
6351 });
6352 }
6353
6354 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6355 OVERRIDE.with(|bs| {
6356 bs.borrow_mut()
6357 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6358 });
6359 }
6360
6361 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6362 OVERRIDE.with(|bs| {
6363 let mut new_packages_not_to_include: BTreeSet<_> =
6364 BuiltInFramework::all_package_ids().into_iter().collect();
6365 for pkg in &packages {
6366 new_packages_not_to_include.remove(&pkg.id);
6367 }
6368 for pkg in packages {
6369 bs.borrow_mut()
6370 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6371 }
6372 for empty_pkg in new_packages_not_to_include {
6373 bs.borrow_mut()
6374 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6375 }
6376 });
6377 }
6378
6379 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6380 OVERRIDE.with(|cfg| {
6381 cfg.borrow().get(package_id).and_then(|entry| match entry {
6382 PackageOverrideConfig::Global(framework) => {
6383 Some(compiled_modules_to_bytes(framework))
6384 }
6385 PackageOverrideConfig::PerValidator(func) => {
6386 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6387 }
6388 })
6389 })
6390 }
6391
6392 pub fn get_override_modules(
6393 package_id: &ObjectID,
6394 name: AuthorityName,
6395 ) -> Option<Vec<CompiledModule>> {
6396 OVERRIDE.with(|cfg| {
6397 cfg.borrow().get(package_id).and_then(|entry| match entry {
6398 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6399 PackageOverrideConfig::PerValidator(func) => func(name),
6400 })
6401 })
6402 }
6403
6404 pub fn get_override_system_package(
6405 package_id: &ObjectID,
6406 name: AuthorityName,
6407 ) -> Option<SystemPackage> {
6408 let bytes = get_override_bytes(package_id, name)?;
6409 let dependencies = if is_system_package(*package_id) {
6410 BuiltInFramework::get_package_by_id(package_id)
6411 .dependencies
6412 .to_vec()
6413 } else {
6414 BuiltInFramework::all_package_ids()
6416 };
6417 Some(SystemPackage {
6418 id: *package_id,
6419 bytes,
6420 dependencies,
6421 })
6422 }
6423
6424 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6425 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6426 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6427 cfg.borrow()
6428 .keys()
6429 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6430 .collect()
6431 });
6432
6433 extra
6434 .into_iter()
6435 .map(|package| SystemPackage {
6436 id: package,
6437 bytes: get_override_bytes(&package, name).unwrap(),
6438 dependencies: BuiltInFramework::all_package_ids(),
6439 })
6440 .collect()
6441 }
6442}
6443
6444#[derive(Debug, Serialize, Deserialize, Clone)]
6445pub struct ObjDumpFormat {
6446 pub id: ObjectID,
6447 pub version: VersionNumber,
6448 pub digest: ObjectDigest,
6449 pub object: Object,
6450}
6451
6452impl ObjDumpFormat {
6453 fn new(object: Object) -> Self {
6454 let oref = object.compute_object_reference();
6455 Self {
6456 id: oref.0,
6457 version: oref.1,
6458 digest: oref.2,
6459 object,
6460 }
6461 }
6462}
6463
6464#[derive(Debug, Serialize, Deserialize, Clone)]
6465pub struct NodeStateDump {
6466 pub tx_digest: TransactionDigest,
6467 pub sender_signed_data: SenderSignedData,
6468 pub executed_epoch: u64,
6469 pub reference_gas_price: u64,
6470 pub protocol_version: u64,
6471 pub epoch_start_timestamp_ms: u64,
6472 pub computed_effects: TransactionEffects,
6473 pub expected_effects_digest: TransactionEffectsDigest,
6474 pub relevant_system_packages: Vec<ObjDumpFormat>,
6475 pub shared_objects: Vec<ObjDumpFormat>,
6476 pub loaded_child_objects: Vec<ObjDumpFormat>,
6477 pub modified_at_versions: Vec<ObjDumpFormat>,
6478 pub runtime_reads: Vec<ObjDumpFormat>,
6479 pub input_objects: Vec<ObjDumpFormat>,
6480}
6481
6482impl NodeStateDump {
6483 pub fn new(
6484 tx_digest: &TransactionDigest,
6485 effects: &TransactionEffects,
6486 expected_effects_digest: TransactionEffectsDigest,
6487 object_store: &dyn ObjectStore,
6488 epoch_store: &Arc<AuthorityPerEpochStore>,
6489 inner_temporary_store: &InnerTemporaryStore,
6490 certificate: &VerifiedExecutableTransaction,
6491 ) -> SuiResult<Self> {
6492 let executed_epoch = epoch_store.epoch();
6494 let reference_gas_price = epoch_store.reference_gas_price();
6495 let epoch_start_config = epoch_store.epoch_start_config();
6496 let protocol_version = epoch_store.protocol_version().as_u64();
6497 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6498
6499 let mut relevant_system_packages = Vec::new();
6501 for sys_package_id in BuiltInFramework::all_package_ids() {
6502 if let Some(w) = object_store.get_object(&sys_package_id) {
6503 relevant_system_packages.push(ObjDumpFormat::new(w))
6504 }
6505 }
6506
6507 let mut shared_objects = Vec::new();
6509 for kind in effects.input_consensus_objects() {
6510 match kind {
6511 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6512 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6513 shared_objects.push(ObjDumpFormat::new(w))
6514 }
6515 }
6516 InputConsensusObject::ReadConsensusStreamEnded(..)
6517 | InputConsensusObject::MutateConsensusStreamEnded(..)
6518 | InputConsensusObject::Cancelled(..) => (), }
6520 }
6521
6522 let mut loaded_child_objects = Vec::new();
6525 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6526 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6527 loaded_child_objects.push(ObjDumpFormat::new(w))
6528 }
6529 }
6530
6531 let mut modified_at_versions = Vec::new();
6533 for (id, ver) in effects.modified_at_versions() {
6534 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6535 modified_at_versions.push(ObjDumpFormat::new(w))
6536 }
6537 }
6538
6539 let mut runtime_reads = Vec::new();
6542 for obj in inner_temporary_store
6543 .runtime_packages_loaded_from_db
6544 .values()
6545 {
6546 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6547 }
6548
6549 Ok(Self {
6552 tx_digest: *tx_digest,
6553 executed_epoch,
6554 reference_gas_price,
6555 epoch_start_timestamp_ms,
6556 protocol_version,
6557 relevant_system_packages,
6558 shared_objects,
6559 loaded_child_objects,
6560 modified_at_versions,
6561 runtime_reads,
6562 sender_signed_data: certificate.clone().into_message(),
6563 input_objects: inner_temporary_store
6564 .input_objects
6565 .values()
6566 .map(|o| ObjDumpFormat::new(o.clone()))
6567 .collect(),
6568 computed_effects: effects.clone(),
6569 expected_effects_digest,
6570 })
6571 }
6572
6573 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6574 let mut objects = Vec::new();
6575 objects.extend(self.relevant_system_packages.clone());
6576 objects.extend(self.shared_objects.clone());
6577 objects.extend(self.loaded_child_objects.clone());
6578 objects.extend(self.modified_at_versions.clone());
6579 objects.extend(self.runtime_reads.clone());
6580 objects.extend(self.input_objects.clone());
6581 objects
6582 }
6583
6584 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6585 let file_name = format!(
6586 "{}_{}_NODE_DUMP.json",
6587 self.tx_digest,
6588 AuthorityState::unixtime_now_ms()
6589 );
6590 let mut path = path.to_path_buf();
6591 path.push(&file_name);
6592 let mut file = File::create(path.clone())?;
6593 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6594 Ok(path)
6595 }
6596
6597 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6598 let file = File::open(path)?;
6599 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6600 }
6601}