1use crate::accumulators::coin_reservations::CoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
8use crate::checkpoints::CheckpointBuilderError;
9use crate::checkpoints::CheckpointBuilderResult;
10use crate::congestion_tracker::CongestionTracker;
11use crate::consensus_adapter::ConsensusOverloadChecker;
12use crate::execution_cache::ExecutionCacheTraitPointers;
13use crate::execution_cache::TransactionCacheRead;
14use crate::execution_cache::writeback_cache::WritebackCache;
15use crate::execution_scheduler::ExecutionScheduler;
16use crate::execution_scheduler::SchedulingSource;
17use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
18use crate::jsonrpc_index::CoinIndexKey2;
19use crate::rpc_index::RpcIndexStore;
20use crate::traffic_controller::TrafficController;
21use crate::traffic_controller::metrics::TrafficControllerMetrics;
22use crate::transaction_outputs::TransactionOutputs;
23use crate::verify_indexes::{fix_indexes, verify_indexes};
24use arc_swap::{ArcSwap, Guard};
25use async_trait::async_trait;
26use authority_per_epoch_store::CertLockGuard;
27use fastcrypto::encoding::Base58;
28use fastcrypto::encoding::Encoding;
29use fastcrypto::hash::MultisetHash;
30use itertools::Itertools;
31use move_binary_format::CompiledModule;
32use move_binary_format::binary_config::BinaryConfig;
33use move_core_types::annotated_value::MoveStructLayout;
34use move_core_types::language_storage::ModuleId;
35use mysten_common::fatal;
36use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX};
37use parking_lot::Mutex;
38use prometheus::{
39 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
40 register_histogram_vec_with_registry, register_histogram_with_registry,
41 register_int_counter_vec_with_registry, register_int_counter_with_registry,
42 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
43};
44use serde::de::DeserializeOwned;
45use serde::{Deserialize, Serialize};
46use shared_object_version_manager::AssignedVersions;
47use shared_object_version_manager::Schedulable;
48use std::collections::BTreeMap;
49use std::collections::BTreeSet;
50use std::fs::File;
51use std::io::Write;
52use std::path::{Path, PathBuf};
53use std::sync::atomic::Ordering;
54use std::time::Duration;
55use std::time::Instant;
56use std::time::SystemTime;
57use std::time::UNIX_EPOCH;
58use std::{
59 collections::{HashMap, HashSet},
60 fs,
61 pin::Pin,
62 str::FromStr,
63 sync::Arc,
64 vec,
65};
66use sui_config::NodeConfig;
67use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
68use sui_protocol_config::PerObjectCongestionControlMode;
69use sui_types::crypto::RandomnessRound;
70use sui_types::dynamic_field::visitor as DFV;
71use sui_types::execution::ExecutionOutput;
72use sui_types::execution::ExecutionTimeObservationKey;
73use sui_types::execution::ExecutionTiming;
74use sui_types::execution_params::ExecutionOrEarlyError;
75use sui_types::execution_params::FundsWithdrawStatus;
76use sui_types::execution_params::get_early_execution_error;
77use sui_types::execution_status::ExecutionStatus;
78use sui_types::inner_temporary_store::PackageStoreWithFallback;
79use sui_types::layout_resolver::LayoutResolver;
80use sui_types::layout_resolver::into_struct_layout;
81use sui_types::messages_consensus::AuthorityCapabilitiesV2;
82use sui_types::object::bounded_visitor::BoundedVisitor;
83use sui_types::storage::ChildObjectResolver;
84use sui_types::storage::InputKey;
85use sui_types::storage::TrackingBackingStore;
86use sui_types::traffic_control::{
87 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
88};
89use sui_types::transaction_executor::SimulateTransactionResult;
90use sui_types::transaction_executor::TransactionChecks;
91use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
92use tap::TapFallible;
93use tokio::sync::RwLock;
94use tokio::sync::mpsc::unbounded_channel;
95use tokio::sync::watch::error::RecvError;
96use tokio::sync::{mpsc, oneshot};
97use tokio::task::JoinHandle;
98use tokio::time::timeout;
99use tracing::trace;
100use tracing::{debug, error, info, instrument, warn};
101
102use self::authority_store::ExecutionLockWriteGuard;
103use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
104pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
105use mysten_metrics::{monitored_scope, spawn_monitored_task};
106
107use crate::jsonrpc_index::IndexStore;
108use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
109use mysten_common::debug_fatal;
110use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
111use sui_config::genesis::Genesis;
112use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
113use sui_framework::{BuiltInFramework, SystemPackage};
114use sui_json_rpc_types::{
115 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
116 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
117 SuiTransactionBlockEvents, TransactionFilter,
118};
119use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
120use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
121use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
122use sui_types::authenticator_state::get_authenticator_state;
123use sui_types::committee::{EpochId, ProtocolVersion};
124use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
125use sui_types::deny_list_v1::check_coin_deny_list_v1;
126use sui_types::digests::ChainIdentifier;
127use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
128use sui_types::effects::{
129 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
130 TransactionEvents, VerifiedSignedTransactionEffects,
131};
132use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
133use sui_types::event::{Event, EventID};
134use sui_types::executable_transaction::VerifiedExecutableTransaction;
135use sui_types::gas::{GasCostSummary, SuiGasStatus};
136use sui_types::inner_temporary_store::{
137 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
138};
139use sui_types::message_envelope::Message;
140use sui_types::messages_checkpoint::{
141 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
142 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
143 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
144 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
145};
146use sui_types::messages_grpc::{
147 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
148 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
149};
150use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
151use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
152use sui_types::storage::{
153 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
154};
155use sui_types::sui_system_state::SuiSystemStateTrait;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
158use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
159use sui_types::{
160 SUI_SYSTEM_ADDRESS,
161 base_types::*,
162 committee::Committee,
163 crypto::AuthoritySignature,
164 error::{SuiError, SuiResult},
165 object::{Object, ObjectRead},
166 transaction::*,
167};
168use sui_types::{TypeTag, is_system_package};
169use typed_store::TypedStoreError;
170
171use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
172use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
173use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
174use crate::authority::authority_store_pruner::{
175 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
176};
177use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
178use crate::authority::epoch_start_configuration::EpochStartConfiguration;
179use crate::checkpoints::CheckpointStore;
180use crate::epoch::committee_store::CommitteeStore;
181use crate::execution_cache::{
182 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
183 ObjectCacheRead, StateSyncAPI,
184};
185use crate::execution_driver::execution_process;
186use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
187use crate::metrics::LatencyObserver;
188use crate::metrics::RateTracker;
189use crate::module_cache_metrics::ResolverMetrics;
190use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
191use crate::stake_aggregator::StakeAggregator;
192use crate::subscription_handler::SubscriptionHandler;
193use crate::transaction_input_loader::TransactionInputLoader;
194
195#[cfg(msim)]
196pub use crate::checkpoints::checkpoint_executor::utils::{
197 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
198};
199
200use crate::authority::authority_store_tables::AuthorityPrunerTables;
201#[cfg(msim)]
202use sui_types::committee::CommitteeTrait;
203use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
204
205#[cfg(test)]
206#[path = "unit_tests/authority_tests.rs"]
207pub mod authority_tests;
208
209#[cfg(test)]
210#[path = "unit_tests/transaction_tests.rs"]
211pub mod transaction_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/batch_transaction_tests.rs"]
215mod batch_transaction_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/move_integration_tests.rs"]
219pub mod move_integration_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/gas_tests.rs"]
223mod gas_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/batch_verification_tests.rs"]
227mod batch_verification_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/coin_deny_list_tests.rs"]
231mod coin_deny_list_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/mysticeti_fastpath_execution_tests.rs"]
235mod mysticeti_fastpath_execution_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/auth_unit_test_utils.rs"]
239pub mod auth_unit_test_utils;
240
241pub mod authority_test_utils;
242
243pub mod authority_per_epoch_store;
244pub mod authority_per_epoch_store_pruner;
245
246pub mod authority_store_pruner;
247pub mod authority_store_tables;
248pub mod authority_store_types;
249pub mod consensus_tx_status_cache;
250pub mod epoch_start_configuration;
251pub mod execution_time_estimator;
252pub mod shared_object_congestion_tracker;
253pub mod shared_object_version_manager;
254pub mod submitted_transaction_cache;
255pub mod test_authority_builder;
256pub mod transaction_deferral;
257pub mod transaction_reject_reason_cache;
258mod weighted_moving_average;
259
260pub(crate) mod authority_store;
261pub mod backpressure;
262
263pub struct AuthorityMetrics {
265 tx_orders: IntCounter,
266 total_certs: IntCounter,
267 total_cert_attempts: IntCounter,
268 total_effects: IntCounter,
269 pub shared_obj_tx: IntCounter,
271 sponsored_tx: IntCounter,
272 tx_already_processed: IntCounter,
273 num_input_objs: Histogram,
274 num_shared_objects: Histogram,
276 batch_size: Histogram,
277
278 authority_state_handle_vote_transaction_latency: Histogram,
279
280 execute_certificate_latency_single_writer: Histogram,
281 execute_certificate_latency_shared_object: Histogram,
282 await_transaction_latency: Histogram,
283
284 internal_execution_latency: Histogram,
285 execution_load_input_objects_latency: Histogram,
286 prepare_certificate_latency: Histogram,
287 commit_certificate_latency: Histogram,
288 db_checkpoint_latency: Histogram,
289
290 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
292 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
293 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
294 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
295
296 pub(crate) execution_driver_executed_transactions: IntCounter,
297 pub(crate) execution_driver_paused_transactions: IntCounter,
298 pub(crate) execution_driver_dispatch_queue: IntGauge,
299 pub(crate) execution_queueing_delay_s: Histogram,
300 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
301 pub(crate) execution_gas_latency_ratio: Histogram,
302
303 pub(crate) skipped_consensus_txns: IntCounter,
304 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
305
306 pub(crate) authority_overload_status: IntGauge,
307 pub(crate) authority_load_shedding_percentage: IntGauge,
308
309 pub(crate) transaction_overload_sources: IntCounterVec,
310
311 post_processing_total_events_emitted: IntCounter,
313 post_processing_total_tx_indexed: IntCounter,
314 post_processing_total_tx_had_event_processed: IntCounter,
315 post_processing_total_failures: IntCounter,
316
317 pub consensus_handler_processed: IntCounterVec,
319 pub consensus_handler_transaction_sizes: HistogramVec,
320 pub consensus_handler_num_low_scoring_authorities: IntGauge,
321 pub consensus_handler_scores: IntGaugeVec,
322 pub consensus_handler_deferred_transactions: IntCounter,
323 pub consensus_handler_congested_transactions: IntCounter,
324 pub consensus_handler_cancelled_transactions: IntCounter,
325 pub consensus_handler_max_object_costs: IntGaugeVec,
326 pub consensus_committed_subdags: IntCounterVec,
327 pub consensus_committed_messages: IntGaugeVec,
328 pub consensus_committed_user_transactions: IntGaugeVec,
329 pub consensus_finalized_user_transactions: IntGaugeVec,
330 pub consensus_rejected_user_transactions: IntGaugeVec,
331 pub consensus_calculated_throughput: IntGauge,
332 pub consensus_calculated_throughput_profile: IntGauge,
333 pub consensus_block_handler_block_processed: IntCounter,
334 pub consensus_block_handler_txn_processed: IntCounterVec,
335 pub consensus_block_handler_fastpath_executions: IntCounter,
336 pub consensus_timestamp_bias: Histogram,
337
338 pub limits_metrics: Arc<LimitsMetrics>,
339
340 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
342
343 pub zklogin_sig_count: IntCounter,
345 pub multisig_sig_count: IntCounter,
347
348 pub execution_queueing_latency: LatencyObserver,
351
352 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
359
360 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
363}
364
365const POSITIVE_INT_BUCKETS: &[f64] = &[
367 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
368 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
369 7000000., 10000000.,
370];
371
372const LATENCY_SEC_BUCKETS: &[f64] = &[
373 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
374 10., 20., 30., 60., 90.,
375];
376
377const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
379 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
380 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
381];
382
383const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
386 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
387 2.0, 10.0, 30.0,
388];
389
390const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
391 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
392 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
393];
394
395pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
396
397pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
401
402impl AuthorityMetrics {
403 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
404 let execute_certificate_latency = register_histogram_vec_with_registry!(
405 "authority_state_execute_certificate_latency",
406 "Latency of executing certificates, including waiting for inputs",
407 &["tx_type"],
408 LATENCY_SEC_BUCKETS.to_vec(),
409 registry,
410 )
411 .unwrap();
412
413 let execute_certificate_latency_single_writer =
414 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
415 let execute_certificate_latency_shared_object =
416 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
417
418 Self {
419 tx_orders: register_int_counter_with_registry!(
420 "total_transaction_orders",
421 "Total number of transaction orders",
422 registry,
423 )
424 .unwrap(),
425 total_certs: register_int_counter_with_registry!(
426 "total_transaction_certificates",
427 "Total number of transaction certificates handled",
428 registry,
429 )
430 .unwrap(),
431 total_cert_attempts: register_int_counter_with_registry!(
432 "total_handle_certificate_attempts",
433 "Number of calls to handle_certificate",
434 registry,
435 )
436 .unwrap(),
437 total_effects: register_int_counter_with_registry!(
439 "total_transaction_effects",
440 "Total number of transaction effects produced",
441 registry,
442 )
443 .unwrap(),
444
445 shared_obj_tx: register_int_counter_with_registry!(
446 "num_shared_obj_tx",
447 "Number of transactions involving shared objects",
448 registry,
449 )
450 .unwrap(),
451
452 sponsored_tx: register_int_counter_with_registry!(
453 "num_sponsored_tx",
454 "Number of sponsored transactions",
455 registry,
456 )
457 .unwrap(),
458
459 tx_already_processed: register_int_counter_with_registry!(
460 "num_tx_already_processed",
461 "Number of transaction orders already processed previously",
462 registry,
463 )
464 .unwrap(),
465 num_input_objs: register_histogram_with_registry!(
466 "num_input_objects",
467 "Distribution of number of input TX objects per TX",
468 POSITIVE_INT_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 num_shared_objects: register_histogram_with_registry!(
473 "num_shared_objects",
474 "Number of shared input objects per TX",
475 POSITIVE_INT_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 batch_size: register_histogram_with_registry!(
480 "batch_size",
481 "Distribution of size of transaction batch",
482 POSITIVE_INT_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
487 "authority_state_handle_vote_transaction_latency",
488 "Latency of voting on transactions without signing",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 )
492 .unwrap(),
493 execute_certificate_latency_single_writer,
494 execute_certificate_latency_shared_object,
495 await_transaction_latency: register_histogram_with_registry!(
496 "await_transaction_latency",
497 "Latency of awaiting user transaction execution, including waiting for inputs",
498 LATENCY_SEC_BUCKETS.to_vec(),
499 registry,
500 )
501 .unwrap(),
502 internal_execution_latency: register_histogram_with_registry!(
503 "authority_state_internal_execution_latency",
504 "Latency of actual certificate executions",
505 LATENCY_SEC_BUCKETS.to_vec(),
506 registry,
507 )
508 .unwrap(),
509 execution_load_input_objects_latency: register_histogram_with_registry!(
510 "authority_state_execution_load_input_objects_latency",
511 "Latency of loading input objects for execution",
512 LOW_LATENCY_SEC_BUCKETS.to_vec(),
513 registry,
514 )
515 .unwrap(),
516 prepare_certificate_latency: register_histogram_with_registry!(
517 "authority_state_prepare_certificate_latency",
518 "Latency of executing certificates, before committing the results",
519 LATENCY_SEC_BUCKETS.to_vec(),
520 registry,
521 )
522 .unwrap(),
523 commit_certificate_latency: register_histogram_with_registry!(
524 "authority_state_commit_certificate_latency",
525 "Latency of committing certificate execution results",
526 LATENCY_SEC_BUCKETS.to_vec(),
527 registry,
528 )
529 .unwrap(),
530 db_checkpoint_latency: register_histogram_with_registry!(
531 "db_checkpoint_latency",
532 "Latency of checkpointing dbs",
533 LATENCY_SEC_BUCKETS.to_vec(),
534 registry,
535 ).unwrap(),
536 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
537 "transaction_manager_num_enqueued_certificates",
538 "Current number of certificates enqueued to ExecutionScheduler",
539 &["result"],
540 registry,
541 )
542 .unwrap(),
543 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
544 "transaction_manager_num_pending_certificates",
545 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
546 registry,
547 )
548 .unwrap(),
549 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
550 "transaction_manager_num_executing_certificates",
551 "Number of executing certificates, including queued and actually running certificates",
552 registry,
553 )
554 .unwrap(),
555 authority_overload_status: register_int_gauge_with_registry!(
556 "authority_overload_status",
557 "Whether authority is current experiencing overload and enters load shedding mode.",
558 registry)
559 .unwrap(),
560 authority_load_shedding_percentage: register_int_gauge_with_registry!(
561 "authority_load_shedding_percentage",
562 "The percentage of transactions is shed when the authority is in load shedding mode.",
563 registry)
564 .unwrap(),
565 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
566 "transaction_manager_transaction_queue_age_s",
567 "Time spent in waiting for transaction in the queue",
568 LATENCY_SEC_BUCKETS.to_vec(),
569 registry,
570 )
571 .unwrap(),
572 transaction_overload_sources: register_int_counter_vec_with_registry!(
573 "transaction_overload_sources",
574 "Number of times each source indicates transaction overload.",
575 &["source"],
576 registry)
577 .unwrap(),
578 execution_driver_executed_transactions: register_int_counter_with_registry!(
579 "execution_driver_executed_transactions",
580 "Cumulative number of transaction executed by execution driver",
581 registry,
582 )
583 .unwrap(),
584 execution_driver_paused_transactions: register_int_counter_with_registry!(
585 "execution_driver_paused_transactions",
586 "Cumulative number of transactions paused by execution driver",
587 registry,
588 )
589 .unwrap(),
590 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591 "execution_driver_dispatch_queue",
592 "Number of transaction pending in execution driver dispatch queue",
593 registry,
594 )
595 .unwrap(),
596 execution_queueing_delay_s: register_histogram_with_registry!(
597 "execution_queueing_delay_s",
598 "Queueing delay between a transaction is ready for execution until it starts executing.",
599 LATENCY_SEC_BUCKETS.to_vec(),
600 registry
601 )
602 .unwrap(),
603 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604 "prepare_cert_gas_latency_ratio",
605 "The ratio of computation gas divided by VM execution latency.",
606 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607 registry
608 )
609 .unwrap(),
610 execution_gas_latency_ratio: register_histogram_with_registry!(
611 "execution_gas_latency_ratio",
612 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614 registry
615 )
616 .unwrap(),
617 skipped_consensus_txns: register_int_counter_with_registry!(
618 "skipped_consensus_txns",
619 "Total number of consensus transactions skipped",
620 registry,
621 )
622 .unwrap(),
623 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624 "skipped_consensus_txns_cache_hit",
625 "Total number of consensus transactions skipped because of local cache hit",
626 registry,
627 )
628 .unwrap(),
629 post_processing_total_events_emitted: register_int_counter_with_registry!(
630 "post_processing_total_events_emitted",
631 "Total number of events emitted in post processing",
632 registry,
633 )
634 .unwrap(),
635 post_processing_total_tx_indexed: register_int_counter_with_registry!(
636 "post_processing_total_tx_indexed",
637 "Total number of txes indexed in post processing",
638 registry,
639 )
640 .unwrap(),
641 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642 "post_processing_total_tx_had_event_processed",
643 "Total number of txes finished event processing in post processing",
644 registry,
645 )
646 .unwrap(),
647 post_processing_total_failures: register_int_counter_with_registry!(
648 "post_processing_total_failures",
649 "Total number of failure in post processing",
650 registry,
651 )
652 .unwrap(),
653 consensus_handler_processed: register_int_counter_vec_with_registry!(
654 "consensus_handler_processed",
655 "Number of transactions processed by consensus handler",
656 &["class"],
657 registry
658 ).unwrap(),
659 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660 "consensus_handler_transaction_sizes",
661 "Sizes of each type of transactions processed by consensus handler",
662 &["class"],
663 POSITIVE_INT_BUCKETS.to_vec(),
664 registry
665 ).unwrap(),
666 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667 "consensus_handler_num_low_scoring_authorities",
668 "Number of low scoring authorities based on reputation scores from consensus",
669 registry
670 ).unwrap(),
671 consensus_handler_scores: register_int_gauge_vec_with_registry!(
672 "consensus_handler_scores",
673 "scores from consensus for each authority",
674 &["authority"],
675 registry,
676 ).unwrap(),
677 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678 "consensus_handler_deferred_transactions",
679 "Number of transactions deferred by consensus handler",
680 registry,
681 ).unwrap(),
682 consensus_handler_congested_transactions: register_int_counter_with_registry!(
683 "consensus_handler_congested_transactions",
684 "Number of transactions deferred by consensus handler due to congestion",
685 registry,
686 ).unwrap(),
687 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688 "consensus_handler_cancelled_transactions",
689 "Number of transactions cancelled by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693 "consensus_handler_max_congestion_control_object_costs",
694 "Max object costs for congestion control in the current consensus commit",
695 &["commit_type"],
696 registry,
697 ).unwrap(),
698 consensus_committed_subdags: register_int_counter_vec_with_registry!(
699 "consensus_committed_subdags",
700 "Number of committed subdags, sliced by author",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 consensus_committed_messages: register_int_gauge_vec_with_registry!(
705 "consensus_committed_messages",
706 "Total number of committed consensus messages, sliced by author",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711 "consensus_committed_user_transactions",
712 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
713 &["authority"],
714 registry,
715 ).unwrap(),
716 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
717 "consensus_finalized_user_transactions",
718 "Number of user transactions finalized, sliced by submitter",
719 &["authority"],
720 registry,
721 ).unwrap(),
722 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
723 "consensus_rejected_user_transactions",
724 "Number of user transactions rejected, sliced by submitter",
725 &["authority"],
726 registry,
727 ).unwrap(),
728 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
729 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
730 zklogin_sig_count: register_int_counter_with_registry!(
731 "zklogin_sig_count",
732 "Count of zkLogin signatures",
733 registry,
734 )
735 .unwrap(),
736 multisig_sig_count: register_int_counter_with_registry!(
737 "multisig_sig_count",
738 "Count of zkLogin signatures",
739 registry,
740 )
741 .unwrap(),
742 consensus_calculated_throughput: register_int_gauge_with_registry!(
743 "consensus_calculated_throughput",
744 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
745 registry,
746 ).unwrap(),
747 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
748 "consensus_calculated_throughput_profile",
749 "The current active calculated throughput profile",
750 registry
751 ).unwrap(),
752 consensus_block_handler_block_processed: register_int_counter_with_registry!(
753 "consensus_block_handler_block_processed",
754 "Number of blocks processed by consensus block handler.",
755 registry
756 ).unwrap(),
757 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
758 "consensus_block_handler_txn_processed",
759 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
760 &["outcome"],
761 registry
762 ).unwrap(),
763 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
764 "consensus_block_handler_fastpath_executions",
765 "Number of fastpath transactions sent for execution by consensus transaction handler",
766 registry,
767 ).unwrap(),
768 consensus_timestamp_bias: register_histogram_with_registry!(
769 "consensus_timestamp_bias",
770 "Bias/delay from consensus timestamp to system time",
771 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
772 registry
773 ).unwrap(),
774 execution_queueing_latency: LatencyObserver::new(),
775 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
776 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
777 }
778 }
779}
780
781pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
788
789#[derive(Debug, Clone)]
792pub struct ExecutionEnv {
793 pub assigned_versions: AssignedVersions,
795 pub expected_effects_digest: Option<TransactionEffectsDigest>,
798 pub scheduling_source: SchedulingSource,
800 pub funds_withdraw_status: FundsWithdrawStatus,
803 pub barrier_dependencies: Vec<TransactionDigest>,
806}
807
808impl Default for ExecutionEnv {
809 fn default() -> Self {
810 Self {
811 assigned_versions: Default::default(),
812 expected_effects_digest: None,
813 scheduling_source: SchedulingSource::NonFastPath,
814 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
815 barrier_dependencies: Default::default(),
816 }
817 }
818}
819
820impl ExecutionEnv {
821 pub fn new() -> Self {
822 Default::default()
823 }
824
825 pub fn with_scheduling_source(mut self, scheduling_source: SchedulingSource) -> Self {
826 self.scheduling_source = scheduling_source;
827 self
828 }
829
830 pub fn with_expected_effects_digest(
831 mut self,
832 expected_effects_digest: TransactionEffectsDigest,
833 ) -> Self {
834 self.expected_effects_digest = Some(expected_effects_digest);
835 self
836 }
837
838 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
839 self.assigned_versions = assigned_versions;
840 self
841 }
842
843 pub fn with_insufficient_funds(mut self) -> Self {
844 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
845 self
846 }
847
848 pub fn with_barrier_dependencies(
849 mut self,
850 barrier_dependencies: BTreeSet<TransactionDigest>,
851 ) -> Self {
852 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
853 self
854 }
855}
856
857#[derive(Debug)]
858pub struct ForkRecoveryState {
859 transaction_overrides:
861 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
862}
863
864impl Default for ForkRecoveryState {
865 fn default() -> Self {
866 Self {
867 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
868 }
869 }
870}
871
872impl ForkRecoveryState {
873 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
874 let Some(config) = config else {
875 return Ok(Self::default());
876 };
877
878 let mut transaction_overrides = HashMap::new();
879 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
880 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
881 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
882 })?;
883 let effects_digest =
884 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
885 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
886 })?;
887 transaction_overrides.insert(tx_digest, effects_digest);
888 }
889
890 Ok(Self {
891 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
892 })
893 }
894
895 pub fn get_transaction_override(
896 &self,
897 tx_digest: &TransactionDigest,
898 ) -> Option<TransactionEffectsDigest> {
899 self.transaction_overrides.read().get(tx_digest).copied()
900 }
901}
902
903pub struct AuthorityState {
904 pub name: AuthorityName,
907 pub secret: StableSyncAuthoritySigner,
909
910 input_loader: TransactionInputLoader,
912 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
913 coin_reservation_resolver: Arc<CoinReservationResolver>,
914
915 epoch_store: ArcSwap<AuthorityPerEpochStore>,
916
917 execution_lock: RwLock<EpochId>,
922
923 pub indexes: Option<Arc<IndexStore>>,
924 pub rpc_index: Option<Arc<RpcIndexStore>>,
925
926 pub subscription_handler: Arc<SubscriptionHandler>,
927 pub checkpoint_store: Arc<CheckpointStore>,
928
929 committee_store: Arc<CommitteeStore>,
930
931 execution_scheduler: Arc<ExecutionScheduler>,
933
934 #[allow(unused)]
936 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
937
938 pub metrics: Arc<AuthorityMetrics>,
939 _pruner: AuthorityStorePruner,
940 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
941
942 db_checkpoint_config: DBCheckpointConfig,
944
945 pub config: NodeConfig,
946
947 pub overload_info: AuthorityOverloadInfo,
949
950 chain_identifier: ChainIdentifier,
952
953 pub(crate) congestion_tracker: Arc<CongestionTracker>,
954
955 pub traffic_controller: Option<Arc<TrafficController>>,
957
958 fork_recovery_state: Option<ForkRecoveryState>,
960
961 notify_epoch: tokio::sync::watch::Sender<EpochId>,
963}
964
965impl AuthorityState {
972 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
973 epoch_store.committee().authority_exists(&self.name)
974 }
975
976 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
977 !self.is_validator(epoch_store)
978 }
979
980 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
981 &self.committee_store
982 }
983
984 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
985 self.committee_store.clone()
986 }
987
988 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
989 &self.config.authority_overload_config
990 }
991
992 pub fn get_epoch_state_commitments(
993 &self,
994 epoch: EpochId,
995 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
996 self.checkpoint_store.get_epoch_state_commitments(epoch)
997 }
998
999 fn handle_transaction_deny_checks(
1000 &self,
1001 transaction: &VerifiedTransaction,
1002 epoch_store: &Arc<AuthorityPerEpochStore>,
1003 ) -> SuiResult<CheckedInputObjects> {
1004 let tx_digest = transaction.digest();
1005 let tx_data = transaction.data().transaction_data();
1006
1007 let input_object_kinds = tx_data.input_objects()?;
1008 let receiving_objects_refs = tx_data.receiving_objects();
1009
1010 sui_transaction_checks::deny::check_transaction_for_signing(
1014 tx_data,
1015 transaction.tx_signatures(),
1016 &input_object_kinds,
1017 &receiving_objects_refs,
1018 &self.config.transaction_deny_config,
1019 self.get_backing_package_store().as_ref(),
1020 )?;
1021
1022 let withdraws = tx_data.process_funds_withdrawals_for_signing(
1023 self.chain_identifier,
1024 self.coin_reservation_resolver.as_ref(),
1025 )?;
1026
1027 self.execution_cache_trait_pointers
1028 .account_funds_read
1029 .check_amounts_available(&withdraws)?;
1030
1031 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1032 Some(tx_digest),
1033 &input_object_kinds,
1034 &receiving_objects_refs,
1035 epoch_store.epoch(),
1036 )?;
1037
1038 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1039 epoch_store.protocol_config(),
1040 epoch_store.reference_gas_price(),
1041 tx_data,
1042 input_objects,
1043 &receiving_objects,
1044 &self.metrics.bytecode_verifier_metrics,
1045 &self.config.verifier_signing_config,
1046 )?;
1047
1048 self.handle_coin_deny_list_checks(
1049 tx_data,
1050 &checked_input_objects,
1051 &receiving_objects,
1052 epoch_store,
1053 )?;
1054
1055 Ok(checked_input_objects)
1056 }
1057
1058 fn handle_coin_deny_list_checks(
1059 &self,
1060 tx_data: &TransactionData,
1061 checked_input_objects: &CheckedInputObjects,
1062 receiving_objects: &ReceivingObjects,
1063 epoch_store: &Arc<AuthorityPerEpochStore>,
1064 ) -> SuiResult<()> {
1065 let funds_withdraw_types = tx_data
1066 .get_funds_withdrawals()
1067 .into_iter()
1068 .filter_map(|withdraw| {
1069 withdraw
1070 .type_arg
1071 .get_balance_type_param()
1072 .map(|ty| ty.to_canonical_string(false))
1073 })
1074 .collect::<BTreeSet<_>>();
1075
1076 if epoch_store.coin_deny_list_v1_enabled() {
1077 check_coin_deny_list_v1(
1078 tx_data.sender(),
1079 checked_input_objects,
1080 receiving_objects,
1081 funds_withdraw_types.clone(),
1082 &self.get_object_store(),
1083 )?;
1084 }
1085
1086 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1087 check_coin_deny_list_v2_during_signing(
1088 tx_data.sender(),
1089 checked_input_objects,
1090 receiving_objects,
1091 funds_withdraw_types.clone(),
1092 &self.get_object_store(),
1093 )?;
1094 }
1095
1096 Ok(())
1097 }
1098
1099 #[instrument(level = "trace", skip_all)]
1102 fn handle_transaction_impl(
1103 &self,
1104 transaction: VerifiedTransaction,
1105 sign: bool,
1106 epoch_store: &Arc<AuthorityPerEpochStore>,
1107 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
1108 let _execution_lock = self.execution_lock_for_signing()?;
1110
1111 let checked_input_objects =
1112 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1113
1114 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1115
1116 let tx_digest = *transaction.digest();
1117 let signed_transaction = if sign {
1118 Some(VerifiedSignedTransaction::new(
1119 epoch_store.epoch(),
1120 transaction,
1121 self.name,
1122 &*self.secret,
1123 ))
1124 } else {
1125 None
1126 };
1127
1128 if epoch_store.protocol_config().disable_preconsensus_locking() {
1129 self.get_cache_writer()
1136 .validate_owned_object_versions(&owned_objects)?;
1137 } else {
1138 self.get_cache_writer().acquire_transaction_locks(
1143 epoch_store,
1144 &owned_objects,
1145 tx_digest,
1146 signed_transaction.clone(),
1147 )?;
1148 }
1149
1150 Ok(signed_transaction)
1151 }
1152
1153 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1163 pub fn handle_vote_transaction(
1164 &self,
1165 epoch_store: &Arc<AuthorityPerEpochStore>,
1166 transaction: VerifiedTransaction,
1167 ) -> SuiResult<()> {
1168 debug!("handle_vote_transaction");
1169
1170 let _metrics_guard = self
1171 .metrics
1172 .authority_state_handle_vote_transaction_latency
1173 .start_timer();
1174 self.metrics.tx_orders.inc();
1175
1176 if !epoch_store
1180 .get_reconfig_state_read_lock_guard()
1181 .should_accept_user_certs()
1182 {
1183 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1184 }
1185
1186 let tx_digest = *transaction.digest();
1191 if epoch_store.is_recently_finalized(&tx_digest)
1192 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1193 {
1194 return Ok(());
1195 }
1196
1197 if self
1198 .get_transaction_cache_reader()
1199 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1200 {
1201 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1202 digest: (*transaction.digest()),
1203 }
1204 .into());
1205 }
1206
1207 let result =
1208 self.handle_transaction_impl(transaction, false , epoch_store)?;
1209 assert!(
1210 result.is_none(),
1211 "handle_transaction_impl should not return a signed transaction when sign is false"
1212 );
1213 Ok(())
1214 }
1215
1216 pub fn check_transaction_validity(
1225 &self,
1226 epoch_store: &Arc<AuthorityPerEpochStore>,
1227 transaction: &VerifiedTransaction,
1228 enforce_live_input_objects: bool,
1229 ) -> SuiResult<()> {
1230 if !epoch_store
1231 .get_reconfig_state_read_lock_guard()
1232 .should_accept_user_certs()
1233 {
1234 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1235 }
1236
1237 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1238
1239 let checked_input_objects =
1240 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1241
1242 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1245 let cache_reader = self.get_object_cache_reader();
1246
1247 for obj_ref in &owned_objects {
1248 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1249 if obj_ref.1 < live_object.version() {
1252 return Err(SuiErrorKind::UserInputError {
1253 error: UserInputError::ObjectVersionUnavailableForConsumption {
1254 provided_obj_ref: *obj_ref,
1255 current_version: live_object.version(),
1256 },
1257 }
1258 .into());
1259 }
1260
1261 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1262 return Err(SuiErrorKind::UserInputError {
1264 error: UserInputError::ObjectVersionUnavailableForConsumption {
1265 provided_obj_ref: *obj_ref,
1266 current_version: live_object.version(),
1267 },
1268 }
1269 .into());
1270 }
1271
1272 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1274 return Err(SuiErrorKind::UserInputError {
1275 error: UserInputError::InvalidObjectDigest {
1276 object_id: obj_ref.0,
1277 expected_digest: live_object.digest(),
1278 },
1279 }
1280 .into());
1281 }
1282 }
1283 }
1284
1285 Ok(())
1286 }
1287
1288 pub fn check_system_overload_at_signing(&self) -> bool {
1289 self.config
1290 .authority_overload_config
1291 .check_system_overload_at_signing
1292 }
1293
1294 pub fn check_system_overload_at_execution(&self) -> bool {
1295 self.config
1296 .authority_overload_config
1297 .check_system_overload_at_execution
1298 }
1299
1300 pub(crate) fn check_system_overload(
1301 &self,
1302 consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1303 tx_data: &SenderSignedData,
1304 do_authority_overload_check: bool,
1305 ) -> SuiResult {
1306 if do_authority_overload_check {
1307 self.check_authority_overload(tx_data).tap_err(|_| {
1308 self.update_overload_metrics("execution_queue");
1309 })?;
1310 }
1311 self.execution_scheduler
1312 .check_execution_overload(self.overload_config(), tx_data)
1313 .tap_err(|_| {
1314 self.update_overload_metrics("execution_pending");
1315 })?;
1316 consensus_overload_checker
1317 .check_consensus_overload()
1318 .tap_err(|_| {
1319 self.update_overload_metrics("consensus");
1320 })?;
1321
1322 let pending_tx_count = self
1323 .get_cache_commit()
1324 .approximate_pending_transaction_count();
1325 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1326 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1327 retry_after_secs: 10,
1328 }
1329 .into());
1330 }
1331
1332 Ok(())
1333 }
1334
1335 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1336 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1337 return Ok(());
1338 }
1339
1340 let load_shedding_percentage = self
1341 .overload_info
1342 .load_shedding_percentage
1343 .load(Ordering::Relaxed);
1344 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1345 }
1346
1347 fn update_overload_metrics(&self, source: &str) {
1348 self.metrics
1349 .transaction_overload_sources
1350 .with_label_values(&[source])
1351 .inc();
1352 }
1353
1354 pub(crate) async fn wait_for_fastpath_dependency_objects(
1358 &self,
1359 transaction: &VerifiedTransaction,
1360 epoch: EpochId,
1361 ) -> SuiResult<bool> {
1362 let txn_data = transaction.data().transaction_data();
1363 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1364
1365 let fastpath_dependency_objects: Vec<_> = move_objects
1367 .into_iter()
1368 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1369 .chain(
1370 packages
1371 .into_iter()
1372 .map(|package_id| InputKey::Package { id: package_id }),
1373 )
1374 .collect();
1375 let receiving_keys: HashSet<_> = receiving_objects
1376 .into_iter()
1377 .filter_map(|receiving_obj_ref| {
1378 self.should_wait_for_dependency_object(receiving_obj_ref)
1379 })
1380 .collect();
1381 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1382 return Ok(true);
1383 }
1384
1385 let max_wait = if cfg!(msim) {
1388 Duration::from_millis(200)
1389 } else {
1390 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1391 };
1392
1393 match timeout(
1394 max_wait,
1395 self.get_object_cache_reader().notify_read_input_objects(
1396 &fastpath_dependency_objects,
1397 &receiving_keys,
1398 epoch,
1399 ),
1400 )
1401 .await
1402 {
1403 Ok(()) => Ok(true),
1404 Err(_) => Ok(false),
1407 }
1408 }
1409
1410 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1417 let (obj_id, cur_version, _digest) = obj_ref;
1418 let Some(latest_obj_ref) = self
1419 .get_object_cache_reader()
1420 .get_latest_object_ref_or_tombstone(obj_id)
1421 else {
1422 return Some(InputKey::VersionedObject {
1424 id: FullObjectID::new(obj_id, None),
1425 version: cur_version,
1426 });
1427 };
1428 let latest_digest = latest_obj_ref.2;
1429 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1430 return None;
1432 }
1433 let latest_version = latest_obj_ref.1;
1434 if cur_version <= latest_version {
1435 return None;
1438 }
1439 Some(InputKey::VersionedObject {
1441 id: FullObjectID::new(obj_id, None),
1442 version: cur_version,
1443 })
1444 }
1445
1446 #[instrument(level = "trace", skip_all)]
1451 pub async fn wait_for_certificate_execution(
1452 &self,
1453 certificate: &VerifiedCertificate,
1454 epoch_store: &Arc<AuthorityPerEpochStore>,
1455 ) -> SuiResult<TransactionEffects> {
1456 self.wait_for_transaction_execution(
1457 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1458 epoch_store,
1459 )
1460 .await
1461 }
1462
1463 #[instrument(level = "trace", skip_all)]
1467 pub async fn wait_for_transaction_execution(
1468 &self,
1469 transaction: &VerifiedExecutableTransaction,
1470 epoch_store: &Arc<AuthorityPerEpochStore>,
1471 ) -> SuiResult<TransactionEffects> {
1472 let _metrics_guard = if transaction.is_consensus_tx() {
1473 self.metrics
1474 .execute_certificate_latency_shared_object
1475 .start_timer()
1476 } else {
1477 self.metrics
1478 .execute_certificate_latency_single_writer
1479 .start_timer()
1480 };
1481 trace!("execute_transaction");
1482
1483 self.metrics.total_cert_attempts.inc();
1484
1485 if !transaction.is_consensus_tx()
1486 && !epoch_store.protocol_config().disable_preconsensus_locking()
1487 {
1488 self.execution_scheduler.enqueue(
1496 vec![(
1497 Schedulable::Transaction(transaction.clone()),
1498 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
1499 )],
1500 epoch_store,
1501 );
1502 }
1503
1504 epoch_store
1507 .within_alive_epoch(self.notify_read_effects(
1508 "AuthorityState::wait_for_transaction_execution",
1509 *transaction.digest(),
1510 ))
1511 .await
1512 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1513 .and_then(|r| r)
1514 }
1515
1516 pub async fn await_transaction_effects(
1520 &self,
1521 digest: TransactionDigest,
1522 epoch_store: &Arc<AuthorityPerEpochStore>,
1523 ) -> SuiResult<TransactionEffects> {
1524 let _metrics_guard = self.metrics.await_transaction_latency.start_timer();
1525 debug!("await_transaction");
1526
1527 epoch_store
1528 .within_alive_epoch(
1529 self.notify_read_effects("AuthorityState::await_transaction_effects", digest),
1530 )
1531 .await
1532 .map_err(|_| SuiErrorKind::EpochEnded(epoch_store.epoch()).into())
1533 .and_then(|r| r)
1534 }
1535
1536 #[instrument(level = "trace", skip_all)]
1548 pub async fn try_execute_immediately(
1549 &self,
1550 certificate: &VerifiedExecutableTransaction,
1551 mut execution_env: ExecutionEnv,
1552 epoch_store: &Arc<AuthorityPerEpochStore>,
1553 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1554 let _scope = monitored_scope("Execution::try_execute_immediately");
1555 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1556
1557 let tx_digest = certificate.digest();
1558
1559 if let Some(fork_recovery) = &self.fork_recovery_state
1560 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1561 {
1562 warn!(
1563 ?tx_digest,
1564 original_digest = ?execution_env.expected_effects_digest,
1565 override_digest = ?override_digest,
1566 "Applying fork recovery override for transaction effects digest"
1567 );
1568 execution_env.expected_effects_digest = Some(override_digest);
1569 }
1570
1571 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1573
1574 let tx_cache_reader = self.get_transaction_cache_reader();
1575 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1576 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1577 assert_eq!(
1578 effects.digest(),
1579 expected_effects_digest,
1580 "Unexpected effects digest for transaction {:?}",
1581 tx_digest
1582 );
1583 }
1584 tx_guard.release();
1585 return ExecutionOutput::Success((effects, None));
1586 }
1587
1588 let execution_start_time = Instant::now();
1589
1590 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1595 else {
1596 tx_guard.release();
1597 return ExecutionOutput::EpochEnded;
1598 };
1599 if *execution_guard != epoch_store.epoch() {
1604 tx_guard.release();
1605 info!("The epoch of the execution_guard doesn't match the epoch store");
1606 return ExecutionOutput::EpochEnded;
1607 }
1608
1609 let scheduling_source = execution_env.scheduling_source;
1610 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1611 let mysticeti_fp_outputs = if epoch_store.protocol_config().mysticeti_fastpath() {
1612 tx_cache_reader.get_mysticeti_fastpath_outputs(tx_digest)
1613 } else {
1614 None
1615 };
1616
1617 let (transaction_outputs, timings, execution_error_opt) = if let Some(outputs) =
1618 mysticeti_fp_outputs
1619 {
1620 assert!(
1621 !certificate.is_consensus_tx(),
1622 "Mysticeti fastpath executed transactions cannot be consensus transactions"
1623 );
1624 debug!(
1629 ?tx_digest,
1630 "Mysticeti fastpath certified transaction outputs found in cache, skipping execution and committing"
1631 );
1632 (outputs, None, None)
1633 } else {
1634 let (transaction_outputs, timings, execution_error_opt) = match self
1635 .process_certificate(
1636 &tx_guard,
1637 &execution_guard,
1638 certificate,
1639 execution_env,
1640 epoch_store,
1641 ) {
1642 ExecutionOutput::Success(result) => result,
1643 output => return output.unwrap_err(),
1644 };
1645 (
1646 Arc::new(transaction_outputs),
1647 Some(timings),
1648 execution_error_opt,
1649 )
1650 };
1651
1652 fail_point!("crash");
1653
1654 let effects = transaction_outputs.effects.clone();
1655 debug!(
1656 ?tx_digest,
1657 fx_digest=?effects.digest(),
1658 "process_certificate succeeded in {:.3}ms",
1659 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1660 );
1661
1662 if scheduling_source == SchedulingSource::MysticetiFastPath {
1663 self.get_cache_writer()
1664 .write_fastpath_transaction_outputs(transaction_outputs);
1665 } else {
1666 let commit_result =
1667 self.commit_certificate(certificate, transaction_outputs, epoch_store);
1668 if let Err(err) = commit_result {
1669 error!(?tx_digest, "Error committing transaction: {err}");
1670 tx_guard.release();
1671 return ExecutionOutput::Fatal(err);
1672 }
1673 }
1674
1675 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1676 certificate.data().transaction_data().kind()
1677 {
1678 if let Some(err) = &execution_error_opt {
1679 debug_fatal!("Authenticator state update failed: {:?}", err);
1680 }
1681 epoch_store.update_authenticator_state(auth_state);
1682
1683 if cfg!(debug_assertions) {
1685 let authenticator_state = get_authenticator_state(self.get_object_store())
1686 .expect("Read cannot fail")
1687 .expect("Authenticator state must exist");
1688
1689 let mut sys_jwks: Vec<_> = authenticator_state
1690 .active_jwks
1691 .into_iter()
1692 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1693 .collect();
1694 let mut active_jwks: Vec<_> = epoch_store
1695 .signature_verifier
1696 .get_jwks()
1697 .into_iter()
1698 .collect();
1699 sys_jwks.sort();
1700 active_jwks.sort();
1701
1702 assert_eq!(sys_jwks, active_jwks);
1703 }
1704 }
1705
1706 if certificate
1710 .transaction_data()
1711 .kind()
1712 .is_accumulator_barrier_settle_tx()
1713 {
1714 let next_accumulator_version = accumulator_version.unwrap().next();
1717 self.execution_scheduler
1718 .settle_object_funds(next_accumulator_version);
1719 }
1720
1721 tx_guard.commit_tx();
1722
1723 let elapsed = execution_start_time.elapsed();
1724 if let Some(timings) = timings {
1725 epoch_store.record_local_execution_time(
1726 certificate.data().transaction_data(),
1727 &effects,
1728 timings,
1729 elapsed,
1730 );
1731 }
1732
1733 let elapsed_us = elapsed.as_micros() as f64;
1734 if elapsed_us > 0.0 {
1735 self.metrics
1736 .execution_gas_latency_ratio
1737 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1738 };
1739 ExecutionOutput::Success((effects, execution_error_opt))
1740 }
1741
1742 pub fn read_objects_for_execution(
1743 &self,
1744 tx_lock: &CertLockGuard,
1745 certificate: &VerifiedExecutableTransaction,
1746 assigned_shared_object_versions: &AssignedVersions,
1747 epoch_store: &Arc<AuthorityPerEpochStore>,
1748 ) -> SuiResult<InputObjects> {
1749 let _scope = monitored_scope("Execution::load_input_objects");
1750 let _metrics_guard = self
1751 .metrics
1752 .execution_load_input_objects_latency
1753 .start_timer();
1754 let input_objects = &certificate.data().transaction_data().input_objects()?;
1755 self.input_loader.read_objects_for_execution(
1756 &certificate.key(),
1757 tx_lock,
1758 input_objects,
1759 assigned_shared_object_versions,
1760 epoch_store.epoch(),
1761 )
1762 }
1763
1764 pub async fn try_execute_for_test(
1767 &self,
1768 certificate: &VerifiedCertificate,
1769 execution_env: ExecutionEnv,
1770 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1771 let epoch_store = self.epoch_store_for_testing();
1772 let (effects, execution_error_opt) = self
1773 .try_execute_immediately(
1774 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1775 execution_env,
1776 &epoch_store,
1777 )
1778 .await
1779 .unwrap();
1780 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1781 (signed_effects, execution_error_opt)
1782 }
1783
1784 pub async fn try_execute_executable_for_test(
1785 &self,
1786 executable: &VerifiedExecutableTransaction,
1787 execution_env: ExecutionEnv,
1788 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1789 let epoch_store = self.epoch_store_for_testing();
1790 let (effects, execution_error_opt) = self
1791 .try_execute_immediately(executable, execution_env, &epoch_store)
1792 .await
1793 .unwrap();
1794 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1795 (signed_effects, execution_error_opt)
1796 }
1797
1798 pub async fn notify_read_effects(
1799 &self,
1800 task_name: &'static str,
1801 digest: TransactionDigest,
1802 ) -> SuiResult<TransactionEffects> {
1803 Ok(self
1804 .get_transaction_cache_reader()
1805 .notify_read_executed_effects(task_name, &[digest])
1806 .await
1807 .pop()
1808 .expect("must return correct number of effects"))
1809 }
1810
1811 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1812 self.get_object_cache_reader()
1813 .check_owned_objects_are_live(owned_object_refs)
1814 }
1815
1816 pub(crate) fn debug_dump_transaction_state(
1821 &self,
1822 tx_digest: &TransactionDigest,
1823 effects: &TransactionEffects,
1824 expected_effects_digest: TransactionEffectsDigest,
1825 inner_temporary_store: &InnerTemporaryStore,
1826 certificate: &VerifiedExecutableTransaction,
1827 debug_dump_config: &StateDebugDumpConfig,
1828 ) -> SuiResult<PathBuf> {
1829 let dump_dir = debug_dump_config
1830 .dump_file_directory
1831 .as_ref()
1832 .cloned()
1833 .unwrap_or(std::env::temp_dir());
1834 let epoch_store = self.load_epoch_store_one_call_per_task();
1835
1836 NodeStateDump::new(
1837 tx_digest,
1838 effects,
1839 expected_effects_digest,
1840 self.get_object_store().as_ref(),
1841 &epoch_store,
1842 inner_temporary_store,
1843 certificate,
1844 )?
1845 .write_to_file(&dump_dir)
1846 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1847 }
1848
1849 #[instrument(level = "trace", skip_all)]
1850 pub(crate) fn process_certificate(
1851 &self,
1852 tx_guard: &CertTxGuard,
1853 execution_guard: &ExecutionLockReadGuard<'_>,
1854 certificate: &VerifiedExecutableTransaction,
1855 execution_env: ExecutionEnv,
1856 epoch_store: &Arc<AuthorityPerEpochStore>,
1857 ) -> ExecutionOutput<(
1858 TransactionOutputs,
1859 Vec<ExecutionTiming>,
1860 Option<ExecutionError>,
1861 )> {
1862 let _scope = monitored_scope("Execution::process_certificate");
1863 let tx_digest = *certificate.digest();
1864
1865 let input_objects = match self.read_objects_for_execution(
1866 tx_guard.as_lock_guard(),
1867 certificate,
1868 &execution_env.assigned_versions,
1869 epoch_store,
1870 ) {
1871 Ok(objects) => objects,
1872 Err(e) => return ExecutionOutput::Fatal(e),
1873 };
1874
1875 let expected_effects_digest = match execution_env.expected_effects_digest {
1876 Some(expected_effects_digest) => Some(expected_effects_digest),
1877 None => {
1878 match epoch_store.get_signed_effects_digest(&tx_digest) {
1883 Ok(digest) => digest,
1884 Err(e) => return ExecutionOutput::Fatal(e),
1885 }
1886 }
1887 };
1888
1889 fail_point_if!("correlated-crash-process-certificate", || {
1890 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1891 sui_simulator::task::kill_current_node(None);
1892 }
1893 });
1894
1895 self.execute_certificate(
1900 execution_guard,
1901 certificate,
1902 input_objects,
1903 expected_effects_digest,
1904 execution_env,
1905 epoch_store,
1906 )
1907 }
1908
1909 pub async fn reconfigure_traffic_control(
1910 &self,
1911 params: TrafficControlReconfigParams,
1912 ) -> Result<TrafficControlReconfigParams, SuiError> {
1913 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1914 traffic_controller.admin_reconfigure(params).await
1915 } else {
1916 Err(SuiErrorKind::InvalidAdminRequest(
1917 "Traffic controller is not configured on this node".to_string(),
1918 )
1919 .into())
1920 }
1921 }
1922
1923 #[instrument(level = "trace", skip_all)]
1924 fn commit_certificate(
1925 &self,
1926 certificate: &VerifiedExecutableTransaction,
1927 transaction_outputs: Arc<TransactionOutputs>,
1928 epoch_store: &Arc<AuthorityPerEpochStore>,
1929 ) -> SuiResult {
1930 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1931 monitored_scope("Execution::commit_certificate");
1932 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1933
1934 let tx_digest = certificate.digest();
1935
1936 epoch_store.insert_executed_in_epoch(tx_digest);
1939 let key = certificate.key();
1940 if !matches!(key, TransactionKey::Digest(_)) {
1941 epoch_store.insert_tx_key(key, *tx_digest)?;
1942 }
1943
1944 fail_point!("crash");
1946
1947 self.get_cache_writer()
1948 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1949
1950 if certificate.transaction_data().is_end_of_epoch_tx() {
1951 self.get_object_cache_reader()
1954 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1955 }
1956
1957 Ok(())
1958 }
1959
1960 fn update_metrics(
1961 &self,
1962 certificate: &VerifiedExecutableTransaction,
1963 inner_temporary_store: &InnerTemporaryStore,
1964 effects: &TransactionEffects,
1965 ) {
1966 if certificate.has_zklogin_sig() {
1968 self.metrics.zklogin_sig_count.inc();
1969 } else if certificate.has_upgraded_multisig() {
1970 self.metrics.multisig_sig_count.inc();
1971 }
1972
1973 self.metrics.total_effects.inc();
1974 self.metrics.total_certs.inc();
1975
1976 let consensus_object_count = effects.input_consensus_objects().len();
1977 if consensus_object_count > 0 {
1978 self.metrics.shared_obj_tx.inc();
1979 }
1980
1981 if certificate.is_sponsored_tx() {
1982 self.metrics.sponsored_tx.inc();
1983 }
1984
1985 let input_object_count = inner_temporary_store.input_objects.len();
1986 self.metrics
1987 .num_input_objs
1988 .observe(input_object_count as f64);
1989 self.metrics
1990 .num_shared_objects
1991 .observe(consensus_object_count as f64);
1992 self.metrics.batch_size.observe(
1993 certificate
1994 .data()
1995 .intent_message()
1996 .value
1997 .kind()
1998 .num_commands() as f64,
1999 );
2000 }
2001
2002 #[instrument(level = "trace", skip_all)]
2011 fn execute_certificate(
2012 &self,
2013 _execution_guard: &ExecutionLockReadGuard<'_>,
2014 certificate: &VerifiedExecutableTransaction,
2015 input_objects: InputObjects,
2016 expected_effects_digest: Option<TransactionEffectsDigest>,
2017 execution_env: ExecutionEnv,
2018 epoch_store: &Arc<AuthorityPerEpochStore>,
2019 ) -> ExecutionOutput<(
2020 TransactionOutputs,
2021 Vec<ExecutionTiming>,
2022 Option<ExecutionError>,
2023 )> {
2024 let _scope = monitored_scope("Execution::prepare_certificate");
2025 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
2026 let prepare_certificate_start_time = tokio::time::Instant::now();
2027
2028 let tx_data = certificate.data().transaction_data();
2030
2031 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
2032 return ExecutionOutput::Fatal(e);
2033 }
2034
2035 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2039 certificate,
2040 input_objects,
2041 epoch_store.protocol_config(),
2042 epoch_store.reference_gas_price(),
2043 ) {
2044 Ok(result) => result,
2045 Err(e) => return ExecutionOutput::Fatal(e),
2046 };
2047
2048 let owned_object_refs = input_objects.inner().filter_owned_objects();
2049 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2050 return ExecutionOutput::Fatal(e);
2051 }
2052 let tx_digest = *certificate.digest();
2053 let protocol_config = epoch_store.protocol_config();
2054 let transaction_data = &certificate.data().intent_message().value;
2055 let (kind, signer, gas_data) = transaction_data.execution_parts();
2056 let early_execution_error = get_early_execution_error(
2057 &tx_digest,
2058 &input_objects,
2059 self.config.certificate_deny_config.certificate_deny_set(),
2060 &execution_env.funds_withdraw_status,
2061 );
2062 let execution_params = match early_execution_error {
2063 Some(error) => ExecutionOrEarlyError::Err(error),
2064 None => ExecutionOrEarlyError::Ok(()),
2065 };
2066
2067 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2068
2069 #[allow(unused_mut)]
2070 let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
2071 epoch_store.executor().execute_transaction_to_effects(
2072 &tracking_store,
2073 protocol_config,
2074 self.metrics.limits_metrics.clone(),
2075 self.config
2078 .expensive_safety_check_config
2079 .enable_deep_per_tx_sui_conservation_check(),
2080 execution_params,
2081 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2082 epoch_store
2083 .epoch_start_config()
2084 .epoch_data()
2085 .epoch_start_timestamp(),
2086 input_objects,
2087 gas_data,
2088 gas_status,
2089 kind,
2090 signer,
2091 tx_digest,
2092 &mut None,
2093 );
2094
2095 if !self
2096 .execution_scheduler
2097 .should_commit_object_funds_withdraws(
2098 certificate,
2099 &effects,
2100 &execution_env,
2101 epoch_store,
2102 )
2103 {
2104 return ExecutionOutput::RetryLater;
2105 }
2106
2107 if let Some(expected_effects_digest) = expected_effects_digest
2108 && effects.digest() != expected_effects_digest
2109 {
2110 match self.debug_dump_transaction_state(
2112 &tx_digest,
2113 &effects,
2114 expected_effects_digest,
2115 &inner_temp_store,
2116 certificate,
2117 &self.config.state_debug_dump_config,
2118 ) {
2119 Ok(out_path) => {
2120 info!(
2121 "Dumped node state for transaction {} to {}",
2122 tx_digest,
2123 out_path.as_path().display().to_string()
2124 );
2125 }
2126 Err(e) => {
2127 error!("Error dumping state for transaction {}: {e}", tx_digest);
2128 }
2129 }
2130 let expected_effects = self
2131 .get_transaction_cache_reader()
2132 .get_effects(&expected_effects_digest);
2133 error!(
2134 ?tx_digest,
2135 ?expected_effects_digest,
2136 actual_effects = ?effects,
2137 expected_effects = ?expected_effects,
2138 "fork detected!"
2139 );
2140 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2141 tx_digest,
2142 expected_effects_digest,
2143 effects.digest(),
2144 ) {
2145 error!("Failed to record transaction fork: {e}");
2146 }
2147
2148 fail_point_if!("kill_transaction_fork_node", || {
2149 #[cfg(msim)]
2150 {
2151 tracing::error!(
2152 fatal = true,
2153 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2154 tx_digest
2155 );
2156 sui_simulator::task::shutdown_current_node();
2157 }
2158 });
2159
2160 fatal!(
2161 "Transaction {} is expected to have effects digest {}, but got {}!",
2162 tx_digest,
2163 expected_effects_digest,
2164 effects.digest()
2165 );
2166 }
2167
2168 fail_point_arg!("simulate_fork_during_execution", |(
2169 forked_validators,
2170 full_halt,
2171 effects_overrides,
2172 fork_probability,
2173 ): (
2174 std::sync::Arc<
2175 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2176 >,
2177 bool,
2178 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2179 f32,
2180 )| {
2181 #[cfg(msim)]
2182 self.simulate_fork_during_execution(
2183 certificate,
2184 epoch_store,
2185 &mut effects,
2186 forked_validators,
2187 full_halt,
2188 effects_overrides,
2189 fork_probability,
2190 );
2191 });
2192
2193 let unchanged_loaded_runtime_objects =
2194 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2195 certificate.transaction_data(),
2196 &effects,
2197 &tracking_store.into_read_objects(),
2198 );
2199
2200 let _ = self
2202 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2203 .tap_err(|e| {
2204 self.metrics.post_processing_total_failures.inc();
2205 error!(?tx_digest, "tx post processing failed: {e}");
2206 });
2207
2208 self.update_metrics(certificate, &inner_temp_store, &effects);
2209
2210 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2211 certificate.clone().into_unsigned(),
2212 effects,
2213 inner_temp_store,
2214 unchanged_loaded_runtime_objects,
2215 );
2216
2217 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2218 if elapsed > 0.0 {
2219 self.metrics.prepare_cert_gas_latency_ratio.observe(
2220 transaction_outputs
2221 .effects
2222 .gas_cost_summary()
2223 .computation_cost as f64
2224 / elapsed,
2225 );
2226 }
2227
2228 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2229 }
2230
2231 pub fn prepare_certificate_for_benchmark(
2232 &self,
2233 certificate: &VerifiedExecutableTransaction,
2234 input_objects: InputObjects,
2235 epoch_store: &Arc<AuthorityPerEpochStore>,
2236 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2237 let lock = RwLock::new(epoch_store.epoch());
2238 let execution_guard = lock.try_read().unwrap();
2239
2240 let (transaction_outputs, _timings, execution_error_opt) = self
2241 .execute_certificate(
2242 &execution_guard,
2243 certificate,
2244 input_objects,
2245 None,
2246 ExecutionEnv::default(),
2247 epoch_store,
2248 )
2249 .unwrap();
2250 Ok((transaction_outputs, execution_error_opt))
2251 }
2252
2253 #[instrument(skip_all)]
2254 #[allow(clippy::type_complexity)]
2255 pub async fn dry_exec_transaction(
2256 &self,
2257 transaction: TransactionData,
2258 transaction_digest: TransactionDigest,
2259 ) -> SuiResult<(
2260 DryRunTransactionBlockResponse,
2261 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2262 TransactionEffects,
2263 Option<ObjectID>,
2264 )> {
2265 let epoch_store = self.load_epoch_store_one_call_per_task();
2266 if !self.is_fullnode(&epoch_store) {
2267 return Err(SuiErrorKind::UnsupportedFeatureError {
2268 error: "dry-exec is only supported on fullnodes".to_string(),
2269 }
2270 .into());
2271 }
2272
2273 if transaction.kind().is_system_tx() {
2274 return Err(SuiErrorKind::UnsupportedFeatureError {
2275 error: "dry-exec does not support system transactions".to_string(),
2276 }
2277 .into());
2278 }
2279
2280 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2281 }
2282
2283 #[allow(clippy::type_complexity)]
2284 pub fn dry_exec_transaction_for_benchmark(
2285 &self,
2286 transaction: TransactionData,
2287 transaction_digest: TransactionDigest,
2288 ) -> SuiResult<(
2289 DryRunTransactionBlockResponse,
2290 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2291 TransactionEffects,
2292 Option<ObjectID>,
2293 )> {
2294 let epoch_store = self.load_epoch_store_one_call_per_task();
2295 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2296 }
2297
2298 #[allow(clippy::type_complexity)]
2299 fn dry_exec_transaction_impl(
2300 &self,
2301 epoch_store: &AuthorityPerEpochStore,
2302 transaction: TransactionData,
2303 transaction_digest: TransactionDigest,
2304 ) -> SuiResult<(
2305 DryRunTransactionBlockResponse,
2306 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2307 TransactionEffects,
2308 Option<ObjectID>,
2309 )> {
2310 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2312
2313 let input_object_kinds = transaction.input_objects()?;
2314 let receiving_object_refs = transaction.receiving_objects();
2315
2316 sui_transaction_checks::deny::check_transaction_for_signing(
2317 &transaction,
2318 &[],
2319 &input_object_kinds,
2320 &receiving_object_refs,
2321 &self.config.transaction_deny_config,
2322 self.get_backing_package_store().as_ref(),
2323 )?;
2324
2325 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2326 None,
2328 &input_object_kinds,
2329 &receiving_object_refs,
2330 epoch_store.epoch(),
2331 )?;
2332
2333 let mut gas_data = transaction.gas_data().clone();
2335 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2336 let sender = transaction.sender();
2337 const MIST_TO_SUI: u64 = 1_000_000_000;
2339 const DRY_RUN_SUI: u64 = 1_000_000_000;
2340 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2341 let gas_object_id = ObjectID::random();
2342 let gas_object = Object::new_move(
2343 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2344 Owner::AddressOwner(sender),
2345 TransactionDigest::genesis_marker(),
2346 );
2347 let gas_object_ref = gas_object.compute_object_reference();
2348 gas_data.payment = vec![gas_object_ref];
2349 (
2350 sui_transaction_checks::check_transaction_input_with_given_gas(
2351 epoch_store.protocol_config(),
2352 epoch_store.reference_gas_price(),
2353 &transaction,
2354 input_objects,
2355 receiving_objects,
2356 gas_object,
2357 &self.metrics.bytecode_verifier_metrics,
2358 &self.config.verifier_signing_config,
2359 )?,
2360 Some(gas_object_id),
2361 )
2362 } else {
2363 (
2364 sui_transaction_checks::check_transaction_input(
2365 epoch_store.protocol_config(),
2366 epoch_store.reference_gas_price(),
2367 &transaction,
2368 input_objects,
2369 &receiving_objects,
2370 &self.metrics.bytecode_verifier_metrics,
2371 &self.config.verifier_signing_config,
2372 )?,
2373 None,
2374 )
2375 };
2376
2377 let protocol_config = epoch_store.protocol_config();
2378 let (kind, signer, _) = transaction.execution_parts();
2379
2380 let silent = true;
2381 let executor = sui_execution::executor(protocol_config, silent)
2382 .expect("Creating an executor should not fail here");
2383
2384 let expensive_checks = false;
2385 let early_execution_error = get_early_execution_error(
2386 &transaction_digest,
2387 &checked_input_objects,
2388 self.config.certificate_deny_config.certificate_deny_set(),
2389 &FundsWithdrawStatus::MaybeSufficient,
2395 );
2396 let execution_params = match early_execution_error {
2397 Some(error) => ExecutionOrEarlyError::Err(error),
2398 None => ExecutionOrEarlyError::Ok(()),
2399 };
2400 let (inner_temp_store, _, effects, _timings, execution_error) = executor
2401 .execute_transaction_to_effects(
2402 self.get_backing_store().as_ref(),
2403 protocol_config,
2404 self.metrics.limits_metrics.clone(),
2405 expensive_checks,
2406 execution_params,
2407 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2408 epoch_store
2409 .epoch_start_config()
2410 .epoch_data()
2411 .epoch_start_timestamp(),
2412 checked_input_objects,
2413 gas_data,
2414 gas_status,
2415 kind,
2416 signer,
2417 transaction_digest,
2418 &mut None,
2419 );
2420 let tx_digest = *effects.transaction_digest();
2421
2422 let module_cache =
2423 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2424
2425 let mut layout_resolver =
2426 epoch_store
2427 .executor()
2428 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2429 &inner_temp_store,
2430 self.get_backing_package_store(),
2431 )));
2432 let object_changes = Vec::new();
2434
2435 let balance_changes = Vec::new();
2437
2438 let written_with_kind = effects
2439 .created()
2440 .into_iter()
2441 .map(|(oref, _)| (oref, WriteKind::Create))
2442 .chain(
2443 effects
2444 .unwrapped()
2445 .into_iter()
2446 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2447 )
2448 .chain(
2449 effects
2450 .mutated()
2451 .into_iter()
2452 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2453 )
2454 .map(|(oref, kind)| {
2455 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2456 (oref.0, (oref, obj.clone(), kind))
2458 })
2459 .collect();
2460
2461 let execution_error_source = execution_error
2462 .as_ref()
2463 .err()
2464 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2465
2466 Ok((
2467 DryRunTransactionBlockResponse {
2468 suggested_gas_price: self
2469 .congestion_tracker
2470 .get_suggested_gas_prices(&transaction),
2471 input: SuiTransactionBlockData::try_from_with_module_cache(
2472 transaction,
2473 &module_cache,
2474 )
2475 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2476 error: format!(
2477 "Failed to convert transaction to SuiTransactionBlockData: {}",
2478 e
2479 ),
2480 })?, effects: effects.clone().try_into()?,
2482 events: SuiTransactionBlockEvents::try_from(
2483 inner_temp_store.events.clone(),
2484 tx_digest,
2485 None,
2486 layout_resolver.as_mut(),
2487 )?,
2488 object_changes,
2489 balance_changes,
2490 execution_error_source,
2491 },
2492 written_with_kind,
2493 effects,
2494 mock_gas,
2495 ))
2496 }
2497
2498 pub fn simulate_transaction(
2499 &self,
2500 mut transaction: TransactionData,
2501 checks: TransactionChecks,
2502 allow_mock_gas_coin: bool,
2503 ) -> SuiResult<SimulateTransactionResult> {
2504 if transaction.kind().is_system_tx() {
2505 return Err(SuiErrorKind::UnsupportedFeatureError {
2506 error: "simulate does not support system transactions".to_string(),
2507 }
2508 .into());
2509 }
2510
2511 let epoch_store = self.load_epoch_store_one_call_per_task();
2512 if !self.is_fullnode(&epoch_store) {
2513 return Err(SuiErrorKind::UnsupportedFeatureError {
2514 error: "simulate is only supported on fullnodes".to_string(),
2515 }
2516 .into());
2517 }
2518
2519 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2521
2522 let input_object_kinds = transaction.input_objects()?;
2523 let receiving_object_refs = transaction.receiving_objects();
2524
2525 sui_transaction_checks::deny::check_transaction_for_signing(
2526 &transaction,
2527 &[],
2528 &input_object_kinds,
2529 &receiving_object_refs,
2530 &self.config.transaction_deny_config,
2531 self.get_backing_package_store().as_ref(),
2532 )?;
2533
2534 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2535 None,
2537 &input_object_kinds,
2538 &receiving_object_refs,
2539 epoch_store.epoch(),
2540 )?;
2541
2542 let mock_gas_id = if allow_mock_gas_coin && transaction.gas().is_empty() {
2544 let mock_gas_object = Object::new_move(
2545 MoveObject::new_gas_coin(
2546 OBJECT_START_VERSION,
2547 ObjectID::MAX,
2548 DEV_INSPECT_GAS_COIN_VALUE,
2549 ),
2550 Owner::AddressOwner(transaction.gas_data().owner),
2551 TransactionDigest::genesis_marker(),
2552 );
2553 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2554 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2555 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2556 Some(mock_gas_object.id())
2557 } else {
2558 None
2559 };
2560
2561 let protocol_config = epoch_store.protocol_config();
2562
2563 let (gas_status, checked_input_objects) = if checks.enabled() {
2564 sui_transaction_checks::check_transaction_input(
2565 epoch_store.protocol_config(),
2566 epoch_store.reference_gas_price(),
2567 &transaction,
2568 input_objects,
2569 &receiving_objects,
2570 &self.metrics.bytecode_verifier_metrics,
2571 &self.config.verifier_signing_config,
2572 )?
2573 } else {
2574 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2575 protocol_config,
2576 transaction.kind(),
2577 input_objects,
2578 receiving_objects,
2579 )?;
2580 let gas_status = SuiGasStatus::new(
2581 transaction.gas_budget(),
2582 transaction.gas_price(),
2583 epoch_store.reference_gas_price(),
2584 protocol_config,
2585 )?;
2586
2587 (gas_status, checked_input_objects)
2588 };
2589
2590 let executor = sui_execution::executor(
2592 protocol_config,
2593 true, )
2595 .expect("Creating an executor should not fail here");
2596
2597 let (kind, signer, gas_data) = transaction.execution_parts();
2598 let early_execution_error = get_early_execution_error(
2599 &transaction.digest(),
2600 &checked_input_objects,
2601 self.config.certificate_deny_config.certificate_deny_set(),
2602 &FundsWithdrawStatus::MaybeSufficient,
2604 );
2605 let execution_params = match early_execution_error {
2606 Some(error) => ExecutionOrEarlyError::Err(error),
2607 None => ExecutionOrEarlyError::Ok(()),
2608 };
2609
2610 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2611
2612 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2613 &tracking_store,
2614 protocol_config,
2615 self.metrics.limits_metrics.clone(),
2616 false, execution_params,
2618 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2619 epoch_store
2620 .epoch_start_config()
2621 .epoch_data()
2622 .epoch_start_timestamp(),
2623 checked_input_objects,
2624 gas_data,
2625 gas_status,
2626 kind,
2627 signer,
2628 transaction.digest(),
2629 checks.disabled(),
2630 );
2631
2632 let loaded_runtime_objects = tracking_store.into_read_objects();
2633 let unchanged_loaded_runtime_objects =
2634 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2635 &transaction,
2636 &effects,
2637 &loaded_runtime_objects,
2638 );
2639
2640 let object_set = {
2641 let objects = {
2642 let mut objects = loaded_runtime_objects;
2643
2644 for o in inner_temp_store
2645 .input_objects
2646 .into_values()
2647 .chain(inner_temp_store.written.into_values())
2648 {
2649 objects.insert(o);
2650 }
2651
2652 objects
2653 };
2654
2655 let object_keys = sui_types::storage::get_transaction_object_set(
2656 &transaction,
2657 &effects,
2658 &unchanged_loaded_runtime_objects,
2659 );
2660
2661 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2662 for k in object_keys {
2663 if let Some(o) = objects.get(&k) {
2664 set.insert(o.clone());
2665 }
2666 }
2667
2668 set
2669 };
2670
2671 Ok(SimulateTransactionResult {
2672 objects: object_set,
2673 events: effects.events_digest().map(|_| inner_temp_store.events),
2674 effects,
2675 execution_result,
2676 mock_gas_id,
2677 unchanged_loaded_runtime_objects,
2678 })
2679 }
2680
2681 #[allow(clippy::collapsible_else_if)]
2683 #[instrument(skip_all)]
2684 pub async fn dev_inspect_transaction_block(
2685 &self,
2686 sender: SuiAddress,
2687 transaction_kind: TransactionKind,
2688 gas_price: Option<u64>,
2689 gas_budget: Option<u64>,
2690 gas_sponsor: Option<SuiAddress>,
2691 gas_objects: Option<Vec<ObjectRef>>,
2692 show_raw_txn_data_and_effects: Option<bool>,
2693 skip_checks: Option<bool>,
2694 ) -> SuiResult<DevInspectResults> {
2695 let epoch_store = self.load_epoch_store_one_call_per_task();
2696
2697 if !self.is_fullnode(&epoch_store) {
2698 return Err(SuiErrorKind::UnsupportedFeatureError {
2699 error: "dev-inspect is only supported on fullnodes".to_string(),
2700 }
2701 .into());
2702 }
2703
2704 if transaction_kind.is_system_tx() {
2705 return Err(SuiErrorKind::UnsupportedFeatureError {
2706 error: "system transactions are not supported".to_string(),
2707 }
2708 .into());
2709 }
2710
2711 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2712 let skip_checks = skip_checks.unwrap_or(true);
2713 let reference_gas_price = epoch_store.reference_gas_price();
2714 let protocol_config = epoch_store.protocol_config();
2715 let max_tx_gas = protocol_config.max_tx_gas();
2716
2717 let price = gas_price.unwrap_or(reference_gas_price);
2718 let budget = gas_budget.unwrap_or(max_tx_gas);
2719 let owner = gas_sponsor.unwrap_or(sender);
2720 let payment = gas_objects.unwrap_or_default();
2722 let mut transaction = TransactionData::V1(TransactionDataV1 {
2723 kind: transaction_kind.clone(),
2724 sender,
2725 gas_data: GasData {
2726 payment,
2727 owner,
2728 price,
2729 budget,
2730 },
2731 expiration: TransactionExpiration::None,
2732 });
2733
2734 let raw_txn_data = if show_raw_txn_data_and_effects {
2735 bcs::to_bytes(&transaction).map_err(|_| {
2736 SuiErrorKind::TransactionSerializationError {
2737 error: "Failed to serialize transaction during dev inspect".to_string(),
2738 }
2739 })?
2740 } else {
2741 vec![]
2742 };
2743
2744 transaction.validity_check_no_gas_check(protocol_config)?;
2745
2746 let input_object_kinds = transaction.input_objects()?;
2747 let receiving_object_refs = transaction.receiving_objects();
2748
2749 sui_transaction_checks::deny::check_transaction_for_signing(
2750 &transaction,
2751 &[],
2752 &input_object_kinds,
2753 &receiving_object_refs,
2754 &self.config.transaction_deny_config,
2755 self.get_backing_package_store().as_ref(),
2756 )?;
2757
2758 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2759 None,
2761 &input_object_kinds,
2762 &receiving_object_refs,
2763 epoch_store.epoch(),
2764 )?;
2765
2766 let (gas_status, checked_input_objects) = if skip_checks {
2767 if transaction.gas().is_empty() {
2771 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2773 DEV_INSPECT_GAS_COIN_VALUE,
2774 transaction.gas_owner(),
2775 );
2776 let gas_object_ref = dummy_gas_object.compute_object_reference();
2777 transaction.gas_data_mut().payment = vec![gas_object_ref];
2778 input_objects.push(ObjectReadResult::new(
2779 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2780 dummy_gas_object.into(),
2781 ));
2782 }
2783 let checked_input_objects = sui_transaction_checks::check_dev_inspect_input(
2784 protocol_config,
2785 &transaction_kind,
2786 input_objects,
2787 receiving_objects,
2788 )?;
2789 let gas_status = SuiGasStatus::new(
2790 max_tx_gas,
2791 transaction.gas_price(),
2792 reference_gas_price,
2793 protocol_config,
2794 )?;
2795
2796 (gas_status, checked_input_objects)
2797 } else {
2798 if transaction.gas().is_empty() {
2801 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2803 DEV_INSPECT_GAS_COIN_VALUE,
2804 transaction.gas_owner(),
2805 );
2806 let gas_object_ref = dummy_gas_object.compute_object_reference();
2807 transaction.gas_data_mut().payment = vec![gas_object_ref];
2808 sui_transaction_checks::check_transaction_input_with_given_gas(
2809 epoch_store.protocol_config(),
2810 epoch_store.reference_gas_price(),
2811 &transaction,
2812 input_objects,
2813 receiving_objects,
2814 dummy_gas_object,
2815 &self.metrics.bytecode_verifier_metrics,
2816 &self.config.verifier_signing_config,
2817 )?
2818 } else {
2819 sui_transaction_checks::check_transaction_input(
2820 epoch_store.protocol_config(),
2821 epoch_store.reference_gas_price(),
2822 &transaction,
2823 input_objects,
2824 &receiving_objects,
2825 &self.metrics.bytecode_verifier_metrics,
2826 &self.config.verifier_signing_config,
2827 )?
2828 }
2829 };
2830
2831 let executor = sui_execution::executor(protocol_config, true)
2832 .expect("Creating an executor should not fail here");
2833 let gas_data = transaction.gas_data().clone();
2834 let intent_msg = IntentMessage::new(
2835 Intent {
2836 version: IntentVersion::V0,
2837 scope: IntentScope::TransactionData,
2838 app_id: AppId::Sui,
2839 },
2840 transaction,
2841 );
2842 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2843 let early_execution_error = get_early_execution_error(
2844 &transaction_digest,
2845 &checked_input_objects,
2846 self.config.certificate_deny_config.certificate_deny_set(),
2847 &FundsWithdrawStatus::MaybeSufficient,
2849 );
2850 let execution_params = match early_execution_error {
2851 Some(error) => ExecutionOrEarlyError::Err(error),
2852 None => ExecutionOrEarlyError::Ok(()),
2853 };
2854 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2855 self.get_backing_store().as_ref(),
2856 protocol_config,
2857 self.metrics.limits_metrics.clone(),
2858 false,
2859 execution_params,
2860 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2861 epoch_store
2862 .epoch_start_config()
2863 .epoch_data()
2864 .epoch_start_timestamp(),
2865 checked_input_objects,
2866 gas_data,
2867 gas_status,
2868 transaction_kind,
2869 sender,
2870 transaction_digest,
2871 skip_checks,
2872 );
2873
2874 let raw_effects = if show_raw_txn_data_and_effects {
2875 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2876 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2877 })?
2878 } else {
2879 vec![]
2880 };
2881
2882 let mut layout_resolver =
2883 epoch_store
2884 .executor()
2885 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2886 &inner_temp_store,
2887 self.get_backing_package_store(),
2888 )));
2889
2890 DevInspectResults::new(
2891 effects,
2892 inner_temp_store.events.clone(),
2893 execution_result,
2894 raw_txn_data,
2895 raw_effects,
2896 layout_resolver.as_mut(),
2897 )
2898 }
2899
2900 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2902 let epoch_store = self.epoch_store_for_testing();
2903 Ok(epoch_store.reference_gas_price())
2904 }
2905
2906 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2907 self.get_transaction_cache_reader()
2908 .is_tx_already_executed(digest)
2909 }
2910
2911 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2912 fn index_tx(
2913 &self,
2914 indexes: &IndexStore,
2915 digest: &TransactionDigest,
2916 cert: &VerifiedExecutableTransaction,
2918 effects: &TransactionEffects,
2919 events: &TransactionEvents,
2920 timestamp_ms: u64,
2921 tx_coins: Option<TxCoins>,
2922 written: &WrittenObjects,
2923 inner_temporary_store: &InnerTemporaryStore,
2924 epoch_store: &Arc<AuthorityPerEpochStore>,
2925 ) -> SuiResult<u64> {
2926 let changes = self
2927 .process_object_index(effects, written, inner_temporary_store, epoch_store)
2928 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2929
2930 indexes.index_tx(
2931 cert.data().intent_message().value.sender(),
2932 cert.data()
2933 .intent_message()
2934 .value
2935 .input_objects()?
2936 .iter()
2937 .map(|o| o.object_id()),
2938 effects
2939 .all_changed_objects()
2940 .into_iter()
2941 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2942 cert.data()
2943 .intent_message()
2944 .value
2945 .move_calls()
2946 .into_iter()
2947 .map(|(_cmd_idx, package, module, function)| {
2948 (*package, module.to_owned(), function.to_owned())
2949 }),
2950 events,
2951 changes,
2952 digest,
2953 timestamp_ms,
2954 tx_coins,
2955 effects.accumulator_events(),
2956 )
2957 }
2958
2959 #[cfg(msim)]
2960 fn simulate_fork_during_execution(
2961 &self,
2962 certificate: &VerifiedExecutableTransaction,
2963 epoch_store: &Arc<AuthorityPerEpochStore>,
2964 effects: &mut TransactionEffects,
2965 forked_validators: std::sync::Arc<
2966 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2967 >,
2968 full_halt: bool,
2969 effects_overrides: std::sync::Arc<
2970 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2971 >,
2972 fork_probability: f32,
2973 ) {
2974 use std::cell::RefCell;
2975 thread_local! {
2976 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2977 }
2978 if !certificate.data().intent_message().value.is_system_tx() {
2979 let committee = epoch_store.committee();
2980 let cur_stake = (**committee).weight(&self.name);
2981 if cur_stake > 0 {
2982 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2983 let already_forked = forked_validators
2984 .lock()
2985 .ok()
2986 .map(|set| set.contains(&self.name))
2987 .unwrap_or(false);
2988
2989 if !already_forked {
2990 let should_fork = if full_halt {
2991 *total_stake <= committee.validity_threshold()
2993 } else {
2994 *total_stake + cur_stake < committee.validity_threshold()
2996 };
2997
2998 if should_fork {
2999 *total_stake += cur_stake;
3000
3001 if let Ok(mut external_set) = forked_validators.lock() {
3002 external_set.insert(self.name);
3003 info!("forked_validators: {:?}", external_set);
3004 }
3005 }
3006 }
3007
3008 if let Ok(external_set) = forked_validators.lock() {
3009 if external_set.contains(&self.name) {
3010 let tx_digest = certificate.digest().to_string();
3016 if let Ok(mut overrides) = effects_overrides.lock() {
3017 if overrides.contains_key(&tx_digest)
3018 || overrides.is_empty()
3019 && sui_simulator::random::deterministic_probability(
3020 &tx_digest,
3021 fork_probability,
3022 )
3023 {
3024 let original_effects_digest = effects.digest().to_string();
3025 overrides
3026 .insert(tx_digest.clone(), original_effects_digest.clone());
3027 info!(
3028 ?tx_digest,
3029 ?original_effects_digest,
3030 "Captured forked effects digest for transaction"
3031 );
3032 effects.gas_cost_summary_mut_for_testing().computation_cost +=
3033 1;
3034 }
3035 }
3036 }
3037 }
3038 });
3039 }
3040 }
3041 }
3042
3043 fn process_object_index(
3044 &self,
3045 effects: &TransactionEffects,
3046 written: &WrittenObjects,
3047 inner_temporary_store: &InnerTemporaryStore,
3048 epoch_store: &Arc<AuthorityPerEpochStore>,
3049 ) -> SuiResult<ObjectIndexChanges> {
3050 let mut layout_resolver =
3051 epoch_store
3052 .executor()
3053 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3054 inner_temporary_store,
3055 self.get_backing_package_store(),
3056 )));
3057
3058 let modified_at_version = effects
3059 .modified_at_versions()
3060 .into_iter()
3061 .collect::<HashMap<_, _>>();
3062
3063 let tx_digest = effects.transaction_digest();
3064 let mut deleted_owners = vec![];
3065 let mut deleted_dynamic_fields = vec![];
3066 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3067 let old_version = modified_at_version.get(&id).unwrap();
3068 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
3071 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3072 ) {
3073 Owner::AddressOwner(addr)
3074 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3075 Owner::ObjectOwner(object_id) => {
3076 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3077 }
3078 _ => {}
3079 }
3080 }
3081
3082 let mut new_owners = vec![];
3083 let mut new_dynamic_fields = vec![];
3084
3085 for (oref, owner, kind) in effects.all_changed_objects() {
3086 let id = &oref.0;
3087 if let WriteKind::Mutate = kind {
3089 let Some(old_version) = modified_at_version.get(id) else {
3090 panic!(
3091 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3092 tx_digest
3093 );
3094 };
3095 let Some(old_object) = self.get_object_store().get_object_by_key(id, *old_version)
3098 else {
3099 panic!(
3100 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3101 tx_digest, id, old_version
3102 );
3103 };
3104 if old_object.owner != owner {
3105 match old_object.owner {
3106 Owner::AddressOwner(addr)
3107 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3108 deleted_owners.push((addr, *id));
3109 }
3110 Owner::ObjectOwner(object_id) => {
3111 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3112 }
3113 _ => {}
3114 }
3115 }
3116 }
3117
3118 match owner {
3119 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3120 let new_object = written.get(id).unwrap_or_else(
3122 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3123 );
3124 assert_eq!(
3125 new_object.version(),
3126 oref.1,
3127 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3128 tx_digest,
3129 id,
3130 new_object.version(),
3131 oref.1
3132 );
3133
3134 let type_ = new_object
3135 .type_()
3136 .map(|type_| ObjectType::Struct(type_.clone()))
3137 .unwrap_or(ObjectType::Package);
3138
3139 new_owners.push((
3140 (addr, *id),
3141 ObjectInfo {
3142 object_id: *id,
3143 version: oref.1,
3144 digest: oref.2,
3145 type_,
3146 owner,
3147 previous_transaction: *effects.transaction_digest(),
3148 },
3149 ));
3150 }
3151 Owner::ObjectOwner(owner) => {
3152 let new_object = written.get(id).unwrap_or_else(
3153 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3154 );
3155 assert_eq!(
3156 new_object.version(),
3157 oref.1,
3158 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3159 tx_digest,
3160 id,
3161 new_object.version(),
3162 oref.1
3163 );
3164
3165 let Some(df_info) = self
3166 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
3167 .unwrap_or_else(|e| {
3168 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
3169 None
3170 }
3171 )
3172 else {
3173 continue;
3175 };
3176 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3177 }
3178 _ => {}
3179 }
3180 }
3181
3182 Ok(ObjectIndexChanges {
3183 deleted_owners,
3184 deleted_dynamic_fields,
3185 new_owners,
3186 new_dynamic_fields,
3187 })
3188 }
3189
3190 fn try_create_dynamic_field_info(
3191 &self,
3192 o: &Object,
3193 written: &WrittenObjects,
3194 resolver: &mut dyn LayoutResolver,
3195 ) -> SuiResult<Option<DynamicFieldInfo>> {
3196 let Some(move_object) = o.data.try_as_move().cloned() else {
3198 return Ok(None);
3199 };
3200
3201 if !move_object.type_().is_dynamic_field() {
3203 return Ok(None);
3204 }
3205
3206 let layout = resolver
3207 .get_annotated_layout(&move_object.type_().clone().into())?
3208 .into_layout();
3209
3210 let field =
3211 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3212 SuiErrorKind::ObjectDeserializationError {
3213 error: e.to_string(),
3214 }
3215 })?;
3216
3217 let type_ = field.kind;
3218 let name_type: TypeTag = field.name_layout.into();
3219 let bcs_name = field.name_bytes.to_owned();
3220
3221 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3222 .map_err(|e| {
3223 warn!("{e}");
3224 SuiErrorKind::ObjectDeserializationError {
3225 error: e.to_string(),
3226 }
3227 })?;
3228
3229 let name = DynamicFieldName {
3230 type_: name_type,
3231 value: SuiMoveValue::from(name_value).to_json_value(),
3232 };
3233
3234 let value_metadata = field.value_metadata().map_err(|e| {
3235 warn!("{e}");
3236 SuiErrorKind::ObjectDeserializationError {
3237 error: e.to_string(),
3238 }
3239 })?;
3240
3241 Ok(Some(match value_metadata {
3242 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3243 name,
3244 bcs_name,
3245 type_,
3246 object_type: object_type.to_canonical_string(true),
3247 object_id: o.id(),
3248 version: o.version(),
3249 digest: o.digest(),
3250 },
3251
3252 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3253 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3257 let version = object.version();
3258 let digest = object.digest();
3259 let object_type = object.data.type_().unwrap().clone();
3260 (version, digest, object_type)
3261 } else {
3262 let object = self
3264 .get_object_store()
3265 .get_object_by_key(&object_id, o.version())
3266 .ok_or_else(|| UserInputError::ObjectNotFound {
3267 object_id,
3268 version: Some(o.version()),
3269 })?;
3270 let version = object.version();
3271 let digest = object.digest();
3272 let object_type = object.data.type_().unwrap().clone();
3273 (version, digest, object_type)
3274 };
3275
3276 DynamicFieldInfo {
3277 name,
3278 bcs_name,
3279 type_,
3280 object_type: object_type.to_string(),
3281 object_id,
3282 version,
3283 digest,
3284 }
3285 }
3286 }))
3287 }
3288
3289 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3290 fn post_process_one_tx(
3291 &self,
3292 certificate: &VerifiedExecutableTransaction,
3293 effects: &TransactionEffects,
3294 inner_temporary_store: &InnerTemporaryStore,
3295 epoch_store: &Arc<AuthorityPerEpochStore>,
3296 ) -> SuiResult {
3297 if self.indexes.is_none() {
3298 return Ok(());
3299 }
3300
3301 let _scope = monitored_scope("Execution::post_process_one_tx");
3302
3303 let tx_digest = certificate.digest();
3304 let timestamp_ms = Self::unixtime_now_ms();
3305 let events = &inner_temporary_store.events;
3306 let written = &inner_temporary_store.written;
3307 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3308 effects,
3309 inner_temporary_store,
3310 epoch_store,
3311 );
3312
3313 if let Some(indexes) = &self.indexes {
3315 let _ = self
3316 .index_tx(
3317 indexes.as_ref(),
3318 tx_digest,
3319 certificate,
3320 effects,
3321 events,
3322 timestamp_ms,
3323 tx_coins,
3324 written,
3325 inner_temporary_store,
3326 epoch_store,
3327 )
3328 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3329 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3330 .expect("Indexing tx should not fail");
3331
3332 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3333 let events = self.make_transaction_block_events(
3334 events.clone(),
3335 *tx_digest,
3336 timestamp_ms,
3337 epoch_store,
3338 inner_temporary_store,
3339 )?;
3340 self.subscription_handler
3342 .process_tx(certificate.data().transaction_data(), &effects, &events)
3343 .tap_ok(|_| {
3344 self.metrics
3345 .post_processing_total_tx_had_event_processed
3346 .inc()
3347 })
3348 .tap_err(|e| {
3349 warn!(
3350 ?tx_digest,
3351 "Post processing - Couldn't process events for tx: {}", e
3352 )
3353 })?;
3354
3355 self.metrics
3356 .post_processing_total_events_emitted
3357 .inc_by(events.data.len() as u64);
3358 };
3359 Ok(())
3360 }
3361
3362 fn make_transaction_block_events(
3363 &self,
3364 transaction_events: TransactionEvents,
3365 digest: TransactionDigest,
3366 timestamp_ms: u64,
3367 epoch_store: &Arc<AuthorityPerEpochStore>,
3368 inner_temporary_store: &InnerTemporaryStore,
3369 ) -> SuiResult<SuiTransactionBlockEvents> {
3370 let mut layout_resolver =
3371 epoch_store
3372 .executor()
3373 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3374 inner_temporary_store,
3375 self.get_backing_package_store(),
3376 )));
3377 SuiTransactionBlockEvents::try_from(
3378 transaction_events,
3379 digest,
3380 Some(timestamp_ms),
3381 layout_resolver.as_mut(),
3382 )
3383 }
3384
3385 pub fn unixtime_now_ms() -> u64 {
3386 let now = SystemTime::now()
3387 .duration_since(UNIX_EPOCH)
3388 .expect("Time went backwards")
3389 .as_millis();
3390 u64::try_from(now).expect("Travelling in time machine")
3391 }
3392
3393 #[instrument(level = "trace", skip_all)]
3397 pub async fn handle_transaction_info_request(
3398 &self,
3399 request: TransactionInfoRequest,
3400 ) -> SuiResult<TransactionInfoResponse> {
3401 let epoch_store = self.load_epoch_store_one_call_per_task();
3402 let (transaction, status) = self
3403 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3404 .ok_or(SuiErrorKind::TransactionNotFound {
3405 digest: request.transaction_digest,
3406 })?;
3407 Ok(TransactionInfoResponse {
3408 transaction,
3409 status,
3410 })
3411 }
3412
3413 #[instrument(level = "trace", skip_all)]
3414 pub async fn handle_object_info_request(
3415 &self,
3416 request: ObjectInfoRequest,
3417 ) -> SuiResult<ObjectInfoResponse> {
3418 let epoch_store = self.load_epoch_store_one_call_per_task();
3419
3420 let requested_object_seq = match request.request_kind {
3421 ObjectInfoRequestKind::LatestObjectInfo => {
3422 let (_, seq, _) = self
3423 .get_object_or_tombstone(request.object_id)
3424 .await
3425 .ok_or_else(|| {
3426 SuiError::from(UserInputError::ObjectNotFound {
3427 object_id: request.object_id,
3428 version: None,
3429 })
3430 })?;
3431 seq
3432 }
3433 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3434 };
3435
3436 let object = self
3437 .get_object_store()
3438 .get_object_by_key(&request.object_id, requested_object_seq)
3439 .ok_or_else(|| {
3440 SuiError::from(UserInputError::ObjectNotFound {
3441 object_id: request.object_id,
3442 version: Some(requested_object_seq),
3443 })
3444 })?;
3445
3446 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3447 (request.generate_layout, object.data.try_as_move())
3448 {
3449 Some(into_struct_layout(
3450 self.load_epoch_store_one_call_per_task()
3451 .executor()
3452 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3453 .get_annotated_layout(&move_obj.type_().clone().into())?,
3454 )?)
3455 } else {
3456 None
3457 };
3458
3459 let lock = if !object.is_address_owned() {
3460 None
3462 } else {
3463 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3464 .await?
3465 .map(|s| s.into_inner())
3466 };
3467
3468 Ok(ObjectInfoResponse {
3469 object,
3470 layout,
3471 lock_for_debugging: lock,
3472 })
3473 }
3474
3475 #[instrument(level = "trace", skip_all)]
3476 pub fn handle_checkpoint_request(
3477 &self,
3478 request: &CheckpointRequest,
3479 ) -> SuiResult<CheckpointResponse> {
3480 let summary = match request.sequence_number {
3481 Some(seq) => self
3482 .checkpoint_store
3483 .get_checkpoint_by_sequence_number(seq)?,
3484 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3485 }
3486 .map(|v| v.into_inner());
3487 let contents = match &summary {
3488 Some(s) => self
3489 .checkpoint_store
3490 .get_checkpoint_contents(&s.content_digest)?,
3491 None => None,
3492 };
3493 Ok(CheckpointResponse {
3494 checkpoint: summary,
3495 contents,
3496 })
3497 }
3498
3499 #[instrument(level = "trace", skip_all)]
3500 pub fn handle_checkpoint_request_v2(
3501 &self,
3502 request: &CheckpointRequestV2,
3503 ) -> SuiResult<CheckpointResponseV2> {
3504 let summary = if request.certified {
3505 let summary = match request.sequence_number {
3506 Some(seq) => self
3507 .checkpoint_store
3508 .get_checkpoint_by_sequence_number(seq)?,
3509 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3510 }
3511 .map(|v| v.into_inner());
3512 summary.map(CheckpointSummaryResponse::Certified)
3513 } else {
3514 let summary = match request.sequence_number {
3515 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3516 None => self
3517 .checkpoint_store
3518 .get_latest_locally_computed_checkpoint()?,
3519 };
3520 summary.map(CheckpointSummaryResponse::Pending)
3521 };
3522 let contents = match &summary {
3523 Some(s) => self
3524 .checkpoint_store
3525 .get_checkpoint_contents(&s.content_digest())?,
3526 None => None,
3527 };
3528 Ok(CheckpointResponseV2 {
3529 checkpoint: summary,
3530 contents,
3531 })
3532 }
3533
3534 fn check_protocol_version(
3535 supported_protocol_versions: SupportedProtocolVersions,
3536 current_version: ProtocolVersion,
3537 ) {
3538 info!("current protocol version is now {:?}", current_version);
3539 info!("supported versions are: {:?}", supported_protocol_versions);
3540 if !supported_protocol_versions.is_version_supported(current_version) {
3541 let msg = format!(
3542 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3543 current_version, supported_protocol_versions,
3544 );
3545
3546 error!("{}", msg);
3547 eprintln!("{}", msg);
3548
3549 #[cfg(not(msim))]
3550 std::process::exit(1);
3551
3552 #[cfg(msim)]
3553 sui_simulator::task::shutdown_current_node();
3554 }
3555 }
3556
3557 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3559 pub async fn new(
3560 name: AuthorityName,
3561 secret: StableSyncAuthoritySigner,
3562 supported_protocol_versions: SupportedProtocolVersions,
3563 store: Arc<AuthorityStore>,
3564 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3565 epoch_store: Arc<AuthorityPerEpochStore>,
3566 committee_store: Arc<CommitteeStore>,
3567 indexes: Option<Arc<IndexStore>>,
3568 rpc_index: Option<Arc<RpcIndexStore>>,
3569 checkpoint_store: Arc<CheckpointStore>,
3570 prometheus_registry: &Registry,
3571 genesis_objects: &[Object],
3572 db_checkpoint_config: &DBCheckpointConfig,
3573 config: NodeConfig,
3574 chain_identifier: ChainIdentifier,
3575 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3576 policy_config: Option<PolicyConfig>,
3577 firewall_config: Option<RemoteFirewallConfig>,
3578 pruner_watermarks: Arc<PrunerWatermarks>,
3579 ) -> Arc<Self> {
3580 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3581
3582 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3583 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3584 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3585 execution_cache_trait_pointers.object_cache_reader.clone(),
3586 execution_cache_trait_pointers.account_funds_read.clone(),
3587 execution_cache_trait_pointers
3588 .transaction_cache_reader
3589 .clone(),
3590 tx_ready_certificates,
3591 &epoch_store,
3592 metrics.clone(),
3593 ));
3594 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3595
3596 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3597 epoch_store.get_parent_path(),
3598 &config.authority_store_pruning_config,
3599 );
3600 let _pruner = AuthorityStorePruner::new(
3601 store.perpetual_tables.clone(),
3602 checkpoint_store.clone(),
3603 rpc_index.clone(),
3604 indexes.clone(),
3605 config.authority_store_pruning_config.clone(),
3606 epoch_store.committee().authority_exists(&name),
3607 epoch_store.epoch_start_state().epoch_duration_ms(),
3608 prometheus_registry,
3609 pruner_db,
3610 pruner_watermarks,
3611 );
3612 let input_loader =
3613 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3614 let epoch = epoch_store.epoch();
3615 let traffic_controller_metrics =
3616 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3617 let traffic_controller = if let Some(policy_config) = policy_config {
3618 Some(Arc::new(
3619 TrafficController::init(
3620 policy_config,
3621 traffic_controller_metrics,
3622 firewall_config.clone(),
3623 )
3624 .await,
3625 ))
3626 } else {
3627 None
3628 };
3629
3630 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3631 ForkRecoveryState::new(Some(fork_config))
3632 .expect("Failed to initialize fork recovery state")
3633 });
3634
3635 let coin_reservation_resolver = Arc::new(CoinReservationResolver::new(
3636 execution_cache_trait_pointers.child_object_resolver.clone(),
3637 ));
3638
3639 let state = Arc::new(AuthorityState {
3640 name,
3641 secret,
3642 execution_lock: RwLock::new(epoch),
3643 epoch_store: ArcSwap::new(epoch_store.clone()),
3644 input_loader,
3645 execution_cache_trait_pointers,
3646 coin_reservation_resolver,
3647 indexes,
3648 rpc_index,
3649 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3650 checkpoint_store,
3651 committee_store,
3652 execution_scheduler,
3653 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3654 metrics,
3655 _pruner,
3656 _authority_per_epoch_pruner,
3657 db_checkpoint_config: db_checkpoint_config.clone(),
3658 config,
3659 overload_info: AuthorityOverloadInfo::default(),
3660 chain_identifier,
3661 congestion_tracker: Arc::new(CongestionTracker::new()),
3662 traffic_controller,
3663 fork_recovery_state,
3664 notify_epoch: tokio::sync::watch::channel(epoch).0,
3665 });
3666
3667 let state_clone = Arc::downgrade(&state);
3668 spawn_monitored_task!(fix_indexes(state_clone));
3669 let authority_state = Arc::downgrade(&state);
3671 spawn_monitored_task!(execution_process(
3672 authority_state,
3673 rx_ready_certificates,
3674 rx_execution_shutdown,
3675 ));
3676 state
3678 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3679 .expect("Error indexing genesis objects.");
3680
3681 if epoch_store
3682 .protocol_config()
3683 .enable_multi_epoch_transaction_expiration()
3684 && epoch_store.epoch() > 0
3685 {
3686 use typed_store::Map;
3687 let previous_epoch = epoch_store.epoch() - 1;
3688 let start_key = (previous_epoch, TransactionDigest::ZERO);
3689 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3690 let has_previous_epoch_data = store
3691 .perpetual_tables
3692 .executed_transaction_digests
3693 .safe_range_iter(start_key..end_key)
3694 .next()
3695 .is_some();
3696
3697 if !has_previous_epoch_data {
3698 panic!(
3699 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3700 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3701 Please restore from a snapshot using the latest version of sui-tool.",
3702 previous_epoch
3703 );
3704 }
3705 }
3706
3707 state
3708 }
3709
3710 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3712 &self.execution_cache_trait_pointers.object_cache_reader
3713 }
3714
3715 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3716 &self.execution_cache_trait_pointers.transaction_cache_reader
3717 }
3718
3719 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3720 &self.execution_cache_trait_pointers.cache_writer
3721 }
3722
3723 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3724 &self.execution_cache_trait_pointers.backing_store
3725 }
3726
3727 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3728 &self.execution_cache_trait_pointers.child_object_resolver
3729 }
3730
3731 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3732 &self.execution_cache_trait_pointers.account_funds_read
3733 }
3734
3735 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3736 &self.execution_cache_trait_pointers.backing_package_store
3737 }
3738
3739 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3740 &self.execution_cache_trait_pointers.object_store
3741 }
3742
3743 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3744 &self.execution_cache_trait_pointers.reconfig_api
3745 }
3746
3747 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3748 &self.execution_cache_trait_pointers.global_state_hash_store
3749 }
3750
3751 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3752 &self.execution_cache_trait_pointers.checkpoint_cache
3753 }
3754
3755 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3756 &self.execution_cache_trait_pointers.state_sync_store
3757 }
3758
3759 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3760 &self.execution_cache_trait_pointers.cache_commit
3761 }
3762
3763 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3764 self.execution_cache_trait_pointers
3765 .testing_api
3766 .database_for_testing()
3767 }
3768
3769 pub fn cache_for_testing(&self) -> &WritebackCache {
3770 self.execution_cache_trait_pointers
3771 .testing_api
3772 .cache_for_testing()
3773 }
3774
3775 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3776 &self,
3777 config: NodeConfig,
3778 metrics: Arc<AuthorityStorePruningMetrics>,
3779 ) -> anyhow::Result<()> {
3780 use crate::authority::authority_store_pruner::PrunerWatermarks;
3781 let watermarks = Arc::new(PrunerWatermarks::default());
3782 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3783 &self.database_for_testing().perpetual_tables,
3784 &self.checkpoint_store,
3785 self.rpc_index.as_deref(),
3786 None,
3787 config.authority_store_pruning_config,
3788 metrics,
3789 EPOCH_DURATION_MS_FOR_TESTING,
3790 &watermarks,
3791 )
3792 .await
3793 }
3794
3795 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3796 &self.execution_scheduler
3797 }
3798
3799 fn create_owner_index_if_empty(
3800 &self,
3801 genesis_objects: &[Object],
3802 epoch_store: &Arc<AuthorityPerEpochStore>,
3803 ) -> SuiResult {
3804 let Some(index_store) = &self.indexes else {
3805 return Ok(());
3806 };
3807 if !index_store.is_empty() {
3808 return Ok(());
3809 }
3810
3811 let mut new_owners = vec![];
3812 let mut new_dynamic_fields = vec![];
3813 let mut layout_resolver = epoch_store
3814 .executor()
3815 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3816 for o in genesis_objects.iter() {
3817 match o.owner {
3818 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3819 new_owners.push((
3820 (addr, o.id()),
3821 ObjectInfo::new(&o.compute_object_reference(), o),
3822 ))
3823 }
3824 Owner::ObjectOwner(object_id) => {
3825 let id = o.id();
3826 let Some(info) = self.try_create_dynamic_field_info(
3827 o,
3828 &BTreeMap::new(),
3829 layout_resolver.as_mut(),
3830 )?
3831 else {
3832 continue;
3833 };
3834 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3835 }
3836 _ => {}
3837 }
3838 }
3839
3840 index_store.insert_genesis_objects(ObjectIndexChanges {
3841 deleted_owners: vec![],
3842 deleted_dynamic_fields: vec![],
3843 new_owners,
3844 new_dynamic_fields,
3845 })
3846 }
3847
3848 pub fn execution_lock_for_executable_transaction(
3852 &self,
3853 transaction: &VerifiedExecutableTransaction,
3854 ) -> Option<ExecutionLockReadGuard<'_>> {
3855 let lock = self.execution_lock.try_read().ok()?;
3856 if *lock == transaction.auth_sig().epoch() {
3857 Some(lock)
3858 } else {
3859 None
3861 }
3862 }
3863
3864 pub fn execution_lock_for_signing(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
3869 self.execution_lock
3870 .try_read()
3871 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
3872 }
3873
3874 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3875 self.execution_lock.write().await
3876 }
3877
3878 #[instrument(level = "error", skip_all)]
3879 pub async fn reconfigure(
3880 &self,
3881 cur_epoch_store: &AuthorityPerEpochStore,
3882 supported_protocol_versions: SupportedProtocolVersions,
3883 new_committee: Committee,
3884 epoch_start_configuration: EpochStartConfiguration,
3885 state_hasher: Arc<GlobalStateHasher>,
3886 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3887 epoch_last_checkpoint: CheckpointSequenceNumber,
3888 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
3889 Self::check_protocol_version(
3890 supported_protocol_versions,
3891 epoch_start_configuration
3892 .epoch_start_state()
3893 .protocol_version(),
3894 );
3895
3896 self.committee_store.insert_new_committee(&new_committee)?;
3897
3898 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3900
3901 cur_epoch_store.epoch_terminated().await;
3903
3904 {
3908 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
3909 if state.should_accept_user_certs() {
3910 cur_epoch_store.close_user_certs(state);
3917 }
3918 }
3920
3921 self.get_reconfig_api()
3922 .clear_state_end_of_epoch(&execution_lock);
3923 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
3924 self.maybe_reaccumulate_state_hash(
3925 cur_epoch_store,
3926 epoch_start_configuration
3927 .epoch_start_state()
3928 .protocol_version(),
3929 );
3930 self.get_reconfig_api()
3931 .set_epoch_start_configuration(&epoch_start_configuration);
3932 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
3933 && self
3934 .db_checkpoint_config
3935 .perform_db_checkpoints_at_epoch_end
3936 {
3937 let checkpoint_indexes = self
3938 .db_checkpoint_config
3939 .perform_index_db_checkpoints_at_epoch_end
3940 .unwrap_or(false);
3941 let current_epoch = cur_epoch_store.epoch();
3942 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
3943 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
3944 }
3945
3946 self.get_reconfig_api()
3947 .reconfigure_cache(&epoch_start_configuration)
3948 .await;
3949
3950 let new_epoch = new_committee.epoch;
3951 let new_epoch_store = self
3952 .reopen_epoch_db(
3953 cur_epoch_store,
3954 new_committee,
3955 epoch_start_configuration,
3956 expensive_safety_check_config,
3957 epoch_last_checkpoint,
3958 )
3959 .await?;
3960 assert_eq!(new_epoch_store.epoch(), new_epoch);
3961 self.execution_scheduler
3962 .reconfigure(&new_epoch_store, self.get_account_funds_read());
3963 *execution_lock = new_epoch;
3964
3965 self.notify_epoch(new_epoch);
3966 Ok(new_epoch_store)
3970 }
3971
3972 fn notify_epoch(&self, new_epoch: EpochId) {
3973 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
3974 }
3975
3976 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
3977 let mut rx = self.notify_epoch.subscribe();
3978 loop {
3979 let epoch = *rx.borrow();
3980 if epoch >= target_epoch {
3981 return Ok(epoch);
3982 }
3983 rx.changed().await?;
3984 }
3985 }
3986
3987 pub async fn settle_accumulator_for_testing(&self, effects: &[TransactionEffects]) {
3988 let accumulator_version = self
3989 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3990 .await
3991 .unwrap()
3992 .version();
3993 let ckpt_seq = accumulator_version.value();
3996 let builder = AccumulatorSettlementTxBuilder::new(
3997 Some(self.get_transaction_cache_reader().as_ref()),
3998 effects,
3999 ckpt_seq,
4000 0,
4001 );
4002 let balance_changes = builder.collect_funds_changes();
4003 let epoch_store = self.epoch_store_for_testing();
4004 let epoch = epoch_store.epoch();
4005 let accumulator_root_obj_initial_shared_version = epoch_store
4006 .epoch_start_config()
4007 .accumulator_root_obj_initial_shared_version()
4008 .unwrap();
4009 let settlements = builder.build_tx(
4010 epoch_store.protocol_config(),
4011 epoch,
4012 accumulator_root_obj_initial_shared_version,
4013 ckpt_seq,
4014 ckpt_seq,
4015 );
4016
4017 let settlements: Vec<_> = settlements
4018 .into_iter()
4019 .map(|tx| {
4020 VerifiedExecutableTransaction::new_system(
4021 VerifiedTransaction::new_system_transaction(tx),
4022 epoch,
4023 )
4024 })
4025 .collect();
4026
4027 let assigned_versions = epoch_store
4028 .assign_shared_object_versions_for_tests(
4029 self.get_object_cache_reader().as_ref(),
4030 &settlements,
4031 )
4032 .unwrap();
4033 let version_map = assigned_versions.into_map();
4034
4035 let mut settlement_effects = Vec::with_capacity(settlements.len());
4036 for tx in settlements {
4037 let env = ExecutionEnv::new()
4038 .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
4039 let (effects, _) = self
4040 .try_execute_immediately(&tx, env, &epoch_store)
4041 .await
4042 .unwrap();
4043 assert!(effects.status().is_ok());
4044 settlement_effects.push(effects);
4045 }
4046
4047 let barrier = accumulators::build_accumulator_barrier_tx(
4048 epoch,
4049 accumulator_root_obj_initial_shared_version,
4050 ckpt_seq,
4051 &settlement_effects,
4052 );
4053 let barrier = VerifiedExecutableTransaction::new_system(
4054 VerifiedTransaction::new_system_transaction(barrier),
4055 epoch,
4056 );
4057
4058 let assigned_versions = epoch_store
4059 .assign_shared_object_versions_for_tests(
4060 self.get_object_cache_reader().as_ref(),
4061 std::slice::from_ref(&barrier),
4062 )
4063 .unwrap();
4064 let version_map = assigned_versions.into_map();
4065
4066 let env = ExecutionEnv::new()
4067 .with_assigned_versions(version_map.get(&barrier.key()).unwrap().clone());
4068 let (effects, _) = self
4069 .try_execute_immediately(&barrier, env, &epoch_store)
4070 .await
4071 .unwrap();
4072 assert!(effects.status().is_ok());
4073
4074 let next_accumulator_version = accumulator_version.next();
4075 self.execution_scheduler
4076 .settle_address_funds(FundsSettlement {
4077 funds_changes: balance_changes,
4078 next_accumulator_version,
4079 });
4080 }
4082
4083 pub async fn reconfigure_for_testing(&self) {
4087 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4088 let epoch_store = self.epoch_store_for_testing().clone();
4089 let protocol_config = epoch_store.protocol_config().clone();
4090 let _guard =
4097 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4098 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4099 self.get_backing_package_store().clone(),
4100 self.get_object_store().clone(),
4101 &self.config.expensive_safety_check_config,
4102 self.checkpoint_store
4103 .get_epoch_last_checkpoint(epoch_store.epoch())
4104 .unwrap()
4105 .map(|c| *c.sequence_number())
4106 .unwrap_or_default(),
4107 );
4108 self.execution_scheduler
4109 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4110 let new_epoch = new_epoch_store.epoch();
4111 self.epoch_store.store(new_epoch_store);
4112 epoch_store.epoch_terminated().await;
4113
4114 *execution_lock = new_epoch;
4115 }
4116
4117 #[instrument(level = "error", skip_all)]
4120 fn maybe_reaccumulate_state_hash(
4121 &self,
4122 cur_epoch_store: &AuthorityPerEpochStore,
4123 new_protocol_version: ProtocolVersion,
4124 ) {
4125 self.get_reconfig_api()
4126 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4127 }
4128
4129 #[instrument(level = "error", skip_all)]
4130 fn check_system_consistency(
4131 &self,
4132 cur_epoch_store: &AuthorityPerEpochStore,
4133 state_hasher: Arc<GlobalStateHasher>,
4134 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4135 ) {
4136 info!(
4137 "Performing sui conservation consistency check for epoch {}",
4138 cur_epoch_store.epoch()
4139 );
4140
4141 if let Err(err) = self
4142 .get_reconfig_api()
4143 .expensive_check_sui_conservation(cur_epoch_store)
4144 {
4145 if cfg!(debug_assertions) {
4146 panic!("{}", err);
4147 } else {
4148 warn!("Sui conservation consistency check failed: {}", err);
4151 }
4152 } else {
4153 info!("Sui conservation consistency check passed");
4154 }
4155
4156 if expensive_safety_check_config.enable_state_consistency_check() {
4158 info!(
4159 "Performing state consistency check for epoch {}",
4160 cur_epoch_store.epoch()
4161 );
4162 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4163 }
4164
4165 if expensive_safety_check_config.enable_secondary_index_checks()
4166 && let Some(indexes) = self.indexes.clone()
4167 {
4168 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4169 .expect("secondary indexes are inconsistent");
4170 }
4171 }
4172
4173 fn expensive_check_is_consistent_state(
4174 &self,
4175 state_hasher: Arc<GlobalStateHasher>,
4176 cur_epoch_store: &AuthorityPerEpochStore,
4177 ) {
4178 let live_object_set_hash = state_hasher.digest_live_object_set(
4179 !cur_epoch_store
4180 .protocol_config()
4181 .simplified_unwrap_then_delete(),
4182 );
4183
4184 let root_state_hash: ECMHLiveObjectSetDigest = self
4185 .get_global_state_hash_store()
4186 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4187 .expect("Retrieving root state hash cannot fail")
4188 .expect("Root state hash for epoch must exist")
4189 .1
4190 .digest()
4191 .into();
4192
4193 let is_inconsistent = root_state_hash != live_object_set_hash;
4194 if is_inconsistent {
4195 debug_fatal!(
4196 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4197 root_state_hash,
4198 live_object_set_hash
4199 );
4200 } else {
4201 info!("State consistency check passed");
4202 }
4203
4204 state_hasher.set_inconsistent_state(is_inconsistent);
4205 }
4206
4207 pub fn current_epoch_for_testing(&self) -> EpochId {
4208 self.epoch_store_for_testing().epoch()
4209 }
4210
4211 #[instrument(level = "error", skip_all)]
4212 pub fn checkpoint_all_dbs(
4213 &self,
4214 checkpoint_path: &Path,
4215 cur_epoch_store: &AuthorityPerEpochStore,
4216 checkpoint_indexes: bool,
4217 ) -> SuiResult {
4218 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4219 let current_epoch = cur_epoch_store.epoch();
4220
4221 if checkpoint_path.exists() {
4222 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4223 return Ok(());
4224 }
4225
4226 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4227 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4228
4229 if checkpoint_path_tmp.exists() {
4230 fs::remove_dir_all(&checkpoint_path_tmp)
4231 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4232 }
4233
4234 fs::create_dir_all(&checkpoint_path_tmp)
4235 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4236 fs::create_dir(&store_checkpoint_path_tmp)
4237 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4238
4239 self.checkpoint_store
4242 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4243
4244 self.get_reconfig_api()
4245 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4246
4247 self.committee_store
4248 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4249
4250 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4251 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4252 }
4253
4254 fs::rename(checkpoint_path_tmp, checkpoint_path)
4255 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4256 Ok(())
4257 }
4258
4259 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4265 self.epoch_store.load()
4266 }
4267
4268 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4270 self.load_epoch_store_one_call_per_task()
4271 }
4272
4273 pub fn clone_committee_for_testing(&self) -> Committee {
4274 Committee::clone(self.epoch_store_for_testing().committee())
4275 }
4276
4277 #[instrument(level = "trace", skip_all)]
4278 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4279 self.get_object_store().get_object(object_id)
4280 }
4281
4282 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4283 Ok(self
4284 .get_object(&SUI_SYSTEM_ADDRESS.into())
4285 .await
4286 .expect("framework object should always exist")
4287 .compute_object_reference())
4288 }
4289
4290 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4292 self.get_object_cache_reader()
4293 .get_sui_system_state_object_unsafe()
4294 }
4295
4296 #[instrument(level = "trace", skip_all)]
4297 fn get_transaction_checkpoint_sequence(
4298 &self,
4299 digest: &TransactionDigest,
4300 epoch_store: &AuthorityPerEpochStore,
4301 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4302 epoch_store.get_transaction_checkpoint(digest)
4303 }
4304
4305 #[instrument(level = "trace", skip_all)]
4306 pub fn get_checkpoint_by_sequence_number(
4307 &self,
4308 sequence_number: CheckpointSequenceNumber,
4309 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4310 Ok(self
4311 .checkpoint_store
4312 .get_checkpoint_by_sequence_number(sequence_number)?)
4313 }
4314
4315 #[instrument(level = "trace", skip_all)]
4316 pub fn get_transaction_checkpoint_for_tests(
4317 &self,
4318 digest: &TransactionDigest,
4319 epoch_store: &AuthorityPerEpochStore,
4320 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4321 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4322 let Some(checkpoint) = checkpoint else {
4323 return Ok(None);
4324 };
4325 let checkpoint = self
4326 .checkpoint_store
4327 .get_checkpoint_by_sequence_number(checkpoint)?;
4328 Ok(checkpoint)
4329 }
4330
4331 #[instrument(level = "trace", skip_all)]
4332 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4333 Ok(
4334 match self
4335 .get_object_cache_reader()
4336 .get_latest_object_or_tombstone(*object_id)
4337 {
4338 Some((_, ObjectOrTombstone::Object(object))) => {
4339 let layout = self.get_object_layout(&object)?;
4340 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4341 }
4342 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4343 None => ObjectRead::NotExists(*object_id),
4344 },
4345 )
4346 }
4347
4348 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4350 self.chain_identifier
4351 }
4352
4353 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4354 self.fork_recovery_state.as_ref()
4355 }
4356
4357 #[instrument(level = "trace", skip_all)]
4358 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4359 where
4360 T: DeserializeOwned,
4361 {
4362 let o = self.get_object_read(object_id)?.into_object()?;
4363 if let Some(move_object) = o.data.try_as_move() {
4364 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4365 SuiErrorKind::ObjectDeserializationError {
4366 error: format!("{e}"),
4367 }
4368 })?)
4369 } else {
4370 Err(SuiErrorKind::ObjectDeserializationError {
4371 error: format!("Provided object : [{object_id}] is not a Move object."),
4372 }
4373 .into())
4374 }
4375 }
4376
4377 #[instrument(level = "trace", skip_all)]
4383 pub fn get_past_object_read(
4384 &self,
4385 object_id: &ObjectID,
4386 version: SequenceNumber,
4387 ) -> SuiResult<PastObjectRead> {
4388 let Some(obj_ref) = self
4390 .get_object_cache_reader()
4391 .get_latest_object_ref_or_tombstone(*object_id)
4392 else {
4393 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4394 };
4395
4396 if version > obj_ref.1 {
4397 return Ok(PastObjectRead::VersionTooHigh {
4398 object_id: *object_id,
4399 asked_version: version,
4400 latest_version: obj_ref.1,
4401 });
4402 }
4403
4404 if version < obj_ref.1 {
4405 return Ok(match self.read_object_at_version(object_id, version)? {
4407 Some((object, layout)) => {
4408 let obj_ref = object.compute_object_reference();
4409 PastObjectRead::VersionFound(obj_ref, object, layout)
4410 }
4411
4412 None => PastObjectRead::VersionNotFound(*object_id, version),
4413 });
4414 }
4415
4416 if !obj_ref.2.is_alive() {
4417 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4418 }
4419
4420 match self.read_object_at_version(object_id, obj_ref.1)? {
4421 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4422 None => {
4423 debug_fatal!(
4424 "Object with in parent_entry is missing from object store, datastore is \
4425 inconsistent"
4426 );
4427 Err(UserInputError::ObjectNotFound {
4428 object_id: *object_id,
4429 version: Some(obj_ref.1),
4430 }
4431 .into())
4432 }
4433 }
4434 }
4435
4436 #[instrument(level = "trace", skip_all)]
4437 fn read_object_at_version(
4438 &self,
4439 object_id: &ObjectID,
4440 version: SequenceNumber,
4441 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4442 let Some(object) = self
4443 .get_object_cache_reader()
4444 .get_object_by_key(object_id, version)
4445 else {
4446 return Ok(None);
4447 };
4448
4449 let layout = self.get_object_layout(&object)?;
4450 Ok(Some((object, layout)))
4451 }
4452
4453 fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4454 let layout = object
4455 .data
4456 .try_as_move()
4457 .map(|object| {
4458 into_struct_layout(
4459 self.load_epoch_store_one_call_per_task()
4460 .executor()
4461 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4463 .get_annotated_layout(&object.type_().clone().into())?,
4464 )
4465 })
4466 .transpose()?;
4467 Ok(layout)
4468 }
4469
4470 fn get_owner_at_version(
4471 &self,
4472 object_id: &ObjectID,
4473 version: SequenceNumber,
4474 ) -> SuiResult<Owner> {
4475 self.get_object_store()
4476 .get_object_by_key(object_id, version)
4477 .ok_or_else(|| {
4478 SuiError::from(UserInputError::ObjectNotFound {
4479 object_id: *object_id,
4480 version: Some(version),
4481 })
4482 })
4483 .map(|o| o.owner.clone())
4484 }
4485
4486 #[instrument(level = "trace", skip_all)]
4487 pub fn get_owner_objects(
4488 &self,
4489 owner: SuiAddress,
4490 cursor: Option<ObjectID>,
4492 limit: usize,
4493 filter: Option<SuiObjectDataFilter>,
4494 ) -> SuiResult<Vec<ObjectInfo>> {
4495 if let Some(indexes) = &self.indexes {
4496 indexes.get_owner_objects(owner, cursor, limit, filter)
4497 } else {
4498 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4499 }
4500 }
4501
4502 #[instrument(level = "trace", skip_all)]
4503 pub fn get_owned_coins_iterator_with_cursor(
4504 &self,
4505 owner: SuiAddress,
4506 cursor: (String, u64, ObjectID),
4508 limit: usize,
4509 one_coin_type_only: bool,
4510 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4511 if let Some(indexes) = &self.indexes {
4512 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4513 } else {
4514 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4515 }
4516 }
4517
4518 #[instrument(level = "trace", skip_all)]
4519 pub fn get_owner_objects_iterator(
4520 &self,
4521 owner: SuiAddress,
4522 cursor: Option<ObjectID>,
4524 filter: Option<SuiObjectDataFilter>,
4525 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4526 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4527 if let Some(indexes) = &self.indexes {
4528 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4529 } else {
4530 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4531 }
4532 }
4533
4534 #[instrument(level = "trace", skip_all)]
4535 pub async fn get_move_objects<T>(
4536 &self,
4537 owner: SuiAddress,
4538 type_: MoveObjectType,
4539 ) -> SuiResult<Vec<T>>
4540 where
4541 T: DeserializeOwned,
4542 {
4543 let object_ids = self
4544 .get_owner_objects_iterator(owner, None, None)?
4545 .filter(|o| match &o.type_ {
4546 ObjectType::Struct(s) => &type_ == s,
4547 ObjectType::Package => false,
4548 })
4549 .map(|info| ObjectKey(info.object_id, info.version))
4550 .collect::<Vec<_>>();
4551 let mut move_objects = vec![];
4552
4553 let objects = self
4554 .get_object_store()
4555 .multi_get_objects_by_key(&object_ids);
4556
4557 for (o, id) in objects.into_iter().zip(object_ids) {
4558 let object = o.ok_or_else(|| {
4559 SuiError::from(UserInputError::ObjectNotFound {
4560 object_id: id.0,
4561 version: Some(id.1),
4562 })
4563 })?;
4564 let move_object = object.data.try_as_move().ok_or_else(|| {
4565 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4566 })?;
4567 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4568 SuiErrorKind::ObjectDeserializationError {
4569 error: format!("{e}"),
4570 }
4571 })?);
4572 }
4573 Ok(move_objects)
4574 }
4575
4576 #[instrument(level = "trace", skip_all)]
4577 pub fn get_dynamic_fields(
4578 &self,
4579 owner: ObjectID,
4580 cursor: Option<ObjectID>,
4582 limit: usize,
4583 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4584 Ok(self
4585 .get_dynamic_fields_iterator(owner, cursor)?
4586 .take(limit)
4587 .collect::<Result<Vec<_>, _>>()?)
4588 }
4589
4590 fn get_dynamic_fields_iterator(
4591 &self,
4592 owner: ObjectID,
4593 cursor: Option<ObjectID>,
4595 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4596 {
4597 if let Some(indexes) = &self.indexes {
4598 indexes.get_dynamic_fields_iterator(owner, cursor)
4599 } else {
4600 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4601 }
4602 }
4603
4604 #[instrument(level = "trace", skip_all)]
4605 pub fn get_dynamic_field_object_id(
4606 &self,
4607 owner: ObjectID,
4608 name_type: TypeTag,
4609 name_bcs_bytes: &[u8],
4610 ) -> SuiResult<Option<ObjectID>> {
4611 if let Some(indexes) = &self.indexes {
4612 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4613 } else {
4614 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4615 }
4616 }
4617
4618 #[instrument(level = "trace", skip_all)]
4619 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4620 Ok(self.get_indexes()?.next_sequence_number())
4621 }
4622
4623 #[instrument(level = "trace", skip_all)]
4624 pub async fn get_executed_transaction_and_effects(
4625 &self,
4626 digest: TransactionDigest,
4627 kv_store: Arc<TransactionKeyValueStore>,
4628 ) -> SuiResult<(Transaction, TransactionEffects)> {
4629 let transaction = kv_store.get_tx(digest).await?;
4630 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4631 Ok((transaction, effects))
4632 }
4633
4634 #[instrument(level = "trace", skip_all)]
4635 pub fn multi_get_checkpoint_by_sequence_number(
4636 &self,
4637 sequence_numbers: &[CheckpointSequenceNumber],
4638 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4639 Ok(self
4640 .checkpoint_store
4641 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4642 }
4643
4644 #[instrument(level = "trace", skip_all)]
4645 pub fn get_transaction_events(
4646 &self,
4647 digest: &TransactionDigest,
4648 ) -> SuiResult<TransactionEvents> {
4649 self.get_transaction_cache_reader()
4650 .get_events(digest)
4651 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4652 }
4653
4654 pub fn get_transaction_input_objects(
4655 &self,
4656 effects: &TransactionEffects,
4657 ) -> SuiResult<Vec<Object>> {
4658 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4659 .map_err(Into::into)
4660 }
4661
4662 pub fn get_transaction_output_objects(
4663 &self,
4664 effects: &TransactionEffects,
4665 ) -> SuiResult<Vec<Object>> {
4666 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4667 .map_err(Into::into)
4668 }
4669
4670 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4671 match &self.indexes {
4672 Some(i) => Ok(i.clone()),
4673 None => Err(SuiErrorKind::UnsupportedFeatureError {
4674 error: "extended object indexing is not enabled on this server".into(),
4675 }
4676 .into()),
4677 }
4678 }
4679
4680 pub async fn get_transactions_for_tests(
4681 self: &Arc<Self>,
4682 filter: Option<TransactionFilter>,
4683 cursor: Option<TransactionDigest>,
4684 limit: Option<usize>,
4685 reverse: bool,
4686 ) -> SuiResult<Vec<TransactionDigest>> {
4687 let metrics = KeyValueStoreMetrics::new_for_tests();
4688 let kv_store = Arc::new(TransactionKeyValueStore::new(
4689 "rocksdb",
4690 metrics,
4691 self.clone(),
4692 ));
4693 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4694 .await
4695 }
4696
4697 #[instrument(level = "trace", skip_all)]
4698 pub async fn get_transactions(
4699 &self,
4700 kv_store: &Arc<TransactionKeyValueStore>,
4701 filter: Option<TransactionFilter>,
4702 cursor: Option<TransactionDigest>,
4704 limit: Option<usize>,
4705 reverse: bool,
4706 ) -> SuiResult<Vec<TransactionDigest>> {
4707 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4708 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4709 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4710 if reverse {
4711 let iter = iter
4712 .rev()
4713 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4714 .skip(usize::from(cursor.is_some()));
4715 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4716 } else {
4717 let iter = iter
4718 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4719 .skip(usize::from(cursor.is_some()));
4720 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4721 }
4722 }
4723 self.get_indexes()?
4724 .get_transactions(filter, cursor, limit, reverse)
4725 }
4726
4727 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4728 &self.checkpoint_store
4729 }
4730
4731 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4732 self.get_checkpoint_store()
4733 .get_highest_executed_checkpoint_seq_number()?
4734 .ok_or(
4735 SuiErrorKind::UserInputError {
4736 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4737 }
4738 .into(),
4739 )
4740 }
4741
4742 #[cfg(msim)]
4743 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4744 self.database_for_testing()
4745 .perpetual_tables
4746 .get_highest_pruned_checkpoint()
4747 .map(|c| c.unwrap_or(0))
4748 .map_err(Into::into)
4749 }
4750
4751 #[instrument(level = "trace", skip_all)]
4752 pub fn get_checkpoint_summary_by_sequence_number(
4753 &self,
4754 sequence_number: CheckpointSequenceNumber,
4755 ) -> SuiResult<CheckpointSummary> {
4756 let verified_checkpoint = self
4757 .get_checkpoint_store()
4758 .get_checkpoint_by_sequence_number(sequence_number)?;
4759 match verified_checkpoint {
4760 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4761 None => Err(SuiErrorKind::UserInputError {
4762 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4763 }
4764 .into()),
4765 }
4766 }
4767
4768 #[instrument(level = "trace", skip_all)]
4769 pub fn get_checkpoint_summary_by_digest(
4770 &self,
4771 digest: CheckpointDigest,
4772 ) -> SuiResult<CheckpointSummary> {
4773 let verified_checkpoint = self
4774 .get_checkpoint_store()
4775 .get_checkpoint_by_digest(&digest)?;
4776 match verified_checkpoint {
4777 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4778 None => Err(SuiErrorKind::UserInputError {
4779 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4780 }
4781 .into()),
4782 }
4783 }
4784
4785 #[instrument(level = "trace", skip_all)]
4786 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4787 if is_system_package(package_id) {
4788 return self.find_genesis_txn_digest();
4789 }
4790 Ok(self
4791 .get_object_read(&package_id)?
4792 .into_object()?
4793 .previous_transaction)
4794 }
4795
4796 #[instrument(level = "trace", skip_all)]
4797 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4798 let summary = self
4799 .get_verified_checkpoint_by_sequence_number(0)?
4800 .into_message();
4801 let content = self.get_checkpoint_contents(summary.content_digest)?;
4802 let genesis_transaction = content.enumerate_transactions(&summary).next();
4803 Ok(genesis_transaction
4804 .ok_or(SuiErrorKind::UserInputError {
4805 error: UserInputError::GenesisTransactionNotFound,
4806 })?
4807 .1
4808 .transaction)
4809 }
4810
4811 #[instrument(level = "trace", skip_all)]
4812 pub fn get_verified_checkpoint_by_sequence_number(
4813 &self,
4814 sequence_number: CheckpointSequenceNumber,
4815 ) -> SuiResult<VerifiedCheckpoint> {
4816 let verified_checkpoint = self
4817 .get_checkpoint_store()
4818 .get_checkpoint_by_sequence_number(sequence_number)?;
4819 match verified_checkpoint {
4820 Some(verified_checkpoint) => Ok(verified_checkpoint),
4821 None => Err(SuiErrorKind::UserInputError {
4822 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4823 }
4824 .into()),
4825 }
4826 }
4827
4828 #[instrument(level = "trace", skip_all)]
4829 pub fn get_verified_checkpoint_summary_by_digest(
4830 &self,
4831 digest: CheckpointDigest,
4832 ) -> SuiResult<VerifiedCheckpoint> {
4833 let verified_checkpoint = self
4834 .get_checkpoint_store()
4835 .get_checkpoint_by_digest(&digest)?;
4836 match verified_checkpoint {
4837 Some(verified_checkpoint) => Ok(verified_checkpoint),
4838 None => Err(SuiErrorKind::UserInputError {
4839 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4840 }
4841 .into()),
4842 }
4843 }
4844
4845 #[instrument(level = "trace", skip_all)]
4846 pub fn get_checkpoint_contents(
4847 &self,
4848 digest: CheckpointContentsDigest,
4849 ) -> SuiResult<CheckpointContents> {
4850 self.get_checkpoint_store()
4851 .get_checkpoint_contents(&digest)?
4852 .ok_or(
4853 SuiErrorKind::UserInputError {
4854 error: UserInputError::CheckpointContentsNotFound(digest),
4855 }
4856 .into(),
4857 )
4858 }
4859
4860 #[instrument(level = "trace", skip_all)]
4861 pub fn get_checkpoint_contents_by_sequence_number(
4862 &self,
4863 sequence_number: CheckpointSequenceNumber,
4864 ) -> SuiResult<CheckpointContents> {
4865 let verified_checkpoint = self
4866 .get_checkpoint_store()
4867 .get_checkpoint_by_sequence_number(sequence_number)?;
4868 match verified_checkpoint {
4869 Some(verified_checkpoint) => {
4870 let content_digest = verified_checkpoint.into_inner().content_digest;
4871 self.get_checkpoint_contents(content_digest)
4872 }
4873 None => Err(SuiErrorKind::UserInputError {
4874 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4875 }
4876 .into()),
4877 }
4878 }
4879
4880 #[instrument(level = "trace", skip_all)]
4881 pub async fn query_events(
4882 &self,
4883 kv_store: &Arc<TransactionKeyValueStore>,
4884 query: EventFilter,
4885 cursor: Option<EventID>,
4887 limit: usize,
4888 descending: bool,
4889 ) -> SuiResult<Vec<SuiEvent>> {
4890 let index_store = self.get_indexes()?;
4891
4892 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4894 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4895 SuiErrorKind::TransactionNotFound {
4896 digest: cursor.tx_digest,
4897 },
4898 )?;
4899 (tx_seq, cursor.event_seq as usize)
4900 } else if descending {
4901 (u64::MAX, usize::MAX)
4902 } else {
4903 (0, 0)
4904 };
4905
4906 let limit = limit + 1;
4907 let mut event_keys = match query {
4908 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
4909 EventFilter::Transaction(digest) => {
4910 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4911 }
4912 EventFilter::MoveModule { package, module } => {
4913 let module_id = ModuleId::new(package.into(), module);
4914 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4915 }
4916 EventFilter::MoveEventType(struct_name) => index_store
4917 .events_by_move_event_struct_name(
4918 &struct_name,
4919 tx_num,
4920 event_num,
4921 limit,
4922 descending,
4923 )?,
4924 EventFilter::Sender(sender) => {
4925 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4926 }
4927 EventFilter::TimeRange {
4928 start_time,
4929 end_time,
4930 } => index_store
4931 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4932 EventFilter::MoveEventModule { package, module } => index_store
4933 .events_by_move_event_module(
4934 &ModuleId::new(package.into(), module),
4935 tx_num,
4936 event_num,
4937 limit,
4938 descending,
4939 )?,
4940 EventFilter::Any(_) => {
4942 return Err(SuiErrorKind::UserInputError {
4943 error: UserInputError::Unsupported(
4944 "'Any' queries are not supported by the fullnode.".to_string(),
4945 ),
4946 }
4947 .into());
4948 }
4949 };
4950
4951 if cursor.is_some() {
4954 if !event_keys.is_empty() {
4955 event_keys.remove(0);
4956 }
4957 } else {
4958 event_keys.truncate(limit - 1);
4959 }
4960
4961 let transaction_digests = event_keys
4963 .iter()
4964 .map(|(_, digest, _, _)| *digest)
4965 .collect::<HashSet<_>>()
4966 .into_iter()
4967 .collect::<Vec<_>>();
4968
4969 let events = kv_store
4970 .multi_get_events_by_tx_digests(&transaction_digests)
4971 .await?;
4972
4973 let events_map: HashMap<_, _> =
4974 transaction_digests.iter().zip(events.into_iter()).collect();
4975
4976 let stored_events = event_keys
4977 .into_iter()
4978 .map(|k| {
4979 (
4980 k,
4981 events_map
4982 .get(&k.1)
4983 .expect("fetched digest is missing")
4984 .clone()
4985 .and_then(|e| e.data.get(k.2).cloned()),
4986 )
4987 })
4988 .map(
4989 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4990 event
4991 .map(|e| (e, tx_digest, event_seq, timestamp))
4992 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
4993 },
4994 )
4995 .collect::<Result<Vec<_>, _>>()?;
4996
4997 let epoch_store = self.load_epoch_store_one_call_per_task();
4998 let backing_store = self.get_backing_package_store().as_ref();
4999 let mut layout_resolver = epoch_store
5000 .executor()
5001 .type_layout_resolver(Box::new(backing_store));
5002 let mut events = vec![];
5003 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5004 events.push(SuiEvent::try_from(
5005 e.clone(),
5006 tx_digest,
5007 event_seq as u64,
5008 Some(timestamp),
5009 layout_resolver.get_annotated_layout(&e.type_)?,
5010 )?)
5011 }
5012 Ok(events)
5013 }
5014
5015 pub async fn insert_genesis_object(&self, object: Object) {
5016 self.get_reconfig_api().insert_genesis_object(object);
5017 }
5018
5019 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5020 futures::future::join_all(
5021 objects
5022 .iter()
5023 .map(|o| self.insert_genesis_object(o.clone())),
5024 )
5025 .await;
5026 }
5027
5028 #[instrument(level = "trace", skip_all)]
5030 pub fn get_transaction_status(
5031 &self,
5032 transaction_digest: &TransactionDigest,
5033 epoch_store: &Arc<AuthorityPerEpochStore>,
5034 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5035 if let Some(effects) =
5037 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5038 {
5039 if let Some(transaction) = self
5040 .get_transaction_cache_reader()
5041 .get_transaction_block(transaction_digest)
5042 {
5043 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5044 let events = if effects.events_digest().is_some() {
5045 self.get_transaction_events(effects.transaction_digest())?
5046 } else {
5047 TransactionEvents::default()
5048 };
5049 return Ok(Some((
5050 (*transaction).clone().into_message(),
5051 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5052 )));
5053 } else {
5054 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5058 }
5059 }
5060 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5061 self.metrics.tx_already_processed.inc();
5062 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5063 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5064 } else {
5065 Ok(None)
5066 }
5067 }
5068
5069 #[instrument(level = "trace", skip_all)]
5073 pub fn get_signed_effects_and_maybe_resign(
5074 &self,
5075 transaction_digest: &TransactionDigest,
5076 epoch_store: &Arc<AuthorityPerEpochStore>,
5077 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5078 let effects = self
5079 .get_transaction_cache_reader()
5080 .get_executed_effects(transaction_digest);
5081 match effects {
5082 Some(effects) => {
5083 if effects.executed_epoch() != epoch_store.epoch() {
5103 debug!(
5104 tx_digest=?transaction_digest,
5105 effects_epoch=?effects.executed_epoch(),
5106 epoch=?epoch_store.epoch(),
5107 "Re-signing the effects with the current epoch"
5108 );
5109 }
5110 Ok(Some(self.sign_effects(effects, epoch_store)?))
5111 }
5112 None => Ok(None),
5113 }
5114 }
5115
5116 #[instrument(level = "trace", skip_all)]
5117 pub(crate) fn sign_effects(
5118 &self,
5119 effects: TransactionEffects,
5120 epoch_store: &Arc<AuthorityPerEpochStore>,
5121 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5122 let tx_digest = *effects.transaction_digest();
5123 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5124 Some(sig) => {
5125 debug_assert!(sig.epoch == epoch_store.epoch());
5126 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5127 }
5128 _ => {
5129 let sig = AuthoritySignInfo::new(
5130 epoch_store.epoch(),
5131 &effects,
5132 Intent::sui_app(IntentScope::TransactionEffects),
5133 self.name,
5134 &*self.secret,
5135 );
5136
5137 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5138
5139 epoch_store.insert_effects_digest_and_signature(
5140 &tx_digest,
5141 effects.digest(),
5142 &sig,
5143 )?;
5144
5145 effects
5146 }
5147 };
5148
5149 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5150 signed_effects,
5151 ))
5152 }
5153
5154 #[instrument(level = "trace", skip_all)]
5156 fn fullnode_only_get_tx_coins_for_indexing(
5157 &self,
5158 effects: &TransactionEffects,
5159 inner_temporary_store: &InnerTemporaryStore,
5160 epoch_store: &Arc<AuthorityPerEpochStore>,
5161 ) -> Option<TxCoins> {
5162 if self.indexes.is_none() || self.is_validator(epoch_store) {
5163 return None;
5164 }
5165 let written_coin_objects = inner_temporary_store
5166 .written
5167 .iter()
5168 .filter_map(|(k, v)| {
5169 if v.is_coin() {
5170 Some((*k, v.clone()))
5171 } else {
5172 None
5173 }
5174 })
5175 .collect();
5176 let mut input_coin_objects = inner_temporary_store
5177 .input_objects
5178 .iter()
5179 .filter_map(|(k, v)| {
5180 if v.is_coin() {
5181 Some((*k, v.clone()))
5182 } else {
5183 None
5184 }
5185 })
5186 .collect::<ObjectMap>();
5187
5188 for (object_id, version) in effects.modified_at_versions() {
5192 if inner_temporary_store
5193 .loaded_runtime_objects
5194 .contains_key(&object_id)
5195 && let Some(object) = self
5196 .get_object_store()
5197 .get_object_by_key(&object_id, version)
5198 && object.is_coin()
5199 {
5200 input_coin_objects.insert(object_id, object);
5201 }
5202 }
5203
5204 Some((input_coin_objects, written_coin_objects))
5205 }
5206
5207 #[instrument(level = "trace", skip_all)]
5215 pub async fn get_transaction_lock(
5216 &self,
5217 object_ref: &ObjectRef,
5218 epoch_store: &AuthorityPerEpochStore,
5219 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5220 let lock_info = self
5221 .get_object_cache_reader()
5222 .get_lock(*object_ref, epoch_store)?;
5223 let lock_info = match lock_info {
5224 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5225 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5226 provided_obj_ref: *object_ref,
5227 current_version: locked_ref.1,
5228 }
5229 .into());
5230 }
5231 ObjectLockStatus::Initialized => {
5232 return Ok(None);
5233 }
5234 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5235 };
5236
5237 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5238 }
5239
5240 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5241 self.get_object_cache_reader().get_objects(objects)
5242 }
5243
5244 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5245 self.get_object_cache_reader()
5246 .get_latest_object_ref_or_tombstone(object_id)
5247 }
5248
5249 pub fn set_override_protocol_upgrade_buffer_stake(
5258 &self,
5259 expected_epoch: EpochId,
5260 buffer_stake_bps: u64,
5261 ) -> SuiResult {
5262 let epoch_store = self.load_epoch_store_one_call_per_task();
5263 let actual_epoch = epoch_store.epoch();
5264 if actual_epoch != expected_epoch {
5265 return Err(SuiErrorKind::WrongEpoch {
5266 expected_epoch,
5267 actual_epoch,
5268 }
5269 .into());
5270 }
5271
5272 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5273 }
5274
5275 pub fn clear_override_protocol_upgrade_buffer_stake(
5276 &self,
5277 expected_epoch: EpochId,
5278 ) -> SuiResult {
5279 let epoch_store = self.load_epoch_store_one_call_per_task();
5280 let actual_epoch = epoch_store.epoch();
5281 if actual_epoch != expected_epoch {
5282 return Err(SuiErrorKind::WrongEpoch {
5283 expected_epoch,
5284 actual_epoch,
5285 }
5286 .into());
5287 }
5288
5289 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5290 }
5291
5292 pub async fn get_available_system_packages(
5295 &self,
5296 binary_config: &BinaryConfig,
5297 ) -> Vec<ObjectRef> {
5298 let mut results = vec![];
5299
5300 let system_packages = BuiltInFramework::iter_system_packages();
5301
5302 #[cfg(msim)]
5304 let extra_packages = framework_injection::get_extra_packages(self.name);
5305 #[cfg(msim)]
5306 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5307
5308 for system_package in system_packages {
5309 let modules = system_package.modules().to_vec();
5310 #[cfg(msim)]
5312 let modules =
5313 match framework_injection::get_override_modules(&system_package.id, self.name) {
5314 Some(overrides) if overrides.is_empty() => continue,
5315 Some(overrides) => overrides,
5316 None => modules,
5317 };
5318
5319 let Some(obj_ref) = sui_framework::compare_system_package(
5320 &self.get_object_store(),
5321 &system_package.id,
5322 &modules,
5323 system_package.dependencies.to_vec(),
5324 binary_config,
5325 )
5326 .await
5327 else {
5328 return vec![];
5329 };
5330 results.push(obj_ref);
5331 }
5332
5333 results
5334 }
5335
5336 async fn get_system_package_bytes(
5350 &self,
5351 system_packages: Vec<ObjectRef>,
5352 binary_config: &BinaryConfig,
5353 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5354 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5355 let objects = self.get_objects(&ids).await;
5356
5357 let mut res = Vec::with_capacity(system_packages.len());
5358 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5359 let prev_transaction = match object {
5360 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5361 info!("Framework {} does not need updating", system_package_ref.0);
5363 continue;
5364 }
5365
5366 Some(cur_object) => cur_object.previous_transaction,
5367 None => TransactionDigest::genesis_marker(),
5368 };
5369
5370 #[cfg(msim)]
5371 let SystemPackage {
5372 id: _,
5373 bytes,
5374 dependencies,
5375 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5376 .unwrap_or_else(|| {
5377 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5378 });
5379
5380 #[cfg(not(msim))]
5381 let SystemPackage {
5382 id: _,
5383 bytes,
5384 dependencies,
5385 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5386
5387 let modules: Vec<_> = bytes
5388 .iter()
5389 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5390 .collect();
5391
5392 let new_object = Object::new_system_package(
5393 &modules,
5394 system_package_ref.1,
5395 dependencies.clone(),
5396 prev_transaction,
5397 );
5398
5399 let new_ref = new_object.compute_object_reference();
5400 if new_ref != system_package_ref {
5401 if cfg!(msim) {
5402 debug_fatal!(
5404 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5405 );
5406 } else {
5407 error!(
5408 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5409 );
5410 }
5411 return None;
5412 }
5413
5414 res.push((system_package_ref.1, bytes, dependencies));
5415 }
5416
5417 Some(res)
5418 }
5419
5420 fn is_protocol_version_supported_v2(
5421 current_protocol_version: ProtocolVersion,
5422 proposed_protocol_version: ProtocolVersion,
5423 protocol_config: &ProtocolConfig,
5424 committee: &Committee,
5425 capabilities: Vec<AuthorityCapabilitiesV2>,
5426 mut buffer_stake_bps: u64,
5427 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5428 if proposed_protocol_version > current_protocol_version + 1
5429 && !protocol_config.advance_to_highest_supported_protocol_version()
5430 {
5431 return None;
5432 }
5433
5434 if buffer_stake_bps > 10000 {
5435 warn!("clamping buffer_stake_bps to 10000");
5436 buffer_stake_bps = 10000;
5437 }
5438
5439 let mut desired_upgrades: Vec<_> = capabilities
5442 .into_iter()
5443 .filter_map(|mut cap| {
5444 if cap.available_system_packages.is_empty() {
5446 return None;
5447 }
5448
5449 cap.available_system_packages.sort();
5450
5451 info!(
5452 "validator {:?} supports {:?} with system packages: {:?}",
5453 cap.authority.concise(),
5454 cap.supported_protocol_versions,
5455 cap.available_system_packages,
5456 );
5457
5458 cap.supported_protocol_versions
5462 .get_version_digest(proposed_protocol_version)
5463 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5464 })
5465 .collect();
5466
5467 desired_upgrades.sort();
5469 desired_upgrades
5470 .into_iter()
5471 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5472 .into_iter()
5473 .find_map(|((digest, packages), group)| {
5474 assert!(!packages.is_empty());
5476
5477 let mut stake_aggregator: StakeAggregator<(), true> =
5478 StakeAggregator::new(Arc::new(committee.clone()));
5479
5480 for (_, _, authority) in group {
5481 stake_aggregator.insert_generic(authority, ());
5482 }
5483
5484 let total_votes = stake_aggregator.total_votes();
5485 let quorum_threshold = committee.quorum_threshold();
5486 let f = committee.total_votes() - committee.quorum_threshold();
5487
5488 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5490 let effective_threshold = quorum_threshold + buffer_stake;
5491
5492 info!(
5493 protocol_config_digest = ?digest,
5494 ?total_votes,
5495 ?quorum_threshold,
5496 ?buffer_stake_bps,
5497 ?effective_threshold,
5498 ?proposed_protocol_version,
5499 ?packages,
5500 "support for upgrade"
5501 );
5502
5503 let has_support = total_votes >= effective_threshold;
5504 has_support.then_some((proposed_protocol_version, packages))
5505 })
5506 }
5507
5508 fn choose_protocol_version_and_system_packages_v2(
5509 current_protocol_version: ProtocolVersion,
5510 protocol_config: &ProtocolConfig,
5511 committee: &Committee,
5512 capabilities: Vec<AuthorityCapabilitiesV2>,
5513 buffer_stake_bps: u64,
5514 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5515 assert!(protocol_config.authority_capabilities_v2());
5516 let mut next_protocol_version = current_protocol_version;
5517 let mut system_packages = vec![];
5518
5519 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5520 current_protocol_version,
5521 next_protocol_version + 1,
5522 protocol_config,
5523 committee,
5524 capabilities.clone(),
5525 buffer_stake_bps,
5526 ) {
5527 next_protocol_version = version;
5528 system_packages = packages;
5529 }
5530
5531 (next_protocol_version, system_packages)
5532 }
5533
5534 #[instrument(level = "debug", skip_all)]
5535 fn create_authenticator_state_tx(
5536 &self,
5537 epoch_store: &Arc<AuthorityPerEpochStore>,
5538 ) -> Option<EndOfEpochTransactionKind> {
5539 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5540 info!("authenticator state transactions not enabled");
5541 return None;
5542 }
5543
5544 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5545 let tx = if authenticator_state_exists {
5546 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5547 let min_epoch =
5548 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5549 let authenticator_obj_initial_shared_version = epoch_store
5550 .epoch_start_config()
5551 .authenticator_obj_initial_shared_version()
5552 .expect("initial version must exist");
5553
5554 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5555 min_epoch,
5556 authenticator_obj_initial_shared_version,
5557 );
5558
5559 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5560
5561 tx
5562 } else {
5563 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5564 info!("Creating AuthenticatorStateCreate tx");
5565 tx
5566 };
5567 Some(tx)
5568 }
5569
5570 #[instrument(level = "debug", skip_all)]
5571 fn create_randomness_state_tx(
5572 &self,
5573 epoch_store: &Arc<AuthorityPerEpochStore>,
5574 ) -> Option<EndOfEpochTransactionKind> {
5575 if !epoch_store.protocol_config().random_beacon() {
5576 info!("randomness state transactions not enabled");
5577 return None;
5578 }
5579
5580 if epoch_store.randomness_state_exists() {
5581 return None;
5582 }
5583
5584 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5585 info!("Creating RandomnessStateCreate tx");
5586 Some(tx)
5587 }
5588
5589 #[instrument(level = "debug", skip_all)]
5590 fn create_accumulator_root_tx(
5591 &self,
5592 epoch_store: &Arc<AuthorityPerEpochStore>,
5593 ) -> Option<EndOfEpochTransactionKind> {
5594 if !epoch_store
5595 .protocol_config()
5596 .create_root_accumulator_object()
5597 {
5598 info!("accumulator root creation not enabled");
5599 return None;
5600 }
5601
5602 if epoch_store.accumulator_root_exists() {
5603 return None;
5604 }
5605
5606 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5607 info!("Creating AccumulatorRootCreate tx");
5608 Some(tx)
5609 }
5610
5611 #[instrument(level = "debug", skip_all)]
5612 fn create_write_accumulator_storage_cost_tx(
5613 &self,
5614 epoch_store: &Arc<AuthorityPerEpochStore>,
5615 ) -> Option<EndOfEpochTransactionKind> {
5616 if !epoch_store.accumulator_root_exists() {
5617 info!("accumulator root does not exist yet");
5618 return None;
5619 }
5620 if !epoch_store.protocol_config().enable_accumulators() {
5621 info!("accumulators not enabled");
5622 return None;
5623 }
5624
5625 let object_store = self.get_object_store();
5626 let object_count =
5627 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5628 Ok(Some(count)) => count,
5629 Ok(None) => return None,
5630 Err(e) => {
5631 fatal!("failed to read accumulator object count: {e}");
5632 }
5633 };
5634
5635 let storage_cost = object_count.saturating_mul(
5636 epoch_store
5637 .protocol_config()
5638 .accumulator_object_storage_cost(),
5639 );
5640
5641 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5642 info!(
5643 object_count,
5644 storage_cost, "Creating WriteAccumulatorStorageCost tx"
5645 );
5646 Some(tx)
5647 }
5648
5649 #[instrument(level = "debug", skip_all)]
5650 fn create_coin_registry_tx(
5651 &self,
5652 epoch_store: &Arc<AuthorityPerEpochStore>,
5653 ) -> Option<EndOfEpochTransactionKind> {
5654 if !epoch_store.protocol_config().enable_coin_registry() {
5655 info!("coin registry not enabled");
5656 return None;
5657 }
5658
5659 if epoch_store.coin_registry_exists() {
5660 return None;
5661 }
5662
5663 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5664 info!("Creating CoinRegistryCreate tx");
5665 Some(tx)
5666 }
5667
5668 #[instrument(level = "debug", skip_all)]
5669 fn create_display_registry_tx(
5670 &self,
5671 epoch_store: &Arc<AuthorityPerEpochStore>,
5672 ) -> Option<EndOfEpochTransactionKind> {
5673 if !epoch_store.protocol_config().enable_display_registry() {
5674 info!("display registry not enabled");
5675 return None;
5676 }
5677
5678 if epoch_store.display_registry_exists() {
5679 return None;
5680 }
5681
5682 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5683 info!("Creating DisplayRegistryCreate tx");
5684 Some(tx)
5685 }
5686
5687 #[instrument(level = "debug", skip_all)]
5688 fn create_bridge_tx(
5689 &self,
5690 epoch_store: &Arc<AuthorityPerEpochStore>,
5691 ) -> Option<EndOfEpochTransactionKind> {
5692 if !epoch_store.protocol_config().enable_bridge() {
5693 info!("bridge not enabled");
5694 return None;
5695 }
5696 if epoch_store.bridge_exists() {
5697 return None;
5698 }
5699 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5700 info!("Creating Bridge Create tx");
5701 Some(tx)
5702 }
5703
5704 #[instrument(level = "debug", skip_all)]
5705 fn init_bridge_committee_tx(
5706 &self,
5707 epoch_store: &Arc<AuthorityPerEpochStore>,
5708 ) -> Option<EndOfEpochTransactionKind> {
5709 if !epoch_store.protocol_config().enable_bridge() {
5710 info!("bridge not enabled");
5711 return None;
5712 }
5713 if !epoch_store
5714 .protocol_config()
5715 .should_try_to_finalize_bridge_committee()
5716 {
5717 info!("should not try to finalize bridge committee yet");
5718 return None;
5719 }
5720 if !epoch_store.bridge_exists() {
5722 return None;
5723 }
5724
5725 if epoch_store.bridge_committee_initiated() {
5726 return None;
5727 }
5728
5729 let bridge_initial_shared_version = epoch_store
5730 .epoch_start_config()
5731 .bridge_obj_initial_shared_version()
5732 .expect("initial version must exist");
5733 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5734 info!("Init Bridge committee tx");
5735 Some(tx)
5736 }
5737
5738 #[instrument(level = "debug", skip_all)]
5739 fn create_deny_list_state_tx(
5740 &self,
5741 epoch_store: &Arc<AuthorityPerEpochStore>,
5742 ) -> Option<EndOfEpochTransactionKind> {
5743 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5744 return None;
5745 }
5746
5747 if epoch_store.coin_deny_list_state_exists() {
5748 return None;
5749 }
5750
5751 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5752 info!("Creating DenyListStateCreate tx");
5753 Some(tx)
5754 }
5755
5756 #[instrument(level = "debug", skip_all)]
5757 fn create_address_alias_state_tx(
5758 &self,
5759 epoch_store: &Arc<AuthorityPerEpochStore>,
5760 ) -> Option<EndOfEpochTransactionKind> {
5761 if !epoch_store.protocol_config().address_aliases() {
5762 info!("address aliases not enabled");
5763 return None;
5764 }
5765
5766 if epoch_store.address_alias_state_exists() {
5767 return None;
5768 }
5769
5770 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5771 info!("Creating AddressAliasStateCreate tx");
5772 Some(tx)
5773 }
5774
5775 #[instrument(level = "debug", skip_all)]
5776 fn create_execution_time_observations_tx(
5777 &self,
5778 epoch_store: &Arc<AuthorityPerEpochStore>,
5779 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5780 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5781 ) -> Option<EndOfEpochTransactionKind> {
5782 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5783 .protocol_config()
5784 .per_object_congestion_control_mode()
5785 else {
5786 return None;
5787 };
5788
5789 let start_checkpoint = std::cmp::max(
5792 last_checkpoint_before_end_of_epoch
5793 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5794 epoch_store
5796 .epoch()
5797 .checked_sub(1)
5798 .map(|prev_epoch| {
5799 self.checkpoint_store
5800 .get_epoch_last_checkpoint_seq_number(prev_epoch)
5801 .expect("typed store must not fail")
5802 .expect(
5803 "sequence number of last checkpoint of preceding epoch must be saved",
5804 )
5805 + 1
5806 })
5807 .unwrap_or(1),
5808 );
5809 info!(
5810 "reading checkpoint range {:?}..={:?}",
5811 start_checkpoint, last_checkpoint_before_end_of_epoch
5812 );
5813 let sequence_numbers =
5814 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
5815 let contents_digests: Vec<_> = self
5816 .checkpoint_store
5817 .multi_get_locally_computed_checkpoints(&sequence_numbers)
5818 .expect("typed store must not fail")
5819 .into_iter()
5820 .zip(sequence_numbers)
5821 .map(|(maybe_checkpoint, sequence_number)| {
5822 if let Some(checkpoint) = maybe_checkpoint {
5823 checkpoint.content_digest
5824 } else {
5825 self.checkpoint_store
5828 .get_checkpoint_by_sequence_number(sequence_number)
5829 .expect("typed store must not fail")
5830 .unwrap_or_else(|| {
5831 fatal!("preceding checkpoints must exist by end of epoch")
5832 })
5833 .data()
5834 .content_digest
5835 }
5836 })
5837 .collect();
5838 let tx_digests: Vec<_> = self
5839 .checkpoint_store
5840 .multi_get_checkpoint_content(&contents_digests)
5841 .expect("typed store must not fail")
5842 .into_iter()
5843 .flat_map(|maybe_contents| {
5844 maybe_contents
5845 .expect("preceding checkpoint contents must exist by end of epoch")
5846 .into_inner()
5847 .into_iter()
5848 .map(|digests| digests.transaction)
5849 })
5850 .collect();
5851 let included_execution_time_observations: HashSet<_> = self
5852 .get_transaction_cache_reader()
5853 .multi_get_transaction_blocks(&tx_digests)
5854 .into_iter()
5855 .flat_map(|maybe_tx| {
5856 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
5857 .expect("preceding transaction must exist by end of epoch")
5858 .transaction_data()
5859 .kind()
5860 {
5861 #[allow(clippy::unnecessary_to_owned)]
5862 itertools::Either::Left(
5863 ptb.commands
5864 .to_owned()
5865 .into_iter()
5866 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
5867 )
5868 } else {
5869 itertools::Either::Right(std::iter::empty())
5870 }
5871 })
5872 .chain(end_of_epoch_observation_keys.into_iter())
5873 .collect();
5874
5875 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
5876 epoch_store
5877 .get_end_of_epoch_execution_time_observations()
5878 .filter_and_sort_v1(
5879 |(key, _)| included_execution_time_observations.contains(key),
5880 params.stored_observations_limit.try_into().unwrap(),
5881 ),
5882 );
5883 info!("Creating StoreExecutionTimeObservations tx");
5884 Some(tx)
5885 }
5886
5887 #[instrument(level = "error", skip_all)]
5898 pub async fn create_and_execute_advance_epoch_tx(
5899 &self,
5900 epoch_store: &Arc<AuthorityPerEpochStore>,
5901 gas_cost_summary: &GasCostSummary,
5902 checkpoint: CheckpointSequenceNumber,
5903 epoch_start_timestamp_ms: CheckpointTimestamp,
5904 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5905 last_checkpoint: CheckpointSequenceNumber,
5908 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
5909 let mut txns = Vec::new();
5910
5911 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5912 txns.push(tx);
5913 }
5914 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
5915 txns.push(tx);
5916 }
5917 if let Some(tx) = self.create_bridge_tx(epoch_store) {
5918 txns.push(tx);
5919 }
5920 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
5921 txns.push(tx);
5922 }
5923 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
5924 txns.push(tx);
5925 }
5926 if let Some(tx) = self.create_execution_time_observations_tx(
5927 epoch_store,
5928 end_of_epoch_observation_keys,
5929 last_checkpoint,
5930 ) {
5931 txns.push(tx);
5932 }
5933 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
5934 txns.push(tx);
5935 }
5936
5937 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
5938 txns.push(tx);
5939 }
5940 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
5941 txns.push(tx);
5942 }
5943 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
5944 txns.push(tx);
5945 }
5946 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
5947 txns.push(tx);
5948 }
5949
5950 let next_epoch = epoch_store.epoch() + 1;
5951
5952 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5953
5954 let (next_epoch_protocol_version, next_epoch_system_packages) =
5955 Self::choose_protocol_version_and_system_packages_v2(
5956 epoch_store.protocol_version(),
5957 epoch_store.protocol_config(),
5958 epoch_store.committee(),
5959 epoch_store
5960 .get_capabilities_v2()
5961 .expect("read capabilities from db cannot fail"),
5962 buffer_stake_bps,
5963 );
5964
5965 let config = epoch_store.protocol_config();
5968 let binary_config = config.binary_config(None);
5969 let Some(next_epoch_system_package_bytes) = self
5970 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5971 .await
5972 else {
5973 if next_epoch_protocol_version <= ProtocolVersion::MAX {
5974 debug_fatal!(
5978 "upgraded system packages {:?} are not locally available, cannot create \
5979 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5980 next_epoch_system_packages
5981 );
5982 } else {
5983 error!(
5984 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
5985 next_epoch_protocol_version
5986 );
5987 }
5988 return Err(CheckpointBuilderError::SystemPackagesMissing);
5997 };
5998
5999 let tx = if epoch_store
6000 .protocol_config()
6001 .end_of_epoch_transaction_supported()
6002 {
6003 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6004 next_epoch,
6005 next_epoch_protocol_version,
6006 gas_cost_summary.storage_cost,
6007 gas_cost_summary.computation_cost,
6008 gas_cost_summary.storage_rebate,
6009 gas_cost_summary.non_refundable_storage_fee,
6010 epoch_start_timestamp_ms,
6011 next_epoch_system_package_bytes,
6012 ));
6013
6014 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6015 } else {
6016 VerifiedTransaction::new_change_epoch(
6017 next_epoch,
6018 next_epoch_protocol_version,
6019 gas_cost_summary.storage_cost,
6020 gas_cost_summary.computation_cost,
6021 gas_cost_summary.storage_rebate,
6022 gas_cost_summary.non_refundable_storage_fee,
6023 epoch_start_timestamp_ms,
6024 next_epoch_system_package_bytes,
6025 )
6026 };
6027
6028 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6029 tx.clone(),
6030 epoch_store.epoch(),
6031 checkpoint,
6032 );
6033
6034 let tx_digest = executable_tx.digest();
6035
6036 info!(
6037 ?next_epoch,
6038 ?next_epoch_protocol_version,
6039 ?next_epoch_system_packages,
6040 computation_cost=?gas_cost_summary.computation_cost,
6041 storage_cost=?gas_cost_summary.storage_cost,
6042 storage_rebate=?gas_cost_summary.storage_rebate,
6043 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6044 ?tx_digest,
6045 "Creating advance epoch transaction"
6046 );
6047
6048 fail_point_async!("change_epoch_tx_delay");
6049 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6050
6051 if self
6054 .get_transaction_cache_reader()
6055 .is_tx_already_executed(tx_digest)
6056 {
6057 warn!("change epoch tx has already been executed via state sync");
6058 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6059 }
6060
6061 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6062 else {
6063 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6064 };
6065
6066 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6069 self.get_object_cache_reader().as_ref(),
6070 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6071 )?;
6072
6073 assert_eq!(assigned_versions.0.len(), 1);
6074 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6075
6076 let input_objects = self.read_objects_for_execution(
6077 &tx_lock,
6078 &executable_tx,
6079 &assigned_versions,
6080 epoch_store,
6081 )?;
6082
6083 let (transaction_outputs, _timings, _execution_error_opt) = self
6084 .execute_certificate(
6085 &execution_guard,
6086 &executable_tx,
6087 input_objects,
6088 None,
6089 ExecutionEnv::default(),
6090 epoch_store,
6091 )
6092 .unwrap();
6093 let system_obj = get_sui_system_state(&transaction_outputs.written)
6094 .expect("change epoch tx must write to system object");
6095
6096 let effects = transaction_outputs.effects;
6097 self.get_state_sync_store()
6101 .insert_transaction_and_effects(&tx, &effects);
6102
6103 info!(
6104 "Effects summary of the change epoch transaction: {:?}",
6105 effects.summary_for_debug()
6106 );
6107 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6108 assert!(effects.status().is_ok());
6110 Ok((system_obj, effects))
6111 }
6112
6113 #[instrument(level = "error", skip_all)]
6114 async fn reopen_epoch_db(
6115 &self,
6116 cur_epoch_store: &AuthorityPerEpochStore,
6117 new_committee: Committee,
6118 epoch_start_configuration: EpochStartConfiguration,
6119 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6120 epoch_last_checkpoint: CheckpointSequenceNumber,
6121 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6122 let new_epoch = new_committee.epoch;
6123 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6124 assert_eq!(
6125 epoch_start_configuration.epoch_start_state().epoch(),
6126 new_committee.epoch
6127 );
6128 fail_point!("before-open-new-epoch-store");
6129 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6130 self.name,
6131 new_committee,
6132 epoch_start_configuration,
6133 self.get_backing_package_store().clone(),
6134 self.get_object_store().clone(),
6135 expensive_safety_check_config,
6136 epoch_last_checkpoint,
6137 )?;
6138 self.epoch_store.store(new_epoch_store.clone());
6139 Ok(new_epoch_store)
6140 }
6141
6142 #[cfg(test)]
6143 pub(crate) fn iter_live_object_set_for_testing(
6144 &self,
6145 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6146 let include_wrapped_object = !self
6147 .epoch_store_for_testing()
6148 .protocol_config()
6149 .simplified_unwrap_then_delete();
6150 self.get_global_state_hash_store()
6151 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6152 }
6153
6154 #[cfg(test)]
6155 pub(crate) fn shutdown_execution_for_test(&self) {
6156 self.tx_execution_shutdown
6157 .lock()
6158 .take()
6159 .unwrap()
6160 .send(())
6161 .unwrap();
6162 }
6163
6164 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6166 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6167 self.get_object_cache_reader()
6168 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6169 self.get_reconfig_api()
6170 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6171 Ok(())
6172 }
6173}
6174
6175pub struct RandomnessRoundReceiver {
6176 authority_state: Arc<AuthorityState>,
6177 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6178}
6179
6180impl RandomnessRoundReceiver {
6181 pub fn spawn(
6182 authority_state: Arc<AuthorityState>,
6183 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6184 ) -> JoinHandle<()> {
6185 let rrr = RandomnessRoundReceiver {
6186 authority_state,
6187 randomness_rx,
6188 };
6189 spawn_monitored_task!(rrr.run())
6190 }
6191
6192 async fn run(mut self) {
6193 info!("RandomnessRoundReceiver event loop started");
6194
6195 loop {
6196 tokio::select! {
6197 maybe_recv = self.randomness_rx.recv() => {
6198 if let Some((epoch, round, bytes)) = maybe_recv {
6199 self.handle_new_randomness(epoch, round, bytes).await;
6200 } else {
6201 break;
6202 }
6203 },
6204 }
6205 }
6206
6207 info!("RandomnessRoundReceiver event loop ended");
6208 }
6209
6210 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6211 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6212 fail_point_async!("randomness-delay");
6213
6214 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6215 if epoch_store.epoch() != epoch {
6216 warn!(
6217 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6218 epoch_store.epoch()
6219 );
6220 return;
6221 }
6222 let key = TransactionKey::RandomnessRound(epoch, round);
6223 let transaction = VerifiedTransaction::new_randomness_state_update(
6224 epoch,
6225 round,
6226 bytes,
6227 epoch_store
6228 .epoch_start_config()
6229 .randomness_obj_initial_shared_version()
6230 .expect("randomness state obj must exist"),
6231 );
6232 debug!(
6233 "created randomness state update transaction with digest: {:?}",
6234 transaction.digest()
6235 );
6236 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6237 let digest = *transaction.digest();
6238
6239 self.authority_state
6244 .get_cache_commit()
6245 .persist_transaction(&transaction);
6246
6247 if epoch_store.insert_tx_key(key, digest).is_err() {
6249 warn!("epoch ended while handling new randomness");
6250 }
6251
6252 let authority_state = self.authority_state.clone();
6253 spawn_monitored_task!(async move {
6254 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6261 let result = tokio::time::timeout(
6262 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6263 authority_state
6264 .get_transaction_cache_reader()
6265 .notify_read_executed_effects(
6266 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6267 &[digest],
6268 ),
6269 )
6270 .await;
6271 let mut effects = match result {
6272 Ok(result) => result,
6273 Err(_) => {
6274 if cfg!(debug_assertions) {
6275 panic!(
6277 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6278 );
6279 }
6280 warn!(
6281 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6282 );
6283 authority_state
6285 .get_transaction_cache_reader()
6286 .notify_read_executed_effects(
6287 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6288 &[digest],
6289 )
6290 .await
6291 }
6292 };
6293
6294 let effects = effects.pop().expect("should return effects");
6295 if *effects.status() != ExecutionStatus::Success {
6296 fatal!(
6297 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6298 );
6299 }
6300 debug!(
6301 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6302 );
6303 });
6304 }
6305}
6306
6307#[async_trait]
6308impl TransactionKeyValueStoreTrait for AuthorityState {
6309 #[instrument(skip(self))]
6310 async fn multi_get(
6311 &self,
6312 transactions: &[TransactionDigest],
6313 effects: &[TransactionDigest],
6314 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6315 let txns = if !transactions.is_empty() {
6316 self.get_transaction_cache_reader()
6317 .multi_get_transaction_blocks(transactions)
6318 .into_iter()
6319 .map(|t| t.map(|t| (*t).clone().into_inner()))
6320 .collect()
6321 } else {
6322 vec![]
6323 };
6324
6325 let fx = if !effects.is_empty() {
6326 self.get_transaction_cache_reader()
6327 .multi_get_executed_effects(effects)
6328 } else {
6329 vec![]
6330 };
6331
6332 Ok((txns, fx))
6333 }
6334
6335 #[instrument(skip(self))]
6336 async fn multi_get_checkpoints(
6337 &self,
6338 checkpoint_summaries: &[CheckpointSequenceNumber],
6339 checkpoint_contents: &[CheckpointSequenceNumber],
6340 checkpoint_summaries_by_digest: &[CheckpointDigest],
6341 ) -> SuiResult<(
6342 Vec<Option<CertifiedCheckpointSummary>>,
6343 Vec<Option<CheckpointContents>>,
6344 Vec<Option<CertifiedCheckpointSummary>>,
6345 )> {
6346 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6348 let store = self.get_checkpoint_store();
6349 for seq in checkpoint_summaries {
6350 let checkpoint = store
6351 .get_checkpoint_by_sequence_number(*seq)?
6352 .map(|c| c.into_inner());
6353
6354 summaries.push(checkpoint);
6355 }
6356
6357 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6358 for seq in checkpoint_contents {
6359 let checkpoint = store
6360 .get_checkpoint_by_sequence_number(*seq)?
6361 .and_then(|summary| {
6362 store
6363 .get_checkpoint_contents(&summary.content_digest)
6364 .expect("db read cannot fail")
6365 });
6366 contents.push(checkpoint);
6367 }
6368
6369 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6370 for digest in checkpoint_summaries_by_digest {
6371 let checkpoint = store
6372 .get_checkpoint_by_digest(digest)?
6373 .map(|c| c.into_inner());
6374 summaries_by_digest.push(checkpoint);
6375 }
6376 Ok((summaries, contents, summaries_by_digest))
6377 }
6378
6379 #[instrument(skip(self))]
6380 async fn deprecated_get_transaction_checkpoint(
6381 &self,
6382 digest: TransactionDigest,
6383 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6384 Ok(self
6385 .get_checkpoint_cache()
6386 .deprecated_get_transaction_checkpoint(&digest)
6387 .map(|(_epoch, checkpoint)| checkpoint))
6388 }
6389
6390 #[instrument(skip(self))]
6391 async fn get_object(
6392 &self,
6393 object_id: ObjectID,
6394 version: VersionNumber,
6395 ) -> SuiResult<Option<Object>> {
6396 Ok(self
6397 .get_object_cache_reader()
6398 .get_object_by_key(&object_id, version))
6399 }
6400
6401 #[instrument(skip_all)]
6402 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6403 Ok(self
6404 .get_object_cache_reader()
6405 .multi_get_objects_by_key(object_keys))
6406 }
6407
6408 #[instrument(skip(self))]
6409 async fn multi_get_transaction_checkpoint(
6410 &self,
6411 digests: &[TransactionDigest],
6412 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6413 let res = self
6414 .get_checkpoint_cache()
6415 .deprecated_multi_get_transaction_checkpoint(digests);
6416
6417 Ok(res
6418 .into_iter()
6419 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6420 .collect())
6421 }
6422
6423 #[instrument(skip(self))]
6424 async fn multi_get_events_by_tx_digests(
6425 &self,
6426 digests: &[TransactionDigest],
6427 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6428 if digests.is_empty() {
6429 return Ok(vec![]);
6430 }
6431
6432 Ok(self
6433 .get_transaction_cache_reader()
6434 .multi_get_events(digests))
6435 }
6436}
6437
6438#[cfg(msim)]
6439pub mod framework_injection {
6440 use move_binary_format::CompiledModule;
6441 use std::collections::BTreeMap;
6442 use std::{cell::RefCell, collections::BTreeSet};
6443 use sui_framework::{BuiltInFramework, SystemPackage};
6444 use sui_types::base_types::{AuthorityName, ObjectID};
6445 use sui_types::is_system_package;
6446
6447 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6448
6449 thread_local! {
6451 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6452 }
6453
6454 type Framework = Vec<CompiledModule>;
6455
6456 pub type PackageUpgradeCallback =
6457 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6458
6459 enum PackageOverrideConfig {
6460 Global(Framework),
6461 PerValidator(PackageUpgradeCallback),
6462 }
6463
6464 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6465 modules
6466 .iter()
6467 .map(|m| {
6468 let mut buf = Vec::new();
6469 m.serialize_with_version(m.version, &mut buf).unwrap();
6470 buf
6471 })
6472 .collect()
6473 }
6474
6475 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6476 OVERRIDE.with(|bs| {
6477 bs.borrow_mut()
6478 .insert(package_id, PackageOverrideConfig::Global(modules))
6479 });
6480 }
6481
6482 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6483 OVERRIDE.with(|bs| {
6484 bs.borrow_mut()
6485 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6486 });
6487 }
6488
6489 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6490 OVERRIDE.with(|bs| {
6491 let mut new_packages_not_to_include: BTreeSet<_> =
6492 BuiltInFramework::all_package_ids().into_iter().collect();
6493 for pkg in &packages {
6494 new_packages_not_to_include.remove(&pkg.id);
6495 }
6496 for pkg in packages {
6497 bs.borrow_mut()
6498 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6499 }
6500 for empty_pkg in new_packages_not_to_include {
6501 bs.borrow_mut()
6502 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6503 }
6504 });
6505 }
6506
6507 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6508 OVERRIDE.with(|cfg| {
6509 cfg.borrow().get(package_id).and_then(|entry| match entry {
6510 PackageOverrideConfig::Global(framework) => {
6511 Some(compiled_modules_to_bytes(framework))
6512 }
6513 PackageOverrideConfig::PerValidator(func) => {
6514 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6515 }
6516 })
6517 })
6518 }
6519
6520 pub fn get_override_modules(
6521 package_id: &ObjectID,
6522 name: AuthorityName,
6523 ) -> Option<Vec<CompiledModule>> {
6524 OVERRIDE.with(|cfg| {
6525 cfg.borrow().get(package_id).and_then(|entry| match entry {
6526 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6527 PackageOverrideConfig::PerValidator(func) => func(name),
6528 })
6529 })
6530 }
6531
6532 pub fn get_override_system_package(
6533 package_id: &ObjectID,
6534 name: AuthorityName,
6535 ) -> Option<SystemPackage> {
6536 let bytes = get_override_bytes(package_id, name)?;
6537 let dependencies = if is_system_package(*package_id) {
6538 BuiltInFramework::get_package_by_id(package_id)
6539 .dependencies
6540 .to_vec()
6541 } else {
6542 BuiltInFramework::all_package_ids()
6544 };
6545 Some(SystemPackage {
6546 id: *package_id,
6547 bytes,
6548 dependencies,
6549 })
6550 }
6551
6552 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6553 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6554 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6555 cfg.borrow()
6556 .keys()
6557 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6558 .collect()
6559 });
6560
6561 extra
6562 .into_iter()
6563 .map(|package| SystemPackage {
6564 id: package,
6565 bytes: get_override_bytes(&package, name).unwrap(),
6566 dependencies: BuiltInFramework::all_package_ids(),
6567 })
6568 .collect()
6569 }
6570}
6571
6572#[derive(Debug, Serialize, Deserialize, Clone)]
6573pub struct ObjDumpFormat {
6574 pub id: ObjectID,
6575 pub version: VersionNumber,
6576 pub digest: ObjectDigest,
6577 pub object: Object,
6578}
6579
6580impl ObjDumpFormat {
6581 fn new(object: Object) -> Self {
6582 let oref = object.compute_object_reference();
6583 Self {
6584 id: oref.0,
6585 version: oref.1,
6586 digest: oref.2,
6587 object,
6588 }
6589 }
6590}
6591
6592#[derive(Debug, Serialize, Deserialize, Clone)]
6593pub struct NodeStateDump {
6594 pub tx_digest: TransactionDigest,
6595 pub sender_signed_data: SenderSignedData,
6596 pub executed_epoch: u64,
6597 pub reference_gas_price: u64,
6598 pub protocol_version: u64,
6599 pub epoch_start_timestamp_ms: u64,
6600 pub computed_effects: TransactionEffects,
6601 pub expected_effects_digest: TransactionEffectsDigest,
6602 pub relevant_system_packages: Vec<ObjDumpFormat>,
6603 pub shared_objects: Vec<ObjDumpFormat>,
6604 pub loaded_child_objects: Vec<ObjDumpFormat>,
6605 pub modified_at_versions: Vec<ObjDumpFormat>,
6606 pub runtime_reads: Vec<ObjDumpFormat>,
6607 pub input_objects: Vec<ObjDumpFormat>,
6608}
6609
6610impl NodeStateDump {
6611 pub fn new(
6612 tx_digest: &TransactionDigest,
6613 effects: &TransactionEffects,
6614 expected_effects_digest: TransactionEffectsDigest,
6615 object_store: &dyn ObjectStore,
6616 epoch_store: &Arc<AuthorityPerEpochStore>,
6617 inner_temporary_store: &InnerTemporaryStore,
6618 certificate: &VerifiedExecutableTransaction,
6619 ) -> SuiResult<Self> {
6620 let executed_epoch = epoch_store.epoch();
6622 let reference_gas_price = epoch_store.reference_gas_price();
6623 let epoch_start_config = epoch_store.epoch_start_config();
6624 let protocol_version = epoch_store.protocol_version().as_u64();
6625 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6626
6627 let mut relevant_system_packages = Vec::new();
6629 for sys_package_id in BuiltInFramework::all_package_ids() {
6630 if let Some(w) = object_store.get_object(&sys_package_id) {
6631 relevant_system_packages.push(ObjDumpFormat::new(w))
6632 }
6633 }
6634
6635 let mut shared_objects = Vec::new();
6637 for kind in effects.input_consensus_objects() {
6638 match kind {
6639 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6640 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6641 shared_objects.push(ObjDumpFormat::new(w))
6642 }
6643 }
6644 InputConsensusObject::ReadConsensusStreamEnded(..)
6645 | InputConsensusObject::MutateConsensusStreamEnded(..)
6646 | InputConsensusObject::Cancelled(..) => (), }
6648 }
6649
6650 let mut loaded_child_objects = Vec::new();
6653 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6654 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6655 loaded_child_objects.push(ObjDumpFormat::new(w))
6656 }
6657 }
6658
6659 let mut modified_at_versions = Vec::new();
6661 for (id, ver) in effects.modified_at_versions() {
6662 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6663 modified_at_versions.push(ObjDumpFormat::new(w))
6664 }
6665 }
6666
6667 let mut runtime_reads = Vec::new();
6670 for obj in inner_temporary_store
6671 .runtime_packages_loaded_from_db
6672 .values()
6673 {
6674 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6675 }
6676
6677 Ok(Self {
6680 tx_digest: *tx_digest,
6681 executed_epoch,
6682 reference_gas_price,
6683 epoch_start_timestamp_ms,
6684 protocol_version,
6685 relevant_system_packages,
6686 shared_objects,
6687 loaded_child_objects,
6688 modified_at_versions,
6689 runtime_reads,
6690 sender_signed_data: certificate.clone().into_message(),
6691 input_objects: inner_temporary_store
6692 .input_objects
6693 .values()
6694 .map(|o| ObjDumpFormat::new(o.clone()))
6695 .collect(),
6696 computed_effects: effects.clone(),
6697 expected_effects_digest,
6698 })
6699 }
6700
6701 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6702 let mut objects = Vec::new();
6703 objects.extend(self.relevant_system_packages.clone());
6704 objects.extend(self.shared_objects.clone());
6705 objects.extend(self.loaded_child_objects.clone());
6706 objects.extend(self.modified_at_versions.clone());
6707 objects.extend(self.runtime_reads.clone());
6708 objects.extend(self.input_objects.clone());
6709 objects
6710 }
6711
6712 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6713 let file_name = format!(
6714 "{}_{}_NODE_DUMP.json",
6715 self.tx_digest,
6716 AuthorityState::unixtime_now_ms()
6717 );
6718 let mut path = path.to_path_buf();
6719 path.push(&file_name);
6720 let mut file = File::create(path.clone())?;
6721 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6722 Ok(path)
6723 }
6724
6725 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6726 let file = File::open(path)?;
6727 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6728 }
6729}