1use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::ZipDebugEqIteratorExt;
39use mysten_common::{assert_reachable, fatal};
40use parking_lot::Mutex;
41use prometheus::{
42 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43 register_histogram_vec_with_registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62 collections::{HashMap, HashSet},
63 fs,
64 pin::Pin,
65 str::FromStr,
66 sync::Arc,
67 vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::crypto::RandomnessRound;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::execution_status::ExecutionStatus;
83use sui_types::inner_temporary_store::PackageStoreWithFallback;
84use sui_types::layout_resolver::LayoutResolver;
85use sui_types::layout_resolver::into_struct_layout;
86use sui_types::messages_consensus::AuthorityCapabilitiesV2;
87use sui_types::object::bounded_visitor::BoundedVisitor;
88use sui_types::storage::ChildObjectResolver;
89use sui_types::storage::InputKey;
90use sui_types::storage::TrackingBackingStore;
91use sui_types::traffic_control::{
92 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
93};
94use sui_types::transaction_executor::SimulateTransactionResult;
95use sui_types::transaction_executor::TransactionChecks;
96use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
97use tap::TapFallible;
98use tokio::sync::RwLock;
99use tokio::sync::mpsc::unbounded_channel;
100use tokio::sync::watch::error::RecvError;
101use tokio::sync::{mpsc, oneshot};
102use tokio::task::JoinHandle;
103use tokio::time::timeout;
104use tracing::{debug, error, info, instrument, warn};
105
106use self::authority_store::ExecutionLockWriteGuard;
107use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
108pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
109use mysten_metrics::{monitored_scope, spawn_monitored_task};
110
111use crate::jsonrpc_index::IndexStore;
112use crate::jsonrpc_index::{
113 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
114};
115use mysten_common::{debug_fatal, debug_fatal_no_invariant};
116use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
117use sui_config::genesis::Genesis;
118use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
119use sui_framework::{BuiltInFramework, SystemPackage};
120use sui_json_rpc_types::{
121 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
122 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
123 SuiTransactionBlockEvents, TransactionFilter,
124};
125use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
126use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
127use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
128use sui_types::accumulator_root::AccumulatorValue;
129use sui_types::authenticator_state::get_authenticator_state;
130use sui_types::balance::Balance;
131use sui_types::coin_reservation;
132use sui_types::committee::{EpochId, ProtocolVersion};
133use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
134use sui_types::deny_list_v1::check_coin_deny_list_v1;
135use sui_types::digests::ChainIdentifier;
136use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
137use sui_types::effects::{
138 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
139 TransactionEvents, VerifiedSignedTransactionEffects,
140};
141use sui_types::error::{ExecutionError, ExecutionErrorTrait, SuiErrorKind, UserInputError};
142use sui_types::event::EventID;
143use sui_types::executable_transaction::VerifiedExecutableTransaction;
144use sui_types::execution_status::ExecutionErrorKind;
145use sui_types::gas::{GasCostSummary, SuiGasStatus};
146use sui_types::inner_temporary_store::{
147 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
148};
149use sui_types::message_envelope::Message;
150use sui_types::messages_checkpoint::{
151 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
152 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
153 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
154 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
155};
156use sui_types::messages_grpc::{
157 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
158 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
159};
160use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
161use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
162use sui_types::signature::GenericSignature;
163use sui_types::storage::{
164 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
165};
166use sui_types::sui_system_state::SuiSystemStateTrait;
167use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
168use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
169use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
170use sui_types::{
171 SUI_SYSTEM_ADDRESS,
172 base_types::*,
173 committee::Committee,
174 crypto::AuthoritySignature,
175 error::{SuiError, SuiResult},
176 object::{Object, ObjectRead},
177 transaction::*,
178};
179use sui_types::{TypeTag, is_system_package};
180use typed_store::TypedStoreError;
181use typed_store::rocks::StagedBatch;
182
183use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
184use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
185use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
186use crate::authority::authority_store_pruner::{
187 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
188};
189use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
190use crate::authority::epoch_start_configuration::EpochStartConfiguration;
191use crate::checkpoints::CheckpointStore;
192use crate::epoch::committee_store::CommitteeStore;
193use crate::execution_cache::{
194 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
195 ObjectCacheRead, StateSyncAPI,
196};
197use crate::execution_driver::execution_process;
198use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
199use crate::metrics::LatencyObserver;
200use crate::metrics::RateTracker;
201use crate::module_cache_metrics::ResolverMetrics;
202use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
203use crate::stake_aggregator::StakeAggregator;
204use crate::subscription_handler::SubscriptionHandler;
205use crate::transaction_input_loader::TransactionInputLoader;
206
207#[cfg(msim)]
208pub use crate::checkpoints::checkpoint_executor::utils::{
209 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
210};
211
212#[cfg(msim)]
213use sui_types::committee::CommitteeTrait;
214use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
215
216#[cfg(test)]
217#[path = "unit_tests/authority_tests.rs"]
218pub mod authority_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/transaction_tests.rs"]
222pub mod transaction_tests;
223
224#[cfg(test)]
225#[path = "unit_tests/batch_transaction_tests.rs"]
226mod batch_transaction_tests;
227
228#[cfg(test)]
229#[path = "unit_tests/move_integration_tests.rs"]
230pub mod move_integration_tests;
231
232#[cfg(test)]
233#[path = "unit_tests/gas_tests.rs"]
234mod gas_tests;
235
236#[cfg(test)]
237#[path = "unit_tests/gas_data_tests.rs"]
238mod gas_data_tests;
239
240#[cfg(test)]
241#[path = "unit_tests/batch_verification_tests.rs"]
242mod batch_verification_tests;
243
244#[cfg(test)]
245#[path = "unit_tests/coin_deny_list_tests.rs"]
246mod coin_deny_list_tests;
247
248#[cfg(test)]
249#[path = "unit_tests/auth_unit_test_utils.rs"]
250pub mod auth_unit_test_utils;
251
252pub mod authority_test_utils;
253
254pub mod authority_per_epoch_store;
255pub mod authority_per_epoch_store_pruner;
256
257pub mod authority_store_pruner;
258pub mod authority_store_tables;
259pub mod authority_store_types;
260pub mod congestion_log;
261pub mod consensus_tx_status_cache;
262pub(crate) mod epoch_marker_key;
263pub mod epoch_start_configuration;
264pub mod execution_time_estimator;
265pub mod shared_object_congestion_tracker;
266pub mod shared_object_version_manager;
267pub mod submitted_transaction_cache;
268pub mod test_authority_builder;
269pub mod transaction_deferral;
270pub mod transaction_reject_reason_cache;
271mod weighted_moving_average;
272
273pub(crate) mod authority_store;
274pub mod backpressure;
275
276pub struct AuthorityMetrics {
278 tx_orders: IntCounter,
279 total_certs: IntCounter,
280 total_effects: IntCounter,
281 pub shared_obj_tx: IntCounter,
283 sponsored_tx: IntCounter,
284 tx_already_processed: IntCounter,
285 num_input_objs: Histogram,
286 num_shared_objects: Histogram,
288 batch_size: Histogram,
289
290 authority_state_handle_vote_transaction_latency: Histogram,
291
292 internal_execution_latency: Histogram,
293 execution_load_input_objects_latency: Histogram,
294 prepare_certificate_latency: Histogram,
295 commit_certificate_latency: Histogram,
296 db_checkpoint_latency: Histogram,
297
298 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
300 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
301 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
302 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
303
304 pub(crate) execution_driver_executed_transactions: IntCounter,
305 pub(crate) execution_driver_paused_transactions: IntCounter,
306 pub(crate) execution_driver_dispatch_queue: IntGauge,
307 pub(crate) execution_queueing_delay_s: Histogram,
308 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
309 pub(crate) execution_gas_latency_ratio: Histogram,
310
311 pub(crate) skipped_consensus_txns: IntCounter,
312 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
313 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
314
315 pub(crate) authority_overload_status: IntGauge,
316 pub(crate) authority_load_shedding_percentage: IntGauge,
317
318 pub(crate) transaction_overload_sources: IntCounterVec,
319
320 post_processing_total_events_emitted: IntCounter,
322 post_processing_total_tx_indexed: IntCounter,
323 post_processing_total_tx_had_event_processed: IntCounter,
324 post_processing_total_failures: IntCounter,
325
326 pub consensus_handler_processed: IntCounterVec,
328 pub consensus_handler_transaction_sizes: HistogramVec,
329 pub consensus_handler_deferred_transactions: IntCounter,
330 pub consensus_handler_congested_transactions: IntCounter,
331 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
332 pub consensus_handler_cancelled_transactions: IntCounter,
333 pub consensus_handler_max_object_costs: IntGaugeVec,
334 pub consensus_committed_subdags: IntCounterVec,
335 pub accumulator_deposits: IntCounter,
336 pub accumulator_withdrawals: IntCounter,
337 pub consensus_committed_messages: IntGaugeVec,
338 pub consensus_committed_user_transactions: IntGaugeVec,
339 pub consensus_finalized_user_transactions: IntGaugeVec,
340 pub consensus_rejected_user_transactions: IntGaugeVec,
341 pub consensus_calculated_throughput: IntGauge,
342 pub consensus_calculated_throughput_profile: IntGauge,
343 pub consensus_block_handler_block_processed: IntCounter,
344 pub consensus_block_handler_txn_processed: IntCounterVec,
345 pub consensus_block_handler_fastpath_executions: IntCounter,
346 pub consensus_timestamp_bias: Histogram,
347
348 pub execution_metrics: Arc<ExecutionMetrics>,
349
350 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
352
353 pub zklogin_sig_count: IntCounter,
355 pub multisig_sig_count: IntCounter,
357
358 pub execution_queueing_latency: LatencyObserver,
361
362 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
369
370 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
373}
374
375const POSITIVE_INT_BUCKETS: &[f64] = &[
377 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
378 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
379 7000000., 10000000.,
380];
381
382const LATENCY_SEC_BUCKETS: &[f64] = &[
383 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
384 10., 20., 30., 60., 90.,
385];
386
387const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
389 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
390 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
391];
392
393const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
396 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
397 2.0, 10.0, 30.0,
398];
399
400const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
401 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
402 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
403];
404
405pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
406
407pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
411
412impl AuthorityMetrics {
413 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
414 Self {
415 tx_orders: register_int_counter_with_registry!(
416 "total_transaction_orders",
417 "Total number of transaction orders",
418 registry,
419 )
420 .unwrap(),
421 total_certs: register_int_counter_with_registry!(
422 "total_transaction_certificates",
423 "Total number of transaction certificates handled",
424 registry,
425 )
426 .unwrap(),
427 total_effects: register_int_counter_with_registry!(
429 "total_transaction_effects",
430 "Total number of transaction effects produced",
431 registry,
432 )
433 .unwrap(),
434
435 shared_obj_tx: register_int_counter_with_registry!(
436 "num_shared_obj_tx",
437 "Number of transactions involving shared objects",
438 registry,
439 )
440 .unwrap(),
441
442 sponsored_tx: register_int_counter_with_registry!(
443 "num_sponsored_tx",
444 "Number of sponsored transactions",
445 registry,
446 )
447 .unwrap(),
448
449 tx_already_processed: register_int_counter_with_registry!(
450 "num_tx_already_processed",
451 "Number of transaction orders already processed previously",
452 registry,
453 )
454 .unwrap(),
455 num_input_objs: register_histogram_with_registry!(
456 "num_input_objects",
457 "Distribution of number of input TX objects per TX",
458 POSITIVE_INT_BUCKETS.to_vec(),
459 registry,
460 )
461 .unwrap(),
462 num_shared_objects: register_histogram_with_registry!(
463 "num_shared_objects",
464 "Number of shared input objects per TX",
465 POSITIVE_INT_BUCKETS.to_vec(),
466 registry,
467 )
468 .unwrap(),
469 batch_size: register_histogram_with_registry!(
470 "batch_size",
471 "Distribution of size of transaction batch",
472 POSITIVE_INT_BUCKETS.to_vec(),
473 registry,
474 )
475 .unwrap(),
476 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
477 "authority_state_handle_vote_transaction_latency",
478 "Latency of voting on transactions without signing",
479 LATENCY_SEC_BUCKETS.to_vec(),
480 registry,
481 )
482 .unwrap(),
483 internal_execution_latency: register_histogram_with_registry!(
484 "authority_state_internal_execution_latency",
485 "Latency of actual certificate executions",
486 LATENCY_SEC_BUCKETS.to_vec(),
487 registry,
488 )
489 .unwrap(),
490 execution_load_input_objects_latency: register_histogram_with_registry!(
491 "authority_state_execution_load_input_objects_latency",
492 "Latency of loading input objects for execution",
493 LOW_LATENCY_SEC_BUCKETS.to_vec(),
494 registry,
495 )
496 .unwrap(),
497 prepare_certificate_latency: register_histogram_with_registry!(
498 "authority_state_prepare_certificate_latency",
499 "Latency of executing certificates, before committing the results",
500 LATENCY_SEC_BUCKETS.to_vec(),
501 registry,
502 )
503 .unwrap(),
504 commit_certificate_latency: register_histogram_with_registry!(
505 "authority_state_commit_certificate_latency",
506 "Latency of committing certificate execution results",
507 LATENCY_SEC_BUCKETS.to_vec(),
508 registry,
509 )
510 .unwrap(),
511 db_checkpoint_latency: register_histogram_with_registry!(
512 "db_checkpoint_latency",
513 "Latency of checkpointing dbs",
514 LATENCY_SEC_BUCKETS.to_vec(),
515 registry,
516 ).unwrap(),
517 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
518 "transaction_manager_num_enqueued_certificates",
519 "Current number of certificates enqueued to ExecutionScheduler",
520 &["result"],
521 registry,
522 )
523 .unwrap(),
524 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
525 "transaction_manager_num_pending_certificates",
526 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
527 registry,
528 )
529 .unwrap(),
530 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
531 "transaction_manager_num_executing_certificates",
532 "Number of executing certificates, including queued and actually running certificates",
533 registry,
534 )
535 .unwrap(),
536 authority_overload_status: register_int_gauge_with_registry!(
537 "authority_overload_status",
538 "Whether authority is current experiencing overload and enters load shedding mode.",
539 registry)
540 .unwrap(),
541 authority_load_shedding_percentage: register_int_gauge_with_registry!(
542 "authority_load_shedding_percentage",
543 "The percentage of transactions is shed when the authority is in load shedding mode.",
544 registry)
545 .unwrap(),
546 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
547 "transaction_manager_transaction_queue_age_s",
548 "Time spent in waiting for transaction in the queue",
549 LATENCY_SEC_BUCKETS.to_vec(),
550 registry,
551 )
552 .unwrap(),
553 transaction_overload_sources: register_int_counter_vec_with_registry!(
554 "transaction_overload_sources",
555 "Number of times each source indicates transaction overload.",
556 &["source"],
557 registry)
558 .unwrap(),
559 execution_driver_executed_transactions: register_int_counter_with_registry!(
560 "execution_driver_executed_transactions",
561 "Cumulative number of transaction executed by execution driver",
562 registry,
563 )
564 .unwrap(),
565 execution_driver_paused_transactions: register_int_counter_with_registry!(
566 "execution_driver_paused_transactions",
567 "Cumulative number of transactions paused by execution driver",
568 registry,
569 )
570 .unwrap(),
571 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
572 "execution_driver_dispatch_queue",
573 "Number of transaction pending in execution driver dispatch queue",
574 registry,
575 )
576 .unwrap(),
577 execution_queueing_delay_s: register_histogram_with_registry!(
578 "execution_queueing_delay_s",
579 "Queueing delay between a transaction is ready for execution until it starts executing.",
580 LATENCY_SEC_BUCKETS.to_vec(),
581 registry
582 )
583 .unwrap(),
584 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
585 "prepare_cert_gas_latency_ratio",
586 "The ratio of computation gas divided by VM execution latency.",
587 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
588 registry
589 )
590 .unwrap(),
591 execution_gas_latency_ratio: register_histogram_with_registry!(
592 "execution_gas_latency_ratio",
593 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
594 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
595 registry
596 )
597 .unwrap(),
598 skipped_consensus_txns: register_int_counter_with_registry!(
599 "skipped_consensus_txns",
600 "Total number of consensus transactions skipped",
601 registry,
602 )
603 .unwrap(),
604 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
605 "skipped_consensus_txns_cache_hit",
606 "Total number of consensus transactions skipped because of local cache hit",
607 registry,
608 )
609 .unwrap(),
610 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
611 "consensus_handler_duplicate_tx_count",
612 "Number of times each transaction appears in its first consensus commit",
613 POSITIVE_INT_BUCKETS.to_vec(),
614 registry,
615 )
616 .unwrap(),
617 post_processing_total_events_emitted: register_int_counter_with_registry!(
618 "post_processing_total_events_emitted",
619 "Total number of events emitted in post processing",
620 registry,
621 )
622 .unwrap(),
623 post_processing_total_tx_indexed: register_int_counter_with_registry!(
624 "post_processing_total_tx_indexed",
625 "Total number of txes indexed in post processing",
626 registry,
627 )
628 .unwrap(),
629 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
630 "post_processing_total_tx_had_event_processed",
631 "Total number of txes finished event processing in post processing",
632 registry,
633 )
634 .unwrap(),
635 post_processing_total_failures: register_int_counter_with_registry!(
636 "post_processing_total_failures",
637 "Total number of failure in post processing",
638 registry,
639 )
640 .unwrap(),
641 consensus_handler_processed: register_int_counter_vec_with_registry!(
642 "consensus_handler_processed",
643 "Number of transactions processed by consensus handler",
644 &["class"],
645 registry
646 ).unwrap(),
647 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
648 "consensus_handler_transaction_sizes",
649 "Sizes of each type of transactions processed by consensus handler",
650 &["class"],
651 POSITIVE_INT_BUCKETS.to_vec(),
652 registry
653 ).unwrap(),
654 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
655 "consensus_handler_deferred_transactions",
656 "Number of transactions deferred by consensus handler",
657 registry,
658 ).unwrap(),
659 consensus_handler_congested_transactions: register_int_counter_with_registry!(
660 "consensus_handler_congested_transactions",
661 "Number of transactions deferred by consensus handler due to congestion",
662 registry,
663 ).unwrap(),
664 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
665 "consensus_handler_unpaid_amplification_deferrals",
666 "Number of transactions deferred due to unpaid consensus amplification",
667 registry,
668 ).unwrap(),
669 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
670 "consensus_handler_cancelled_transactions",
671 "Number of transactions cancelled by consensus handler",
672 registry,
673 ).unwrap(),
674 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
675 "consensus_handler_max_congestion_control_object_costs",
676 "Max object costs for congestion control in the current consensus commit",
677 &["commit_type"],
678 registry,
679 ).unwrap(),
680 consensus_committed_subdags: register_int_counter_vec_with_registry!(
681 "consensus_committed_subdags",
682 "Number of committed subdags, sliced by author",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 accumulator_deposits: register_int_counter_with_registry!(
687 "accumulator_deposits",
688 "Total accumulator deposit (merge) events processed in settlement",
689 registry,
690 ).unwrap(),
691 accumulator_withdrawals: register_int_counter_with_registry!(
692 "accumulator_withdrawals",
693 "Total accumulator withdrawal (split) events processed in settlement",
694 registry,
695 ).unwrap(),
696 consensus_committed_messages: register_int_gauge_vec_with_registry!(
697 "consensus_committed_messages",
698 "Total number of committed consensus messages, sliced by author",
699 &["authority"],
700 registry,
701 ).unwrap(),
702 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
703 "consensus_committed_user_transactions",
704 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
705 &["authority"],
706 registry,
707 ).unwrap(),
708 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
709 "consensus_finalized_user_transactions",
710 "Number of user transactions finalized, sliced by submitter",
711 &["authority"],
712 registry,
713 ).unwrap(),
714 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
715 "consensus_rejected_user_transactions",
716 "Number of user transactions rejected, sliced by submitter",
717 &["authority"],
718 registry,
719 ).unwrap(),
720 execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
721 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
722 zklogin_sig_count: register_int_counter_with_registry!(
723 "zklogin_sig_count",
724 "Count of zkLogin signatures",
725 registry,
726 )
727 .unwrap(),
728 multisig_sig_count: register_int_counter_with_registry!(
729 "multisig_sig_count",
730 "Count of zkLogin signatures",
731 registry,
732 )
733 .unwrap(),
734 consensus_calculated_throughput: register_int_gauge_with_registry!(
735 "consensus_calculated_throughput",
736 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
737 registry,
738 ).unwrap(),
739 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
740 "consensus_calculated_throughput_profile",
741 "The current active calculated throughput profile",
742 registry
743 ).unwrap(),
744 consensus_block_handler_block_processed: register_int_counter_with_registry!(
745 "consensus_block_handler_block_processed",
746 "Number of blocks processed by consensus block handler.",
747 registry
748 ).unwrap(),
749 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
750 "consensus_block_handler_txn_processed",
751 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
752 &["outcome"],
753 registry
754 ).unwrap(),
755 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
756 "consensus_block_handler_fastpath_executions",
757 "Number of fastpath transactions sent for execution by consensus transaction handler",
758 registry,
759 ).unwrap(),
760 consensus_timestamp_bias: register_histogram_with_registry!(
761 "consensus_timestamp_bias",
762 "Bias/delay from consensus timestamp to system time",
763 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
764 registry
765 ).unwrap(),
766 execution_queueing_latency: LatencyObserver::new(),
767 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
768 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
769 }
770 }
771}
772
773pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
780
781#[derive(Debug, Clone)]
784pub struct ExecutionEnv {
785 pub assigned_versions: AssignedVersions,
787 pub expected_effects_digest: Option<TransactionEffectsDigest>,
790 pub funds_withdraw_status: FundsWithdrawStatus,
793 pub barrier_dependencies: Vec<TransactionDigest>,
796}
797
798impl Default for ExecutionEnv {
799 fn default() -> Self {
800 Self {
801 assigned_versions: Default::default(),
802 expected_effects_digest: None,
803 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
804 barrier_dependencies: Default::default(),
805 }
806 }
807}
808
809impl ExecutionEnv {
810 pub fn new() -> Self {
811 Default::default()
812 }
813
814 pub fn with_expected_effects_digest(
815 mut self,
816 expected_effects_digest: TransactionEffectsDigest,
817 ) -> Self {
818 self.expected_effects_digest = Some(expected_effects_digest);
819 self
820 }
821
822 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
823 self.assigned_versions = assigned_versions;
824 self
825 }
826
827 pub fn with_insufficient_funds(mut self) -> Self {
828 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
829 self
830 }
831
832 pub fn with_barrier_dependencies(
833 mut self,
834 barrier_dependencies: BTreeSet<TransactionDigest>,
835 ) -> Self {
836 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
837 self
838 }
839}
840
841#[derive(Debug)]
842pub struct ForkRecoveryState {
843 transaction_overrides:
845 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
846}
847
848impl Default for ForkRecoveryState {
849 fn default() -> Self {
850 Self {
851 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
852 }
853 }
854}
855
856impl ForkRecoveryState {
857 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
858 let Some(config) = config else {
859 return Ok(Self::default());
860 };
861
862 let mut transaction_overrides = HashMap::new();
863 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
864 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
865 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
866 })?;
867 let effects_digest =
868 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
869 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
870 })?;
871 transaction_overrides.insert(tx_digest, effects_digest);
872 }
873
874 Ok(Self {
875 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
876 })
877 }
878
879 pub fn get_transaction_override(
880 &self,
881 tx_digest: &TransactionDigest,
882 ) -> Option<TransactionEffectsDigest> {
883 self.transaction_overrides.read().get(tx_digest).copied()
884 }
885}
886
887pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
888
889pub struct AuthorityState {
890 pub name: AuthorityName,
893 pub secret: StableSyncAuthoritySigner,
895
896 input_loader: TransactionInputLoader,
898 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
899 coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
900
901 epoch_store: ArcSwap<AuthorityPerEpochStore>,
902
903 execution_lock: RwLock<EpochId>,
908
909 pub indexes: Option<Arc<IndexStore>>,
910 pub rpc_index: Option<Arc<RpcIndexStore>>,
911
912 pub subscription_handler: Arc<SubscriptionHandler>,
913 pub checkpoint_store: Arc<CheckpointStore>,
914
915 committee_store: Arc<CommitteeStore>,
916
917 execution_scheduler: Arc<ExecutionScheduler>,
919
920 #[allow(unused)]
922 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
923
924 pub metrics: Arc<AuthorityMetrics>,
925 _pruner: AuthorityStorePruner,
926 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
927
928 db_checkpoint_config: DBCheckpointConfig,
930
931 pub config: NodeConfig,
932
933 pub overload_info: AuthorityOverloadInfo,
935
936 chain_identifier: ChainIdentifier,
938
939 pub(crate) congestion_tracker: Arc<CongestionTracker>,
940
941 pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
943
944 pub traffic_controller: Option<Arc<TrafficController>>,
946
947 fork_recovery_state: Option<ForkRecoveryState>,
949
950 notify_epoch: tokio::sync::watch::Sender<EpochId>,
952
953 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
954 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
955
956 pending_post_processing:
960 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
961
962 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
965}
966
967impl AuthorityState {
974 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
975 epoch_store.committee().authority_exists(&self.name)
976 }
977
978 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
979 !self.is_validator(epoch_store)
980 }
981
982 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
983 &self.committee_store
984 }
985
986 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
987 self.committee_store.clone()
988 }
989
990 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
991 &self.config.authority_overload_config
992 }
993
994 pub fn get_epoch_state_commitments(
995 &self,
996 epoch: EpochId,
997 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
998 self.checkpoint_store.get_epoch_state_commitments(epoch)
999 }
1000
1001 fn pre_object_load_checks(
1004 &self,
1005 tx_data: &TransactionData,
1006 tx_signatures: &[GenericSignature],
1007 input_object_kinds: &[InputObjectKind],
1008 receiving_objects_refs: &[ObjectRef],
1009 protocol_config: &ProtocolConfig,
1010 ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1011 sui_transaction_checks::deny::check_transaction_for_signing(
1015 tx_data,
1016 tx_signatures,
1017 input_object_kinds,
1018 receiving_objects_refs,
1019 &self.config.transaction_deny_config,
1020 self.get_backing_package_store().as_ref(),
1021 )?;
1022
1023 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1024 self.chain_identifier,
1025 self.coin_reservation_resolver.as_ref(),
1026 )?;
1027
1028 self.execution_cache_trait_pointers
1029 .account_funds_read
1030 .check_amounts_available(&declared_withdrawals)?;
1031
1032 if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1033 let min_amounts =
1034 sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1035 self.execution_cache_trait_pointers
1036 .account_funds_read
1037 .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1038 }
1039
1040 Ok(declared_withdrawals)
1041 }
1042
1043 fn handle_transaction_deny_checks(
1044 &self,
1045 transaction: &VerifiedTransaction,
1046 epoch_store: &Arc<AuthorityPerEpochStore>,
1047 ) -> SuiResult<CheckedInputObjects> {
1048 let tx_digest = transaction.digest();
1049 let tx_data = transaction.data().transaction_data();
1050
1051 let input_object_kinds = tx_data.input_objects()?;
1052 let receiving_objects_refs = tx_data.receiving_objects();
1053
1054 self.pre_object_load_checks(
1055 tx_data,
1056 transaction.tx_signatures(),
1057 &input_object_kinds,
1058 &receiving_objects_refs,
1059 epoch_store.protocol_config(),
1060 )?;
1061
1062 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1063 Some(tx_digest),
1064 &input_object_kinds,
1065 &receiving_objects_refs,
1066 epoch_store.epoch(),
1067 )?;
1068
1069 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1070 epoch_store.protocol_config(),
1071 epoch_store.reference_gas_price(),
1072 tx_data,
1073 input_objects,
1074 &receiving_objects,
1075 &self.metrics.bytecode_verifier_metrics,
1076 &self.config.verifier_signing_config,
1077 )?;
1078
1079 self.handle_coin_deny_list_checks(
1080 tx_data,
1081 &checked_input_objects,
1082 &receiving_objects,
1083 epoch_store,
1084 )?;
1085
1086 Ok(checked_input_objects)
1087 }
1088
1089 fn handle_coin_deny_list_checks(
1090 &self,
1091 tx_data: &TransactionData,
1092 checked_input_objects: &CheckedInputObjects,
1093 receiving_objects: &ReceivingObjects,
1094 epoch_store: &Arc<AuthorityPerEpochStore>,
1095 ) -> SuiResult<()> {
1096 use sui_types::balance::Balance;
1097
1098 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1099 self.chain_identifier,
1100 self.coin_reservation_resolver.as_ref(),
1101 )?;
1102
1103 let funds_withdraw_types = declared_withdrawals
1104 .values()
1105 .filter_map(|(_, type_tag)| {
1106 Balance::maybe_get_balance_type_param(type_tag)
1107 .map(|ty| ty.to_canonical_string(false))
1108 })
1109 .collect::<BTreeSet<_>>();
1110
1111 if epoch_store.coin_deny_list_v1_enabled() {
1112 check_coin_deny_list_v1(
1113 tx_data.sender(),
1114 checked_input_objects,
1115 receiving_objects,
1116 funds_withdraw_types.clone(),
1117 &self.get_object_store(),
1118 )?;
1119 }
1120
1121 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1122 check_coin_deny_list_v2_during_signing(
1123 tx_data.sender(),
1124 checked_input_objects,
1125 receiving_objects,
1126 funds_withdraw_types.clone(),
1127 &self.get_object_store(),
1128 )?;
1129 }
1130
1131 Ok(())
1132 }
1133
1134 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1144 pub fn handle_vote_transaction(
1145 &self,
1146 epoch_store: &Arc<AuthorityPerEpochStore>,
1147 transaction: VerifiedTransaction,
1148 ) -> SuiResult<()> {
1149 debug!("handle_vote_transaction");
1150
1151 let _metrics_guard = self
1152 .metrics
1153 .authority_state_handle_vote_transaction_latency
1154 .start_timer();
1155 self.metrics.tx_orders.inc();
1156
1157 if !epoch_store
1161 .get_reconfig_state_read_lock_guard()
1162 .should_accept_user_certs()
1163 {
1164 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1165 }
1166
1167 let tx_digest = *transaction.digest();
1172 if epoch_store.is_recently_finalized(&tx_digest)
1173 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1174 {
1175 assert_reachable!("transaction recently executed");
1176 return Ok(());
1177 }
1178
1179 if self
1180 .get_transaction_cache_reader()
1181 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1182 {
1183 assert_reachable!("transaction executed in last epoch");
1184 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1185 digest: (*transaction.digest()),
1186 }
1187 .into());
1188 }
1189
1190 let _execution_lock = self.execution_lock_for_validation()?;
1192
1193 let checked_input_objects =
1194 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1195
1196 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1197
1198 self.get_cache_writer()
1202 .validate_owned_object_versions(&owned_objects)
1203 }
1204
1205 pub fn check_transaction_validity(
1214 &self,
1215 epoch_store: &Arc<AuthorityPerEpochStore>,
1216 transaction: &VerifiedTransaction,
1217 enforce_live_input_objects: bool,
1218 ) -> SuiResult<()> {
1219 if !epoch_store
1220 .get_reconfig_state_read_lock_guard()
1221 .should_accept_user_certs()
1222 {
1223 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1224 }
1225
1226 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1227
1228 let checked_input_objects =
1229 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1230
1231 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1233 let cache_reader = self.get_object_cache_reader();
1234
1235 for obj_ref in &owned_objects {
1236 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1237 if obj_ref.1 < live_object.version() {
1240 return Err(SuiErrorKind::UserInputError {
1241 error: UserInputError::ObjectVersionUnavailableForConsumption {
1242 provided_obj_ref: *obj_ref,
1243 current_version: live_object.version(),
1244 },
1245 }
1246 .into());
1247 }
1248
1249 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1250 return Err(SuiErrorKind::UserInputError {
1252 error: UserInputError::ObjectVersionUnavailableForConsumption {
1253 provided_obj_ref: *obj_ref,
1254 current_version: live_object.version(),
1255 },
1256 }
1257 .into());
1258 }
1259
1260 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1262 return Err(SuiErrorKind::UserInputError {
1263 error: UserInputError::InvalidObjectDigest {
1264 object_id: obj_ref.0,
1265 expected_digest: live_object.digest(),
1266 },
1267 }
1268 .into());
1269 }
1270 }
1271 }
1272
1273 Ok(())
1274 }
1275
1276 pub fn check_system_overload_at_signing(&self) -> bool {
1277 self.config
1278 .authority_overload_config
1279 .check_system_overload_at_signing
1280 }
1281
1282 pub fn check_system_overload_at_execution(&self) -> bool {
1283 self.config
1284 .authority_overload_config
1285 .check_system_overload_at_execution
1286 }
1287
1288 pub(crate) fn check_system_overload(
1289 &self,
1290 tx_data: &SenderSignedData,
1291 do_authority_overload_check: bool,
1292 ) -> SuiResult {
1293 if do_authority_overload_check {
1294 self.check_authority_overload(tx_data).tap_err(|_| {
1295 self.update_overload_metrics("execution_queue");
1296 })?;
1297 }
1298 self.execution_scheduler
1299 .check_execution_overload(self.overload_config(), tx_data)
1300 .tap_err(|_| {
1301 self.update_overload_metrics("execution_pending");
1302 })?;
1303 let pending_tx_count = self
1306 .get_cache_commit()
1307 .approximate_pending_transaction_count();
1308 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1309 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1310 retry_after_secs: 10,
1311 }
1312 .into());
1313 }
1314
1315 Ok(())
1316 }
1317
1318 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1319 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1320 return Ok(());
1321 }
1322
1323 let load_shedding_percentage = self
1324 .overload_info
1325 .load_shedding_percentage
1326 .load(Ordering::Relaxed);
1327 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1328 }
1329
1330 pub(crate) fn update_overload_metrics(&self, source: &str) {
1331 self.metrics
1332 .transaction_overload_sources
1333 .with_label_values(&[source])
1334 .inc();
1335 }
1336
1337 pub(crate) async fn wait_for_fastpath_dependency_objects(
1341 &self,
1342 transaction: &VerifiedTransaction,
1343 epoch: EpochId,
1344 ) -> SuiResult<bool> {
1345 let txn_data = transaction.data().transaction_data();
1346 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1347
1348 let fastpath_dependency_objects: Vec<_> = move_objects
1350 .into_iter()
1351 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1352 .chain(
1353 packages
1354 .into_iter()
1355 .map(|package_id| InputKey::Package { id: package_id }),
1356 )
1357 .collect();
1358 let receiving_keys: HashSet<_> = receiving_objects
1359 .into_iter()
1360 .filter_map(|receiving_obj_ref| {
1361 self.should_wait_for_dependency_object(receiving_obj_ref)
1362 })
1363 .collect();
1364 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1365 return Ok(true);
1366 }
1367
1368 let max_wait = if cfg!(msim) {
1371 Duration::from_millis(200)
1372 } else {
1373 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1374 };
1375
1376 match timeout(
1377 max_wait,
1378 self.get_object_cache_reader().notify_read_input_objects(
1379 &fastpath_dependency_objects,
1380 &receiving_keys,
1381 epoch,
1382 ),
1383 )
1384 .await
1385 {
1386 Ok(()) => Ok(true),
1387 Err(_) => Ok(false),
1390 }
1391 }
1392
1393 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1400 let (obj_id, cur_version, _digest) = obj_ref;
1401 let Some(latest_obj_ref) = self
1402 .get_object_cache_reader()
1403 .get_latest_object_ref_or_tombstone(obj_id)
1404 else {
1405 return Some(InputKey::VersionedObject {
1407 id: FullObjectID::new(obj_id, None),
1408 version: cur_version,
1409 });
1410 };
1411 let latest_digest = latest_obj_ref.2;
1412 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1413 return None;
1415 }
1416 let latest_version = latest_obj_ref.1;
1417 if cur_version <= latest_version {
1418 return None;
1421 }
1422 Some(InputKey::VersionedObject {
1424 id: FullObjectID::new(obj_id, None),
1425 version: cur_version,
1426 })
1427 }
1428
1429 #[instrument(level = "trace", skip_all)]
1434 pub async fn wait_for_transaction_execution_for_testing(
1435 &self,
1436 transaction: &VerifiedExecutableTransaction,
1437 ) -> TransactionEffects {
1438 self.notify_read_effects_for_testing(
1439 "AuthorityState::wait_for_transaction_execution_for_testing",
1440 *transaction.digest(),
1441 )
1442 .await
1443 }
1444
1445 #[instrument(level = "trace", skip_all)]
1457 pub async fn try_execute_immediately(
1458 &self,
1459 certificate: &VerifiedExecutableTransaction,
1460 mut execution_env: ExecutionEnv,
1461 epoch_store: &Arc<AuthorityPerEpochStore>,
1462 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1463 let _scope = monitored_scope("Execution::try_execute_immediately");
1464 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1465
1466 let tx_digest = certificate.digest();
1467
1468 if let Some(fork_recovery) = &self.fork_recovery_state
1469 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1470 {
1471 warn!(
1472 ?tx_digest,
1473 original_digest = ?execution_env.expected_effects_digest,
1474 override_digest = ?override_digest,
1475 "Applying fork recovery override for transaction effects digest"
1476 );
1477 execution_env.expected_effects_digest = Some(override_digest);
1478 }
1479
1480 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1482
1483 let tx_cache_reader = self.get_transaction_cache_reader();
1484 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1485 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1486 assert_eq!(
1487 effects.digest(),
1488 expected_effects_digest,
1489 "Unexpected effects digest for transaction {:?}",
1490 tx_digest
1491 );
1492 }
1493 tx_guard.release();
1494 return ExecutionOutput::Success((effects, None));
1495 }
1496
1497 let execution_start_time = Instant::now();
1498
1499 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1504 else {
1505 tx_guard.release();
1506 return ExecutionOutput::EpochEnded;
1507 };
1508 if *execution_guard != epoch_store.epoch() {
1513 tx_guard.release();
1514 info!("The epoch of the execution_guard doesn't match the epoch store");
1515 return ExecutionOutput::EpochEnded;
1516 }
1517
1518 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1519
1520 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1521 &tx_guard,
1522 &execution_guard,
1523 certificate,
1524 execution_env,
1525 epoch_store,
1526 ) {
1527 ExecutionOutput::Success(result) => result,
1528 output => return output.unwrap_err(),
1529 };
1530 let transaction_outputs = Arc::new(transaction_outputs);
1531
1532 fail_point!("crash");
1533
1534 let effects = transaction_outputs.effects.clone();
1535 debug!(
1536 ?tx_digest,
1537 fx_digest=?effects.digest(),
1538 "process_certificate succeeded in {:.3}ms",
1539 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1540 );
1541
1542 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1543 if let Err(err) = commit_result {
1544 error!(?tx_digest, "Error committing transaction: {err}");
1545 tx_guard.release();
1546 return ExecutionOutput::Fatal(err);
1547 }
1548
1549 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1550 certificate.data().transaction_data().kind()
1551 {
1552 if let Some(err) = &execution_error_opt {
1553 debug_fatal!("Authenticator state update failed: {:?}", err);
1554 }
1555 epoch_store.update_authenticator_state(auth_state);
1556
1557 if cfg!(debug_assertions) {
1559 let authenticator_state = get_authenticator_state(self.get_object_store())
1560 .expect("Read cannot fail")
1561 .expect("Authenticator state must exist");
1562
1563 let mut sys_jwks: Vec<_> = authenticator_state
1564 .active_jwks
1565 .into_iter()
1566 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1567 .collect();
1568 let mut active_jwks: Vec<_> = epoch_store
1569 .signature_verifier
1570 .get_jwks()
1571 .into_iter()
1572 .collect();
1573 sys_jwks.sort();
1574 active_jwks.sort();
1575
1576 assert_eq!(sys_jwks, active_jwks);
1577 }
1578 }
1579
1580 if certificate
1584 .transaction_data()
1585 .kind()
1586 .is_accumulator_barrier_settle_tx()
1587 {
1588 let object_funds_checker = self.object_funds_checker.load();
1589 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1590 let next_accumulator_version = accumulator_version.unwrap().next();
1593 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1594 }
1595 }
1596
1597 tx_guard.commit_tx();
1598
1599 let elapsed = execution_start_time.elapsed();
1600 epoch_store.record_local_execution_time(
1601 certificate.data().transaction_data(),
1602 &effects,
1603 timings,
1604 elapsed,
1605 );
1606
1607 let elapsed_us = elapsed.as_micros() as f64;
1608 if elapsed_us > 0.0 {
1609 self.metrics
1610 .execution_gas_latency_ratio
1611 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1612 };
1613 ExecutionOutput::Success((effects, execution_error_opt))
1614 }
1615
1616 pub fn read_objects_for_execution(
1617 &self,
1618 tx_lock: &CertLockGuard,
1619 certificate: &VerifiedExecutableTransaction,
1620 assigned_shared_object_versions: &AssignedVersions,
1621 epoch_store: &Arc<AuthorityPerEpochStore>,
1622 ) -> SuiResult<InputObjects> {
1623 let _scope = monitored_scope("Execution::load_input_objects");
1624 let _metrics_guard = self
1625 .metrics
1626 .execution_load_input_objects_latency
1627 .start_timer();
1628 let input_objects = &certificate.data().transaction_data().input_objects()?;
1629 self.input_loader.read_objects_for_execution(
1630 &certificate.key(),
1631 tx_lock,
1632 input_objects,
1633 assigned_shared_object_versions,
1634 epoch_store.epoch(),
1635 )
1636 }
1637
1638 pub async fn try_execute_for_test(
1641 &self,
1642 certificate: &VerifiedCertificate,
1643 execution_env: ExecutionEnv,
1644 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1645 let epoch_store = self.epoch_store_for_testing();
1646 let (effects, execution_error_opt) = self
1647 .try_execute_immediately(
1648 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1649 execution_env,
1650 &epoch_store,
1651 )
1652 .await
1653 .unwrap();
1654 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1655 (signed_effects, execution_error_opt)
1656 }
1657
1658 pub async fn try_execute_executable_for_test(
1659 &self,
1660 executable: &VerifiedExecutableTransaction,
1661 execution_env: ExecutionEnv,
1662 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1663 let epoch_store = self.epoch_store_for_testing();
1664 let (effects, execution_error_opt) = self
1665 .try_execute_immediately(executable, execution_env, &epoch_store)
1666 .await
1667 .unwrap();
1668 self.flush_post_processing(executable.digest()).await;
1669 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1670 (signed_effects, execution_error_opt)
1671 }
1672
1673 pub async fn notify_read_effects_for_testing(
1678 &self,
1679 task_name: &'static str,
1680 digest: TransactionDigest,
1681 ) -> TransactionEffects {
1682 self.get_transaction_cache_reader()
1683 .notify_read_executed_effects(task_name, &[digest])
1684 .await
1685 .pop()
1686 .expect("must return correct number of effects")
1687 }
1688
1689 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1690 self.get_object_cache_reader()
1691 .check_owned_objects_are_live(owned_object_refs)
1692 }
1693
1694 pub(crate) fn debug_dump_transaction_state(
1699 &self,
1700 tx_digest: &TransactionDigest,
1701 effects: &TransactionEffects,
1702 expected_effects_digest: TransactionEffectsDigest,
1703 inner_temporary_store: &InnerTemporaryStore,
1704 certificate: &VerifiedExecutableTransaction,
1705 debug_dump_config: &StateDebugDumpConfig,
1706 ) -> SuiResult<PathBuf> {
1707 let dump_dir = debug_dump_config
1708 .dump_file_directory
1709 .as_ref()
1710 .cloned()
1711 .unwrap_or(std::env::temp_dir());
1712 let epoch_store = self.load_epoch_store_one_call_per_task();
1713
1714 NodeStateDump::new(
1715 tx_digest,
1716 effects,
1717 expected_effects_digest,
1718 self.get_object_store().as_ref(),
1719 &epoch_store,
1720 inner_temporary_store,
1721 certificate,
1722 )?
1723 .write_to_file(&dump_dir)
1724 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1725 }
1726
1727 #[instrument(level = "trace", skip_all)]
1728 pub(crate) fn process_certificate(
1729 &self,
1730 tx_guard: &CertTxGuard,
1731 execution_guard: &ExecutionLockReadGuard<'_>,
1732 certificate: &VerifiedExecutableTransaction,
1733 execution_env: ExecutionEnv,
1734 epoch_store: &Arc<AuthorityPerEpochStore>,
1735 ) -> ExecutionOutput<(
1736 TransactionOutputs,
1737 Vec<ExecutionTiming>,
1738 Option<ExecutionError>,
1739 )> {
1740 let _scope = monitored_scope("Execution::process_certificate");
1741 let tx_digest = *certificate.digest();
1742
1743 let input_objects = match self.read_objects_for_execution(
1744 tx_guard.as_lock_guard(),
1745 certificate,
1746 &execution_env.assigned_versions,
1747 epoch_store,
1748 ) {
1749 Ok(objects) => objects,
1750 Err(e) => return ExecutionOutput::Fatal(e),
1751 };
1752
1753 let expected_effects_digest = match execution_env.expected_effects_digest {
1754 Some(expected_effects_digest) => Some(expected_effects_digest),
1755 None => {
1756 match epoch_store.get_signed_effects_digest(&tx_digest) {
1761 Ok(digest) => digest,
1762 Err(e) => return ExecutionOutput::Fatal(e),
1763 }
1764 }
1765 };
1766
1767 fail_point_if!("correlated-crash-process-certificate", || {
1768 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1769 sui_simulator::task::kill_current_node(None);
1770 }
1771 });
1772
1773 self.execute_certificate(
1778 execution_guard,
1779 certificate,
1780 input_objects,
1781 expected_effects_digest,
1782 execution_env,
1783 epoch_store,
1784 )
1785 }
1786
1787 pub async fn reconfigure_traffic_control(
1788 &self,
1789 params: TrafficControlReconfigParams,
1790 ) -> Result<TrafficControlReconfigParams, SuiError> {
1791 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1792 traffic_controller.admin_reconfigure(params).await
1793 } else {
1794 Err(SuiErrorKind::InvalidAdminRequest(
1795 "Traffic controller is not configured on this node".to_string(),
1796 )
1797 .into())
1798 }
1799 }
1800
1801 #[instrument(level = "trace", skip_all)]
1802 fn commit_certificate(
1803 &self,
1804 certificate: &VerifiedExecutableTransaction,
1805 transaction_outputs: Arc<TransactionOutputs>,
1806 epoch_store: &Arc<AuthorityPerEpochStore>,
1807 ) -> SuiResult {
1808 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1809 monitored_scope("Execution::commit_certificate");
1810 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1811
1812 let tx_digest = certificate.digest();
1813
1814 epoch_store.insert_executed_in_epoch(tx_digest);
1817 let key = certificate.key();
1818 if !matches!(key, TransactionKey::Digest(_)) {
1819 epoch_store.insert_tx_key(key, *tx_digest)?;
1820 }
1821
1822 fail_point!("crash");
1824
1825 self.get_cache_writer()
1826 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1827
1828 if let Some(settlement_key) = certificate
1833 .transaction_data()
1834 .kind()
1835 .accumulator_barrier_settlement_key()
1836 {
1837 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1838 }
1839
1840 if certificate.transaction_data().is_end_of_epoch_tx() {
1841 self.get_object_cache_reader()
1844 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1845 }
1846
1847 Ok(())
1848 }
1849
1850 fn update_metrics(
1851 &self,
1852 certificate: &VerifiedExecutableTransaction,
1853 inner_temporary_store: &InnerTemporaryStore,
1854 effects: &TransactionEffects,
1855 ) {
1856 if certificate.has_zklogin_sig() {
1858 self.metrics.zklogin_sig_count.inc();
1859 } else if certificate.has_upgraded_multisig() {
1860 self.metrics.multisig_sig_count.inc();
1861 }
1862
1863 self.metrics.total_effects.inc();
1864 self.metrics.total_certs.inc();
1865
1866 let consensus_object_count = effects.input_consensus_objects().len();
1867 if consensus_object_count > 0 {
1868 self.metrics.shared_obj_tx.inc();
1869 }
1870
1871 if certificate.is_sponsored_tx() {
1872 self.metrics.sponsored_tx.inc();
1873 }
1874
1875 let input_object_count = inner_temporary_store.input_objects.len();
1876 self.metrics
1877 .num_input_objs
1878 .observe(input_object_count as f64);
1879 self.metrics
1880 .num_shared_objects
1881 .observe(consensus_object_count as f64);
1882 self.metrics.batch_size.observe(
1883 certificate
1884 .data()
1885 .intent_message()
1886 .value
1887 .kind()
1888 .num_commands() as f64,
1889 );
1890 }
1891
1892 fn execute_transaction_to_effects(
1896 &self,
1897 executor: &dyn Executor,
1898 store: &dyn BackingStore,
1899 protocol_config: &ProtocolConfig,
1900 enable_expensive_checks: bool,
1901 execution_params: ExecutionOrEarlyError,
1902 epoch_id: &EpochId,
1903 epoch_timestamp_ms: u64,
1904 input_objects: CheckedInputObjects,
1905 gas_data: GasData,
1906 gas_status: SuiGasStatus,
1907 sender: SuiAddress,
1908 mut kind: TransactionKind,
1909 signer: SuiAddress,
1910 tx_digest: TransactionDigest,
1911 accumulator_version: Option<SequenceNumber>,
1912 ) -> (
1913 InnerTemporaryStore,
1914 SuiGasStatus,
1915 TransactionEffects,
1916 Vec<ExecutionTiming>,
1917 Result<(), ExecutionError>,
1918 ) {
1919 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
1920 self.chain_identifier,
1921 &*self.coin_reservation_resolver,
1922 sender,
1923 &mut kind,
1924 accumulator_version,
1925 );
1926
1927 let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1928 .execute_transaction_to_effects_and_execution_error(
1930 store,
1931 protocol_config,
1932 self.metrics.execution_metrics.clone(),
1933 enable_expensive_checks,
1934 execution_params,
1935 epoch_id,
1936 epoch_timestamp_ms,
1937 input_objects,
1938 gas_data,
1939 gas_status,
1940 kind,
1941 rewritten_inputs,
1942 signer,
1943 tx_digest,
1944 &mut None,
1945 );
1946
1947 (
1948 inner_temp_store,
1949 gas_status,
1950 effects,
1951 timings,
1952 execution_error,
1953 )
1954 }
1955
1956 #[instrument(level = "trace", skip_all)]
1965 fn execute_certificate(
1966 &self,
1967 _execution_guard: &ExecutionLockReadGuard<'_>,
1968 certificate: &VerifiedExecutableTransaction,
1969 input_objects: InputObjects,
1970 expected_effects_digest: Option<TransactionEffectsDigest>,
1971 execution_env: ExecutionEnv,
1972 epoch_store: &Arc<AuthorityPerEpochStore>,
1973 ) -> ExecutionOutput<(
1974 TransactionOutputs,
1975 Vec<ExecutionTiming>,
1976 Option<ExecutionError>,
1977 )> {
1978 let _scope = monitored_scope("Execution::prepare_certificate");
1979 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1980 let prepare_certificate_start_time = tokio::time::Instant::now();
1981
1982 let tx_data = certificate.data().transaction_data();
1984
1985 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1986 return ExecutionOutput::Fatal(e);
1987 }
1988
1989 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1993 certificate,
1994 input_objects,
1995 epoch_store.protocol_config(),
1996 epoch_store.reference_gas_price(),
1997 ) {
1998 Ok(result) => result,
1999 Err(e) => return ExecutionOutput::Fatal(e),
2000 };
2001
2002 let owned_object_refs = input_objects.inner().filter_owned_objects();
2003 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2004 return ExecutionOutput::Fatal(e);
2005 }
2006 let tx_digest = *certificate.digest();
2007 let protocol_config = epoch_store.protocol_config();
2008 let transaction_data = &certificate.data().intent_message().value;
2009 let sender = transaction_data.sender();
2010 let (kind, signer, gas_data) = transaction_data.execution_parts();
2011 let early_execution_error = get_early_execution_error(
2012 &tx_digest,
2013 &input_objects,
2014 self.config.certificate_deny_config.certificate_deny_set(),
2015 &execution_env.funds_withdraw_status,
2016 );
2017 let execution_params = match early_execution_error {
2018 Some(error) => ExecutionOrEarlyError::Err(error),
2019 None => ExecutionOrEarlyError::Ok(()),
2020 };
2021
2022 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2023
2024 #[allow(unused_mut)]
2025 let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2026 .execute_transaction_to_effects(
2027 &**epoch_store.executor(),
2028 &tracking_store,
2029 protocol_config,
2030 self.config
2033 .expensive_safety_check_config
2034 .enable_deep_per_tx_sui_conservation_check(),
2035 execution_params,
2036 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2037 epoch_store
2038 .epoch_start_config()
2039 .epoch_data()
2040 .epoch_start_timestamp(),
2041 input_objects,
2042 gas_data,
2043 gas_status,
2044 sender,
2045 kind,
2046 signer,
2047 tx_digest,
2048 execution_env.assigned_versions.accumulator_version,
2049 );
2050
2051 let object_funds_checker = self.object_funds_checker.load();
2052 if let Some(object_funds_checker) = object_funds_checker.as_ref()
2053 && !object_funds_checker.should_commit_object_funds_withdraws(
2054 certificate,
2055 effects.status(),
2056 &inner_temp_store.accumulator_running_max_withdraws,
2057 &execution_env,
2058 self.get_account_funds_read(),
2059 &self.execution_scheduler,
2060 epoch_store,
2061 )
2062 {
2063 assert_reachable!("retry object withdraw later");
2064 return ExecutionOutput::RetryLater;
2065 }
2066
2067 if let Some(expected_effects_digest) = expected_effects_digest
2068 && effects.digest() != expected_effects_digest
2069 {
2070 match self.debug_dump_transaction_state(
2072 &tx_digest,
2073 &effects,
2074 expected_effects_digest,
2075 &inner_temp_store,
2076 certificate,
2077 &self.config.state_debug_dump_config,
2078 ) {
2079 Ok(out_path) => {
2080 info!(
2081 "Dumped node state for transaction {} to {}",
2082 tx_digest,
2083 out_path.as_path().display().to_string()
2084 );
2085 }
2086 Err(e) => {
2087 error!("Error dumping state for transaction {}: {e}", tx_digest);
2088 }
2089 }
2090 let expected_effects = self
2091 .get_transaction_cache_reader()
2092 .get_effects(&expected_effects_digest);
2093 error!(
2094 ?tx_digest,
2095 ?expected_effects_digest,
2096 actual_effects = ?effects,
2097 expected_effects = ?expected_effects,
2098 "fork detected!"
2099 );
2100 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2101 tx_digest,
2102 expected_effects_digest,
2103 effects.digest(),
2104 ) {
2105 error!("Failed to record transaction fork: {e}");
2106 }
2107
2108 fail_point_if!("kill_transaction_fork_node", || {
2109 #[cfg(msim)]
2110 {
2111 tracing::error!(
2112 fatal = true,
2113 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2114 tx_digest
2115 );
2116 sui_simulator::task::shutdown_current_node();
2117 }
2118 });
2119
2120 fatal!(
2121 "Transaction {} is expected to have effects digest {}, but got {}!",
2122 tx_digest,
2123 expected_effects_digest,
2124 effects.digest()
2125 );
2126 }
2127
2128 fail_point_arg!("simulate_fork_during_execution", |(
2129 forked_validators,
2130 full_halt,
2131 effects_overrides,
2132 fork_probability,
2133 ): (
2134 std::sync::Arc<
2135 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2136 >,
2137 bool,
2138 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2139 f32,
2140 )| {
2141 #[cfg(msim)]
2142 self.simulate_fork_during_execution(
2143 certificate,
2144 epoch_store,
2145 &mut effects,
2146 forked_validators,
2147 full_halt,
2148 effects_overrides,
2149 fork_probability,
2150 );
2151 });
2152
2153 let unchanged_loaded_runtime_objects =
2154 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2155 certificate.transaction_data(),
2156 &effects,
2157 &tracking_store.into_read_objects(),
2158 );
2159
2160 let _ = self
2162 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2163 .tap_err(|e| {
2164 self.metrics.post_processing_total_failures.inc();
2165 error!(?tx_digest, "tx post processing failed: {e}");
2166 });
2167
2168 self.update_metrics(certificate, &inner_temp_store, &effects);
2169
2170 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2171 certificate.clone().into_unsigned(),
2172 effects,
2173 inner_temp_store,
2174 unchanged_loaded_runtime_objects,
2175 );
2176
2177 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2178 if elapsed > 0.0 {
2179 self.metrics.prepare_cert_gas_latency_ratio.observe(
2180 transaction_outputs
2181 .effects
2182 .gas_cost_summary()
2183 .computation_cost as f64
2184 / elapsed,
2185 );
2186 }
2187
2188 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2189 }
2190
2191 pub fn prepare_certificate_for_benchmark(
2192 &self,
2193 certificate: &VerifiedExecutableTransaction,
2194 input_objects: InputObjects,
2195 epoch_store: &Arc<AuthorityPerEpochStore>,
2196 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2197 let lock = RwLock::new(epoch_store.epoch());
2198 let execution_guard = lock.try_read().unwrap();
2199
2200 let (transaction_outputs, _timings, execution_error_opt) = self
2201 .execute_certificate(
2202 &execution_guard,
2203 certificate,
2204 input_objects,
2205 None,
2206 ExecutionEnv::default(),
2207 epoch_store,
2208 )
2209 .unwrap();
2210 Ok((transaction_outputs, execution_error_opt))
2211 }
2212
2213 #[instrument(skip_all)]
2214 #[allow(clippy::type_complexity)]
2215 pub async fn dry_exec_transaction(
2216 &self,
2217 transaction: TransactionData,
2218 transaction_digest: TransactionDigest,
2219 ) -> SuiResult<(
2220 DryRunTransactionBlockResponse,
2221 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2222 TransactionEffects,
2223 Option<ObjectID>,
2224 )> {
2225 let epoch_store = self.load_epoch_store_one_call_per_task();
2226 if !self.is_fullnode(&epoch_store) {
2227 return Err(SuiErrorKind::UnsupportedFeatureError {
2228 error: "dry-exec is only supported on fullnodes".to_string(),
2229 }
2230 .into());
2231 }
2232
2233 if transaction.kind().is_system_tx() {
2234 return Err(SuiErrorKind::UnsupportedFeatureError {
2235 error: "dry-exec does not support system transactions".to_string(),
2236 }
2237 .into());
2238 }
2239
2240 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2241 }
2242
2243 #[allow(clippy::type_complexity)]
2244 pub fn dry_exec_transaction_for_benchmark(
2245 &self,
2246 transaction: TransactionData,
2247 transaction_digest: TransactionDigest,
2248 ) -> SuiResult<(
2249 DryRunTransactionBlockResponse,
2250 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2251 TransactionEffects,
2252 Option<ObjectID>,
2253 )> {
2254 let epoch_store = self.load_epoch_store_one_call_per_task();
2255 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2256 }
2257
2258 #[allow(clippy::type_complexity)]
2259 fn dry_exec_transaction_impl(
2260 &self,
2261 epoch_store: &AuthorityPerEpochStore,
2262 transaction: TransactionData,
2263 transaction_digest: TransactionDigest,
2264 ) -> SuiResult<(
2265 DryRunTransactionBlockResponse,
2266 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2267 TransactionEffects,
2268 Option<ObjectID>,
2269 )> {
2270 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2272
2273 let input_object_kinds = transaction.input_objects()?;
2274 let receiving_object_refs = transaction.receiving_objects();
2275
2276 sui_transaction_checks::deny::check_transaction_for_signing(
2277 &transaction,
2278 &[],
2279 &input_object_kinds,
2280 &receiving_object_refs,
2281 &self.config.transaction_deny_config,
2282 self.get_backing_package_store().as_ref(),
2283 )?;
2284
2285 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2286 None,
2288 &input_object_kinds,
2289 &receiving_object_refs,
2290 epoch_store.epoch(),
2291 )?;
2292
2293 let mut gas_data = transaction.gas_data().clone();
2295 let is_gasless =
2296 epoch_store.protocol_config().enable_gasless() && transaction.is_gasless_transaction();
2297 let ((gas_status, checked_input_objects), mock_gas) = if is_gasless {
2298 (
2300 sui_transaction_checks::check_transaction_input(
2301 epoch_store.protocol_config(),
2302 epoch_store.reference_gas_price(),
2303 &transaction,
2304 input_objects,
2305 &receiving_objects,
2306 &self.metrics.bytecode_verifier_metrics,
2307 &self.config.verifier_signing_config,
2308 )?,
2309 None,
2310 )
2311 } else if transaction.gas().is_empty() {
2312 let sender = transaction.sender();
2313 const MIST_TO_SUI: u64 = 1_000_000_000;
2315 const DRY_RUN_SUI: u64 = 1_000_000_000;
2316 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2317 let gas_object_id = ObjectID::random();
2318 let gas_object = Object::new_move(
2319 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2320 Owner::AddressOwner(sender),
2321 TransactionDigest::genesis_marker(),
2322 );
2323 let gas_object_ref = gas_object.compute_object_reference();
2324 gas_data.payment = vec![gas_object_ref];
2325 (
2326 sui_transaction_checks::check_transaction_input_with_given_gas(
2327 epoch_store.protocol_config(),
2328 epoch_store.reference_gas_price(),
2329 &transaction,
2330 input_objects,
2331 receiving_objects,
2332 gas_object,
2333 &self.metrics.bytecode_verifier_metrics,
2334 &self.config.verifier_signing_config,
2335 )?,
2336 Some(gas_object_id),
2337 )
2338 } else {
2339 (
2340 sui_transaction_checks::check_transaction_input(
2341 epoch_store.protocol_config(),
2342 epoch_store.reference_gas_price(),
2343 &transaction,
2344 input_objects,
2345 &receiving_objects,
2346 &self.metrics.bytecode_verifier_metrics,
2347 &self.config.verifier_signing_config,
2348 )?,
2349 None,
2350 )
2351 };
2352
2353 let protocol_config = epoch_store.protocol_config();
2354 let (kind, signer, _) = transaction.execution_parts();
2355
2356 let silent = true;
2357 let executor = sui_execution::executor(protocol_config, silent)
2358 .expect("Creating an executor should not fail here");
2359
2360 let expensive_checks = false;
2361 let early_execution_error = get_early_execution_error(
2362 &transaction_digest,
2363 &checked_input_objects,
2364 self.config.certificate_deny_config.certificate_deny_set(),
2365 &FundsWithdrawStatus::MaybeSufficient,
2371 );
2372 let execution_params = match early_execution_error {
2373 Some(error) => ExecutionOrEarlyError::Err(error),
2374 None => ExecutionOrEarlyError::Ok(()),
2375 };
2376
2377 let (inner_temp_store, _, effects, _timings, execution_error) = self
2378 .execute_transaction_to_effects(
2379 executor.as_ref(),
2380 self.get_backing_store().as_ref(),
2381 protocol_config,
2382 expensive_checks,
2383 execution_params,
2384 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2385 epoch_store
2386 .epoch_start_config()
2387 .epoch_data()
2388 .epoch_start_timestamp(),
2389 checked_input_objects,
2390 gas_data,
2391 gas_status,
2392 transaction.sender(),
2393 kind,
2394 signer,
2395 transaction_digest,
2396 None,
2398 );
2399
2400 let tx_digest = *effects.transaction_digest();
2401
2402 let module_cache =
2403 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2404
2405 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2406 epoch_store.protocol_config(),
2407 Box::new(PackageStoreWithFallback::new(
2408 &inner_temp_store,
2409 self.get_backing_package_store(),
2410 )),
2411 );
2412 let object_changes = Vec::new();
2414
2415 let balance_changes = Vec::new();
2417
2418 let written_with_kind = effects
2419 .created()
2420 .into_iter()
2421 .map(|(oref, _)| (oref, WriteKind::Create))
2422 .chain(
2423 effects
2424 .unwrapped()
2425 .into_iter()
2426 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2427 )
2428 .chain(
2429 effects
2430 .mutated()
2431 .into_iter()
2432 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2433 )
2434 .map(|(oref, kind)| {
2435 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2436 (oref.0, (oref, obj.clone(), kind))
2438 })
2439 .collect();
2440
2441 let execution_error_source = execution_error
2442 .as_ref()
2443 .err()
2444 .and_then(|e| e.source_ref().as_ref().map(|e| e.to_string()));
2445
2446 Ok((
2447 DryRunTransactionBlockResponse {
2448 suggested_gas_price: self
2449 .congestion_tracker
2450 .get_suggested_gas_prices(&transaction),
2451 input: SuiTransactionBlockData::try_from_with_module_cache(
2452 transaction,
2453 &module_cache,
2454 )
2455 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2456 error: format!(
2457 "Failed to convert transaction to SuiTransactionBlockData: {}",
2458 e
2459 ),
2460 })?, effects: effects.clone().try_into()?,
2462 events: SuiTransactionBlockEvents::try_from(
2463 inner_temp_store.events.clone(),
2464 tx_digest,
2465 None,
2466 layout_resolver.as_mut(),
2467 )?,
2468 object_changes,
2469 balance_changes,
2470 execution_error_source,
2471 },
2472 written_with_kind,
2473 effects,
2474 mock_gas,
2475 ))
2476 }
2477
2478 pub fn simulate_transaction(
2479 &self,
2480 mut transaction: TransactionData,
2481 checks: TransactionChecks,
2482 allow_mock_gas_coin: bool,
2483 ) -> SuiResult<SimulateTransactionResult> {
2484 if transaction.kind().is_system_tx() {
2485 return Err(SuiErrorKind::UnsupportedFeatureError {
2486 error: "simulate does not support system transactions".to_string(),
2487 }
2488 .into());
2489 }
2490
2491 let epoch_store = self.load_epoch_store_one_call_per_task();
2492 if !self.is_fullnode(&epoch_store) {
2493 return Err(SuiErrorKind::UnsupportedFeatureError {
2494 error: "simulate is only supported on fullnodes".to_string(),
2495 }
2496 .into());
2497 }
2498
2499 let dev_inspect = checks.disabled();
2500 if dev_inspect && self.config.dev_inspect_disabled {
2501 return Err(SuiErrorKind::UnsupportedFeatureError {
2502 error: "simulate with checks disabled is not allowed on this node".to_string(),
2503 }
2504 .into());
2505 }
2506
2507 let protocol_config = epoch_store.protocol_config();
2510 if !protocol_config.enable_coin_reservation_obj_refs()
2511 && transaction.gas().iter().any(|obj_ref| {
2512 sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2513 })
2514 {
2515 return Err(SuiErrorKind::UnsupportedFeatureError {
2516 error:
2517 "coin reservations in gas payment are not supported at this protocol version"
2518 .to_string(),
2519 }
2520 .into());
2521 }
2522
2523 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2525
2526 let input_object_kinds = transaction.input_objects()?;
2527 let receiving_object_refs = transaction.receiving_objects();
2528
2529 let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2534 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2535 {
2536 let obj = Object::new_move(
2537 MoveObject::new_gas_coin(
2538 OBJECT_START_VERSION,
2539 ObjectID::MAX,
2540 DEV_INSPECT_GAS_COIN_VALUE,
2541 ),
2542 Owner::AddressOwner(transaction.gas_data().owner),
2543 TransactionDigest::genesis_marker(),
2544 );
2545 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2546 Some(obj)
2547 } else {
2548 None
2549 };
2550
2551 let declared_withdrawals = self.pre_object_load_checks(
2552 &transaction,
2553 &[],
2554 &input_object_kinds,
2555 &receiving_object_refs,
2556 epoch_store.protocol_config(),
2557 )?;
2558 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2559
2560 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2561 None,
2563 &input_object_kinds,
2564 &receiving_object_refs,
2565 epoch_store.epoch(),
2566 )?;
2567
2568 let mock_gas_id = mock_gas_object.map(|obj| {
2570 let id = obj.id();
2571 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2572 id
2573 });
2574
2575 let protocol_config = epoch_store.protocol_config();
2576
2577 let (gas_status, checked_input_objects) = if dev_inspect {
2578 sui_transaction_checks::check_dev_inspect_input(
2579 protocol_config,
2580 &transaction,
2581 input_objects,
2582 receiving_objects,
2583 epoch_store.reference_gas_price(),
2584 )?
2585 } else {
2586 sui_transaction_checks::check_transaction_input(
2587 epoch_store.protocol_config(),
2588 epoch_store.reference_gas_price(),
2589 &transaction,
2590 input_objects,
2591 &receiving_objects,
2592 &self.metrics.bytecode_verifier_metrics,
2593 &self.config.verifier_signing_config,
2594 )?
2595 };
2596
2597 let executor = sui_execution::executor(
2599 protocol_config,
2600 true, )
2602 .expect("Creating an executor should not fail here");
2603
2604 let (mut kind, signer, gas_data) = transaction.execution_parts();
2605 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2606 self.chain_identifier,
2607 &*self.coin_reservation_resolver,
2608 signer,
2609 &mut kind,
2610 None,
2611 );
2612 let early_execution_error = get_early_execution_error(
2613 &transaction.digest(),
2614 &checked_input_objects,
2615 self.config.certificate_deny_config.certificate_deny_set(),
2616 &FundsWithdrawStatus::MaybeSufficient,
2617 );
2618 let execution_params = match early_execution_error {
2619 Some(error) => ExecutionOrEarlyError::Err(error),
2620 None => ExecutionOrEarlyError::Ok(()),
2621 };
2622
2623 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2624
2625 let cloned_input_objects = checked_input_objects.clone();
2627 let cloned_gas = gas_data.clone();
2628 let cloned_kind = kind.clone();
2629 let tx_digest = transaction.digest();
2630 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2631 let epoch_timestamp_ms = epoch_store
2632 .epoch_start_config()
2633 .epoch_data()
2634 .epoch_start_timestamp();
2635 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2636 &tracking_store,
2637 protocol_config,
2638 self.metrics.execution_metrics.clone(),
2639 false, execution_params,
2641 &epoch_id,
2642 epoch_timestamp_ms,
2643 checked_input_objects,
2644 gas_data,
2645 gas_status,
2646 kind,
2647 rewritten_inputs.clone(),
2648 signer,
2649 tx_digest,
2650 dev_inspect,
2651 );
2652
2653 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2655 let has_insufficient_object_funds = inner_temp_store
2656 .accumulator_running_max_withdraws
2657 .iter()
2658 .filter(|(id, _)| !address_funds.contains(id))
2659 .any(|(id, max_withdraw)| {
2660 let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2661 balance < *max_withdraw
2662 });
2663
2664 if has_insufficient_object_funds {
2665 let retry_gas_status = SuiGasStatus::new(
2666 cloned_gas.budget,
2667 cloned_gas.price,
2668 epoch_store.reference_gas_price(),
2669 protocol_config,
2670 )?;
2671 let (store, _, effects, result) = executor.dev_inspect_transaction(
2672 &tracking_store,
2673 protocol_config,
2674 self.metrics.execution_metrics.clone(),
2675 false,
2676 ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2677 &epoch_id,
2678 epoch_timestamp_ms,
2679 cloned_input_objects,
2680 cloned_gas,
2681 retry_gas_status,
2682 cloned_kind,
2683 rewritten_inputs,
2684 signer,
2685 tx_digest,
2686 dev_inspect,
2687 );
2688 (store, effects, result)
2689 } else {
2690 (inner_temp_store, effects, execution_result)
2691 }
2692 } else {
2693 (inner_temp_store, effects, execution_result)
2694 };
2695
2696 let loaded_runtime_objects = tracking_store.into_read_objects();
2697 let unchanged_loaded_runtime_objects =
2698 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2699 &transaction,
2700 &effects,
2701 &loaded_runtime_objects,
2702 );
2703
2704 let object_set = {
2705 let objects = {
2706 let mut objects = loaded_runtime_objects;
2707
2708 for o in inner_temp_store
2709 .input_objects
2710 .into_values()
2711 .chain(inner_temp_store.written.into_values())
2712 {
2713 objects.insert(o);
2714 }
2715
2716 objects
2717 };
2718
2719 let object_keys = sui_types::storage::get_transaction_object_set(
2720 &transaction,
2721 &effects,
2722 &unchanged_loaded_runtime_objects,
2723 );
2724
2725 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2726 for k in object_keys {
2727 if let Some(o) = objects.get(&k) {
2728 set.insert(o.clone());
2729 }
2730 }
2731
2732 set
2733 };
2734
2735 Ok(SimulateTransactionResult {
2736 objects: object_set,
2737 events: effects.events_digest().map(|_| inner_temp_store.events),
2738 effects,
2739 execution_result,
2740 mock_gas_id,
2741 unchanged_loaded_runtime_objects,
2742 suggested_gas_price: self
2743 .congestion_tracker
2744 .get_suggested_gas_prices(&transaction),
2745 })
2746 }
2747
2748 #[allow(clippy::collapsible_else_if)]
2750 #[instrument(skip_all)]
2751 pub async fn dev_inspect_transaction_block(
2752 &self,
2753 sender: SuiAddress,
2754 transaction_kind: TransactionKind,
2755 gas_price: Option<u64>,
2756 gas_budget: Option<u64>,
2757 gas_sponsor: Option<SuiAddress>,
2758 gas_objects: Option<Vec<ObjectRef>>,
2759 show_raw_txn_data_and_effects: Option<bool>,
2760 skip_checks: Option<bool>,
2761 ) -> SuiResult<DevInspectResults> {
2762 let epoch_store = self.load_epoch_store_one_call_per_task();
2763
2764 if !self.is_fullnode(&epoch_store) {
2765 return Err(SuiErrorKind::UnsupportedFeatureError {
2766 error: "dev-inspect is only supported on fullnodes".to_string(),
2767 }
2768 .into());
2769 }
2770
2771 if transaction_kind.is_system_tx() {
2772 return Err(SuiErrorKind::UnsupportedFeatureError {
2773 error: "system transactions are not supported".to_string(),
2774 }
2775 .into());
2776 }
2777
2778 let skip_checks = skip_checks.unwrap_or(true);
2779 if skip_checks && self.config.dev_inspect_disabled {
2780 return Err(SuiErrorKind::UnsupportedFeatureError {
2781 error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2782 }
2783 .into());
2784 }
2785
2786 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2787 let reference_gas_price = epoch_store.reference_gas_price();
2788 let protocol_config = epoch_store.protocol_config();
2789 let max_tx_gas = protocol_config.max_tx_gas();
2790
2791 let price = gas_price.unwrap_or(reference_gas_price);
2792 let budget = gas_budget.unwrap_or(max_tx_gas);
2793 let owner = gas_sponsor.unwrap_or(sender);
2794 let payment = gas_objects.unwrap_or_default();
2796 let mut transaction = TransactionData::V1(TransactionDataV1 {
2797 kind: transaction_kind.clone(),
2798 sender,
2799 gas_data: GasData {
2800 payment,
2801 owner,
2802 price,
2803 budget,
2804 },
2805 expiration: TransactionExpiration::None,
2806 });
2807
2808 let raw_txn_data = if show_raw_txn_data_and_effects {
2809 bcs::to_bytes(&transaction).map_err(|_| {
2810 SuiErrorKind::TransactionSerializationError {
2811 error: "Failed to serialize transaction during dev inspect".to_string(),
2812 }
2813 })?
2814 } else {
2815 vec![]
2816 };
2817
2818 transaction.validity_check_no_gas_check(protocol_config)?;
2819
2820 let input_object_kinds = transaction.input_objects()?;
2821 let receiving_object_refs = transaction.receiving_objects();
2822
2823 sui_transaction_checks::deny::check_transaction_for_signing(
2824 &transaction,
2825 &[],
2826 &input_object_kinds,
2827 &receiving_object_refs,
2828 &self.config.transaction_deny_config,
2829 self.get_backing_package_store().as_ref(),
2830 )?;
2831
2832 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2833 None,
2835 &input_object_kinds,
2836 &receiving_object_refs,
2837 epoch_store.epoch(),
2838 )?;
2839
2840 let (gas_status, checked_input_objects) = if skip_checks {
2841 if transaction.gas().is_empty() {
2845 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2847 DEV_INSPECT_GAS_COIN_VALUE,
2848 transaction.gas_owner(),
2849 );
2850 let gas_object_ref = dummy_gas_object.compute_object_reference();
2851 transaction.gas_data_mut().payment = vec![gas_object_ref];
2852 input_objects.push(ObjectReadResult::new(
2853 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2854 dummy_gas_object.into(),
2855 ));
2856 }
2857 sui_transaction_checks::check_dev_inspect_input(
2858 protocol_config,
2859 &transaction,
2860 input_objects,
2861 receiving_objects,
2862 reference_gas_price,
2863 )?
2864 } else {
2865 if transaction.gas().is_empty() {
2868 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2870 DEV_INSPECT_GAS_COIN_VALUE,
2871 transaction.gas_owner(),
2872 );
2873 let gas_object_ref = dummy_gas_object.compute_object_reference();
2874 transaction.gas_data_mut().payment = vec![gas_object_ref];
2875 sui_transaction_checks::check_transaction_input_with_given_gas(
2876 epoch_store.protocol_config(),
2877 epoch_store.reference_gas_price(),
2878 &transaction,
2879 input_objects,
2880 receiving_objects,
2881 dummy_gas_object,
2882 &self.metrics.bytecode_verifier_metrics,
2883 &self.config.verifier_signing_config,
2884 )?
2885 } else {
2886 sui_transaction_checks::check_transaction_input(
2887 epoch_store.protocol_config(),
2888 epoch_store.reference_gas_price(),
2889 &transaction,
2890 input_objects,
2891 &receiving_objects,
2892 &self.metrics.bytecode_verifier_metrics,
2893 &self.config.verifier_signing_config,
2894 )?
2895 }
2896 };
2897
2898 let executor = sui_execution::executor(protocol_config, true)
2899 .expect("Creating an executor should not fail here");
2900 let gas_data = transaction.gas_data().clone();
2901 let intent_msg = IntentMessage::new(
2902 Intent {
2903 version: IntentVersion::V0,
2904 scope: IntentScope::TransactionData,
2905 app_id: AppId::Sui,
2906 },
2907 transaction,
2908 );
2909 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2910 let early_execution_error = get_early_execution_error(
2911 &transaction_digest,
2912 &checked_input_objects,
2913 self.config.certificate_deny_config.certificate_deny_set(),
2914 &FundsWithdrawStatus::MaybeSufficient,
2916 );
2917 let execution_params = match early_execution_error {
2918 Some(error) => ExecutionOrEarlyError::Err(error),
2919 None => ExecutionOrEarlyError::Ok(()),
2920 };
2921
2922 let mut transaction_kind = transaction_kind;
2923 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2924 self.chain_identifier,
2925 &*self.coin_reservation_resolver,
2926 sender,
2927 &mut transaction_kind,
2928 None,
2929 );
2930 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2931 self.get_backing_store().as_ref(),
2932 protocol_config,
2933 self.metrics.execution_metrics.clone(),
2934 false,
2935 execution_params,
2936 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2937 epoch_store
2938 .epoch_start_config()
2939 .epoch_data()
2940 .epoch_start_timestamp(),
2941 checked_input_objects,
2942 gas_data,
2943 gas_status,
2944 transaction_kind,
2945 rewritten_inputs,
2946 sender,
2947 transaction_digest,
2948 skip_checks,
2949 );
2950
2951 let raw_effects = if show_raw_txn_data_and_effects {
2952 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2953 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2954 })?
2955 } else {
2956 vec![]
2957 };
2958
2959 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2960 epoch_store.protocol_config(),
2961 Box::new(PackageStoreWithFallback::new(
2962 &inner_temp_store,
2963 self.get_backing_package_store(),
2964 )),
2965 );
2966
2967 DevInspectResults::new(
2968 effects,
2969 inner_temp_store.events.clone(),
2970 execution_result,
2971 raw_txn_data,
2972 raw_effects,
2973 layout_resolver.as_mut(),
2974 )
2975 }
2976
2977 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2979 let epoch_store = self.epoch_store_for_testing();
2980 Ok(epoch_store.reference_gas_price())
2981 }
2982
2983 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2984 self.get_transaction_cache_reader()
2985 .is_tx_already_executed(digest)
2986 }
2987
2988 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2989 fn index_tx(
2990 sequence: u64,
2991 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2992 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2993 indexes: &IndexStore,
2994 digest: &TransactionDigest,
2995 cert: &VerifiedExecutableTransaction,
2997 effects: &TransactionEffects,
2998 events: &TransactionEvents,
2999 timestamp_ms: u64,
3000 tx_coins: Option<TxCoins>,
3001 written: &WrittenObjects,
3002 inner_temporary_store: &InnerTemporaryStore,
3003 epoch_store: &Arc<AuthorityPerEpochStore>,
3004 acquire_locks: bool,
3005 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3006 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
3007 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
3008
3009 indexes.index_tx(
3010 sequence,
3011 cert.data().intent_message().value.sender(),
3012 cert.data()
3013 .intent_message()
3014 .value
3015 .input_objects()?
3016 .iter()
3017 .map(|o| o.object_id()),
3018 effects
3019 .all_changed_objects()
3020 .into_iter()
3021 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
3022 cert.data()
3023 .intent_message()
3024 .value
3025 .move_calls()
3026 .into_iter()
3027 .map(|(_cmd_idx, package, module, function)| {
3028 (*package, module.to_owned(), function.to_owned())
3029 }),
3030 events,
3031 changes,
3032 digest,
3033 timestamp_ms,
3034 tx_coins,
3035 effects.accumulator_events(),
3036 acquire_locks,
3037 )
3038 }
3039
3040 #[cfg(msim)]
3041 fn simulate_fork_during_execution(
3042 &self,
3043 certificate: &VerifiedExecutableTransaction,
3044 epoch_store: &Arc<AuthorityPerEpochStore>,
3045 effects: &mut TransactionEffects,
3046 forked_validators: std::sync::Arc<
3047 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3048 >,
3049 full_halt: bool,
3050 effects_overrides: std::sync::Arc<
3051 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3052 >,
3053 fork_probability: f32,
3054 ) {
3055 use std::cell::RefCell;
3056 thread_local! {
3057 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3058 }
3059 if !certificate.data().intent_message().value.is_system_tx() {
3060 let committee = epoch_store.committee();
3061 let cur_stake = (**committee).weight(&self.name);
3062 if cur_stake > 0 {
3063 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3064 let already_forked = forked_validators
3065 .lock()
3066 .ok()
3067 .map(|set| set.contains(&self.name))
3068 .unwrap_or(false);
3069
3070 if !already_forked {
3071 let should_fork = if full_halt {
3072 *total_stake <= committee.validity_threshold()
3074 } else {
3075 *total_stake + cur_stake < committee.validity_threshold()
3077 };
3078
3079 if should_fork {
3080 *total_stake += cur_stake;
3081
3082 if let Ok(mut external_set) = forked_validators.lock() {
3083 external_set.insert(self.name);
3084 info!("forked_validators: {:?}", external_set);
3085 }
3086 }
3087 }
3088
3089 if let Ok(external_set) = forked_validators.lock() {
3090 if external_set.contains(&self.name) {
3091 let tx_digest = certificate.digest().to_string();
3097 if let Ok(mut overrides) = effects_overrides.lock() {
3098 if overrides.contains_key(&tx_digest)
3099 || overrides.is_empty()
3100 && sui_simulator::random::deterministic_probability(
3101 &tx_digest,
3102 fork_probability,
3103 )
3104 {
3105 let original_effects_digest = effects.digest().to_string();
3106 overrides
3107 .insert(tx_digest.clone(), original_effects_digest.clone());
3108 info!(
3109 ?tx_digest,
3110 ?original_effects_digest,
3111 "Captured forked effects digest for transaction"
3112 );
3113 effects.gas_cost_summary_mut_for_testing().computation_cost +=
3114 1;
3115 }
3116 }
3117 }
3118 }
3119 });
3120 }
3121 }
3122 }
3123
3124 fn process_object_index(
3125 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3126 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3127 effects: &TransactionEffects,
3128 written: &WrittenObjects,
3129 inner_temporary_store: &InnerTemporaryStore,
3130 epoch_store: &Arc<AuthorityPerEpochStore>,
3131 ) -> SuiResult<ObjectIndexChanges> {
3132 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3133 epoch_store.protocol_config(),
3134 Box::new(PackageStoreWithFallback::new(
3135 inner_temporary_store,
3136 backing_package_store,
3137 )),
3138 );
3139
3140 let modified_at_version = effects
3141 .modified_at_versions()
3142 .into_iter()
3143 .collect::<HashMap<_, _>>();
3144
3145 let tx_digest = effects.transaction_digest();
3146 let mut deleted_owners = vec![];
3147 let mut deleted_dynamic_fields = vec![];
3148 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3149 let old_version = modified_at_version.get(&id).unwrap();
3150 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3153 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3154 ) {
3155 Owner::AddressOwner(addr)
3156 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3157 Owner::ObjectOwner(object_id) => {
3158 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3159 }
3160 _ => {}
3161 }
3162 }
3163
3164 let mut new_owners = vec![];
3165 let mut new_dynamic_fields = vec![];
3166
3167 for (oref, owner, kind) in effects.all_changed_objects() {
3168 let id = &oref.0;
3169 if let WriteKind::Mutate = kind {
3171 let Some(old_version) = modified_at_version.get(id) else {
3172 panic!(
3173 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3174 tx_digest
3175 );
3176 };
3177 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3180 panic!(
3181 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3182 tx_digest, id, old_version
3183 );
3184 };
3185 if old_object.owner != owner {
3186 match old_object.owner {
3187 Owner::AddressOwner(addr)
3188 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3189 deleted_owners.push((addr, *id));
3190 }
3191 Owner::ObjectOwner(object_id) => {
3192 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3193 }
3194 _ => {}
3195 }
3196 }
3197 }
3198
3199 match owner {
3200 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3201 let new_object = written.get(id).unwrap_or_else(
3203 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3204 );
3205 assert_eq!(
3206 new_object.version(),
3207 oref.1,
3208 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3209 tx_digest,
3210 id,
3211 new_object.version(),
3212 oref.1
3213 );
3214
3215 let type_ = new_object
3216 .type_()
3217 .map(|type_| ObjectType::Struct(type_.clone()))
3218 .unwrap_or(ObjectType::Package);
3219
3220 new_owners.push((
3221 (addr, *id),
3222 ObjectInfo {
3223 object_id: *id,
3224 version: oref.1,
3225 digest: oref.2,
3226 type_,
3227 owner,
3228 previous_transaction: *effects.transaction_digest(),
3229 },
3230 ));
3231 }
3232 Owner::ObjectOwner(owner) => {
3233 let new_object = written.get(id).unwrap_or_else(
3234 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3235 );
3236 assert_eq!(
3237 new_object.version(),
3238 oref.1,
3239 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3240 tx_digest,
3241 id,
3242 new_object.version(),
3243 oref.1
3244 );
3245
3246 let Some(df_info) = Self::try_create_dynamic_field_info(
3247 object_store,
3248 new_object,
3249 written,
3250 layout_resolver.as_mut(),
3251 )
3252 .unwrap_or_else(|e| {
3253 error!(
3254 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3255 e, new_object
3256 );
3257 None
3258 }) else {
3259 continue;
3261 };
3262 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3263 }
3264 _ => {}
3265 }
3266 }
3267
3268 Ok(ObjectIndexChanges {
3269 deleted_owners,
3270 deleted_dynamic_fields,
3271 new_owners,
3272 new_dynamic_fields,
3273 })
3274 }
3275
3276 fn try_create_dynamic_field_info(
3277 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3278 o: &Object,
3279 written: &WrittenObjects,
3280 resolver: &mut dyn LayoutResolver,
3281 ) -> SuiResult<Option<DynamicFieldInfo>> {
3282 let Some(move_object) = o.data.try_as_move().cloned() else {
3284 return Ok(None);
3285 };
3286
3287 if !move_object.type_().is_dynamic_field() {
3289 return Ok(None);
3290 }
3291
3292 let layout = resolver
3293 .get_annotated_layout(&move_object.type_().clone().into())?
3294 .into_layout();
3295
3296 let field =
3297 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3298 SuiErrorKind::ObjectDeserializationError {
3299 error: e.to_string(),
3300 }
3301 })?;
3302
3303 let type_ = field.kind;
3304 let name_type: TypeTag = field.name_layout.into();
3305 let bcs_name = field.name_bytes.to_owned();
3306
3307 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3308 .map_err(|e| {
3309 warn!("{e}");
3310 SuiErrorKind::ObjectDeserializationError {
3311 error: e.to_string(),
3312 }
3313 })?;
3314
3315 let name = DynamicFieldName {
3316 type_: name_type,
3317 value: SuiMoveValue::from(name_value).to_json_value(),
3318 };
3319
3320 let value_metadata = field.value_metadata().map_err(|e| {
3321 warn!("{e}");
3322 SuiErrorKind::ObjectDeserializationError {
3323 error: e.to_string(),
3324 }
3325 })?;
3326
3327 Ok(Some(match value_metadata {
3328 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3329 name,
3330 bcs_name,
3331 type_,
3332 object_type: object_type.to_canonical_string(true),
3333 object_id: o.id(),
3334 version: o.version(),
3335 digest: o.digest(),
3336 },
3337
3338 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3339 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3343 let version = object.version();
3344 let digest = object.digest();
3345 let object_type = object.data.type_().unwrap().clone();
3346 (version, digest, object_type)
3347 } else {
3348 let object = object_store
3350 .get_object_by_key(&object_id, o.version())
3351 .ok_or_else(|| UserInputError::ObjectNotFound {
3352 object_id,
3353 version: Some(o.version()),
3354 })?;
3355 let version = object.version();
3356 let digest = object.digest();
3357 let object_type = object.data.type_().unwrap().clone();
3358 (version, digest, object_type)
3359 };
3360
3361 DynamicFieldInfo {
3362 name,
3363 bcs_name,
3364 type_,
3365 object_type: object_type.to_string(),
3366 object_id,
3367 version,
3368 digest,
3369 }
3370 }
3371 }))
3372 }
3373
3374 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3375 fn post_process_one_tx(
3376 &self,
3377 certificate: &VerifiedExecutableTransaction,
3378 effects: &TransactionEffects,
3379 inner_temporary_store: &InnerTemporaryStore,
3380 epoch_store: &Arc<AuthorityPerEpochStore>,
3381 ) -> SuiResult {
3382 let Some(indexes) = &self.indexes else {
3383 return Ok(());
3384 };
3385
3386 let tx_digest = *certificate.digest();
3387
3388 let sequence = indexes.allocate_sequence_number();
3390
3391 if self.config.sync_post_process_one_tx {
3392 let result = Self::post_process_one_tx_impl(
3397 sequence,
3398 indexes,
3399 &self.subscription_handler,
3400 &self.metrics,
3401 self.name,
3402 self.get_backing_package_store(),
3403 self.get_object_store(),
3404 certificate,
3405 effects,
3406 inner_temporary_store,
3407 epoch_store,
3408 true, );
3410
3411 match result {
3412 Ok((raw_batch, cache_updates_with_locks)) => {
3413 let mut db_batch = indexes.new_db_batch();
3414 db_batch
3415 .concat(vec![raw_batch])
3416 .expect("failed to absorb raw index batch");
3417 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3419 cache_updates_with_locks;
3420 indexes
3421 .commit_index_batch(db_batch, vec![inner])
3422 .expect("failed to commit index batch");
3423 }
3424 Err(e) => {
3425 self.metrics.post_processing_total_failures.inc();
3426 error!(?tx_digest, "tx post processing failed: {e}");
3427 return Err(e);
3428 }
3429 }
3430
3431 return Ok(());
3432 }
3433
3434 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3435 self.pending_post_processing.insert(tx_digest, done_rx);
3436
3437 let indexes = indexes.clone();
3438 let subscription_handler = self.subscription_handler.clone();
3439 let metrics = self.metrics.clone();
3440 let name = self.name;
3441 let backing_package_store = self.get_backing_package_store().clone();
3442 let object_store = self.get_object_store().clone();
3443 let semaphore = self.post_processing_semaphore.clone();
3444
3445 let certificate = certificate.clone();
3446 let effects = effects.clone();
3447 let inner_temporary_store = inner_temporary_store.clone();
3448 let epoch_store = epoch_store.clone();
3449
3450 tokio::spawn(async move {
3452 let permit = {
3453 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3454 semaphore
3455 .acquire_owned()
3456 .await
3457 .expect("post-processing semaphore should not be closed")
3458 };
3459
3460 let _ = tokio::task::spawn_blocking(move || {
3461 let _permit = permit;
3462
3463 let result = Self::post_process_one_tx_impl(
3464 sequence,
3465 &indexes,
3466 &subscription_handler,
3467 &metrics,
3468 name,
3469 &backing_package_store,
3470 &object_store,
3471 &certificate,
3472 &effects,
3473 &inner_temporary_store,
3474 &epoch_store,
3475 false, );
3477
3478 match result {
3479 Ok((raw_batch, cache_updates_with_locks)) => {
3480 fail_point!("crash-after-post-process-one-tx");
3481 let output = (raw_batch, cache_updates_with_locks.into_inner());
3482 let _ = done_tx.send(output);
3483 }
3484 Err(e) => {
3485 metrics.post_processing_total_failures.inc();
3486 error!(?tx_digest, "tx post processing failed: {e}");
3487 }
3488 }
3489 })
3490 .await;
3491 });
3492
3493 Ok(())
3494 }
3495
3496 fn post_process_one_tx_impl(
3497 sequence: u64,
3498 indexes: &Arc<IndexStore>,
3499 subscription_handler: &Arc<SubscriptionHandler>,
3500 metrics: &Arc<AuthorityMetrics>,
3501 name: AuthorityName,
3502 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3503 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3504 certificate: &VerifiedExecutableTransaction,
3505 effects: &TransactionEffects,
3506 inner_temporary_store: &InnerTemporaryStore,
3507 epoch_store: &Arc<AuthorityPerEpochStore>,
3508 acquire_locks: bool,
3509 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3510 let _scope = monitored_scope("Execution::post_process_one_tx");
3511
3512 let tx_digest = certificate.digest();
3513 let timestamp_ms = Self::unixtime_now_ms();
3514 let events = &inner_temporary_store.events;
3515 let written = &inner_temporary_store.written;
3516 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3517 name,
3518 object_store,
3519 effects,
3520 inner_temporary_store,
3521 epoch_store,
3522 );
3523
3524 let (raw_batch, cache_updates) = Self::index_tx(
3525 sequence,
3526 backing_package_store,
3527 object_store,
3528 indexes,
3529 tx_digest,
3530 certificate,
3531 effects,
3532 events,
3533 timestamp_ms,
3534 tx_coins,
3535 written,
3536 inner_temporary_store,
3537 epoch_store,
3538 acquire_locks,
3539 )
3540 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3541 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3542 .expect("Indexing tx should not fail");
3543
3544 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3545 let events = Self::make_transaction_block_events(
3546 backing_package_store,
3547 events.clone(),
3548 *tx_digest,
3549 timestamp_ms,
3550 epoch_store,
3551 inner_temporary_store,
3552 )?;
3553 subscription_handler
3555 .process_tx(certificate.data().transaction_data(), &effects, &events)
3556 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3557 .tap_err(|e| {
3558 warn!(
3559 ?tx_digest,
3560 "Post processing - Couldn't process events for tx: {}", e
3561 )
3562 })?;
3563
3564 metrics
3565 .post_processing_total_events_emitted
3566 .inc_by(events.data.len() as u64);
3567
3568 Ok((raw_batch, cache_updates))
3569 }
3570
3571 fn make_transaction_block_events(
3572 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3573 transaction_events: TransactionEvents,
3574 digest: TransactionDigest,
3575 timestamp_ms: u64,
3576 epoch_store: &Arc<AuthorityPerEpochStore>,
3577 inner_temporary_store: &InnerTemporaryStore,
3578 ) -> SuiResult<SuiTransactionBlockEvents> {
3579 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3580 epoch_store.protocol_config(),
3581 Box::new(PackageStoreWithFallback::new(
3582 inner_temporary_store,
3583 backing_package_store,
3584 )),
3585 );
3586 SuiTransactionBlockEvents::try_from(
3587 transaction_events,
3588 digest,
3589 Some(timestamp_ms),
3590 layout_resolver.as_mut(),
3591 )
3592 }
3593
3594 pub fn unixtime_now_ms() -> u64 {
3595 let now = SystemTime::now()
3596 .duration_since(UNIX_EPOCH)
3597 .expect("Time went backwards")
3598 .as_millis();
3599 u64::try_from(now).expect("Travelling in time machine")
3600 }
3601
3602 #[instrument(level = "trace", skip_all)]
3606 pub async fn handle_transaction_info_request(
3607 &self,
3608 request: TransactionInfoRequest,
3609 ) -> SuiResult<TransactionInfoResponse> {
3610 let epoch_store = self.load_epoch_store_one_call_per_task();
3611 let (transaction, status) = self
3612 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3613 .ok_or(SuiErrorKind::TransactionNotFound {
3614 digest: request.transaction_digest,
3615 })?;
3616 Ok(TransactionInfoResponse {
3617 transaction,
3618 status,
3619 })
3620 }
3621
3622 #[instrument(level = "trace", skip_all)]
3623 pub async fn handle_object_info_request(
3624 &self,
3625 request: ObjectInfoRequest,
3626 ) -> SuiResult<ObjectInfoResponse> {
3627 let epoch_store = self.load_epoch_store_one_call_per_task();
3628
3629 let requested_object_seq = match request.request_kind {
3630 ObjectInfoRequestKind::LatestObjectInfo => {
3631 let (_, seq, _) = self
3632 .get_object_or_tombstone(request.object_id)
3633 .await
3634 .ok_or_else(|| {
3635 SuiError::from(UserInputError::ObjectNotFound {
3636 object_id: request.object_id,
3637 version: None,
3638 })
3639 })?;
3640 seq
3641 }
3642 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3643 };
3644
3645 let object = self
3646 .get_object_store()
3647 .get_object_by_key(&request.object_id, requested_object_seq)
3648 .ok_or_else(|| {
3649 SuiError::from(UserInputError::ObjectNotFound {
3650 object_id: request.object_id,
3651 version: Some(requested_object_seq),
3652 })
3653 })?;
3654
3655 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3656 (request.generate_layout, object.data.try_as_move())
3657 {
3658 let epoch_store = self.load_epoch_store_one_call_per_task();
3659 Some(into_struct_layout(
3660 epoch_store
3661 .executor()
3662 .type_layout_resolver(
3663 epoch_store.protocol_config(),
3664 Box::new(self.get_backing_package_store().as_ref()),
3665 )
3666 .get_annotated_layout(&move_obj.type_().clone().into())?,
3667 )?)
3668 } else {
3669 None
3670 };
3671
3672 let lock = if !object.is_address_owned() {
3673 None
3675 } else {
3676 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3677 .await?
3678 .map(|s| s.into_inner())
3679 };
3680
3681 Ok(ObjectInfoResponse {
3682 object,
3683 layout,
3684 lock_for_debugging: lock,
3685 })
3686 }
3687
3688 #[instrument(level = "trace", skip_all)]
3689 pub fn handle_checkpoint_request(
3690 &self,
3691 request: &CheckpointRequest,
3692 ) -> SuiResult<CheckpointResponse> {
3693 let summary = match request.sequence_number {
3694 Some(seq) => self
3695 .checkpoint_store
3696 .get_checkpoint_by_sequence_number(seq)?,
3697 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3698 }
3699 .map(|v| v.into_inner());
3700 let contents = match &summary {
3701 Some(s) => self
3702 .checkpoint_store
3703 .get_checkpoint_contents(&s.content_digest)?,
3704 None => None,
3705 };
3706 Ok(CheckpointResponse {
3707 checkpoint: summary,
3708 contents,
3709 })
3710 }
3711
3712 #[instrument(level = "trace", skip_all)]
3713 pub fn handle_checkpoint_request_v2(
3714 &self,
3715 request: &CheckpointRequestV2,
3716 ) -> SuiResult<CheckpointResponseV2> {
3717 let summary = if request.certified {
3718 let summary = match request.sequence_number {
3719 Some(seq) => self
3720 .checkpoint_store
3721 .get_checkpoint_by_sequence_number(seq)?,
3722 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3723 }
3724 .map(|v| v.into_inner());
3725 summary.map(CheckpointSummaryResponse::Certified)
3726 } else {
3727 let summary = match request.sequence_number {
3728 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3729 None => self
3730 .checkpoint_store
3731 .get_latest_locally_computed_checkpoint()?,
3732 };
3733 summary.map(CheckpointSummaryResponse::Pending)
3734 };
3735 let contents = match &summary {
3736 Some(s) => self
3737 .checkpoint_store
3738 .get_checkpoint_contents(&s.content_digest())?,
3739 None => None,
3740 };
3741 Ok(CheckpointResponseV2 {
3742 checkpoint: summary,
3743 contents,
3744 })
3745 }
3746
3747 fn check_protocol_version(
3748 supported_protocol_versions: SupportedProtocolVersions,
3749 current_version: ProtocolVersion,
3750 ) {
3751 info!("current protocol version is now {:?}", current_version);
3752 info!("supported versions are: {:?}", supported_protocol_versions);
3753 if !supported_protocol_versions.is_version_supported(current_version) {
3754 let msg = format!(
3755 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3756 current_version, supported_protocol_versions,
3757 );
3758
3759 error!("{}", msg);
3760 eprintln!("{}", msg);
3761
3762 #[cfg(not(msim))]
3763 std::process::exit(1);
3764
3765 #[cfg(msim)]
3766 sui_simulator::task::shutdown_current_node();
3767 }
3768 }
3769
3770 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3772 pub async fn new(
3773 name: AuthorityName,
3774 secret: StableSyncAuthoritySigner,
3775 supported_protocol_versions: SupportedProtocolVersions,
3776 store: Arc<AuthorityStore>,
3777 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3778 epoch_store: Arc<AuthorityPerEpochStore>,
3779 committee_store: Arc<CommitteeStore>,
3780 indexes: Option<Arc<IndexStore>>,
3781 rpc_index: Option<Arc<RpcIndexStore>>,
3782 checkpoint_store: Arc<CheckpointStore>,
3783 prometheus_registry: &Registry,
3784 genesis_objects: &[Object],
3785 db_checkpoint_config: &DBCheckpointConfig,
3786 config: NodeConfig,
3787 chain_identifier: ChainIdentifier,
3788 policy_config: Option<PolicyConfig>,
3789 firewall_config: Option<RemoteFirewallConfig>,
3790 pruner_watermarks: Arc<PrunerWatermarks>,
3791 ) -> Arc<Self> {
3792 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3793
3794 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3795 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3796 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3797 execution_cache_trait_pointers.object_cache_reader.clone(),
3798 execution_cache_trait_pointers.account_funds_read.clone(),
3799 execution_cache_trait_pointers
3800 .transaction_cache_reader
3801 .clone(),
3802 tx_ready_certificates,
3803 &epoch_store,
3804 config.funds_withdraw_scheduler_type,
3805 metrics.clone(),
3806 prometheus_registry,
3807 ));
3808 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3809
3810 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3811 epoch_store.get_parent_path(),
3812 &config.authority_store_pruning_config,
3813 );
3814 let _pruner = AuthorityStorePruner::new(
3815 store.perpetual_tables.clone(),
3816 checkpoint_store.clone(),
3817 rpc_index.clone(),
3818 indexes.clone(),
3819 config.authority_store_pruning_config.clone(),
3820 epoch_store.committee().authority_exists(&name),
3821 epoch_store.epoch_start_state().epoch_duration_ms(),
3822 prometheus_registry,
3823 pruner_watermarks,
3824 );
3825 let input_loader =
3826 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3827 let epoch = epoch_store.epoch();
3828 let traffic_controller_metrics =
3829 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3830 let traffic_controller = if let Some(policy_config) = policy_config {
3831 Some(Arc::new(
3832 TrafficController::init(
3833 policy_config,
3834 traffic_controller_metrics,
3835 firewall_config.clone(),
3836 )
3837 .await,
3838 ))
3839 } else {
3840 None
3841 };
3842
3843 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3844 ForkRecoveryState::new(Some(fork_config))
3845 .expect("Failed to initialize fork recovery state")
3846 });
3847
3848 let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3849 execution_cache_trait_pointers.child_object_resolver.clone(),
3850 ));
3851
3852 let object_funds_checker_metrics =
3853 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3854 let state = Arc::new(AuthorityState {
3855 name,
3856 secret,
3857 execution_lock: RwLock::new(epoch),
3858 epoch_store: ArcSwap::new(epoch_store.clone()),
3859 input_loader,
3860 execution_cache_trait_pointers,
3861 coin_reservation_resolver,
3862 indexes,
3863 rpc_index,
3864 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3865 checkpoint_store,
3866 committee_store,
3867 execution_scheduler,
3868 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3869 metrics,
3870 _pruner,
3871 _authority_per_epoch_pruner,
3872 db_checkpoint_config: db_checkpoint_config.clone(),
3873 config,
3874 overload_info: AuthorityOverloadInfo::default(),
3875 chain_identifier,
3876 congestion_tracker: Arc::new(CongestionTracker::new()),
3877 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3878 traffic_controller,
3879 fork_recovery_state,
3880 notify_epoch: tokio::sync::watch::channel(epoch).0,
3881 object_funds_checker: ArcSwapOption::empty(),
3882 object_funds_checker_metrics,
3883 pending_post_processing: Arc::new(DashMap::new()),
3884 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3885 });
3886 state.init_object_funds_checker().await;
3887
3888 let state_clone = Arc::downgrade(&state);
3889 spawn_monitored_task!(fix_indexes(state_clone));
3890 let authority_state = Arc::downgrade(&state);
3892 spawn_monitored_task!(execution_process(
3893 authority_state,
3894 rx_ready_certificates,
3895 rx_execution_shutdown,
3896 ));
3897 state
3899 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3900 .expect("Error indexing genesis objects.");
3901
3902 if epoch_store
3903 .protocol_config()
3904 .enable_multi_epoch_transaction_expiration()
3905 && epoch_store.epoch() > 0
3906 {
3907 use typed_store::Map;
3908 let previous_epoch = epoch_store.epoch() - 1;
3909 let start_key = (previous_epoch, TransactionDigest::ZERO);
3910 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3911 let has_previous_epoch_data = store
3912 .perpetual_tables
3913 .executed_transaction_digests
3914 .safe_range_iter(start_key..end_key)
3915 .next()
3916 .is_some();
3917
3918 if !has_previous_epoch_data {
3919 panic!(
3920 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3921 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3922 Please restore from a snapshot using the latest version of sui-tool.",
3923 previous_epoch
3924 );
3925 }
3926 }
3927
3928 state
3929 }
3930
3931 async fn init_object_funds_checker(&self) {
3932 let epoch_store = self.epoch_store.load();
3933 if self.is_validator(&epoch_store)
3934 && epoch_store.protocol_config().enable_object_funds_withdraw()
3935 {
3936 if self.object_funds_checker.load().is_none() {
3937 let inner = self
3938 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3939 .await
3940 .map(|o| {
3941 Arc::new(ObjectFundsChecker::new(
3942 o.version(),
3943 self.object_funds_checker_metrics.clone(),
3944 ))
3945 });
3946 self.object_funds_checker.store(inner);
3947 }
3948 } else {
3949 self.object_funds_checker.store(None);
3950 }
3951 }
3952
3953 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3955 &self.execution_cache_trait_pointers.object_cache_reader
3956 }
3957
3958 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3959 &self.execution_cache_trait_pointers.transaction_cache_reader
3960 }
3961
3962 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3963 &self.execution_cache_trait_pointers.cache_writer
3964 }
3965
3966 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3967 &self.execution_cache_trait_pointers.backing_store
3968 }
3969
3970 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3971 &self.execution_cache_trait_pointers.child_object_resolver
3972 }
3973
3974 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3975 &self.execution_cache_trait_pointers.account_funds_read
3976 }
3977
3978 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3979 &self.execution_cache_trait_pointers.backing_package_store
3980 }
3981
3982 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3983 &self.execution_cache_trait_pointers.object_store
3984 }
3985
3986 pub fn pending_post_processing(
3987 &self,
3988 ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3989 &self.pending_post_processing
3990 }
3991
3992 pub async fn await_post_processing(
3993 &self,
3994 tx_digest: &TransactionDigest,
3995 ) -> Option<PostProcessingOutput> {
3996 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3997 rx.await.ok()
3999 } else {
4000 None
4002 }
4003 }
4004
4005 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
4009 if let Some(indexes) = &self.indexes
4010 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
4011 {
4012 let mut db_batch = indexes.new_db_batch();
4013 db_batch
4014 .concat(vec![raw_batch])
4015 .expect("failed to build index batch");
4016 indexes
4017 .commit_index_batch(db_batch, vec![cache_updates])
4018 .expect("failed to commit index batch");
4019 }
4020 }
4021
4022 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
4023 &self.execution_cache_trait_pointers.reconfig_api
4024 }
4025
4026 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
4027 &self.execution_cache_trait_pointers.global_state_hash_store
4028 }
4029
4030 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
4031 &self.execution_cache_trait_pointers.checkpoint_cache
4032 }
4033
4034 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
4035 &self.execution_cache_trait_pointers.state_sync_store
4036 }
4037
4038 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
4039 &self.execution_cache_trait_pointers.cache_commit
4040 }
4041
4042 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
4043 self.execution_cache_trait_pointers
4044 .testing_api
4045 .database_for_testing()
4046 }
4047
4048 pub fn cache_for_testing(&self) -> &WritebackCache {
4049 self.execution_cache_trait_pointers
4050 .testing_api
4051 .cache_for_testing()
4052 }
4053
4054 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4055 &self,
4056 config: NodeConfig,
4057 metrics: Arc<AuthorityStorePruningMetrics>,
4058 ) -> anyhow::Result<()> {
4059 use crate::authority::authority_store_pruner::PrunerWatermarks;
4060 let watermarks = Arc::new(PrunerWatermarks::default());
4061 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4062 &self.database_for_testing().perpetual_tables,
4063 &self.checkpoint_store,
4064 self.rpc_index.as_deref(),
4065 config.authority_store_pruning_config,
4066 metrics,
4067 EPOCH_DURATION_MS_FOR_TESTING,
4068 &watermarks,
4069 )
4070 .await
4071 }
4072
4073 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4074 &self.execution_scheduler
4075 }
4076
4077 fn create_owner_index_if_empty(
4078 &self,
4079 genesis_objects: &[Object],
4080 epoch_store: &Arc<AuthorityPerEpochStore>,
4081 ) -> SuiResult {
4082 let Some(index_store) = &self.indexes else {
4083 return Ok(());
4084 };
4085 if !index_store.is_empty() {
4086 return Ok(());
4087 }
4088
4089 let mut new_owners = vec![];
4090 let mut new_dynamic_fields = vec![];
4091 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
4092 epoch_store.protocol_config(),
4093 Box::new(self.get_backing_package_store().as_ref()),
4094 );
4095 for o in genesis_objects.iter() {
4096 match o.owner {
4097 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4098 new_owners.push((
4099 (addr, o.id()),
4100 ObjectInfo::new(&o.compute_object_reference(), o),
4101 ))
4102 }
4103 Owner::ObjectOwner(object_id) => {
4104 let id = o.id();
4105 let Some(info) = Self::try_create_dynamic_field_info(
4106 self.get_object_store(),
4107 o,
4108 &BTreeMap::new(),
4109 layout_resolver.as_mut(),
4110 )?
4111 else {
4112 continue;
4113 };
4114 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4115 }
4116 _ => {}
4117 }
4118 }
4119
4120 index_store.insert_genesis_objects(ObjectIndexChanges {
4121 deleted_owners: vec![],
4122 deleted_dynamic_fields: vec![],
4123 new_owners,
4124 new_dynamic_fields,
4125 })
4126 }
4127
4128 pub fn execution_lock_for_executable_transaction(
4132 &self,
4133 transaction: &VerifiedExecutableTransaction,
4134 ) -> Option<ExecutionLockReadGuard<'_>> {
4135 let lock = self.execution_lock.try_read().ok()?;
4136 if *lock == transaction.auth_sig().epoch() {
4137 Some(lock)
4138 } else {
4139 None
4141 }
4142 }
4143
4144 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4147 self.execution_lock
4148 .try_read()
4149 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4150 }
4151
4152 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4153 self.execution_lock.write().await
4154 }
4155
4156 #[instrument(level = "error", skip_all)]
4157 pub async fn reconfigure(
4158 &self,
4159 cur_epoch_store: &AuthorityPerEpochStore,
4160 supported_protocol_versions: SupportedProtocolVersions,
4161 new_committee: Committee,
4162 epoch_start_configuration: EpochStartConfiguration,
4163 state_hasher: Arc<GlobalStateHasher>,
4164 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4165 epoch_last_checkpoint: CheckpointSequenceNumber,
4166 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4167 Self::check_protocol_version(
4168 supported_protocol_versions,
4169 epoch_start_configuration
4170 .epoch_start_state()
4171 .protocol_version(),
4172 );
4173
4174 self.committee_store.insert_new_committee(&new_committee)?;
4175
4176 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4178
4179 cur_epoch_store.epoch_terminated().await;
4181
4182 {
4186 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4187 if state.should_accept_user_certs() {
4188 cur_epoch_store.close_user_certs(state);
4195 }
4196 }
4198
4199 self.get_reconfig_api()
4200 .clear_state_end_of_epoch(&execution_lock);
4201 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4202 self.maybe_reaccumulate_state_hash(
4203 cur_epoch_store,
4204 epoch_start_configuration
4205 .epoch_start_state()
4206 .protocol_version(),
4207 );
4208 self.get_reconfig_api()
4209 .set_epoch_start_configuration(&epoch_start_configuration);
4210 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4211 && self
4212 .db_checkpoint_config
4213 .perform_db_checkpoints_at_epoch_end
4214 {
4215 let checkpoint_indexes = self
4216 .db_checkpoint_config
4217 .perform_index_db_checkpoints_at_epoch_end
4218 .unwrap_or(false);
4219 let current_epoch = cur_epoch_store.epoch();
4220 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4221 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4222 }
4223
4224 self.get_reconfig_api()
4225 .reconfigure_cache(&epoch_start_configuration)
4226 .await;
4227
4228 let new_epoch = new_committee.epoch;
4229 let new_epoch_store = self
4230 .reopen_epoch_db(
4231 cur_epoch_store,
4232 new_committee,
4233 epoch_start_configuration,
4234 expensive_safety_check_config,
4235 epoch_last_checkpoint,
4236 )
4237 .await?;
4238 assert_eq!(new_epoch_store.epoch(), new_epoch);
4239 self.execution_scheduler
4240 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4241 self.init_object_funds_checker().await;
4242 *execution_lock = new_epoch;
4243
4244 self.notify_epoch(new_epoch);
4245 Ok(new_epoch_store)
4249 }
4250
4251 fn notify_epoch(&self, new_epoch: EpochId) {
4252 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4253 }
4254
4255 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4256 let mut rx = self.notify_epoch.subscribe();
4257 loop {
4258 let epoch = *rx.borrow();
4259 if epoch >= target_epoch {
4260 return Ok(epoch);
4261 }
4262 rx.changed().await?;
4263 }
4264 }
4265
4266 pub async fn settle_accumulator_for_testing(
4270 &self,
4271 effects: &[TransactionEffects],
4272 checkpoint_seq: Option<u64>,
4273 ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4274 let accumulator_version = self
4275 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4276 .await
4277 .unwrap()
4278 .version();
4279 let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4281 let builder = AccumulatorSettlementTxBuilder::new(
4282 Some(self.get_transaction_cache_reader().as_ref()),
4283 effects,
4284 ckpt_seq,
4285 0,
4286 );
4287 let balance_changes = builder.collect_funds_changes();
4288 let epoch_store = self.epoch_store_for_testing();
4289 let epoch = epoch_store.epoch();
4290 let accumulator_root_obj_initial_shared_version = epoch_store
4291 .epoch_start_config()
4292 .accumulator_root_obj_initial_shared_version()
4293 .unwrap();
4294 let settlements = builder.build_tx(
4295 epoch_store.protocol_config(),
4296 epoch,
4297 accumulator_root_obj_initial_shared_version,
4298 ckpt_seq,
4299 ckpt_seq,
4300 );
4301
4302 let settlements: Vec<_> = settlements
4303 .into_iter()
4304 .map(|tx| {
4305 VerifiedExecutableTransaction::new_system(
4306 VerifiedTransaction::new_system_transaction(tx),
4307 epoch,
4308 )
4309 })
4310 .collect();
4311
4312 let assigned_versions = epoch_store
4313 .assign_shared_object_versions_for_tests(
4314 self.get_object_cache_reader().as_ref(),
4315 &settlements,
4316 )
4317 .unwrap();
4318 let version_map = assigned_versions.into_map();
4319
4320 let mut replay_txns = Vec::new();
4321 let mut settlement_effects = Vec::with_capacity(settlements.len());
4322 for tx in settlements {
4323 let assigned = version_map.get(&tx.key()).unwrap().clone();
4324 let env = ExecutionEnv::new().with_assigned_versions(assigned);
4325 let (effects, _) = self
4326 .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4327 .await
4328 .unwrap();
4329 assert!(effects.status().is_ok());
4330 replay_txns.push((tx, env));
4331 settlement_effects.push(effects);
4332 }
4333
4334 let barrier = accumulators::build_accumulator_barrier_tx(
4335 epoch,
4336 accumulator_root_obj_initial_shared_version,
4337 ckpt_seq,
4338 &settlement_effects,
4339 );
4340 let barrier = VerifiedExecutableTransaction::new_system(
4341 VerifiedTransaction::new_system_transaction(barrier),
4342 epoch,
4343 );
4344
4345 let assigned_versions = epoch_store
4346 .assign_shared_object_versions_for_tests(
4347 self.get_object_cache_reader().as_ref(),
4348 std::slice::from_ref(&barrier),
4349 )
4350 .unwrap();
4351 let version_map = assigned_versions.into_map();
4352
4353 let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4354 let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4355 let (effects, _) = self
4356 .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4357 .await
4358 .unwrap();
4359 assert!(effects.status().is_ok());
4360 replay_txns.push((barrier, env));
4361
4362 let next_accumulator_version = accumulator_version.next();
4363 self.execution_scheduler
4364 .settle_address_funds(FundsSettlement {
4365 funds_changes: balance_changes,
4366 next_accumulator_version,
4367 });
4368 replay_txns
4371 }
4372
4373 pub async fn replay_settlement_for_testing(
4376 &self,
4377 txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4378 ) {
4379 let epoch_store = self.epoch_store_for_testing();
4380 for (tx, env) in txns {
4381 let (effects, _) = self
4382 .try_execute_immediately(tx, env.clone(), &epoch_store)
4383 .await
4384 .unwrap();
4385 assert!(effects.status().is_ok());
4386 }
4387 }
4388
4389 pub async fn reconfigure_for_testing(&self) {
4393 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4394 let epoch_store = self.epoch_store_for_testing().clone();
4395 let protocol_config = epoch_store.protocol_config().clone();
4396 let _guard =
4403 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4404 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4405 self.get_backing_package_store().clone(),
4406 self.get_object_store().clone(),
4407 &self.config.expensive_safety_check_config,
4408 self.checkpoint_store
4409 .get_epoch_last_checkpoint(epoch_store.epoch())
4410 .unwrap()
4411 .map(|c| *c.sequence_number())
4412 .unwrap_or_default(),
4413 );
4414 self.execution_scheduler
4415 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4416 let new_epoch = new_epoch_store.epoch();
4417 self.epoch_store.store(new_epoch_store);
4418 epoch_store.epoch_terminated().await;
4419
4420 *execution_lock = new_epoch;
4421 }
4422
4423 #[instrument(level = "error", skip_all)]
4426 fn maybe_reaccumulate_state_hash(
4427 &self,
4428 cur_epoch_store: &AuthorityPerEpochStore,
4429 new_protocol_version: ProtocolVersion,
4430 ) {
4431 self.get_reconfig_api()
4432 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4433 }
4434
4435 #[instrument(level = "error", skip_all)]
4436 fn check_system_consistency(
4437 &self,
4438 cur_epoch_store: &AuthorityPerEpochStore,
4439 state_hasher: Arc<GlobalStateHasher>,
4440 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4441 ) {
4442 info!(
4443 "Performing sui conservation consistency check for epoch {}",
4444 cur_epoch_store.epoch()
4445 );
4446
4447 if let Err(err) = self
4448 .get_reconfig_api()
4449 .expensive_check_sui_conservation(cur_epoch_store)
4450 {
4451 if cfg!(debug_assertions) {
4452 panic!("{}", err);
4453 } else {
4454 warn!("Sui conservation consistency check failed: {}", err);
4457 }
4458 } else {
4459 info!("Sui conservation consistency check passed");
4460 }
4461
4462 if expensive_safety_check_config.enable_state_consistency_check() {
4464 info!(
4465 "Performing state consistency check for epoch {}",
4466 cur_epoch_store.epoch()
4467 );
4468 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4469 }
4470
4471 if expensive_safety_check_config.enable_secondary_index_checks()
4472 && let Some(indexes) = self.indexes.clone()
4473 {
4474 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4475 .expect("secondary indexes are inconsistent");
4476 }
4477
4478 if expensive_safety_check_config.enable_secondary_index_checks()
4482 && let Some(indexes) = self.indexes.clone()
4483 {
4484 let epoch = cur_epoch_store.epoch();
4485 let first_checkpoint = if epoch == 0 {
4489 0
4490 } else {
4491 self.checkpoint_store
4492 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4493 .expect("Failed to get previous epoch's last checkpoint")
4494 .expect("Previous epoch's last checkpoint missing")
4495 + 1
4496 };
4497 let highest_executed = self
4498 .checkpoint_store
4499 .get_highest_executed_checkpoint_seq_number()
4500 .expect("Failed to get highest executed checkpoint")
4501 .expect("No executed checkpoints");
4502
4503 info!(
4504 "Verifying checkpointed transactions are in transactions_seq \
4505 (checkpoints {first_checkpoint}..={highest_executed})"
4506 );
4507 for seq in first_checkpoint..=highest_executed {
4508 let checkpoint = self
4509 .checkpoint_store
4510 .get_checkpoint_by_sequence_number(seq)
4511 .expect("Failed to get checkpoint")
4512 .expect("Checkpoint missing");
4513 let contents = self
4514 .checkpoint_store
4515 .get_checkpoint_contents(&checkpoint.content_digest)
4516 .expect("Failed to get checkpoint contents")
4517 .expect("Checkpoint contents missing");
4518 for digests in contents.iter() {
4519 let tx_digest = digests.transaction;
4520 assert!(
4521 indexes
4522 .get_transaction_seq(&tx_digest)
4523 .expect("Failed to read transactions_seq")
4524 .is_some(),
4525 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4526 );
4527 }
4528 }
4529 info!("All checkpointed transactions verified in transactions_seq");
4530 }
4531 }
4532
4533 fn expensive_check_is_consistent_state(
4534 &self,
4535 state_hasher: Arc<GlobalStateHasher>,
4536 cur_epoch_store: &AuthorityPerEpochStore,
4537 ) {
4538 let live_object_set_hash = state_hasher.digest_live_object_set(
4539 !cur_epoch_store
4540 .protocol_config()
4541 .simplified_unwrap_then_delete(),
4542 );
4543
4544 let root_state_hash: ECMHLiveObjectSetDigest = self
4545 .get_global_state_hash_store()
4546 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4547 .expect("Retrieving root state hash cannot fail")
4548 .expect("Root state hash for epoch must exist")
4549 .1
4550 .digest()
4551 .into();
4552
4553 let is_inconsistent = root_state_hash != live_object_set_hash;
4554 if is_inconsistent {
4555 debug_fatal!(
4556 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4557 root_state_hash,
4558 live_object_set_hash
4559 );
4560 } else {
4561 info!("State consistency check passed");
4562 }
4563
4564 state_hasher.set_inconsistent_state(is_inconsistent);
4565 }
4566
4567 pub fn current_epoch_for_testing(&self) -> EpochId {
4568 self.epoch_store_for_testing().epoch()
4569 }
4570
4571 #[instrument(level = "error", skip_all)]
4572 pub fn checkpoint_all_dbs(
4573 &self,
4574 checkpoint_path: &Path,
4575 cur_epoch_store: &AuthorityPerEpochStore,
4576 checkpoint_indexes: bool,
4577 ) -> SuiResult {
4578 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4579 let current_epoch = cur_epoch_store.epoch();
4580
4581 if checkpoint_path.exists() {
4582 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4583 return Ok(());
4584 }
4585
4586 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4587 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4588
4589 if checkpoint_path_tmp.exists() {
4590 fs::remove_dir_all(&checkpoint_path_tmp)
4591 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4592 }
4593
4594 fs::create_dir_all(&checkpoint_path_tmp)
4595 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4596 fs::create_dir(&store_checkpoint_path_tmp)
4597 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4598
4599 self.checkpoint_store
4602 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4603
4604 self.get_reconfig_api()
4605 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4606
4607 self.committee_store
4608 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4609
4610 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4611 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4612 }
4613
4614 fs::rename(checkpoint_path_tmp, checkpoint_path)
4615 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4616 Ok(())
4617 }
4618
4619 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4625 self.epoch_store.load()
4626 }
4627
4628 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4630 self.load_epoch_store_one_call_per_task()
4631 }
4632
4633 pub fn clone_committee_for_testing(&self) -> Committee {
4634 Committee::clone(self.epoch_store_for_testing().committee())
4635 }
4636
4637 #[instrument(level = "trace", skip_all)]
4638 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4639 self.get_object_store().get_object(object_id)
4640 }
4641
4642 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4643 Ok(self
4644 .get_object(&SUI_SYSTEM_ADDRESS.into())
4645 .await
4646 .expect("framework object should always exist")
4647 .compute_object_reference())
4648 }
4649
4650 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4652 self.get_object_cache_reader()
4653 .get_sui_system_state_object_unsafe()
4654 }
4655
4656 #[instrument(level = "trace", skip_all)]
4657 fn get_transaction_checkpoint_sequence(
4658 &self,
4659 digest: &TransactionDigest,
4660 epoch_store: &AuthorityPerEpochStore,
4661 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4662 epoch_store.get_transaction_checkpoint(digest)
4663 }
4664
4665 #[instrument(level = "trace", skip_all)]
4666 pub fn get_checkpoint_by_sequence_number(
4667 &self,
4668 sequence_number: CheckpointSequenceNumber,
4669 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4670 Ok(self
4671 .checkpoint_store
4672 .get_checkpoint_by_sequence_number(sequence_number)?)
4673 }
4674
4675 #[instrument(level = "trace", skip_all)]
4676 pub fn get_transaction_checkpoint_for_tests(
4677 &self,
4678 digest: &TransactionDigest,
4679 epoch_store: &AuthorityPerEpochStore,
4680 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4681 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4682 let Some(checkpoint) = checkpoint else {
4683 return Ok(None);
4684 };
4685 let checkpoint = self
4686 .checkpoint_store
4687 .get_checkpoint_by_sequence_number(checkpoint)?;
4688 Ok(checkpoint)
4689 }
4690
4691 #[instrument(level = "trace", skip_all)]
4692 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4693 Ok(
4694 match self
4695 .get_object_cache_reader()
4696 .get_latest_object_or_tombstone(*object_id)
4697 {
4698 Some((_, ObjectOrTombstone::Object(object))) => {
4699 let layout = self.get_object_layout(&object)?;
4700 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4701 }
4702 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4703 None => ObjectRead::NotExists(*object_id),
4704 },
4705 )
4706 }
4707
4708 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4710 self.chain_identifier
4711 }
4712
4713 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4714 self.fork_recovery_state.as_ref()
4715 }
4716
4717 #[instrument(level = "trace", skip_all)]
4718 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4719 where
4720 T: DeserializeOwned,
4721 {
4722 let o = self.get_object_read(object_id)?.into_object()?;
4723 if let Some(move_object) = o.data.try_as_move() {
4724 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4725 SuiErrorKind::ObjectDeserializationError {
4726 error: format!("{e}"),
4727 }
4728 })?)
4729 } else {
4730 Err(SuiErrorKind::ObjectDeserializationError {
4731 error: format!("Provided object : [{object_id}] is not a Move object."),
4732 }
4733 .into())
4734 }
4735 }
4736
4737 #[instrument(level = "trace", skip_all)]
4743 pub fn get_past_object_read(
4744 &self,
4745 object_id: &ObjectID,
4746 version: SequenceNumber,
4747 ) -> SuiResult<PastObjectRead> {
4748 let Some(obj_ref) = self
4750 .get_object_cache_reader()
4751 .get_latest_object_ref_or_tombstone(*object_id)
4752 else {
4753 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4754 };
4755
4756 if version > obj_ref.1 {
4757 return Ok(PastObjectRead::VersionTooHigh {
4758 object_id: *object_id,
4759 asked_version: version,
4760 latest_version: obj_ref.1,
4761 });
4762 }
4763
4764 if version < obj_ref.1 {
4765 return Ok(match self.read_object_at_version(object_id, version)? {
4767 Some((object, layout)) => {
4768 let obj_ref = object.compute_object_reference();
4769 PastObjectRead::VersionFound(obj_ref, object, layout)
4770 }
4771
4772 None => PastObjectRead::VersionNotFound(*object_id, version),
4773 });
4774 }
4775
4776 if !obj_ref.2.is_alive() {
4777 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4778 }
4779
4780 match self.read_object_at_version(object_id, obj_ref.1)? {
4781 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4782 None => {
4783 debug_fatal!(
4784 "Object with in parent_entry is missing from object store, datastore is \
4785 inconsistent"
4786 );
4787 Err(UserInputError::ObjectNotFound {
4788 object_id: *object_id,
4789 version: Some(obj_ref.1),
4790 }
4791 .into())
4792 }
4793 }
4794 }
4795
4796 #[instrument(level = "trace", skip_all)]
4797 fn read_object_at_version(
4798 &self,
4799 object_id: &ObjectID,
4800 version: SequenceNumber,
4801 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4802 let Some(object) = self
4803 .get_object_cache_reader()
4804 .get_object_by_key(object_id, version)
4805 else {
4806 return Ok(None);
4807 };
4808
4809 let layout = self.get_object_layout(&object)?;
4810 Ok(Some((object, layout)))
4811 }
4812
4813 pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4814 let layout = object
4815 .data
4816 .try_as_move()
4817 .map(|object| {
4818 let epoch_store = self.load_epoch_store_one_call_per_task();
4819 into_struct_layout(
4820 epoch_store
4821 .executor()
4822 .type_layout_resolver(
4824 epoch_store.protocol_config(),
4825 Box::new(self.get_backing_package_store().as_ref()),
4826 )
4827 .get_annotated_layout(&object.type_().clone().into())?,
4828 )
4829 })
4830 .transpose()?;
4831 Ok(layout)
4832 }
4833
4834 #[instrument(level = "trace", skip_all)]
4838 pub fn get_address_balance_coin_info(
4839 &self,
4840 owner: SuiAddress,
4841 balance_type: TypeTag,
4842 ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4843 let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4844 let accumulator_obj = AccumulatorValue::load_object_by_id(
4845 self.get_child_object_resolver().as_ref(),
4846 None,
4847 *accumulator_id.inner(),
4848 )?;
4849
4850 let Some(accumulator_obj) = accumulator_obj else {
4851 return Ok(None);
4852 };
4853
4854 let currency_type =
4857 Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4858
4859 let balance = crate::accumulators::balances::get_balance(
4860 owner,
4861 self.get_child_object_resolver().as_ref(),
4862 currency_type,
4863 )?;
4864
4865 if balance == 0 {
4866 return Ok(None);
4867 };
4868
4869 let object_ref = coin_reservation::encode_object_ref(
4870 accumulator_obj.id(),
4871 accumulator_obj.version(),
4872 self.load_epoch_store_one_call_per_task().epoch(),
4873 balance,
4874 self.get_chain_identifier(),
4875 );
4876
4877 Ok(Some((
4878 object_ref,
4879 balance,
4880 accumulator_obj.previous_transaction,
4881 )))
4882 }
4883
4884 #[instrument(level = "trace", skip_all)]
4887 pub fn get_all_address_balance_coin_infos(
4888 &self,
4889 owner: SuiAddress,
4890 ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4891 let indexes = self
4892 .indexes
4893 .as_ref()
4894 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4895
4896 let mut result = std::collections::HashMap::new();
4897 for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4898 let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4899 if let Some((obj_ref, balance, prev_tx)) =
4900 self.get_address_balance_coin_info(owner, balance_type)?
4901 {
4902 result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4905 }
4906 }
4907 Ok(result)
4908 }
4909
4910 fn get_owner_at_version(
4911 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4912 object_id: &ObjectID,
4913 version: SequenceNumber,
4914 ) -> SuiResult<Owner> {
4915 object_store
4916 .get_object_by_key(object_id, version)
4917 .ok_or_else(|| {
4918 SuiError::from(UserInputError::ObjectNotFound {
4919 object_id: *object_id,
4920 version: Some(version),
4921 })
4922 })
4923 .map(|o| o.owner.clone())
4924 }
4925
4926 #[instrument(level = "trace", skip_all)]
4927 pub fn get_owner_objects(
4928 &self,
4929 owner: SuiAddress,
4930 cursor: Option<ObjectID>,
4932 limit: usize,
4933 filter: Option<SuiObjectDataFilter>,
4934 ) -> SuiResult<Vec<ObjectInfo>> {
4935 if let Some(indexes) = &self.indexes {
4936 indexes.get_owner_objects(owner, cursor, limit, filter)
4937 } else {
4938 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4939 }
4940 }
4941
4942 #[instrument(level = "trace", skip_all)]
4943 pub fn get_owned_coins_iterator_with_cursor(
4944 &self,
4945 owner: SuiAddress,
4946 cursor: (String, u64, ObjectID),
4948 limit: usize,
4949 one_coin_type_only: bool,
4950 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4951 if let Some(indexes) = &self.indexes {
4952 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4953 } else {
4954 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4955 }
4956 }
4957
4958 #[instrument(level = "trace", skip_all)]
4959 pub fn get_owner_objects_iterator(
4960 &self,
4961 owner: SuiAddress,
4962 cursor: Option<ObjectID>,
4964 filter: Option<SuiObjectDataFilter>,
4965 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4966 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4967 if let Some(indexes) = &self.indexes {
4968 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4969 } else {
4970 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4971 }
4972 }
4973
4974 #[instrument(level = "trace", skip_all)]
4975 pub async fn get_move_objects<T>(
4976 &self,
4977 owner: SuiAddress,
4978 type_: MoveObjectType,
4979 ) -> SuiResult<Vec<T>>
4980 where
4981 T: DeserializeOwned,
4982 {
4983 let object_ids = self
4984 .get_owner_objects_iterator(owner, None, None)?
4985 .filter(|o| match &o.type_ {
4986 ObjectType::Struct(s) => &type_ == s,
4987 ObjectType::Package => false,
4988 })
4989 .map(|info| ObjectKey(info.object_id, info.version))
4990 .collect::<Vec<_>>();
4991 let mut move_objects = vec![];
4992
4993 let objects = self
4994 .get_object_store()
4995 .multi_get_objects_by_key(&object_ids);
4996
4997 for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
4998 let object = o.ok_or_else(|| {
4999 SuiError::from(UserInputError::ObjectNotFound {
5000 object_id: id.0,
5001 version: Some(id.1),
5002 })
5003 })?;
5004 let move_object = object.data.try_as_move().ok_or_else(|| {
5005 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
5006 })?;
5007 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
5008 SuiErrorKind::ObjectDeserializationError {
5009 error: format!("{e}"),
5010 }
5011 })?);
5012 }
5013 Ok(move_objects)
5014 }
5015
5016 #[instrument(level = "trace", skip_all)]
5017 pub fn get_dynamic_fields(
5018 &self,
5019 owner: ObjectID,
5020 cursor: Option<ObjectID>,
5022 limit: usize,
5023 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
5024 Ok(self
5025 .get_dynamic_fields_iterator(owner, cursor)?
5026 .take(limit)
5027 .collect::<Result<Vec<_>, _>>()?)
5028 }
5029
5030 fn get_dynamic_fields_iterator(
5031 &self,
5032 owner: ObjectID,
5033 cursor: Option<ObjectID>,
5035 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
5036 {
5037 if let Some(indexes) = &self.indexes {
5038 indexes.get_dynamic_fields_iterator(owner, cursor)
5039 } else {
5040 Err(SuiErrorKind::IndexStoreNotAvailable.into())
5041 }
5042 }
5043
5044 #[instrument(level = "trace", skip_all)]
5045 pub fn get_dynamic_field_object_id(
5046 &self,
5047 owner: ObjectID,
5048 name_type: TypeTag,
5049 name_bcs_bytes: &[u8],
5050 ) -> SuiResult<Option<ObjectID>> {
5051 if let Some(indexes) = &self.indexes {
5052 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5053 } else {
5054 Err(SuiErrorKind::IndexStoreNotAvailable.into())
5055 }
5056 }
5057
5058 #[instrument(level = "trace", skip_all)]
5059 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5060 Ok(self.get_indexes()?.next_sequence_number())
5061 }
5062
5063 #[instrument(level = "trace", skip_all)]
5064 pub async fn get_executed_transaction_and_effects(
5065 &self,
5066 digest: TransactionDigest,
5067 kv_store: Arc<TransactionKeyValueStore>,
5068 ) -> SuiResult<(Transaction, TransactionEffects)> {
5069 let transaction = kv_store.get_tx(digest).await?;
5070 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5071 Ok((transaction, effects))
5072 }
5073
5074 #[instrument(level = "trace", skip_all)]
5075 pub fn multi_get_checkpoint_by_sequence_number(
5076 &self,
5077 sequence_numbers: &[CheckpointSequenceNumber],
5078 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5079 Ok(self
5080 .checkpoint_store
5081 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5082 }
5083
5084 #[instrument(level = "trace", skip_all)]
5085 pub fn get_transaction_events(
5086 &self,
5087 digest: &TransactionDigest,
5088 ) -> SuiResult<TransactionEvents> {
5089 self.get_transaction_cache_reader()
5090 .get_events(digest)
5091 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5092 }
5093
5094 pub fn get_transaction_input_objects(
5095 &self,
5096 effects: &TransactionEffects,
5097 ) -> SuiResult<Vec<Object>> {
5098 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5099 .map_err(Into::into)
5100 }
5101
5102 pub fn get_transaction_output_objects(
5103 &self,
5104 effects: &TransactionEffects,
5105 ) -> SuiResult<Vec<Object>> {
5106 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5107 .map_err(Into::into)
5108 }
5109
5110 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5111 match &self.indexes {
5112 Some(i) => Ok(i.clone()),
5113 None => Err(SuiErrorKind::UnsupportedFeatureError {
5114 error: "extended object indexing is not enabled on this server".into(),
5115 }
5116 .into()),
5117 }
5118 }
5119
5120 pub async fn get_transactions_for_tests(
5121 self: &Arc<Self>,
5122 filter: Option<TransactionFilter>,
5123 cursor: Option<TransactionDigest>,
5124 limit: Option<usize>,
5125 reverse: bool,
5126 ) -> SuiResult<Vec<TransactionDigest>> {
5127 let metrics = KeyValueStoreMetrics::new_for_tests();
5128 let kv_store = Arc::new(TransactionKeyValueStore::new(
5129 "rocksdb",
5130 metrics,
5131 self.clone(),
5132 ));
5133 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5134 .await
5135 }
5136
5137 #[instrument(level = "trace", skip_all)]
5138 pub async fn get_transactions(
5139 &self,
5140 kv_store: &Arc<TransactionKeyValueStore>,
5141 filter: Option<TransactionFilter>,
5142 cursor: Option<TransactionDigest>,
5144 limit: Option<usize>,
5145 reverse: bool,
5146 ) -> SuiResult<Vec<TransactionDigest>> {
5147 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5148 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5149 let iter = checkpoint_contents.iter().map(|c| c.transaction);
5150 if reverse {
5151 let iter = iter
5152 .rev()
5153 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5154 .skip(usize::from(cursor.is_some()));
5155 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5156 } else {
5157 let iter = iter
5158 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5159 .skip(usize::from(cursor.is_some()));
5160 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5161 }
5162 }
5163 self.get_indexes()?
5164 .get_transactions(filter, cursor, limit, reverse)
5165 }
5166
5167 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5168 &self.checkpoint_store
5169 }
5170
5171 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5172 self.get_checkpoint_store()
5173 .get_highest_executed_checkpoint_seq_number()?
5174 .ok_or(
5175 SuiErrorKind::UserInputError {
5176 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5177 }
5178 .into(),
5179 )
5180 }
5181
5182 #[cfg(msim)]
5183 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5184 self.database_for_testing()
5185 .perpetual_tables
5186 .get_highest_pruned_checkpoint()
5187 .map(|c| c.unwrap_or(0))
5188 .map_err(Into::into)
5189 }
5190
5191 #[instrument(level = "trace", skip_all)]
5192 pub fn get_checkpoint_summary_by_sequence_number(
5193 &self,
5194 sequence_number: CheckpointSequenceNumber,
5195 ) -> SuiResult<CheckpointSummary> {
5196 let verified_checkpoint = self
5197 .get_checkpoint_store()
5198 .get_checkpoint_by_sequence_number(sequence_number)?;
5199 match verified_checkpoint {
5200 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5201 None => Err(SuiErrorKind::UserInputError {
5202 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5203 }
5204 .into()),
5205 }
5206 }
5207
5208 #[instrument(level = "trace", skip_all)]
5209 pub fn get_checkpoint_summary_by_digest(
5210 &self,
5211 digest: CheckpointDigest,
5212 ) -> SuiResult<CheckpointSummary> {
5213 let verified_checkpoint = self
5214 .get_checkpoint_store()
5215 .get_checkpoint_by_digest(&digest)?;
5216 match verified_checkpoint {
5217 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5218 None => Err(SuiErrorKind::UserInputError {
5219 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5220 }
5221 .into()),
5222 }
5223 }
5224
5225 #[instrument(level = "trace", skip_all)]
5226 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5227 if is_system_package(package_id) {
5228 return self.find_genesis_txn_digest();
5229 }
5230 Ok(self
5231 .get_object_read(&package_id)?
5232 .into_object()?
5233 .previous_transaction)
5234 }
5235
5236 #[instrument(level = "trace", skip_all)]
5237 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5238 let summary = self
5239 .get_verified_checkpoint_by_sequence_number(0)?
5240 .into_message();
5241 let content = self.get_checkpoint_contents(summary.content_digest)?;
5242 let genesis_transaction = content.enumerate_transactions(&summary).next();
5243 Ok(genesis_transaction
5244 .ok_or(SuiErrorKind::UserInputError {
5245 error: UserInputError::GenesisTransactionNotFound,
5246 })?
5247 .1
5248 .transaction)
5249 }
5250
5251 #[instrument(level = "trace", skip_all)]
5252 pub fn get_verified_checkpoint_by_sequence_number(
5253 &self,
5254 sequence_number: CheckpointSequenceNumber,
5255 ) -> SuiResult<VerifiedCheckpoint> {
5256 let verified_checkpoint = self
5257 .get_checkpoint_store()
5258 .get_checkpoint_by_sequence_number(sequence_number)?;
5259 match verified_checkpoint {
5260 Some(verified_checkpoint) => Ok(verified_checkpoint),
5261 None => Err(SuiErrorKind::UserInputError {
5262 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5263 }
5264 .into()),
5265 }
5266 }
5267
5268 #[instrument(level = "trace", skip_all)]
5269 pub fn get_verified_checkpoint_summary_by_digest(
5270 &self,
5271 digest: CheckpointDigest,
5272 ) -> SuiResult<VerifiedCheckpoint> {
5273 let verified_checkpoint = self
5274 .get_checkpoint_store()
5275 .get_checkpoint_by_digest(&digest)?;
5276 match verified_checkpoint {
5277 Some(verified_checkpoint) => Ok(verified_checkpoint),
5278 None => Err(SuiErrorKind::UserInputError {
5279 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5280 }
5281 .into()),
5282 }
5283 }
5284
5285 #[instrument(level = "trace", skip_all)]
5286 pub fn get_checkpoint_contents(
5287 &self,
5288 digest: CheckpointContentsDigest,
5289 ) -> SuiResult<CheckpointContents> {
5290 self.get_checkpoint_store()
5291 .get_checkpoint_contents(&digest)?
5292 .ok_or(
5293 SuiErrorKind::UserInputError {
5294 error: UserInputError::CheckpointContentsNotFound(digest),
5295 }
5296 .into(),
5297 )
5298 }
5299
5300 #[instrument(level = "trace", skip_all)]
5301 pub fn get_checkpoint_contents_by_sequence_number(
5302 &self,
5303 sequence_number: CheckpointSequenceNumber,
5304 ) -> SuiResult<CheckpointContents> {
5305 let verified_checkpoint = self
5306 .get_checkpoint_store()
5307 .get_checkpoint_by_sequence_number(sequence_number)?;
5308 match verified_checkpoint {
5309 Some(verified_checkpoint) => {
5310 let content_digest = verified_checkpoint.into_inner().content_digest;
5311 self.get_checkpoint_contents(content_digest)
5312 }
5313 None => Err(SuiErrorKind::UserInputError {
5314 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5315 }
5316 .into()),
5317 }
5318 }
5319
5320 #[instrument(level = "trace", skip_all)]
5321 pub async fn query_events(
5322 &self,
5323 kv_store: &Arc<TransactionKeyValueStore>,
5324 query: EventFilter,
5325 cursor: Option<EventID>,
5327 limit: usize,
5328 descending: bool,
5329 ) -> SuiResult<Vec<SuiEvent>> {
5330 let index_store = self.get_indexes()?;
5331
5332 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5334 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5335 SuiErrorKind::TransactionNotFound {
5336 digest: cursor.tx_digest,
5337 },
5338 )?;
5339 (tx_seq, cursor.event_seq as usize)
5340 } else if descending {
5341 (u64::MAX, usize::MAX)
5342 } else {
5343 (0, 0)
5344 };
5345
5346 let limit = limit + 1;
5347 let mut event_keys = match query {
5348 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5349 EventFilter::Transaction(digest) => {
5350 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5351 }
5352 EventFilter::MoveModule { package, module } => {
5353 let module_id = ModuleId::new(package.into(), module);
5354 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5355 }
5356 EventFilter::MoveEventType(struct_name) => index_store
5357 .events_by_move_event_struct_name(
5358 &struct_name,
5359 tx_num,
5360 event_num,
5361 limit,
5362 descending,
5363 )?,
5364 EventFilter::Sender(sender) => {
5365 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5366 }
5367 EventFilter::TimeRange {
5368 start_time,
5369 end_time,
5370 } => index_store
5371 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5372 EventFilter::MoveEventModule { package, module } => index_store
5373 .events_by_move_event_module(
5374 &ModuleId::new(package.into(), module),
5375 tx_num,
5376 event_num,
5377 limit,
5378 descending,
5379 )?,
5380 EventFilter::Any(_) => {
5382 return Err(SuiErrorKind::UserInputError {
5383 error: UserInputError::Unsupported(
5384 "'Any' queries are not supported by the fullnode.".to_string(),
5385 ),
5386 }
5387 .into());
5388 }
5389 };
5390
5391 if cursor.is_some() {
5394 if !event_keys.is_empty() {
5395 event_keys.remove(0);
5396 }
5397 } else {
5398 event_keys.truncate(limit - 1);
5399 }
5400
5401 let transaction_digests = event_keys
5403 .iter()
5404 .map(|(_, digest, _, _)| *digest)
5405 .collect::<HashSet<_>>()
5406 .into_iter()
5407 .collect::<Vec<_>>();
5408
5409 let events = kv_store
5410 .multi_get_events_by_tx_digests(&transaction_digests)
5411 .await?;
5412
5413 let events_map: HashMap<_, _> = transaction_digests
5414 .iter()
5415 .zip_debug_eq(events.into_iter())
5416 .collect();
5417
5418 let stored_events = event_keys
5419 .into_iter()
5420 .map(|k| {
5421 (
5422 k,
5423 events_map
5424 .get(&k.1)
5425 .expect("fetched digest is missing")
5426 .clone()
5427 .and_then(|e| e.data.get(k.2).cloned()),
5428 )
5429 })
5430 .map(
5431 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5432 event
5433 .map(|e| (e, tx_digest, event_seq, timestamp))
5434 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5435 },
5436 )
5437 .collect::<Result<Vec<_>, _>>()?;
5438
5439 let epoch_store = self.load_epoch_store_one_call_per_task();
5440 let backing_store = self.get_backing_package_store().as_ref();
5441 let mut layout_resolver = epoch_store
5442 .executor()
5443 .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5444 let mut events = vec![];
5445 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5446 events.push(SuiEvent::try_from(
5447 e.clone(),
5448 tx_digest,
5449 event_seq as u64,
5450 Some(timestamp),
5451 layout_resolver.get_annotated_layout(&e.type_)?,
5452 )?)
5453 }
5454 Ok(events)
5455 }
5456
5457 pub async fn insert_genesis_object(&self, object: Object) {
5458 self.get_reconfig_api().insert_genesis_object(object);
5459 }
5460
5461 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5462 futures::future::join_all(
5463 objects
5464 .iter()
5465 .map(|o| self.insert_genesis_object(o.clone())),
5466 )
5467 .await;
5468 }
5469
5470 #[instrument(level = "trace", skip_all)]
5472 pub fn get_transaction_status(
5473 &self,
5474 transaction_digest: &TransactionDigest,
5475 epoch_store: &Arc<AuthorityPerEpochStore>,
5476 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5477 if let Some(effects) =
5479 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5480 {
5481 if let Some(transaction) = self
5482 .get_transaction_cache_reader()
5483 .get_transaction_block(transaction_digest)
5484 {
5485 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5486 let events = if effects.events_digest().is_some() {
5487 self.get_transaction_events(effects.transaction_digest())?
5488 } else {
5489 TransactionEvents::default()
5490 };
5491 return Ok(Some((
5492 (*transaction).clone().into_message(),
5493 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5494 )));
5495 } else {
5496 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5500 }
5501 }
5502 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5503 self.metrics.tx_already_processed.inc();
5504 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5505 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5506 } else {
5507 Ok(None)
5508 }
5509 }
5510
5511 #[instrument(level = "trace", skip_all)]
5515 pub fn get_signed_effects_and_maybe_resign(
5516 &self,
5517 transaction_digest: &TransactionDigest,
5518 epoch_store: &Arc<AuthorityPerEpochStore>,
5519 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5520 let effects = self
5521 .get_transaction_cache_reader()
5522 .get_executed_effects(transaction_digest);
5523 match effects {
5524 Some(effects) => {
5525 if effects.executed_epoch() != epoch_store.epoch() {
5545 debug!(
5546 tx_digest=?transaction_digest,
5547 effects_epoch=?effects.executed_epoch(),
5548 epoch=?epoch_store.epoch(),
5549 "Re-signing the effects with the current epoch"
5550 );
5551 }
5552 Ok(Some(self.sign_effects(effects, epoch_store)?))
5553 }
5554 None => Ok(None),
5555 }
5556 }
5557
5558 #[instrument(level = "trace", skip_all)]
5559 pub(crate) fn sign_effects(
5560 &self,
5561 effects: TransactionEffects,
5562 epoch_store: &Arc<AuthorityPerEpochStore>,
5563 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5564 let tx_digest = *effects.transaction_digest();
5565 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5566 Some(sig) => {
5567 debug_assert!(sig.epoch == epoch_store.epoch());
5568 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5569 }
5570 _ => {
5571 let sig = AuthoritySignInfo::new(
5572 epoch_store.epoch(),
5573 &effects,
5574 Intent::sui_app(IntentScope::TransactionEffects),
5575 self.name,
5576 &*self.secret,
5577 );
5578
5579 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5580
5581 epoch_store.insert_effects_digest_and_signature(
5582 &tx_digest,
5583 effects.digest(),
5584 &sig,
5585 )?;
5586
5587 effects
5588 }
5589 };
5590
5591 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5592 signed_effects,
5593 ))
5594 }
5595
5596 #[instrument(level = "trace", skip_all)]
5598 fn fullnode_only_get_tx_coins_for_indexing(
5599 name: AuthorityName,
5600 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5601 effects: &TransactionEffects,
5602 inner_temporary_store: &InnerTemporaryStore,
5603 epoch_store: &Arc<AuthorityPerEpochStore>,
5604 ) -> Option<TxCoins> {
5605 if epoch_store.committee().authority_exists(&name) {
5606 return None;
5607 }
5608 let written_coin_objects = inner_temporary_store
5609 .written
5610 .iter()
5611 .filter_map(|(k, v)| {
5612 if v.is_coin() {
5613 Some((*k, v.clone()))
5614 } else {
5615 None
5616 }
5617 })
5618 .collect();
5619 let mut input_coin_objects = inner_temporary_store
5620 .input_objects
5621 .iter()
5622 .filter_map(|(k, v)| {
5623 if v.is_coin() {
5624 Some((*k, v.clone()))
5625 } else {
5626 None
5627 }
5628 })
5629 .collect::<ObjectMap>();
5630
5631 for (object_id, version) in effects.modified_at_versions() {
5635 if inner_temporary_store
5636 .loaded_runtime_objects
5637 .contains_key(&object_id)
5638 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5639 && object.is_coin()
5640 {
5641 input_coin_objects.insert(object_id, object);
5642 }
5643 }
5644
5645 Some((input_coin_objects, written_coin_objects))
5646 }
5647
5648 #[instrument(level = "trace", skip_all)]
5656 pub async fn get_transaction_lock(
5657 &self,
5658 object_ref: &ObjectRef,
5659 epoch_store: &AuthorityPerEpochStore,
5660 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5661 let lock_info = self
5662 .get_object_cache_reader()
5663 .get_lock(*object_ref, epoch_store)?;
5664 let lock_info = match lock_info {
5665 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5666 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5667 provided_obj_ref: *object_ref,
5668 current_version: locked_ref.1,
5669 }
5670 .into());
5671 }
5672 ObjectLockStatus::Initialized => {
5673 return Ok(None);
5674 }
5675 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5676 };
5677
5678 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5679 }
5680
5681 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5682 self.get_object_cache_reader().get_objects(objects)
5683 }
5684
5685 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5686 self.get_object_cache_reader()
5687 .get_latest_object_ref_or_tombstone(object_id)
5688 }
5689
5690 pub fn set_override_protocol_upgrade_buffer_stake(
5699 &self,
5700 expected_epoch: EpochId,
5701 buffer_stake_bps: u64,
5702 ) -> SuiResult {
5703 let epoch_store = self.load_epoch_store_one_call_per_task();
5704 let actual_epoch = epoch_store.epoch();
5705 if actual_epoch != expected_epoch {
5706 return Err(SuiErrorKind::WrongEpoch {
5707 expected_epoch,
5708 actual_epoch,
5709 }
5710 .into());
5711 }
5712
5713 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5714 }
5715
5716 pub fn clear_override_protocol_upgrade_buffer_stake(
5717 &self,
5718 expected_epoch: EpochId,
5719 ) -> SuiResult {
5720 let epoch_store = self.load_epoch_store_one_call_per_task();
5721 let actual_epoch = epoch_store.epoch();
5722 if actual_epoch != expected_epoch {
5723 return Err(SuiErrorKind::WrongEpoch {
5724 expected_epoch,
5725 actual_epoch,
5726 }
5727 .into());
5728 }
5729
5730 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5731 }
5732
5733 pub async fn get_available_system_packages(
5736 &self,
5737 binary_config: &BinaryConfig,
5738 ) -> Vec<ObjectRef> {
5739 let mut results = vec![];
5740
5741 let system_packages = BuiltInFramework::iter_system_packages();
5742
5743 #[cfg(msim)]
5745 let extra_packages = framework_injection::get_extra_packages(self.name);
5746 #[cfg(msim)]
5747 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5748
5749 for system_package in system_packages {
5750 let modules = system_package.modules().to_vec();
5751 #[cfg(msim)]
5753 let modules =
5754 match framework_injection::get_override_modules(&system_package.id, self.name) {
5755 Some(overrides) if overrides.is_empty() => continue,
5756 Some(overrides) => overrides,
5757 None => modules,
5758 };
5759
5760 let Some(obj_ref) = sui_framework::compare_system_package(
5761 &self.get_object_store(),
5762 &system_package.id,
5763 &modules,
5764 system_package.dependencies.to_vec(),
5765 binary_config,
5766 )
5767 .await
5768 else {
5769 return vec![];
5770 };
5771 results.push(obj_ref);
5772 }
5773
5774 results
5775 }
5776
5777 async fn get_system_package_bytes(
5791 &self,
5792 system_packages: Vec<ObjectRef>,
5793 binary_config: &BinaryConfig,
5794 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5795 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5796 let objects = self.get_objects(&ids).await;
5797
5798 let mut res = Vec::with_capacity(system_packages.len());
5799 for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5800 {
5801 let prev_transaction = match object {
5802 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5803 info!("Framework {} does not need updating", system_package_ref.0);
5805 continue;
5806 }
5807
5808 Some(cur_object) => cur_object.previous_transaction,
5809 None => TransactionDigest::genesis_marker(),
5810 };
5811
5812 #[cfg(msim)]
5813 let SystemPackage {
5814 id: _,
5815 bytes,
5816 dependencies,
5817 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5818 .unwrap_or_else(|| {
5819 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5820 });
5821
5822 #[cfg(not(msim))]
5823 let SystemPackage {
5824 id: _,
5825 bytes,
5826 dependencies,
5827 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5828
5829 let modules: Vec<_> = bytes
5830 .iter()
5831 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5832 .collect();
5833
5834 let new_object = Object::new_system_package(
5835 &modules,
5836 system_package_ref.1,
5837 dependencies.clone(),
5838 prev_transaction,
5839 );
5840
5841 let new_ref = new_object.compute_object_reference();
5842 if new_ref != system_package_ref {
5843 if cfg!(msim) {
5844 debug_fatal!(
5846 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5847 );
5848 } else {
5849 error!(
5850 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5851 );
5852 }
5853 return None;
5854 }
5855
5856 res.push((system_package_ref.1, bytes, dependencies));
5857 }
5858
5859 Some(res)
5860 }
5861
5862 fn is_protocol_version_supported_v2(
5863 current_protocol_version: ProtocolVersion,
5864 proposed_protocol_version: ProtocolVersion,
5865 protocol_config: &ProtocolConfig,
5866 committee: &Committee,
5867 capabilities: Vec<AuthorityCapabilitiesV2>,
5868 mut buffer_stake_bps: u64,
5869 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5870 if proposed_protocol_version > current_protocol_version + 1
5871 && !protocol_config.advance_to_highest_supported_protocol_version()
5872 {
5873 return None;
5874 }
5875
5876 if buffer_stake_bps > 10000 {
5877 warn!("clamping buffer_stake_bps to 10000");
5878 buffer_stake_bps = 10000;
5879 }
5880
5881 let mut desired_upgrades: Vec<_> = capabilities
5884 .into_iter()
5885 .filter_map(|mut cap| {
5886 if cap.available_system_packages.is_empty() {
5888 return None;
5889 }
5890
5891 cap.available_system_packages.sort();
5892
5893 info!(
5894 "validator {:?} supports {:?} with system packages: {:?}",
5895 cap.authority.concise(),
5896 cap.supported_protocol_versions,
5897 cap.available_system_packages,
5898 );
5899
5900 cap.supported_protocol_versions
5904 .get_version_digest(proposed_protocol_version)
5905 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5906 })
5907 .collect();
5908
5909 desired_upgrades.sort();
5911 desired_upgrades
5912 .into_iter()
5913 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5914 .into_iter()
5915 .find_map(|((digest, packages), group)| {
5916 assert!(!packages.is_empty());
5918
5919 let mut stake_aggregator: StakeAggregator<(), true> =
5920 StakeAggregator::new(Arc::new(committee.clone()));
5921
5922 for (_, _, authority) in group {
5923 stake_aggregator.insert_generic(authority, ());
5924 }
5925
5926 let total_votes = stake_aggregator.total_votes();
5927 let quorum_threshold = committee.quorum_threshold();
5928 let f = committee.total_votes() - committee.quorum_threshold();
5929
5930 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5932 let effective_threshold = quorum_threshold + buffer_stake;
5933
5934 info!(
5935 protocol_config_digest = ?digest,
5936 ?total_votes,
5937 ?quorum_threshold,
5938 ?buffer_stake_bps,
5939 ?effective_threshold,
5940 ?proposed_protocol_version,
5941 ?packages,
5942 "support for upgrade"
5943 );
5944
5945 let has_support = total_votes >= effective_threshold;
5946 has_support.then_some((proposed_protocol_version, packages))
5947 })
5948 }
5949
5950 fn choose_protocol_version_and_system_packages_v2(
5951 current_protocol_version: ProtocolVersion,
5952 protocol_config: &ProtocolConfig,
5953 committee: &Committee,
5954 capabilities: Vec<AuthorityCapabilitiesV2>,
5955 buffer_stake_bps: u64,
5956 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5957 assert!(protocol_config.authority_capabilities_v2());
5958 let mut next_protocol_version = current_protocol_version;
5959 let mut system_packages = vec![];
5960
5961 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5962 current_protocol_version,
5963 next_protocol_version + 1,
5964 protocol_config,
5965 committee,
5966 capabilities.clone(),
5967 buffer_stake_bps,
5968 ) {
5969 next_protocol_version = version;
5970 system_packages = packages;
5971 }
5972
5973 (next_protocol_version, system_packages)
5974 }
5975
5976 #[instrument(level = "debug", skip_all)]
5977 fn create_authenticator_state_tx(
5978 &self,
5979 epoch_store: &Arc<AuthorityPerEpochStore>,
5980 ) -> Option<EndOfEpochTransactionKind> {
5981 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5982 info!("authenticator state transactions not enabled");
5983 return None;
5984 }
5985
5986 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5987 let tx = if authenticator_state_exists {
5988 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5989 let min_epoch =
5990 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5991 let authenticator_obj_initial_shared_version = epoch_store
5992 .epoch_start_config()
5993 .authenticator_obj_initial_shared_version()
5994 .expect("initial version must exist");
5995
5996 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5997 min_epoch,
5998 authenticator_obj_initial_shared_version,
5999 );
6000
6001 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
6002
6003 tx
6004 } else {
6005 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
6006 info!("Creating AuthenticatorStateCreate tx");
6007 tx
6008 };
6009 Some(tx)
6010 }
6011
6012 #[instrument(level = "debug", skip_all)]
6013 fn create_randomness_state_tx(
6014 &self,
6015 epoch_store: &Arc<AuthorityPerEpochStore>,
6016 ) -> Option<EndOfEpochTransactionKind> {
6017 if !epoch_store.protocol_config().random_beacon() {
6018 info!("randomness state transactions not enabled");
6019 return None;
6020 }
6021
6022 if epoch_store.randomness_state_exists() {
6023 return None;
6024 }
6025
6026 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
6027 info!("Creating RandomnessStateCreate tx");
6028 Some(tx)
6029 }
6030
6031 #[instrument(level = "debug", skip_all)]
6032 fn create_accumulator_root_tx(
6033 &self,
6034 epoch_store: &Arc<AuthorityPerEpochStore>,
6035 ) -> Option<EndOfEpochTransactionKind> {
6036 if !epoch_store
6037 .protocol_config()
6038 .create_root_accumulator_object()
6039 {
6040 info!("accumulator root creation not enabled");
6041 return None;
6042 }
6043
6044 if epoch_store.accumulator_root_exists() {
6045 return None;
6046 }
6047
6048 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
6049 info!("Creating AccumulatorRootCreate tx");
6050 Some(tx)
6051 }
6052
6053 #[instrument(level = "debug", skip_all)]
6054 fn create_write_accumulator_storage_cost_tx(
6055 &self,
6056 epoch_store: &Arc<AuthorityPerEpochStore>,
6057 ) -> Option<EndOfEpochTransactionKind> {
6058 if !epoch_store.accumulator_root_exists() {
6059 info!("accumulator root does not exist yet");
6060 return None;
6061 }
6062 if !epoch_store.protocol_config().enable_accumulators() {
6063 info!("accumulators not enabled");
6064 return None;
6065 }
6066
6067 let object_store = self.get_object_store();
6068 let object_count =
6069 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6070 Ok(Some(count)) => count,
6071 Ok(None) => return None,
6072 Err(e) => {
6073 fatal!("failed to read accumulator object count: {e}");
6074 }
6075 };
6076
6077 let storage_cost = object_count.saturating_mul(
6078 epoch_store
6079 .protocol_config()
6080 .accumulator_object_storage_cost(),
6081 );
6082
6083 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6084 info!(
6085 object_count,
6086 storage_cost, "Creating WriteAccumulatorStorageCost tx"
6087 );
6088 Some(tx)
6089 }
6090
6091 #[instrument(level = "debug", skip_all)]
6092 fn create_coin_registry_tx(
6093 &self,
6094 epoch_store: &Arc<AuthorityPerEpochStore>,
6095 ) -> Option<EndOfEpochTransactionKind> {
6096 if !epoch_store.protocol_config().enable_coin_registry() {
6097 info!("coin registry not enabled");
6098 return None;
6099 }
6100
6101 if epoch_store.coin_registry_exists() {
6102 return None;
6103 }
6104
6105 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6106 info!("Creating CoinRegistryCreate tx");
6107 Some(tx)
6108 }
6109
6110 #[instrument(level = "debug", skip_all)]
6111 fn create_display_registry_tx(
6112 &self,
6113 epoch_store: &Arc<AuthorityPerEpochStore>,
6114 ) -> Option<EndOfEpochTransactionKind> {
6115 if !epoch_store.protocol_config().enable_display_registry() {
6116 info!("display registry not enabled");
6117 return None;
6118 }
6119
6120 if epoch_store.display_registry_exists() {
6121 return None;
6122 }
6123
6124 let tx = EndOfEpochTransactionKind::new_display_registry_create();
6125 info!("Creating DisplayRegistryCreate tx");
6126 Some(tx)
6127 }
6128
6129 #[instrument(level = "debug", skip_all)]
6130 fn create_bridge_tx(
6131 &self,
6132 epoch_store: &Arc<AuthorityPerEpochStore>,
6133 ) -> Option<EndOfEpochTransactionKind> {
6134 if !epoch_store.protocol_config().enable_bridge() {
6135 info!("bridge not enabled");
6136 return None;
6137 }
6138 if epoch_store.bridge_exists() {
6139 return None;
6140 }
6141 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6142 info!("Creating Bridge Create tx");
6143 Some(tx)
6144 }
6145
6146 #[instrument(level = "debug", skip_all)]
6147 fn init_bridge_committee_tx(
6148 &self,
6149 epoch_store: &Arc<AuthorityPerEpochStore>,
6150 ) -> Option<EndOfEpochTransactionKind> {
6151 if !epoch_store.protocol_config().enable_bridge() {
6152 info!("bridge not enabled");
6153 return None;
6154 }
6155 if !epoch_store
6156 .protocol_config()
6157 .should_try_to_finalize_bridge_committee()
6158 {
6159 info!("should not try to finalize bridge committee yet");
6160 return None;
6161 }
6162 if !epoch_store.bridge_exists() {
6164 return None;
6165 }
6166
6167 if epoch_store.bridge_committee_initiated() {
6168 return None;
6169 }
6170
6171 let bridge_initial_shared_version = epoch_store
6172 .epoch_start_config()
6173 .bridge_obj_initial_shared_version()
6174 .expect("initial version must exist");
6175 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6176 info!("Init Bridge committee tx");
6177 Some(tx)
6178 }
6179
6180 #[instrument(level = "debug", skip_all)]
6181 fn create_deny_list_state_tx(
6182 &self,
6183 epoch_store: &Arc<AuthorityPerEpochStore>,
6184 ) -> Option<EndOfEpochTransactionKind> {
6185 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6186 return None;
6187 }
6188
6189 if epoch_store.coin_deny_list_state_exists() {
6190 return None;
6191 }
6192
6193 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6194 info!("Creating DenyListStateCreate tx");
6195 Some(tx)
6196 }
6197
6198 #[instrument(level = "debug", skip_all)]
6199 fn create_address_alias_state_tx(
6200 &self,
6201 epoch_store: &Arc<AuthorityPerEpochStore>,
6202 ) -> Option<EndOfEpochTransactionKind> {
6203 if !epoch_store.protocol_config().address_aliases() {
6204 info!("address aliases not enabled");
6205 return None;
6206 }
6207
6208 if epoch_store.address_alias_state_exists() {
6209 return None;
6210 }
6211
6212 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6213 info!("Creating AddressAliasStateCreate tx");
6214 Some(tx)
6215 }
6216
6217 #[instrument(level = "debug", skip_all)]
6218 fn create_execution_time_observations_tx(
6219 &self,
6220 epoch_store: &Arc<AuthorityPerEpochStore>,
6221 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6222 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6223 ) -> Option<EndOfEpochTransactionKind> {
6224 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6225 .protocol_config()
6226 .per_object_congestion_control_mode()
6227 else {
6228 return None;
6229 };
6230
6231 let start_checkpoint = std::cmp::max(
6234 last_checkpoint_before_end_of_epoch
6235 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6236 epoch_store
6238 .epoch()
6239 .checked_sub(1)
6240 .map(|prev_epoch| {
6241 self.checkpoint_store
6242 .get_epoch_last_checkpoint_seq_number(prev_epoch)
6243 .expect("typed store must not fail")
6244 .expect(
6245 "sequence number of last checkpoint of preceding epoch must be saved",
6246 )
6247 + 1
6248 })
6249 .unwrap_or(1),
6250 );
6251 info!(
6252 "reading checkpoint range {:?}..={:?}",
6253 start_checkpoint, last_checkpoint_before_end_of_epoch
6254 );
6255 let sequence_numbers =
6256 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6257 let contents_digests: Vec<_> = self
6258 .checkpoint_store
6259 .multi_get_locally_computed_checkpoints(&sequence_numbers)
6260 .expect("typed store must not fail")
6261 .into_iter()
6262 .zip_debug_eq(sequence_numbers)
6263 .map(|(maybe_checkpoint, sequence_number)| {
6264 if let Some(checkpoint) = maybe_checkpoint {
6265 checkpoint.content_digest
6266 } else {
6267 self.checkpoint_store
6270 .get_checkpoint_by_sequence_number(sequence_number)
6271 .expect("typed store must not fail")
6272 .unwrap_or_else(|| {
6273 fatal!("preceding checkpoints must exist by end of epoch")
6274 })
6275 .data()
6276 .content_digest
6277 }
6278 })
6279 .collect();
6280 let tx_digests: Vec<_> = self
6281 .checkpoint_store
6282 .multi_get_checkpoint_content(&contents_digests)
6283 .expect("typed store must not fail")
6284 .into_iter()
6285 .flat_map(|maybe_contents| {
6286 maybe_contents
6287 .expect("preceding checkpoint contents must exist by end of epoch")
6288 .into_inner()
6289 .into_iter()
6290 .map(|digests| digests.transaction)
6291 })
6292 .collect();
6293 let included_execution_time_observations: HashSet<_> = self
6294 .get_transaction_cache_reader()
6295 .multi_get_transaction_blocks(&tx_digests)
6296 .into_iter()
6297 .flat_map(|maybe_tx| {
6298 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6299 .expect("preceding transaction must exist by end of epoch")
6300 .transaction_data()
6301 .kind()
6302 {
6303 #[allow(clippy::unnecessary_to_owned)]
6304 itertools::Either::Left(
6305 ptb.commands
6306 .to_owned()
6307 .into_iter()
6308 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6309 )
6310 } else {
6311 itertools::Either::Right(std::iter::empty())
6312 }
6313 })
6314 .chain(end_of_epoch_observation_keys.into_iter())
6315 .collect();
6316
6317 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6318 epoch_store
6319 .get_end_of_epoch_execution_time_observations()
6320 .filter_and_sort_v1(
6321 |(key, _)| included_execution_time_observations.contains(key),
6322 params.stored_observations_limit.try_into().unwrap(),
6323 ),
6324 );
6325 info!("Creating StoreExecutionTimeObservations tx");
6326 Some(tx)
6327 }
6328
6329 #[instrument(level = "error", skip_all)]
6340 pub async fn create_and_execute_advance_epoch_tx(
6341 &self,
6342 epoch_store: &Arc<AuthorityPerEpochStore>,
6343 gas_cost_summary: &GasCostSummary,
6344 checkpoint: CheckpointSequenceNumber,
6345 epoch_start_timestamp_ms: CheckpointTimestamp,
6346 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6347 last_checkpoint: CheckpointSequenceNumber,
6350 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6351 let mut txns = Vec::new();
6352
6353 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6354 txns.push(tx);
6355 }
6356 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6357 txns.push(tx);
6358 }
6359 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6360 txns.push(tx);
6361 }
6362 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6363 txns.push(tx);
6364 }
6365 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6366 txns.push(tx);
6367 }
6368 if let Some(tx) = self.create_execution_time_observations_tx(
6369 epoch_store,
6370 end_of_epoch_observation_keys,
6371 last_checkpoint,
6372 ) {
6373 txns.push(tx);
6374 }
6375 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6376 txns.push(tx);
6377 }
6378
6379 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6380 txns.push(tx);
6381 }
6382 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6383 txns.push(tx);
6384 }
6385 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6386 txns.push(tx);
6387 }
6388 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6389 txns.push(tx);
6390 }
6391
6392 let next_epoch = epoch_store.epoch() + 1;
6393
6394 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6395
6396 let (next_epoch_protocol_version, next_epoch_system_packages) =
6397 Self::choose_protocol_version_and_system_packages_v2(
6398 epoch_store.protocol_version(),
6399 epoch_store.protocol_config(),
6400 epoch_store.committee(),
6401 epoch_store
6402 .get_capabilities_v2()
6403 .expect("read capabilities from db cannot fail"),
6404 buffer_stake_bps,
6405 );
6406
6407 let config = epoch_store.protocol_config();
6410 let binary_config = config.binary_config(None);
6411 let Some(next_epoch_system_package_bytes) = self
6412 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6413 .await
6414 else {
6415 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6416 debug_fatal!(
6420 "upgraded system packages {:?} are not locally available, cannot create \
6421 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6422 next_epoch_system_packages
6423 );
6424 } else {
6425 error!(
6426 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6427 next_epoch_protocol_version
6428 );
6429 }
6430 return Err(CheckpointBuilderError::SystemPackagesMissing);
6439 };
6440
6441 let tx = if epoch_store
6442 .protocol_config()
6443 .end_of_epoch_transaction_supported()
6444 {
6445 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6446 next_epoch,
6447 next_epoch_protocol_version,
6448 gas_cost_summary.storage_cost,
6449 gas_cost_summary.computation_cost,
6450 gas_cost_summary.storage_rebate,
6451 gas_cost_summary.non_refundable_storage_fee,
6452 epoch_start_timestamp_ms,
6453 next_epoch_system_package_bytes,
6454 ));
6455
6456 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6457 } else {
6458 VerifiedTransaction::new_change_epoch(
6459 next_epoch,
6460 next_epoch_protocol_version,
6461 gas_cost_summary.storage_cost,
6462 gas_cost_summary.computation_cost,
6463 gas_cost_summary.storage_rebate,
6464 gas_cost_summary.non_refundable_storage_fee,
6465 epoch_start_timestamp_ms,
6466 next_epoch_system_package_bytes,
6467 )
6468 };
6469
6470 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6471 tx.clone(),
6472 epoch_store.epoch(),
6473 checkpoint,
6474 );
6475
6476 let tx_digest = executable_tx.digest();
6477
6478 info!(
6479 ?next_epoch,
6480 ?next_epoch_protocol_version,
6481 ?next_epoch_system_packages,
6482 computation_cost=?gas_cost_summary.computation_cost,
6483 storage_cost=?gas_cost_summary.storage_cost,
6484 storage_rebate=?gas_cost_summary.storage_rebate,
6485 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6486 ?tx_digest,
6487 "Creating advance epoch transaction"
6488 );
6489
6490 fail_point_async!("change_epoch_tx_delay");
6491 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6492
6493 if self
6496 .get_transaction_cache_reader()
6497 .is_tx_already_executed(tx_digest)
6498 {
6499 warn!("change epoch tx has already been executed via state sync");
6500 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6501 }
6502
6503 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6504 else {
6505 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6506 };
6507
6508 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6511 self.get_object_cache_reader().as_ref(),
6512 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6513 )?;
6514
6515 assert_eq!(assigned_versions.0.len(), 1);
6516 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6517
6518 let input_objects = self.read_objects_for_execution(
6519 &tx_lock,
6520 &executable_tx,
6521 &assigned_versions,
6522 epoch_store,
6523 )?;
6524
6525 let (transaction_outputs, _timings, _execution_error_opt) = self
6526 .execute_certificate(
6527 &execution_guard,
6528 &executable_tx,
6529 input_objects,
6530 None,
6531 ExecutionEnv::default(),
6532 epoch_store,
6533 )
6534 .unwrap();
6535 let system_obj = get_sui_system_state(&transaction_outputs.written)
6536 .expect("change epoch tx must write to system object");
6537
6538 let effects = transaction_outputs.effects;
6539 self.get_state_sync_store()
6543 .insert_transaction_and_effects(&tx, &effects);
6544
6545 info!(
6546 "Effects summary of the change epoch transaction: {:?}",
6547 effects.summary_for_debug()
6548 );
6549 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6550 assert!(effects.status().is_ok());
6552 Ok((system_obj, effects))
6553 }
6554
6555 #[instrument(level = "error", skip_all)]
6556 async fn reopen_epoch_db(
6557 &self,
6558 cur_epoch_store: &AuthorityPerEpochStore,
6559 new_committee: Committee,
6560 epoch_start_configuration: EpochStartConfiguration,
6561 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6562 epoch_last_checkpoint: CheckpointSequenceNumber,
6563 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6564 let new_epoch = new_committee.epoch;
6565 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6566 assert_eq!(
6567 epoch_start_configuration.epoch_start_state().epoch(),
6568 new_committee.epoch
6569 );
6570 fail_point!("before-open-new-epoch-store");
6571 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6572 self.name,
6573 new_committee,
6574 epoch_start_configuration,
6575 self.get_backing_package_store().clone(),
6576 self.get_object_store().clone(),
6577 expensive_safety_check_config,
6578 epoch_last_checkpoint,
6579 )?;
6580 self.epoch_store.store(new_epoch_store.clone());
6581 Ok(new_epoch_store)
6582 }
6583
6584 #[cfg(test)]
6585 pub(crate) fn iter_live_object_set_for_testing(
6586 &self,
6587 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6588 let include_wrapped_object = !self
6589 .epoch_store_for_testing()
6590 .protocol_config()
6591 .simplified_unwrap_then_delete();
6592 self.get_global_state_hash_store()
6593 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6594 }
6595
6596 #[cfg(test)]
6597 pub(crate) fn shutdown_execution_for_test(&self) {
6598 self.tx_execution_shutdown
6599 .lock()
6600 .take()
6601 .unwrap()
6602 .send(())
6603 .unwrap();
6604 }
6605
6606 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6608 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6609 self.get_object_cache_reader()
6610 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6611 self.get_reconfig_api()
6612 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6613 Ok(())
6614 }
6615}
6616
6617pub struct RandomnessRoundReceiver {
6618 authority_state: Arc<AuthorityState>,
6619 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6620}
6621
6622impl RandomnessRoundReceiver {
6623 pub fn spawn(
6624 authority_state: Arc<AuthorityState>,
6625 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6626 ) -> JoinHandle<()> {
6627 let rrr = RandomnessRoundReceiver {
6628 authority_state,
6629 randomness_rx,
6630 };
6631 spawn_monitored_task!(rrr.run())
6632 }
6633
6634 async fn run(mut self) {
6635 info!("RandomnessRoundReceiver event loop started");
6636
6637 loop {
6638 tokio::select! {
6639 maybe_recv = self.randomness_rx.recv() => {
6640 if let Some((epoch, round, bytes)) = maybe_recv {
6641 self.handle_new_randomness(epoch, round, bytes).await;
6642 } else {
6643 break;
6644 }
6645 },
6646 }
6647 }
6648
6649 info!("RandomnessRoundReceiver event loop ended");
6650 }
6651
6652 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6653 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6654 fail_point_async!("randomness-delay");
6655
6656 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6657 if epoch_store.epoch() != epoch {
6658 warn!(
6659 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6660 epoch_store.epoch()
6661 );
6662 return;
6663 }
6664 let key = TransactionKey::RandomnessRound(epoch, round);
6665 let transaction = VerifiedTransaction::new_randomness_state_update(
6666 epoch,
6667 round,
6668 bytes,
6669 epoch_store
6670 .epoch_start_config()
6671 .randomness_obj_initial_shared_version()
6672 .expect("randomness state obj must exist"),
6673 );
6674 debug!(
6675 "created randomness state update transaction with digest: {:?}",
6676 transaction.digest()
6677 );
6678 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6679 let digest = *transaction.digest();
6680
6681 self.authority_state
6686 .get_cache_commit()
6687 .persist_transaction(&transaction);
6688
6689 if epoch_store.insert_tx_key(key, digest).is_err() {
6691 warn!("epoch ended while handling new randomness");
6692 }
6693
6694 let authority_state = self.authority_state.clone();
6695 spawn_monitored_task!(async move {
6696 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6703 let result = tokio::time::timeout(
6704 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6705 authority_state
6706 .get_transaction_cache_reader()
6707 .notify_read_executed_effects(
6708 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6709 &[digest],
6710 ),
6711 )
6712 .await;
6713 let mut effects = match result {
6714 Ok(result) => result,
6715 Err(_) => {
6716 debug_fatal_no_invariant!(
6718 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6719 );
6720 authority_state
6722 .get_transaction_cache_reader()
6723 .notify_read_executed_effects(
6724 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6725 &[digest],
6726 )
6727 .await
6728 }
6729 };
6730
6731 let effects = effects.pop().expect("should return effects");
6732 if *effects.status() != ExecutionStatus::Success {
6733 fatal!(
6734 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6735 );
6736 }
6737 debug!(
6738 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6739 );
6740 });
6741 }
6742}
6743
6744#[async_trait]
6745impl TransactionKeyValueStoreTrait for AuthorityState {
6746 #[instrument(skip(self))]
6747 async fn multi_get(
6748 &self,
6749 transactions: &[TransactionDigest],
6750 effects: &[TransactionDigest],
6751 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6752 let txns = if !transactions.is_empty() {
6753 self.get_transaction_cache_reader()
6754 .multi_get_transaction_blocks(transactions)
6755 .into_iter()
6756 .map(|t| t.map(|t| (*t).clone().into_inner()))
6757 .collect()
6758 } else {
6759 vec![]
6760 };
6761
6762 let fx = if !effects.is_empty() {
6763 self.get_transaction_cache_reader()
6764 .multi_get_executed_effects(effects)
6765 } else {
6766 vec![]
6767 };
6768
6769 Ok((txns, fx))
6770 }
6771
6772 #[instrument(skip(self))]
6773 async fn multi_get_checkpoints(
6774 &self,
6775 checkpoint_summaries: &[CheckpointSequenceNumber],
6776 checkpoint_contents: &[CheckpointSequenceNumber],
6777 checkpoint_summaries_by_digest: &[CheckpointDigest],
6778 ) -> SuiResult<(
6779 Vec<Option<CertifiedCheckpointSummary>>,
6780 Vec<Option<CheckpointContents>>,
6781 Vec<Option<CertifiedCheckpointSummary>>,
6782 )> {
6783 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6785 let store = self.get_checkpoint_store();
6786 for seq in checkpoint_summaries {
6787 let checkpoint = store
6788 .get_checkpoint_by_sequence_number(*seq)?
6789 .map(|c| c.into_inner());
6790
6791 summaries.push(checkpoint);
6792 }
6793
6794 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6795 for seq in checkpoint_contents {
6796 let checkpoint = store
6797 .get_checkpoint_by_sequence_number(*seq)?
6798 .and_then(|summary| {
6799 store
6800 .get_checkpoint_contents(&summary.content_digest)
6801 .expect("db read cannot fail")
6802 });
6803 contents.push(checkpoint);
6804 }
6805
6806 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6807 for digest in checkpoint_summaries_by_digest {
6808 let checkpoint = store
6809 .get_checkpoint_by_digest(digest)?
6810 .map(|c| c.into_inner());
6811 summaries_by_digest.push(checkpoint);
6812 }
6813 Ok((summaries, contents, summaries_by_digest))
6814 }
6815
6816 #[instrument(skip(self))]
6817 async fn deprecated_get_transaction_checkpoint(
6818 &self,
6819 digest: TransactionDigest,
6820 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6821 Ok(self
6822 .get_checkpoint_cache()
6823 .deprecated_get_transaction_checkpoint(&digest)
6824 .map(|(_epoch, checkpoint)| checkpoint))
6825 }
6826
6827 #[instrument(skip(self))]
6828 async fn get_object(
6829 &self,
6830 object_id: ObjectID,
6831 version: VersionNumber,
6832 ) -> SuiResult<Option<Object>> {
6833 Ok(self
6834 .get_object_cache_reader()
6835 .get_object_by_key(&object_id, version))
6836 }
6837
6838 #[instrument(skip_all)]
6839 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6840 Ok(self
6841 .get_object_cache_reader()
6842 .multi_get_objects_by_key(object_keys))
6843 }
6844
6845 #[instrument(skip(self))]
6846 async fn multi_get_transaction_checkpoint(
6847 &self,
6848 digests: &[TransactionDigest],
6849 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6850 let res = self
6851 .get_checkpoint_cache()
6852 .deprecated_multi_get_transaction_checkpoint(digests);
6853
6854 Ok(res
6855 .into_iter()
6856 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6857 .collect())
6858 }
6859
6860 #[instrument(skip(self))]
6861 async fn multi_get_events_by_tx_digests(
6862 &self,
6863 digests: &[TransactionDigest],
6864 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6865 if digests.is_empty() {
6866 return Ok(vec![]);
6867 }
6868
6869 Ok(self
6870 .get_transaction_cache_reader()
6871 .multi_get_events(digests))
6872 }
6873}
6874
6875#[cfg(msim)]
6876pub mod framework_injection {
6877 use move_binary_format::CompiledModule;
6878 use std::collections::BTreeMap;
6879 use std::{cell::RefCell, collections::BTreeSet};
6880 use sui_framework::{BuiltInFramework, SystemPackage};
6881 use sui_types::base_types::{AuthorityName, ObjectID};
6882 use sui_types::is_system_package;
6883
6884 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6885
6886 thread_local! {
6888 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6889 }
6890
6891 type Framework = Vec<CompiledModule>;
6892
6893 pub type PackageUpgradeCallback =
6894 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6895
6896 enum PackageOverrideConfig {
6897 Global(Framework),
6898 PerValidator(PackageUpgradeCallback),
6899 }
6900
6901 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6902 modules
6903 .iter()
6904 .map(|m| {
6905 let mut buf = Vec::new();
6906 m.serialize_with_version(m.version, &mut buf).unwrap();
6907 buf
6908 })
6909 .collect()
6910 }
6911
6912 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6913 OVERRIDE.with(|bs| {
6914 bs.borrow_mut()
6915 .insert(package_id, PackageOverrideConfig::Global(modules))
6916 });
6917 }
6918
6919 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6920 OVERRIDE.with(|bs| {
6921 bs.borrow_mut()
6922 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6923 });
6924 }
6925
6926 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6927 OVERRIDE.with(|bs| {
6928 let mut new_packages_not_to_include: BTreeSet<_> =
6929 BuiltInFramework::all_package_ids().into_iter().collect();
6930 for pkg in &packages {
6931 new_packages_not_to_include.remove(&pkg.id);
6932 }
6933 for pkg in packages {
6934 bs.borrow_mut()
6935 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6936 }
6937 for empty_pkg in new_packages_not_to_include {
6938 bs.borrow_mut()
6939 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6940 }
6941 });
6942 }
6943
6944 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6945 OVERRIDE.with(|cfg| {
6946 cfg.borrow().get(package_id).and_then(|entry| match entry {
6947 PackageOverrideConfig::Global(framework) => {
6948 Some(compiled_modules_to_bytes(framework))
6949 }
6950 PackageOverrideConfig::PerValidator(func) => {
6951 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6952 }
6953 })
6954 })
6955 }
6956
6957 pub fn get_override_modules(
6958 package_id: &ObjectID,
6959 name: AuthorityName,
6960 ) -> Option<Vec<CompiledModule>> {
6961 OVERRIDE.with(|cfg| {
6962 cfg.borrow().get(package_id).and_then(|entry| match entry {
6963 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6964 PackageOverrideConfig::PerValidator(func) => func(name),
6965 })
6966 })
6967 }
6968
6969 pub fn get_override_system_package(
6970 package_id: &ObjectID,
6971 name: AuthorityName,
6972 ) -> Option<SystemPackage> {
6973 let bytes = get_override_bytes(package_id, name)?;
6974 let dependencies = if is_system_package(*package_id) {
6975 BuiltInFramework::get_package_by_id(package_id)
6976 .dependencies
6977 .to_vec()
6978 } else {
6979 BuiltInFramework::all_package_ids()
6981 };
6982 Some(SystemPackage {
6983 id: *package_id,
6984 bytes,
6985 dependencies,
6986 })
6987 }
6988
6989 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6990 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6991 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6992 cfg.borrow()
6993 .keys()
6994 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6995 .collect()
6996 });
6997
6998 extra
6999 .into_iter()
7000 .map(|package| SystemPackage {
7001 id: package,
7002 bytes: get_override_bytes(&package, name).unwrap(),
7003 dependencies: BuiltInFramework::all_package_ids(),
7004 })
7005 .collect()
7006 }
7007}
7008
7009#[derive(Debug, Serialize, Deserialize, Clone)]
7010pub struct ObjDumpFormat {
7011 pub id: ObjectID,
7012 pub version: VersionNumber,
7013 pub digest: ObjectDigest,
7014 pub object: Object,
7015}
7016
7017impl ObjDumpFormat {
7018 fn new(object: Object) -> Self {
7019 let oref = object.compute_object_reference();
7020 Self {
7021 id: oref.0,
7022 version: oref.1,
7023 digest: oref.2,
7024 object,
7025 }
7026 }
7027}
7028
7029#[derive(Debug, Serialize, Deserialize, Clone)]
7030pub struct NodeStateDump {
7031 pub tx_digest: TransactionDigest,
7032 pub sender_signed_data: SenderSignedData,
7033 pub executed_epoch: u64,
7034 pub reference_gas_price: u64,
7035 pub protocol_version: u64,
7036 pub epoch_start_timestamp_ms: u64,
7037 pub computed_effects: TransactionEffects,
7038 pub expected_effects_digest: TransactionEffectsDigest,
7039 pub relevant_system_packages: Vec<ObjDumpFormat>,
7040 pub shared_objects: Vec<ObjDumpFormat>,
7041 pub loaded_child_objects: Vec<ObjDumpFormat>,
7042 pub modified_at_versions: Vec<ObjDumpFormat>,
7043 pub runtime_reads: Vec<ObjDumpFormat>,
7044 pub input_objects: Vec<ObjDumpFormat>,
7045}
7046
7047impl NodeStateDump {
7048 pub fn new(
7049 tx_digest: &TransactionDigest,
7050 effects: &TransactionEffects,
7051 expected_effects_digest: TransactionEffectsDigest,
7052 object_store: &dyn ObjectStore,
7053 epoch_store: &Arc<AuthorityPerEpochStore>,
7054 inner_temporary_store: &InnerTemporaryStore,
7055 certificate: &VerifiedExecutableTransaction,
7056 ) -> SuiResult<Self> {
7057 let executed_epoch = epoch_store.epoch();
7059 let reference_gas_price = epoch_store.reference_gas_price();
7060 let epoch_start_config = epoch_store.epoch_start_config();
7061 let protocol_version = epoch_store.protocol_version().as_u64();
7062 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7063
7064 let mut relevant_system_packages = Vec::new();
7066 for sys_package_id in BuiltInFramework::all_package_ids() {
7067 if let Some(w) = object_store.get_object(&sys_package_id) {
7068 relevant_system_packages.push(ObjDumpFormat::new(w))
7069 }
7070 }
7071
7072 let mut shared_objects = Vec::new();
7074 for kind in effects.input_consensus_objects() {
7075 match kind {
7076 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7077 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7078 shared_objects.push(ObjDumpFormat::new(w))
7079 }
7080 }
7081 InputConsensusObject::ReadConsensusStreamEnded(..)
7082 | InputConsensusObject::MutateConsensusStreamEnded(..)
7083 | InputConsensusObject::Cancelled(..) => (), }
7085 }
7086
7087 let mut loaded_child_objects = Vec::new();
7090 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7091 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7092 loaded_child_objects.push(ObjDumpFormat::new(w))
7093 }
7094 }
7095
7096 let mut modified_at_versions = Vec::new();
7098 for (id, ver) in effects.modified_at_versions() {
7099 if let Some(w) = object_store.get_object_by_key(&id, ver) {
7100 modified_at_versions.push(ObjDumpFormat::new(w))
7101 }
7102 }
7103
7104 let mut runtime_reads = Vec::new();
7107 for obj in inner_temporary_store
7108 .runtime_packages_loaded_from_db
7109 .values()
7110 {
7111 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7112 }
7113
7114 Ok(Self {
7117 tx_digest: *tx_digest,
7118 executed_epoch,
7119 reference_gas_price,
7120 epoch_start_timestamp_ms,
7121 protocol_version,
7122 relevant_system_packages,
7123 shared_objects,
7124 loaded_child_objects,
7125 modified_at_versions,
7126 runtime_reads,
7127 sender_signed_data: certificate.clone().into_message(),
7128 input_objects: inner_temporary_store
7129 .input_objects
7130 .values()
7131 .map(|o| ObjDumpFormat::new(o.clone()))
7132 .collect(),
7133 computed_effects: effects.clone(),
7134 expected_effects_digest,
7135 })
7136 }
7137
7138 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7139 let mut objects = Vec::new();
7140 objects.extend(self.relevant_system_packages.clone());
7141 objects.extend(self.shared_objects.clone());
7142 objects.extend(self.loaded_child_objects.clone());
7143 objects.extend(self.modified_at_versions.clone());
7144 objects.extend(self.runtime_reads.clone());
7145 objects.extend(self.input_objects.clone());
7146 objects
7147 }
7148
7149 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7150 let file_name = format!(
7151 "{}_{}_NODE_DUMP.json",
7152 self.tx_digest,
7153 AuthorityState::unixtime_now_ms()
7154 );
7155 let mut path = path.to_path_buf();
7156 path.push(&file_name);
7157 let mut file = File::create(path.clone())?;
7158 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7159 Ok(path)
7160 }
7161
7162 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7163 let file = File::open(path)?;
7164 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7165 }
7166}