1use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::consensus_adapter::ConsensusOverloadChecker;
15use crate::execution_cache::ExecutionCacheTraitPointers;
16use crate::execution_cache::TransactionCacheRead;
17use crate::execution_cache::writeback_cache::WritebackCache;
18use crate::execution_scheduler::ExecutionScheduler;
19use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::{assert_reachable, fatal};
39use parking_lot::Mutex;
40use prometheus::{
41 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
42 register_histogram_vec_with_registry, register_histogram_with_registry,
43 register_int_counter_vec_with_registry, register_int_counter_with_registry,
44 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
45};
46use serde::de::DeserializeOwned;
47use serde::{Deserialize, Serialize};
48use shared_object_version_manager::AssignedVersions;
49use shared_object_version_manager::Schedulable;
50use std::collections::BTreeMap;
51use std::collections::BTreeSet;
52use std::fs::File;
53use std::io::Write;
54use std::path::{Path, PathBuf};
55use std::sync::atomic::Ordering;
56use std::time::Duration;
57use std::time::Instant;
58use std::time::SystemTime;
59use std::time::UNIX_EPOCH;
60use std::{
61 collections::{HashMap, HashSet},
62 fs,
63 pin::Pin,
64 str::FromStr,
65 sync::Arc,
66 vec,
67};
68use sui_config::NodeConfig;
69use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
70use sui_execution::Executor;
71use sui_protocol_config::PerObjectCongestionControlMode;
72use sui_types::accumulator_root::AccumulatorObjId;
73use sui_types::crypto::RandomnessRound;
74use sui_types::dynamic_field::visitor as DFV;
75use sui_types::execution::ExecutionOutput;
76use sui_types::execution::ExecutionTimeObservationKey;
77use sui_types::execution::ExecutionTiming;
78use sui_types::execution_params::ExecutionOrEarlyError;
79use sui_types::execution_params::FundsWithdrawStatus;
80use sui_types::execution_params::get_early_execution_error;
81use sui_types::execution_status::ExecutionStatus;
82use sui_types::inner_temporary_store::PackageStoreWithFallback;
83use sui_types::layout_resolver::LayoutResolver;
84use sui_types::layout_resolver::into_struct_layout;
85use sui_types::messages_consensus::AuthorityCapabilitiesV2;
86use sui_types::object::bounded_visitor::BoundedVisitor;
87use sui_types::storage::ChildObjectResolver;
88use sui_types::storage::InputKey;
89use sui_types::storage::TrackingBackingStore;
90use sui_types::traffic_control::{
91 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
92};
93use sui_types::transaction_executor::SimulateTransactionResult;
94use sui_types::transaction_executor::TransactionChecks;
95use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
96use tap::TapFallible;
97use tokio::sync::RwLock;
98use tokio::sync::mpsc::unbounded_channel;
99use tokio::sync::watch::error::RecvError;
100use tokio::sync::{mpsc, oneshot};
101use tokio::task::JoinHandle;
102use tokio::time::timeout;
103use tracing::{debug, error, info, instrument, warn};
104
105use self::authority_store::ExecutionLockWriteGuard;
106use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
107pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
108use mysten_metrics::{monitored_scope, spawn_monitored_task};
109
110use crate::jsonrpc_index::IndexStore;
111use crate::jsonrpc_index::{
112 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
113};
114use mysten_common::{debug_fatal, debug_fatal_no_invariant};
115use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
116use sui_config::genesis::Genesis;
117use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
118use sui_framework::{BuiltInFramework, SystemPackage};
119use sui_json_rpc_types::{
120 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
121 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
122 SuiTransactionBlockEvents, TransactionFilter,
123};
124use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
125use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
126use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
127use sui_types::accumulator_root::AccumulatorValue;
128use sui_types::authenticator_state::get_authenticator_state;
129use sui_types::balance::Balance;
130use sui_types::coin_reservation;
131use sui_types::committee::{EpochId, ProtocolVersion};
132use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
133use sui_types::deny_list_v1::check_coin_deny_list_v1;
134use sui_types::digests::ChainIdentifier;
135use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
136use sui_types::effects::{
137 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
138 TransactionEvents, VerifiedSignedTransactionEffects,
139};
140use sui_types::error::{ExecutionError, ExecutionErrorTrait, SuiErrorKind, UserInputError};
141use sui_types::event::{Event, EventID};
142use sui_types::executable_transaction::VerifiedExecutableTransaction;
143use sui_types::execution_status::ExecutionErrorKind;
144use sui_types::gas::{GasCostSummary, SuiGasStatus};
145use sui_types::inner_temporary_store::{
146 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
147};
148use sui_types::message_envelope::Message;
149use sui_types::messages_checkpoint::{
150 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
151 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
152 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
153 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
154};
155use sui_types::messages_grpc::{
156 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
157 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
158};
159use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
160use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
161use sui_types::signature::GenericSignature;
162use sui_types::storage::{
163 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
164};
165use sui_types::sui_system_state::SuiSystemStateTrait;
166use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
167use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
168use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
169use sui_types::{
170 SUI_SYSTEM_ADDRESS,
171 base_types::*,
172 committee::Committee,
173 crypto::AuthoritySignature,
174 error::{SuiError, SuiResult},
175 object::{Object, ObjectRead},
176 transaction::*,
177};
178use sui_types::{TypeTag, is_system_package};
179use typed_store::TypedStoreError;
180use typed_store::rocks::StagedBatch;
181
182use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
183use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
184use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
185use crate::authority::authority_store_pruner::{
186 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
187};
188use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
189use crate::authority::epoch_start_configuration::EpochStartConfiguration;
190use crate::checkpoints::CheckpointStore;
191use crate::epoch::committee_store::CommitteeStore;
192use crate::execution_cache::{
193 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
194 ObjectCacheRead, StateSyncAPI,
195};
196use crate::execution_driver::execution_process;
197use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
198use crate::metrics::LatencyObserver;
199use crate::metrics::RateTracker;
200use crate::module_cache_metrics::ResolverMetrics;
201use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
202use crate::stake_aggregator::StakeAggregator;
203use crate::subscription_handler::SubscriptionHandler;
204use crate::transaction_input_loader::TransactionInputLoader;
205
206#[cfg(msim)]
207pub use crate::checkpoints::checkpoint_executor::utils::{
208 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
209};
210
211#[cfg(msim)]
212use sui_types::committee::CommitteeTrait;
213use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
214
215#[cfg(test)]
216#[path = "unit_tests/authority_tests.rs"]
217pub mod authority_tests;
218
219#[cfg(test)]
220#[path = "unit_tests/transaction_tests.rs"]
221pub mod transaction_tests;
222
223#[cfg(test)]
224#[path = "unit_tests/batch_transaction_tests.rs"]
225mod batch_transaction_tests;
226
227#[cfg(test)]
228#[path = "unit_tests/move_integration_tests.rs"]
229pub mod move_integration_tests;
230
231#[cfg(test)]
232#[path = "unit_tests/gas_tests.rs"]
233mod gas_tests;
234
235#[cfg(test)]
236#[path = "unit_tests/batch_verification_tests.rs"]
237mod batch_verification_tests;
238
239#[cfg(test)]
240#[path = "unit_tests/coin_deny_list_tests.rs"]
241mod coin_deny_list_tests;
242
243#[cfg(test)]
244#[path = "unit_tests/auth_unit_test_utils.rs"]
245pub mod auth_unit_test_utils;
246
247pub mod authority_test_utils;
248
249pub mod authority_per_epoch_store;
250pub mod authority_per_epoch_store_pruner;
251
252pub mod authority_store_pruner;
253pub mod authority_store_tables;
254pub mod authority_store_types;
255pub mod congestion_log;
256pub mod consensus_tx_status_cache;
257pub(crate) mod epoch_marker_key;
258pub mod epoch_start_configuration;
259pub mod execution_time_estimator;
260pub mod shared_object_congestion_tracker;
261pub mod shared_object_version_manager;
262pub mod submitted_transaction_cache;
263pub mod test_authority_builder;
264pub mod transaction_deferral;
265pub mod transaction_reject_reason_cache;
266mod weighted_moving_average;
267
268pub(crate) mod authority_store;
269pub mod backpressure;
270
271pub struct AuthorityMetrics {
273 tx_orders: IntCounter,
274 total_certs: IntCounter,
275 total_effects: IntCounter,
276 pub shared_obj_tx: IntCounter,
278 sponsored_tx: IntCounter,
279 tx_already_processed: IntCounter,
280 num_input_objs: Histogram,
281 num_shared_objects: Histogram,
283 batch_size: Histogram,
284
285 authority_state_handle_vote_transaction_latency: Histogram,
286
287 internal_execution_latency: Histogram,
288 execution_load_input_objects_latency: Histogram,
289 prepare_certificate_latency: Histogram,
290 commit_certificate_latency: Histogram,
291 db_checkpoint_latency: Histogram,
292
293 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
295 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
296 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
297 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
298
299 pub(crate) execution_driver_executed_transactions: IntCounter,
300 pub(crate) execution_driver_paused_transactions: IntCounter,
301 pub(crate) execution_driver_dispatch_queue: IntGauge,
302 pub(crate) execution_queueing_delay_s: Histogram,
303 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
304 pub(crate) execution_gas_latency_ratio: Histogram,
305
306 pub(crate) skipped_consensus_txns: IntCounter,
307 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
308 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
309
310 pub(crate) authority_overload_status: IntGauge,
311 pub(crate) authority_load_shedding_percentage: IntGauge,
312
313 pub(crate) transaction_overload_sources: IntCounterVec,
314
315 post_processing_total_events_emitted: IntCounter,
317 post_processing_total_tx_indexed: IntCounter,
318 post_processing_total_tx_had_event_processed: IntCounter,
319 post_processing_total_failures: IntCounter,
320
321 pub consensus_handler_processed: IntCounterVec,
323 pub consensus_handler_transaction_sizes: HistogramVec,
324 pub consensus_handler_num_low_scoring_authorities: IntGauge,
325 pub consensus_handler_scores: IntGaugeVec,
326 pub consensus_handler_deferred_transactions: IntCounter,
327 pub consensus_handler_congested_transactions: IntCounter,
328 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
329 pub consensus_handler_cancelled_transactions: IntCounter,
330 pub consensus_handler_max_object_costs: IntGaugeVec,
331 pub consensus_committed_subdags: IntCounterVec,
332 pub consensus_committed_messages: IntGaugeVec,
333 pub consensus_committed_user_transactions: IntGaugeVec,
334 pub consensus_finalized_user_transactions: IntGaugeVec,
335 pub consensus_rejected_user_transactions: IntGaugeVec,
336 pub consensus_calculated_throughput: IntGauge,
337 pub consensus_calculated_throughput_profile: IntGauge,
338 pub consensus_block_handler_block_processed: IntCounter,
339 pub consensus_block_handler_txn_processed: IntCounterVec,
340 pub consensus_block_handler_fastpath_executions: IntCounter,
341 pub consensus_timestamp_bias: Histogram,
342
343 pub limits_metrics: Arc<LimitsMetrics>,
344
345 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
347
348 pub zklogin_sig_count: IntCounter,
350 pub multisig_sig_count: IntCounter,
352
353 pub execution_queueing_latency: LatencyObserver,
356
357 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
364
365 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
368}
369
370const POSITIVE_INT_BUCKETS: &[f64] = &[
372 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
373 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
374 7000000., 10000000.,
375];
376
377const LATENCY_SEC_BUCKETS: &[f64] = &[
378 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
379 10., 20., 30., 60., 90.,
380];
381
382const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
384 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
385 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
386];
387
388const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
391 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
392 2.0, 10.0, 30.0,
393];
394
395const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
396 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
397 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
398];
399
400pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
401
402pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
406
407impl AuthorityMetrics {
408 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
409 Self {
410 tx_orders: register_int_counter_with_registry!(
411 "total_transaction_orders",
412 "Total number of transaction orders",
413 registry,
414 )
415 .unwrap(),
416 total_certs: register_int_counter_with_registry!(
417 "total_transaction_certificates",
418 "Total number of transaction certificates handled",
419 registry,
420 )
421 .unwrap(),
422 total_effects: register_int_counter_with_registry!(
424 "total_transaction_effects",
425 "Total number of transaction effects produced",
426 registry,
427 )
428 .unwrap(),
429
430 shared_obj_tx: register_int_counter_with_registry!(
431 "num_shared_obj_tx",
432 "Number of transactions involving shared objects",
433 registry,
434 )
435 .unwrap(),
436
437 sponsored_tx: register_int_counter_with_registry!(
438 "num_sponsored_tx",
439 "Number of sponsored transactions",
440 registry,
441 )
442 .unwrap(),
443
444 tx_already_processed: register_int_counter_with_registry!(
445 "num_tx_already_processed",
446 "Number of transaction orders already processed previously",
447 registry,
448 )
449 .unwrap(),
450 num_input_objs: register_histogram_with_registry!(
451 "num_input_objects",
452 "Distribution of number of input TX objects per TX",
453 POSITIVE_INT_BUCKETS.to_vec(),
454 registry,
455 )
456 .unwrap(),
457 num_shared_objects: register_histogram_with_registry!(
458 "num_shared_objects",
459 "Number of shared input objects per TX",
460 POSITIVE_INT_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 batch_size: register_histogram_with_registry!(
465 "batch_size",
466 "Distribution of size of transaction batch",
467 POSITIVE_INT_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
472 "authority_state_handle_vote_transaction_latency",
473 "Latency of voting on transactions without signing",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 internal_execution_latency: register_histogram_with_registry!(
479 "authority_state_internal_execution_latency",
480 "Latency of actual certificate executions",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 execution_load_input_objects_latency: register_histogram_with_registry!(
486 "authority_state_execution_load_input_objects_latency",
487 "Latency of loading input objects for execution",
488 LOW_LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 )
491 .unwrap(),
492 prepare_certificate_latency: register_histogram_with_registry!(
493 "authority_state_prepare_certificate_latency",
494 "Latency of executing certificates, before committing the results",
495 LATENCY_SEC_BUCKETS.to_vec(),
496 registry,
497 )
498 .unwrap(),
499 commit_certificate_latency: register_histogram_with_registry!(
500 "authority_state_commit_certificate_latency",
501 "Latency of committing certificate execution results",
502 LATENCY_SEC_BUCKETS.to_vec(),
503 registry,
504 )
505 .unwrap(),
506 db_checkpoint_latency: register_histogram_with_registry!(
507 "db_checkpoint_latency",
508 "Latency of checkpointing dbs",
509 LATENCY_SEC_BUCKETS.to_vec(),
510 registry,
511 ).unwrap(),
512 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
513 "transaction_manager_num_enqueued_certificates",
514 "Current number of certificates enqueued to ExecutionScheduler",
515 &["result"],
516 registry,
517 )
518 .unwrap(),
519 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
520 "transaction_manager_num_pending_certificates",
521 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
522 registry,
523 )
524 .unwrap(),
525 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
526 "transaction_manager_num_executing_certificates",
527 "Number of executing certificates, including queued and actually running certificates",
528 registry,
529 )
530 .unwrap(),
531 authority_overload_status: register_int_gauge_with_registry!(
532 "authority_overload_status",
533 "Whether authority is current experiencing overload and enters load shedding mode.",
534 registry)
535 .unwrap(),
536 authority_load_shedding_percentage: register_int_gauge_with_registry!(
537 "authority_load_shedding_percentage",
538 "The percentage of transactions is shed when the authority is in load shedding mode.",
539 registry)
540 .unwrap(),
541 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
542 "transaction_manager_transaction_queue_age_s",
543 "Time spent in waiting for transaction in the queue",
544 LATENCY_SEC_BUCKETS.to_vec(),
545 registry,
546 )
547 .unwrap(),
548 transaction_overload_sources: register_int_counter_vec_with_registry!(
549 "transaction_overload_sources",
550 "Number of times each source indicates transaction overload.",
551 &["source"],
552 registry)
553 .unwrap(),
554 execution_driver_executed_transactions: register_int_counter_with_registry!(
555 "execution_driver_executed_transactions",
556 "Cumulative number of transaction executed by execution driver",
557 registry,
558 )
559 .unwrap(),
560 execution_driver_paused_transactions: register_int_counter_with_registry!(
561 "execution_driver_paused_transactions",
562 "Cumulative number of transactions paused by execution driver",
563 registry,
564 )
565 .unwrap(),
566 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
567 "execution_driver_dispatch_queue",
568 "Number of transaction pending in execution driver dispatch queue",
569 registry,
570 )
571 .unwrap(),
572 execution_queueing_delay_s: register_histogram_with_registry!(
573 "execution_queueing_delay_s",
574 "Queueing delay between a transaction is ready for execution until it starts executing.",
575 LATENCY_SEC_BUCKETS.to_vec(),
576 registry
577 )
578 .unwrap(),
579 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
580 "prepare_cert_gas_latency_ratio",
581 "The ratio of computation gas divided by VM execution latency.",
582 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
583 registry
584 )
585 .unwrap(),
586 execution_gas_latency_ratio: register_histogram_with_registry!(
587 "execution_gas_latency_ratio",
588 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
589 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
590 registry
591 )
592 .unwrap(),
593 skipped_consensus_txns: register_int_counter_with_registry!(
594 "skipped_consensus_txns",
595 "Total number of consensus transactions skipped",
596 registry,
597 )
598 .unwrap(),
599 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
600 "skipped_consensus_txns_cache_hit",
601 "Total number of consensus transactions skipped because of local cache hit",
602 registry,
603 )
604 .unwrap(),
605 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
606 "consensus_handler_duplicate_tx_count",
607 "Number of times each transaction appears in its first consensus commit",
608 POSITIVE_INT_BUCKETS.to_vec(),
609 registry,
610 )
611 .unwrap(),
612 post_processing_total_events_emitted: register_int_counter_with_registry!(
613 "post_processing_total_events_emitted",
614 "Total number of events emitted in post processing",
615 registry,
616 )
617 .unwrap(),
618 post_processing_total_tx_indexed: register_int_counter_with_registry!(
619 "post_processing_total_tx_indexed",
620 "Total number of txes indexed in post processing",
621 registry,
622 )
623 .unwrap(),
624 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
625 "post_processing_total_tx_had_event_processed",
626 "Total number of txes finished event processing in post processing",
627 registry,
628 )
629 .unwrap(),
630 post_processing_total_failures: register_int_counter_with_registry!(
631 "post_processing_total_failures",
632 "Total number of failure in post processing",
633 registry,
634 )
635 .unwrap(),
636 consensus_handler_processed: register_int_counter_vec_with_registry!(
637 "consensus_handler_processed",
638 "Number of transactions processed by consensus handler",
639 &["class"],
640 registry
641 ).unwrap(),
642 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
643 "consensus_handler_transaction_sizes",
644 "Sizes of each type of transactions processed by consensus handler",
645 &["class"],
646 POSITIVE_INT_BUCKETS.to_vec(),
647 registry
648 ).unwrap(),
649 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
650 "consensus_handler_num_low_scoring_authorities",
651 "Number of low scoring authorities based on reputation scores from consensus",
652 registry
653 ).unwrap(),
654 consensus_handler_scores: register_int_gauge_vec_with_registry!(
655 "consensus_handler_scores",
656 "scores from consensus for each authority",
657 &["authority"],
658 registry,
659 ).unwrap(),
660 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
661 "consensus_handler_deferred_transactions",
662 "Number of transactions deferred by consensus handler",
663 registry,
664 ).unwrap(),
665 consensus_handler_congested_transactions: register_int_counter_with_registry!(
666 "consensus_handler_congested_transactions",
667 "Number of transactions deferred by consensus handler due to congestion",
668 registry,
669 ).unwrap(),
670 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
671 "consensus_handler_unpaid_amplification_deferrals",
672 "Number of transactions deferred due to unpaid consensus amplification",
673 registry,
674 ).unwrap(),
675 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
676 "consensus_handler_cancelled_transactions",
677 "Number of transactions cancelled by consensus handler",
678 registry,
679 ).unwrap(),
680 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
681 "consensus_handler_max_congestion_control_object_costs",
682 "Max object costs for congestion control in the current consensus commit",
683 &["commit_type"],
684 registry,
685 ).unwrap(),
686 consensus_committed_subdags: register_int_counter_vec_with_registry!(
687 "consensus_committed_subdags",
688 "Number of committed subdags, sliced by author",
689 &["authority"],
690 registry,
691 ).unwrap(),
692 consensus_committed_messages: register_int_gauge_vec_with_registry!(
693 "consensus_committed_messages",
694 "Total number of committed consensus messages, sliced by author",
695 &["authority"],
696 registry,
697 ).unwrap(),
698 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
699 "consensus_committed_user_transactions",
700 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
705 "consensus_finalized_user_transactions",
706 "Number of user transactions finalized, sliced by submitter",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
711 "consensus_rejected_user_transactions",
712 "Number of user transactions rejected, sliced by submitter",
713 &["authority"],
714 registry,
715 ).unwrap(),
716 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
717 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
718 zklogin_sig_count: register_int_counter_with_registry!(
719 "zklogin_sig_count",
720 "Count of zkLogin signatures",
721 registry,
722 )
723 .unwrap(),
724 multisig_sig_count: register_int_counter_with_registry!(
725 "multisig_sig_count",
726 "Count of zkLogin signatures",
727 registry,
728 )
729 .unwrap(),
730 consensus_calculated_throughput: register_int_gauge_with_registry!(
731 "consensus_calculated_throughput",
732 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
733 registry,
734 ).unwrap(),
735 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
736 "consensus_calculated_throughput_profile",
737 "The current active calculated throughput profile",
738 registry
739 ).unwrap(),
740 consensus_block_handler_block_processed: register_int_counter_with_registry!(
741 "consensus_block_handler_block_processed",
742 "Number of blocks processed by consensus block handler.",
743 registry
744 ).unwrap(),
745 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
746 "consensus_block_handler_txn_processed",
747 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
748 &["outcome"],
749 registry
750 ).unwrap(),
751 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
752 "consensus_block_handler_fastpath_executions",
753 "Number of fastpath transactions sent for execution by consensus transaction handler",
754 registry,
755 ).unwrap(),
756 consensus_timestamp_bias: register_histogram_with_registry!(
757 "consensus_timestamp_bias",
758 "Bias/delay from consensus timestamp to system time",
759 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
760 registry
761 ).unwrap(),
762 execution_queueing_latency: LatencyObserver::new(),
763 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
764 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
765 }
766 }
767}
768
769pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
776
777#[derive(Debug, Clone)]
780pub struct ExecutionEnv {
781 pub assigned_versions: AssignedVersions,
783 pub expected_effects_digest: Option<TransactionEffectsDigest>,
786 pub funds_withdraw_status: FundsWithdrawStatus,
789 pub barrier_dependencies: Vec<TransactionDigest>,
792}
793
794impl Default for ExecutionEnv {
795 fn default() -> Self {
796 Self {
797 assigned_versions: Default::default(),
798 expected_effects_digest: None,
799 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
800 barrier_dependencies: Default::default(),
801 }
802 }
803}
804
805impl ExecutionEnv {
806 pub fn new() -> Self {
807 Default::default()
808 }
809
810 pub fn with_expected_effects_digest(
811 mut self,
812 expected_effects_digest: TransactionEffectsDigest,
813 ) -> Self {
814 self.expected_effects_digest = Some(expected_effects_digest);
815 self
816 }
817
818 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
819 self.assigned_versions = assigned_versions;
820 self
821 }
822
823 pub fn with_insufficient_funds(mut self) -> Self {
824 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
825 self
826 }
827
828 pub fn with_barrier_dependencies(
829 mut self,
830 barrier_dependencies: BTreeSet<TransactionDigest>,
831 ) -> Self {
832 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
833 self
834 }
835}
836
837#[derive(Debug)]
838pub struct ForkRecoveryState {
839 transaction_overrides:
841 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
842}
843
844impl Default for ForkRecoveryState {
845 fn default() -> Self {
846 Self {
847 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
848 }
849 }
850}
851
852impl ForkRecoveryState {
853 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
854 let Some(config) = config else {
855 return Ok(Self::default());
856 };
857
858 let mut transaction_overrides = HashMap::new();
859 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
860 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
861 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
862 })?;
863 let effects_digest =
864 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
865 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
866 })?;
867 transaction_overrides.insert(tx_digest, effects_digest);
868 }
869
870 Ok(Self {
871 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
872 })
873 }
874
875 pub fn get_transaction_override(
876 &self,
877 tx_digest: &TransactionDigest,
878 ) -> Option<TransactionEffectsDigest> {
879 self.transaction_overrides.read().get(tx_digest).copied()
880 }
881}
882
883pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
884
885pub struct AuthorityState {
886 pub name: AuthorityName,
889 pub secret: StableSyncAuthoritySigner,
891
892 input_loader: TransactionInputLoader,
894 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
895 coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
896
897 epoch_store: ArcSwap<AuthorityPerEpochStore>,
898
899 execution_lock: RwLock<EpochId>,
904
905 pub indexes: Option<Arc<IndexStore>>,
906 pub rpc_index: Option<Arc<RpcIndexStore>>,
907
908 pub subscription_handler: Arc<SubscriptionHandler>,
909 pub checkpoint_store: Arc<CheckpointStore>,
910
911 committee_store: Arc<CommitteeStore>,
912
913 execution_scheduler: Arc<ExecutionScheduler>,
915
916 #[allow(unused)]
918 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
919
920 pub metrics: Arc<AuthorityMetrics>,
921 _pruner: AuthorityStorePruner,
922 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
923
924 db_checkpoint_config: DBCheckpointConfig,
926
927 pub config: NodeConfig,
928
929 pub overload_info: AuthorityOverloadInfo,
931
932 chain_identifier: ChainIdentifier,
934
935 pub(crate) congestion_tracker: Arc<CongestionTracker>,
936
937 pub traffic_controller: Option<Arc<TrafficController>>,
939
940 fork_recovery_state: Option<ForkRecoveryState>,
942
943 notify_epoch: tokio::sync::watch::Sender<EpochId>,
945
946 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
947 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
948
949 pending_post_processing:
953 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
954
955 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
958}
959
960impl AuthorityState {
967 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
968 epoch_store.committee().authority_exists(&self.name)
969 }
970
971 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
972 !self.is_validator(epoch_store)
973 }
974
975 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
976 &self.committee_store
977 }
978
979 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
980 self.committee_store.clone()
981 }
982
983 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
984 &self.config.authority_overload_config
985 }
986
987 pub fn get_epoch_state_commitments(
988 &self,
989 epoch: EpochId,
990 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
991 self.checkpoint_store.get_epoch_state_commitments(epoch)
992 }
993
994 fn pre_object_load_checks(
997 &self,
998 tx_data: &TransactionData,
999 tx_signatures: &[GenericSignature],
1000 input_object_kinds: &[InputObjectKind],
1001 receiving_objects_refs: &[ObjectRef],
1002 ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1003 sui_transaction_checks::deny::check_transaction_for_signing(
1007 tx_data,
1008 tx_signatures,
1009 input_object_kinds,
1010 receiving_objects_refs,
1011 &self.config.transaction_deny_config,
1012 self.get_backing_package_store().as_ref(),
1013 )?;
1014
1015 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1016 self.chain_identifier,
1017 self.coin_reservation_resolver.as_ref(),
1018 )?;
1019
1020 self.execution_cache_trait_pointers
1021 .account_funds_read
1022 .check_amounts_available(&declared_withdrawals)?;
1023
1024 Ok(declared_withdrawals)
1025 }
1026
1027 fn handle_transaction_deny_checks(
1028 &self,
1029 transaction: &VerifiedTransaction,
1030 epoch_store: &Arc<AuthorityPerEpochStore>,
1031 ) -> SuiResult<CheckedInputObjects> {
1032 let tx_digest = transaction.digest();
1033 let tx_data = transaction.data().transaction_data();
1034
1035 let input_object_kinds = tx_data.input_objects()?;
1036 let receiving_objects_refs = tx_data.receiving_objects();
1037
1038 self.pre_object_load_checks(
1039 tx_data,
1040 transaction.tx_signatures(),
1041 &input_object_kinds,
1042 &receiving_objects_refs,
1043 )?;
1044
1045 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1046 Some(tx_digest),
1047 &input_object_kinds,
1048 &receiving_objects_refs,
1049 epoch_store.epoch(),
1050 )?;
1051
1052 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1053 epoch_store.protocol_config(),
1054 epoch_store.reference_gas_price(),
1055 tx_data,
1056 input_objects,
1057 &receiving_objects,
1058 &self.metrics.bytecode_verifier_metrics,
1059 &self.config.verifier_signing_config,
1060 )?;
1061
1062 self.handle_coin_deny_list_checks(
1063 tx_data,
1064 &checked_input_objects,
1065 &receiving_objects,
1066 epoch_store,
1067 )?;
1068
1069 Ok(checked_input_objects)
1070 }
1071
1072 fn handle_coin_deny_list_checks(
1073 &self,
1074 tx_data: &TransactionData,
1075 checked_input_objects: &CheckedInputObjects,
1076 receiving_objects: &ReceivingObjects,
1077 epoch_store: &Arc<AuthorityPerEpochStore>,
1078 ) -> SuiResult<()> {
1079 use sui_types::balance::Balance;
1080
1081 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1082 self.chain_identifier,
1083 self.coin_reservation_resolver.as_ref(),
1084 )?;
1085
1086 let funds_withdraw_types = declared_withdrawals
1087 .values()
1088 .filter_map(|(_, type_tag)| {
1089 Balance::maybe_get_balance_type_param(type_tag)
1090 .map(|ty| ty.to_canonical_string(false))
1091 })
1092 .collect::<BTreeSet<_>>();
1093
1094 if epoch_store.coin_deny_list_v1_enabled() {
1095 check_coin_deny_list_v1(
1096 tx_data.sender(),
1097 checked_input_objects,
1098 receiving_objects,
1099 funds_withdraw_types.clone(),
1100 &self.get_object_store(),
1101 )?;
1102 }
1103
1104 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1105 check_coin_deny_list_v2_during_signing(
1106 tx_data.sender(),
1107 checked_input_objects,
1108 receiving_objects,
1109 funds_withdraw_types.clone(),
1110 &self.get_object_store(),
1111 )?;
1112 }
1113
1114 Ok(())
1115 }
1116
1117 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1127 pub fn handle_vote_transaction(
1128 &self,
1129 epoch_store: &Arc<AuthorityPerEpochStore>,
1130 transaction: VerifiedTransaction,
1131 ) -> SuiResult<()> {
1132 debug!("handle_vote_transaction");
1133
1134 let _metrics_guard = self
1135 .metrics
1136 .authority_state_handle_vote_transaction_latency
1137 .start_timer();
1138 self.metrics.tx_orders.inc();
1139
1140 if !epoch_store
1144 .get_reconfig_state_read_lock_guard()
1145 .should_accept_user_certs()
1146 {
1147 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1148 }
1149
1150 let tx_digest = *transaction.digest();
1155 if epoch_store.is_recently_finalized(&tx_digest)
1156 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1157 {
1158 assert_reachable!("transaction recently executed");
1159 return Ok(());
1160 }
1161
1162 if self
1163 .get_transaction_cache_reader()
1164 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1165 {
1166 assert_reachable!("transaction executed in last epoch");
1167 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1168 digest: (*transaction.digest()),
1169 }
1170 .into());
1171 }
1172
1173 let _execution_lock = self.execution_lock_for_validation()?;
1175
1176 let checked_input_objects =
1177 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1178
1179 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1180
1181 self.get_cache_writer()
1185 .validate_owned_object_versions(&owned_objects)
1186 }
1187
1188 pub fn check_transaction_validity(
1197 &self,
1198 epoch_store: &Arc<AuthorityPerEpochStore>,
1199 transaction: &VerifiedTransaction,
1200 enforce_live_input_objects: bool,
1201 ) -> SuiResult<()> {
1202 if !epoch_store
1203 .get_reconfig_state_read_lock_guard()
1204 .should_accept_user_certs()
1205 {
1206 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1207 }
1208
1209 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1210
1211 let checked_input_objects =
1212 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1213
1214 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1216 let cache_reader = self.get_object_cache_reader();
1217
1218 for obj_ref in &owned_objects {
1219 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1220 if obj_ref.1 < live_object.version() {
1223 return Err(SuiErrorKind::UserInputError {
1224 error: UserInputError::ObjectVersionUnavailableForConsumption {
1225 provided_obj_ref: *obj_ref,
1226 current_version: live_object.version(),
1227 },
1228 }
1229 .into());
1230 }
1231
1232 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1233 return Err(SuiErrorKind::UserInputError {
1235 error: UserInputError::ObjectVersionUnavailableForConsumption {
1236 provided_obj_ref: *obj_ref,
1237 current_version: live_object.version(),
1238 },
1239 }
1240 .into());
1241 }
1242
1243 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1245 return Err(SuiErrorKind::UserInputError {
1246 error: UserInputError::InvalidObjectDigest {
1247 object_id: obj_ref.0,
1248 expected_digest: live_object.digest(),
1249 },
1250 }
1251 .into());
1252 }
1253 }
1254 }
1255
1256 Ok(())
1257 }
1258
1259 pub fn check_system_overload_at_signing(&self) -> bool {
1260 self.config
1261 .authority_overload_config
1262 .check_system_overload_at_signing
1263 }
1264
1265 pub fn check_system_overload_at_execution(&self) -> bool {
1266 self.config
1267 .authority_overload_config
1268 .check_system_overload_at_execution
1269 }
1270
1271 pub(crate) fn check_system_overload(
1272 &self,
1273 consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1274 tx_data: &SenderSignedData,
1275 do_authority_overload_check: bool,
1276 ) -> SuiResult {
1277 if do_authority_overload_check {
1278 self.check_authority_overload(tx_data).tap_err(|_| {
1279 self.update_overload_metrics("execution_queue");
1280 })?;
1281 }
1282 self.execution_scheduler
1283 .check_execution_overload(self.overload_config(), tx_data)
1284 .tap_err(|_| {
1285 self.update_overload_metrics("execution_pending");
1286 })?;
1287 consensus_overload_checker
1288 .check_consensus_overload()
1289 .tap_err(|_| {
1290 self.update_overload_metrics("consensus");
1291 })?;
1292
1293 let pending_tx_count = self
1294 .get_cache_commit()
1295 .approximate_pending_transaction_count();
1296 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1297 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1298 retry_after_secs: 10,
1299 }
1300 .into());
1301 }
1302
1303 Ok(())
1304 }
1305
1306 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1307 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1308 return Ok(());
1309 }
1310
1311 let load_shedding_percentage = self
1312 .overload_info
1313 .load_shedding_percentage
1314 .load(Ordering::Relaxed);
1315 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1316 }
1317
1318 fn update_overload_metrics(&self, source: &str) {
1319 self.metrics
1320 .transaction_overload_sources
1321 .with_label_values(&[source])
1322 .inc();
1323 }
1324
1325 pub(crate) async fn wait_for_fastpath_dependency_objects(
1329 &self,
1330 transaction: &VerifiedTransaction,
1331 epoch: EpochId,
1332 ) -> SuiResult<bool> {
1333 let txn_data = transaction.data().transaction_data();
1334 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1335
1336 let fastpath_dependency_objects: Vec<_> = move_objects
1338 .into_iter()
1339 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1340 .chain(
1341 packages
1342 .into_iter()
1343 .map(|package_id| InputKey::Package { id: package_id }),
1344 )
1345 .collect();
1346 let receiving_keys: HashSet<_> = receiving_objects
1347 .into_iter()
1348 .filter_map(|receiving_obj_ref| {
1349 self.should_wait_for_dependency_object(receiving_obj_ref)
1350 })
1351 .collect();
1352 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1353 return Ok(true);
1354 }
1355
1356 let max_wait = if cfg!(msim) {
1359 Duration::from_millis(200)
1360 } else {
1361 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1362 };
1363
1364 match timeout(
1365 max_wait,
1366 self.get_object_cache_reader().notify_read_input_objects(
1367 &fastpath_dependency_objects,
1368 &receiving_keys,
1369 epoch,
1370 ),
1371 )
1372 .await
1373 {
1374 Ok(()) => Ok(true),
1375 Err(_) => Ok(false),
1378 }
1379 }
1380
1381 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1388 let (obj_id, cur_version, _digest) = obj_ref;
1389 let Some(latest_obj_ref) = self
1390 .get_object_cache_reader()
1391 .get_latest_object_ref_or_tombstone(obj_id)
1392 else {
1393 return Some(InputKey::VersionedObject {
1395 id: FullObjectID::new(obj_id, None),
1396 version: cur_version,
1397 });
1398 };
1399 let latest_digest = latest_obj_ref.2;
1400 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1401 return None;
1403 }
1404 let latest_version = latest_obj_ref.1;
1405 if cur_version <= latest_version {
1406 return None;
1409 }
1410 Some(InputKey::VersionedObject {
1412 id: FullObjectID::new(obj_id, None),
1413 version: cur_version,
1414 })
1415 }
1416
1417 #[instrument(level = "trace", skip_all)]
1423 pub async fn wait_for_transaction_execution_for_testing(
1424 &self,
1425 transaction: &VerifiedExecutableTransaction,
1426 epoch_store: &Arc<AuthorityPerEpochStore>,
1427 ) -> TransactionEffects {
1428 if !transaction.is_consensus_tx()
1429 && !epoch_store.protocol_config().disable_preconsensus_locking()
1430 {
1431 self.execution_scheduler.enqueue(
1439 vec![(
1440 Schedulable::Transaction(transaction.clone()),
1441 ExecutionEnv::new(),
1442 )],
1443 epoch_store,
1444 );
1445 }
1446
1447 self.notify_read_effects_for_testing(
1448 "AuthorityState::wait_for_transaction_execution_for_testing",
1449 *transaction.digest(),
1450 )
1451 .await
1452 }
1453
1454 #[instrument(level = "trace", skip_all)]
1466 pub async fn try_execute_immediately(
1467 &self,
1468 certificate: &VerifiedExecutableTransaction,
1469 mut execution_env: ExecutionEnv,
1470 epoch_store: &Arc<AuthorityPerEpochStore>,
1471 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1472 let _scope = monitored_scope("Execution::try_execute_immediately");
1473 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1474
1475 let tx_digest = certificate.digest();
1476
1477 if let Some(fork_recovery) = &self.fork_recovery_state
1478 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1479 {
1480 warn!(
1481 ?tx_digest,
1482 original_digest = ?execution_env.expected_effects_digest,
1483 override_digest = ?override_digest,
1484 "Applying fork recovery override for transaction effects digest"
1485 );
1486 execution_env.expected_effects_digest = Some(override_digest);
1487 }
1488
1489 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1491
1492 let tx_cache_reader = self.get_transaction_cache_reader();
1493 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1494 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1495 assert_eq!(
1496 effects.digest(),
1497 expected_effects_digest,
1498 "Unexpected effects digest for transaction {:?}",
1499 tx_digest
1500 );
1501 }
1502 tx_guard.release();
1503 return ExecutionOutput::Success((effects, None));
1504 }
1505
1506 let execution_start_time = Instant::now();
1507
1508 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1513 else {
1514 tx_guard.release();
1515 return ExecutionOutput::EpochEnded;
1516 };
1517 if *execution_guard != epoch_store.epoch() {
1522 tx_guard.release();
1523 info!("The epoch of the execution_guard doesn't match the epoch store");
1524 return ExecutionOutput::EpochEnded;
1525 }
1526
1527 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1528
1529 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1530 &tx_guard,
1531 &execution_guard,
1532 certificate,
1533 execution_env,
1534 epoch_store,
1535 ) {
1536 ExecutionOutput::Success(result) => result,
1537 output => return output.unwrap_err(),
1538 };
1539 let transaction_outputs = Arc::new(transaction_outputs);
1540
1541 fail_point!("crash");
1542
1543 let effects = transaction_outputs.effects.clone();
1544 debug!(
1545 ?tx_digest,
1546 fx_digest=?effects.digest(),
1547 "process_certificate succeeded in {:.3}ms",
1548 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1549 );
1550
1551 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1552 if let Err(err) = commit_result {
1553 error!(?tx_digest, "Error committing transaction: {err}");
1554 tx_guard.release();
1555 return ExecutionOutput::Fatal(err);
1556 }
1557
1558 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1559 certificate.data().transaction_data().kind()
1560 {
1561 if let Some(err) = &execution_error_opt {
1562 debug_fatal!("Authenticator state update failed: {:?}", err);
1563 }
1564 epoch_store.update_authenticator_state(auth_state);
1565
1566 if cfg!(debug_assertions) {
1568 let authenticator_state = get_authenticator_state(self.get_object_store())
1569 .expect("Read cannot fail")
1570 .expect("Authenticator state must exist");
1571
1572 let mut sys_jwks: Vec<_> = authenticator_state
1573 .active_jwks
1574 .into_iter()
1575 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1576 .collect();
1577 let mut active_jwks: Vec<_> = epoch_store
1578 .signature_verifier
1579 .get_jwks()
1580 .into_iter()
1581 .collect();
1582 sys_jwks.sort();
1583 active_jwks.sort();
1584
1585 assert_eq!(sys_jwks, active_jwks);
1586 }
1587 }
1588
1589 if certificate
1593 .transaction_data()
1594 .kind()
1595 .is_accumulator_barrier_settle_tx()
1596 {
1597 let object_funds_checker = self.object_funds_checker.load();
1598 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1599 let next_accumulator_version = accumulator_version.unwrap().next();
1602 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1603 }
1604 }
1605
1606 tx_guard.commit_tx();
1607
1608 let elapsed = execution_start_time.elapsed();
1609 epoch_store.record_local_execution_time(
1610 certificate.data().transaction_data(),
1611 &effects,
1612 timings,
1613 elapsed,
1614 );
1615
1616 let elapsed_us = elapsed.as_micros() as f64;
1617 if elapsed_us > 0.0 {
1618 self.metrics
1619 .execution_gas_latency_ratio
1620 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1621 };
1622 ExecutionOutput::Success((effects, execution_error_opt))
1623 }
1624
1625 pub fn read_objects_for_execution(
1626 &self,
1627 tx_lock: &CertLockGuard,
1628 certificate: &VerifiedExecutableTransaction,
1629 assigned_shared_object_versions: &AssignedVersions,
1630 epoch_store: &Arc<AuthorityPerEpochStore>,
1631 ) -> SuiResult<InputObjects> {
1632 let _scope = monitored_scope("Execution::load_input_objects");
1633 let _metrics_guard = self
1634 .metrics
1635 .execution_load_input_objects_latency
1636 .start_timer();
1637 let input_objects = &certificate.data().transaction_data().input_objects()?;
1638 self.input_loader.read_objects_for_execution(
1639 &certificate.key(),
1640 tx_lock,
1641 input_objects,
1642 assigned_shared_object_versions,
1643 epoch_store.epoch(),
1644 )
1645 }
1646
1647 pub async fn try_execute_for_test(
1650 &self,
1651 certificate: &VerifiedCertificate,
1652 execution_env: ExecutionEnv,
1653 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1654 let epoch_store = self.epoch_store_for_testing();
1655 let (effects, execution_error_opt) = self
1656 .try_execute_immediately(
1657 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1658 execution_env,
1659 &epoch_store,
1660 )
1661 .await
1662 .unwrap();
1663 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1664 (signed_effects, execution_error_opt)
1665 }
1666
1667 pub async fn try_execute_executable_for_test(
1668 &self,
1669 executable: &VerifiedExecutableTransaction,
1670 execution_env: ExecutionEnv,
1671 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1672 let epoch_store = self.epoch_store_for_testing();
1673 let (effects, execution_error_opt) = self
1674 .try_execute_immediately(executable, execution_env, &epoch_store)
1675 .await
1676 .unwrap();
1677 self.flush_post_processing(executable.digest()).await;
1678 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1679 (signed_effects, execution_error_opt)
1680 }
1681
1682 pub async fn notify_read_effects_for_testing(
1687 &self,
1688 task_name: &'static str,
1689 digest: TransactionDigest,
1690 ) -> TransactionEffects {
1691 self.get_transaction_cache_reader()
1692 .notify_read_executed_effects(task_name, &[digest])
1693 .await
1694 .pop()
1695 .expect("must return correct number of effects")
1696 }
1697
1698 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1699 self.get_object_cache_reader()
1700 .check_owned_objects_are_live(owned_object_refs)
1701 }
1702
1703 pub(crate) fn debug_dump_transaction_state(
1708 &self,
1709 tx_digest: &TransactionDigest,
1710 effects: &TransactionEffects,
1711 expected_effects_digest: TransactionEffectsDigest,
1712 inner_temporary_store: &InnerTemporaryStore,
1713 certificate: &VerifiedExecutableTransaction,
1714 debug_dump_config: &StateDebugDumpConfig,
1715 ) -> SuiResult<PathBuf> {
1716 let dump_dir = debug_dump_config
1717 .dump_file_directory
1718 .as_ref()
1719 .cloned()
1720 .unwrap_or(std::env::temp_dir());
1721 let epoch_store = self.load_epoch_store_one_call_per_task();
1722
1723 NodeStateDump::new(
1724 tx_digest,
1725 effects,
1726 expected_effects_digest,
1727 self.get_object_store().as_ref(),
1728 &epoch_store,
1729 inner_temporary_store,
1730 certificate,
1731 )?
1732 .write_to_file(&dump_dir)
1733 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1734 }
1735
1736 #[instrument(level = "trace", skip_all)]
1737 pub(crate) fn process_certificate(
1738 &self,
1739 tx_guard: &CertTxGuard,
1740 execution_guard: &ExecutionLockReadGuard<'_>,
1741 certificate: &VerifiedExecutableTransaction,
1742 execution_env: ExecutionEnv,
1743 epoch_store: &Arc<AuthorityPerEpochStore>,
1744 ) -> ExecutionOutput<(
1745 TransactionOutputs,
1746 Vec<ExecutionTiming>,
1747 Option<ExecutionError>,
1748 )> {
1749 let _scope = monitored_scope("Execution::process_certificate");
1750 let tx_digest = *certificate.digest();
1751
1752 let input_objects = match self.read_objects_for_execution(
1753 tx_guard.as_lock_guard(),
1754 certificate,
1755 &execution_env.assigned_versions,
1756 epoch_store,
1757 ) {
1758 Ok(objects) => objects,
1759 Err(e) => return ExecutionOutput::Fatal(e),
1760 };
1761
1762 let expected_effects_digest = match execution_env.expected_effects_digest {
1763 Some(expected_effects_digest) => Some(expected_effects_digest),
1764 None => {
1765 match epoch_store.get_signed_effects_digest(&tx_digest) {
1770 Ok(digest) => digest,
1771 Err(e) => return ExecutionOutput::Fatal(e),
1772 }
1773 }
1774 };
1775
1776 fail_point_if!("correlated-crash-process-certificate", || {
1777 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1778 sui_simulator::task::kill_current_node(None);
1779 }
1780 });
1781
1782 self.execute_certificate(
1787 execution_guard,
1788 certificate,
1789 input_objects,
1790 expected_effects_digest,
1791 execution_env,
1792 epoch_store,
1793 )
1794 }
1795
1796 pub async fn reconfigure_traffic_control(
1797 &self,
1798 params: TrafficControlReconfigParams,
1799 ) -> Result<TrafficControlReconfigParams, SuiError> {
1800 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1801 traffic_controller.admin_reconfigure(params).await
1802 } else {
1803 Err(SuiErrorKind::InvalidAdminRequest(
1804 "Traffic controller is not configured on this node".to_string(),
1805 )
1806 .into())
1807 }
1808 }
1809
1810 #[instrument(level = "trace", skip_all)]
1811 fn commit_certificate(
1812 &self,
1813 certificate: &VerifiedExecutableTransaction,
1814 transaction_outputs: Arc<TransactionOutputs>,
1815 epoch_store: &Arc<AuthorityPerEpochStore>,
1816 ) -> SuiResult {
1817 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1818 monitored_scope("Execution::commit_certificate");
1819 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1820
1821 let tx_digest = certificate.digest();
1822
1823 epoch_store.insert_executed_in_epoch(tx_digest);
1826 let key = certificate.key();
1827 if !matches!(key, TransactionKey::Digest(_)) {
1828 epoch_store.insert_tx_key(key, *tx_digest)?;
1829 }
1830
1831 fail_point!("crash");
1833
1834 self.get_cache_writer()
1835 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1836
1837 if let Some(settlement_key) = certificate
1842 .transaction_data()
1843 .kind()
1844 .accumulator_barrier_settlement_key()
1845 {
1846 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1847 }
1848
1849 if certificate.transaction_data().is_end_of_epoch_tx() {
1850 self.get_object_cache_reader()
1853 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1854 }
1855
1856 Ok(())
1857 }
1858
1859 fn update_metrics(
1860 &self,
1861 certificate: &VerifiedExecutableTransaction,
1862 inner_temporary_store: &InnerTemporaryStore,
1863 effects: &TransactionEffects,
1864 ) {
1865 if certificate.has_zklogin_sig() {
1867 self.metrics.zklogin_sig_count.inc();
1868 } else if certificate.has_upgraded_multisig() {
1869 self.metrics.multisig_sig_count.inc();
1870 }
1871
1872 self.metrics.total_effects.inc();
1873 self.metrics.total_certs.inc();
1874
1875 let consensus_object_count = effects.input_consensus_objects().len();
1876 if consensus_object_count > 0 {
1877 self.metrics.shared_obj_tx.inc();
1878 }
1879
1880 if certificate.is_sponsored_tx() {
1881 self.metrics.sponsored_tx.inc();
1882 }
1883
1884 let input_object_count = inner_temporary_store.input_objects.len();
1885 self.metrics
1886 .num_input_objs
1887 .observe(input_object_count as f64);
1888 self.metrics
1889 .num_shared_objects
1890 .observe(consensus_object_count as f64);
1891 self.metrics.batch_size.observe(
1892 certificate
1893 .data()
1894 .intent_message()
1895 .value
1896 .kind()
1897 .num_commands() as f64,
1898 );
1899 }
1900
1901 fn execute_transaction_to_effects(
1905 &self,
1906 executor: &dyn Executor,
1907 store: &dyn BackingStore,
1908 protocol_config: &ProtocolConfig,
1909 enable_expensive_checks: bool,
1910 execution_params: ExecutionOrEarlyError,
1911 epoch_id: &EpochId,
1912 epoch_timestamp_ms: u64,
1913 input_objects: CheckedInputObjects,
1914 gas_data: GasData,
1915 gas_status: SuiGasStatus,
1916 sender: SuiAddress,
1917 mut kind: TransactionKind,
1918 signer: SuiAddress,
1919 tx_digest: TransactionDigest,
1920 accumulator_version: Option<SequenceNumber>,
1921 ) -> (
1922 InnerTemporaryStore,
1923 SuiGasStatus,
1924 TransactionEffects,
1925 Vec<ExecutionTiming>,
1926 Result<(), ExecutionError>,
1927 ) {
1928 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
1929 self.chain_identifier,
1930 &*self.coin_reservation_resolver,
1931 sender,
1932 &mut kind,
1933 accumulator_version,
1934 );
1935
1936 let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1937 .execute_transaction_to_effects_and_execution_error(
1939 store,
1940 protocol_config,
1941 self.metrics.limits_metrics.clone(),
1942 enable_expensive_checks,
1943 execution_params,
1944 epoch_id,
1945 epoch_timestamp_ms,
1946 input_objects,
1947 gas_data,
1948 gas_status,
1949 kind,
1950 rewritten_inputs,
1951 signer,
1952 tx_digest,
1953 &mut None,
1954 );
1955
1956 (
1957 inner_temp_store,
1958 gas_status,
1959 effects,
1960 timings,
1961 execution_error,
1962 )
1963 }
1964
1965 #[instrument(level = "trace", skip_all)]
1974 fn execute_certificate(
1975 &self,
1976 _execution_guard: &ExecutionLockReadGuard<'_>,
1977 certificate: &VerifiedExecutableTransaction,
1978 input_objects: InputObjects,
1979 expected_effects_digest: Option<TransactionEffectsDigest>,
1980 execution_env: ExecutionEnv,
1981 epoch_store: &Arc<AuthorityPerEpochStore>,
1982 ) -> ExecutionOutput<(
1983 TransactionOutputs,
1984 Vec<ExecutionTiming>,
1985 Option<ExecutionError>,
1986 )> {
1987 let _scope = monitored_scope("Execution::prepare_certificate");
1988 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1989 let prepare_certificate_start_time = tokio::time::Instant::now();
1990
1991 let tx_data = certificate.data().transaction_data();
1993
1994 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1995 return ExecutionOutput::Fatal(e);
1996 }
1997
1998 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
2002 certificate,
2003 input_objects,
2004 epoch_store.protocol_config(),
2005 epoch_store.reference_gas_price(),
2006 ) {
2007 Ok(result) => result,
2008 Err(e) => return ExecutionOutput::Fatal(e),
2009 };
2010
2011 let owned_object_refs = input_objects.inner().filter_owned_objects();
2012 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
2013 return ExecutionOutput::Fatal(e);
2014 }
2015 let tx_digest = *certificate.digest();
2016 let protocol_config = epoch_store.protocol_config();
2017 let transaction_data = &certificate.data().intent_message().value;
2018 let sender = transaction_data.sender();
2019 let (kind, signer, gas_data) = transaction_data.execution_parts();
2020 let early_execution_error = get_early_execution_error(
2021 &tx_digest,
2022 &input_objects,
2023 self.config.certificate_deny_config.certificate_deny_set(),
2024 &execution_env.funds_withdraw_status,
2025 );
2026 let execution_params = match early_execution_error {
2027 Some(error) => ExecutionOrEarlyError::Err(error),
2028 None => ExecutionOrEarlyError::Ok(()),
2029 };
2030
2031 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2032
2033 #[allow(unused_mut)]
2034 let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2035 .execute_transaction_to_effects(
2036 &**epoch_store.executor(),
2037 &tracking_store,
2038 protocol_config,
2039 self.config
2042 .expensive_safety_check_config
2043 .enable_deep_per_tx_sui_conservation_check(),
2044 execution_params,
2045 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2046 epoch_store
2047 .epoch_start_config()
2048 .epoch_data()
2049 .epoch_start_timestamp(),
2050 input_objects,
2051 gas_data,
2052 gas_status,
2053 sender,
2054 kind,
2055 signer,
2056 tx_digest,
2057 execution_env.assigned_versions.accumulator_version,
2058 );
2059
2060 let object_funds_checker = self.object_funds_checker.load();
2061 if let Some(object_funds_checker) = object_funds_checker.as_ref()
2062 && !object_funds_checker.should_commit_object_funds_withdraws(
2063 certificate,
2064 effects.status(),
2065 &inner_temp_store.accumulator_running_max_withdraws,
2066 &execution_env,
2067 self.get_account_funds_read(),
2068 &self.execution_scheduler,
2069 epoch_store,
2070 )
2071 {
2072 assert_reachable!("retry object withdraw later");
2073 return ExecutionOutput::RetryLater;
2074 }
2075
2076 if let Some(expected_effects_digest) = expected_effects_digest
2077 && effects.digest() != expected_effects_digest
2078 {
2079 match self.debug_dump_transaction_state(
2081 &tx_digest,
2082 &effects,
2083 expected_effects_digest,
2084 &inner_temp_store,
2085 certificate,
2086 &self.config.state_debug_dump_config,
2087 ) {
2088 Ok(out_path) => {
2089 info!(
2090 "Dumped node state for transaction {} to {}",
2091 tx_digest,
2092 out_path.as_path().display().to_string()
2093 );
2094 }
2095 Err(e) => {
2096 error!("Error dumping state for transaction {}: {e}", tx_digest);
2097 }
2098 }
2099 let expected_effects = self
2100 .get_transaction_cache_reader()
2101 .get_effects(&expected_effects_digest);
2102 error!(
2103 ?tx_digest,
2104 ?expected_effects_digest,
2105 actual_effects = ?effects,
2106 expected_effects = ?expected_effects,
2107 "fork detected!"
2108 );
2109 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2110 tx_digest,
2111 expected_effects_digest,
2112 effects.digest(),
2113 ) {
2114 error!("Failed to record transaction fork: {e}");
2115 }
2116
2117 fail_point_if!("kill_transaction_fork_node", || {
2118 #[cfg(msim)]
2119 {
2120 tracing::error!(
2121 fatal = true,
2122 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2123 tx_digest
2124 );
2125 sui_simulator::task::shutdown_current_node();
2126 }
2127 });
2128
2129 fatal!(
2130 "Transaction {} is expected to have effects digest {}, but got {}!",
2131 tx_digest,
2132 expected_effects_digest,
2133 effects.digest()
2134 );
2135 }
2136
2137 fail_point_arg!("simulate_fork_during_execution", |(
2138 forked_validators,
2139 full_halt,
2140 effects_overrides,
2141 fork_probability,
2142 ): (
2143 std::sync::Arc<
2144 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2145 >,
2146 bool,
2147 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2148 f32,
2149 )| {
2150 #[cfg(msim)]
2151 self.simulate_fork_during_execution(
2152 certificate,
2153 epoch_store,
2154 &mut effects,
2155 forked_validators,
2156 full_halt,
2157 effects_overrides,
2158 fork_probability,
2159 );
2160 });
2161
2162 let unchanged_loaded_runtime_objects =
2163 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2164 certificate.transaction_data(),
2165 &effects,
2166 &tracking_store.into_read_objects(),
2167 );
2168
2169 let _ = self
2171 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2172 .tap_err(|e| {
2173 self.metrics.post_processing_total_failures.inc();
2174 error!(?tx_digest, "tx post processing failed: {e}");
2175 });
2176
2177 self.update_metrics(certificate, &inner_temp_store, &effects);
2178
2179 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2180 certificate.clone().into_unsigned(),
2181 effects,
2182 inner_temp_store,
2183 unchanged_loaded_runtime_objects,
2184 );
2185
2186 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2187 if elapsed > 0.0 {
2188 self.metrics.prepare_cert_gas_latency_ratio.observe(
2189 transaction_outputs
2190 .effects
2191 .gas_cost_summary()
2192 .computation_cost as f64
2193 / elapsed,
2194 );
2195 }
2196
2197 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2198 }
2199
2200 pub fn prepare_certificate_for_benchmark(
2201 &self,
2202 certificate: &VerifiedExecutableTransaction,
2203 input_objects: InputObjects,
2204 epoch_store: &Arc<AuthorityPerEpochStore>,
2205 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2206 let lock = RwLock::new(epoch_store.epoch());
2207 let execution_guard = lock.try_read().unwrap();
2208
2209 let (transaction_outputs, _timings, execution_error_opt) = self
2210 .execute_certificate(
2211 &execution_guard,
2212 certificate,
2213 input_objects,
2214 None,
2215 ExecutionEnv::default(),
2216 epoch_store,
2217 )
2218 .unwrap();
2219 Ok((transaction_outputs, execution_error_opt))
2220 }
2221
2222 #[instrument(skip_all)]
2223 #[allow(clippy::type_complexity)]
2224 pub async fn dry_exec_transaction(
2225 &self,
2226 transaction: TransactionData,
2227 transaction_digest: TransactionDigest,
2228 ) -> SuiResult<(
2229 DryRunTransactionBlockResponse,
2230 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2231 TransactionEffects,
2232 Option<ObjectID>,
2233 )> {
2234 let epoch_store = self.load_epoch_store_one_call_per_task();
2235 if !self.is_fullnode(&epoch_store) {
2236 return Err(SuiErrorKind::UnsupportedFeatureError {
2237 error: "dry-exec is only supported on fullnodes".to_string(),
2238 }
2239 .into());
2240 }
2241
2242 if transaction.kind().is_system_tx() {
2243 return Err(SuiErrorKind::UnsupportedFeatureError {
2244 error: "dry-exec does not support system transactions".to_string(),
2245 }
2246 .into());
2247 }
2248
2249 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2250 }
2251
2252 #[allow(clippy::type_complexity)]
2253 pub fn dry_exec_transaction_for_benchmark(
2254 &self,
2255 transaction: TransactionData,
2256 transaction_digest: TransactionDigest,
2257 ) -> SuiResult<(
2258 DryRunTransactionBlockResponse,
2259 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2260 TransactionEffects,
2261 Option<ObjectID>,
2262 )> {
2263 let epoch_store = self.load_epoch_store_one_call_per_task();
2264 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2265 }
2266
2267 #[allow(clippy::type_complexity)]
2268 fn dry_exec_transaction_impl(
2269 &self,
2270 epoch_store: &AuthorityPerEpochStore,
2271 transaction: TransactionData,
2272 transaction_digest: TransactionDigest,
2273 ) -> SuiResult<(
2274 DryRunTransactionBlockResponse,
2275 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2276 TransactionEffects,
2277 Option<ObjectID>,
2278 )> {
2279 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2281
2282 let input_object_kinds = transaction.input_objects()?;
2283 let receiving_object_refs = transaction.receiving_objects();
2284
2285 sui_transaction_checks::deny::check_transaction_for_signing(
2286 &transaction,
2287 &[],
2288 &input_object_kinds,
2289 &receiving_object_refs,
2290 &self.config.transaction_deny_config,
2291 self.get_backing_package_store().as_ref(),
2292 )?;
2293
2294 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2295 None,
2297 &input_object_kinds,
2298 &receiving_object_refs,
2299 epoch_store.epoch(),
2300 )?;
2301
2302 let mut gas_data = transaction.gas_data().clone();
2304 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2305 let sender = transaction.sender();
2306 const MIST_TO_SUI: u64 = 1_000_000_000;
2308 const DRY_RUN_SUI: u64 = 1_000_000_000;
2309 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2310 let gas_object_id = ObjectID::random();
2311 let gas_object = Object::new_move(
2312 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2313 Owner::AddressOwner(sender),
2314 TransactionDigest::genesis_marker(),
2315 );
2316 let gas_object_ref = gas_object.compute_object_reference();
2317 gas_data.payment = vec![gas_object_ref];
2318 (
2319 sui_transaction_checks::check_transaction_input_with_given_gas(
2320 epoch_store.protocol_config(),
2321 epoch_store.reference_gas_price(),
2322 &transaction,
2323 input_objects,
2324 receiving_objects,
2325 gas_object,
2326 &self.metrics.bytecode_verifier_metrics,
2327 &self.config.verifier_signing_config,
2328 )?,
2329 Some(gas_object_id),
2330 )
2331 } else {
2332 (
2333 sui_transaction_checks::check_transaction_input(
2334 epoch_store.protocol_config(),
2335 epoch_store.reference_gas_price(),
2336 &transaction,
2337 input_objects,
2338 &receiving_objects,
2339 &self.metrics.bytecode_verifier_metrics,
2340 &self.config.verifier_signing_config,
2341 )?,
2342 None,
2343 )
2344 };
2345
2346 let protocol_config = epoch_store.protocol_config();
2347 let (kind, signer, _) = transaction.execution_parts();
2348
2349 let silent = true;
2350 let executor = sui_execution::executor(protocol_config, silent)
2351 .expect("Creating an executor should not fail here");
2352
2353 let expensive_checks = false;
2354 let early_execution_error = get_early_execution_error(
2355 &transaction_digest,
2356 &checked_input_objects,
2357 self.config.certificate_deny_config.certificate_deny_set(),
2358 &FundsWithdrawStatus::MaybeSufficient,
2364 );
2365 let execution_params = match early_execution_error {
2366 Some(error) => ExecutionOrEarlyError::Err(error),
2367 None => ExecutionOrEarlyError::Ok(()),
2368 };
2369
2370 let (inner_temp_store, _, effects, _timings, execution_error) = self
2371 .execute_transaction_to_effects(
2372 executor.as_ref(),
2373 self.get_backing_store().as_ref(),
2374 protocol_config,
2375 expensive_checks,
2376 execution_params,
2377 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2378 epoch_store
2379 .epoch_start_config()
2380 .epoch_data()
2381 .epoch_start_timestamp(),
2382 checked_input_objects,
2383 gas_data,
2384 gas_status,
2385 transaction.sender(),
2386 kind,
2387 signer,
2388 transaction_digest,
2389 None,
2391 );
2392
2393 let tx_digest = *effects.transaction_digest();
2394
2395 let module_cache =
2396 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2397
2398 let mut layout_resolver =
2399 epoch_store
2400 .executor()
2401 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2402 &inner_temp_store,
2403 self.get_backing_package_store(),
2404 )));
2405 let object_changes = Vec::new();
2407
2408 let balance_changes = Vec::new();
2410
2411 let written_with_kind = effects
2412 .created()
2413 .into_iter()
2414 .map(|(oref, _)| (oref, WriteKind::Create))
2415 .chain(
2416 effects
2417 .unwrapped()
2418 .into_iter()
2419 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2420 )
2421 .chain(
2422 effects
2423 .mutated()
2424 .into_iter()
2425 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2426 )
2427 .map(|(oref, kind)| {
2428 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2429 (oref.0, (oref, obj.clone(), kind))
2431 })
2432 .collect();
2433
2434 let execution_error_source = execution_error
2435 .as_ref()
2436 .err()
2437 .and_then(|e| e.source_ref().as_ref().map(|e| e.to_string()));
2438
2439 Ok((
2440 DryRunTransactionBlockResponse {
2441 suggested_gas_price: self
2442 .congestion_tracker
2443 .get_suggested_gas_prices(&transaction),
2444 input: SuiTransactionBlockData::try_from_with_module_cache(
2445 transaction,
2446 &module_cache,
2447 )
2448 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2449 error: format!(
2450 "Failed to convert transaction to SuiTransactionBlockData: {}",
2451 e
2452 ),
2453 })?, effects: effects.clone().try_into()?,
2455 events: SuiTransactionBlockEvents::try_from(
2456 inner_temp_store.events.clone(),
2457 tx_digest,
2458 None,
2459 layout_resolver.as_mut(),
2460 )?,
2461 object_changes,
2462 balance_changes,
2463 execution_error_source,
2464 },
2465 written_with_kind,
2466 effects,
2467 mock_gas,
2468 ))
2469 }
2470
2471 pub fn simulate_transaction(
2472 &self,
2473 mut transaction: TransactionData,
2474 checks: TransactionChecks,
2475 allow_mock_gas_coin: bool,
2476 ) -> SuiResult<SimulateTransactionResult> {
2477 if transaction.kind().is_system_tx() {
2478 return Err(SuiErrorKind::UnsupportedFeatureError {
2479 error: "simulate does not support system transactions".to_string(),
2480 }
2481 .into());
2482 }
2483
2484 let epoch_store = self.load_epoch_store_one_call_per_task();
2485 if !self.is_fullnode(&epoch_store) {
2486 return Err(SuiErrorKind::UnsupportedFeatureError {
2487 error: "simulate is only supported on fullnodes".to_string(),
2488 }
2489 .into());
2490 }
2491
2492 let dev_inspect = checks.disabled();
2493 if dev_inspect && self.config.dev_inspect_disabled {
2494 return Err(SuiErrorKind::UnsupportedFeatureError {
2495 error: "simulate with checks disabled is not allowed on this node".to_string(),
2496 }
2497 .into());
2498 }
2499
2500 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2502
2503 let input_object_kinds = transaction.input_objects()?;
2504 let receiving_object_refs = transaction.receiving_objects();
2505
2506 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() {
2510 let obj = Object::new_move(
2511 MoveObject::new_gas_coin(
2512 OBJECT_START_VERSION,
2513 ObjectID::MAX,
2514 DEV_INSPECT_GAS_COIN_VALUE,
2515 ),
2516 Owner::AddressOwner(transaction.gas_data().owner),
2517 TransactionDigest::genesis_marker(),
2518 );
2519 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2520 Some(obj)
2521 } else {
2522 None
2523 };
2524
2525 let declared_withdrawals = self.pre_object_load_checks(
2526 &transaction,
2527 &[],
2528 &input_object_kinds,
2529 &receiving_object_refs,
2530 )?;
2531 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2532
2533 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2534 None,
2536 &input_object_kinds,
2537 &receiving_object_refs,
2538 epoch_store.epoch(),
2539 )?;
2540
2541 let mock_gas_id = mock_gas_object.map(|obj| {
2543 let id = obj.id();
2544 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2545 id
2546 });
2547
2548 let protocol_config = epoch_store.protocol_config();
2549
2550 let (gas_status, checked_input_objects) = if dev_inspect {
2551 sui_transaction_checks::check_dev_inspect_input(
2552 protocol_config,
2553 &transaction,
2554 input_objects,
2555 receiving_objects,
2556 epoch_store.reference_gas_price(),
2557 )?
2558 } else {
2559 sui_transaction_checks::check_transaction_input(
2560 epoch_store.protocol_config(),
2561 epoch_store.reference_gas_price(),
2562 &transaction,
2563 input_objects,
2564 &receiving_objects,
2565 &self.metrics.bytecode_verifier_metrics,
2566 &self.config.verifier_signing_config,
2567 )?
2568 };
2569
2570 let executor = sui_execution::executor(
2572 protocol_config,
2573 true, )
2575 .expect("Creating an executor should not fail here");
2576
2577 let (kind, signer, gas_data) = transaction.execution_parts();
2578 let early_execution_error = get_early_execution_error(
2579 &transaction.digest(),
2580 &checked_input_objects,
2581 self.config.certificate_deny_config.certificate_deny_set(),
2582 &FundsWithdrawStatus::MaybeSufficient,
2583 );
2584 let execution_params = match early_execution_error {
2585 Some(error) => ExecutionOrEarlyError::Err(error),
2586 None => ExecutionOrEarlyError::Ok(()),
2587 };
2588
2589 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2590
2591 let cloned_input_objects = checked_input_objects.clone();
2593 let cloned_gas = gas_data.clone();
2594 let cloned_kind = kind.clone();
2595 let tx_digest = transaction.digest();
2596 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2597 let epoch_timestamp_ms = epoch_store
2598 .epoch_start_config()
2599 .epoch_data()
2600 .epoch_start_timestamp();
2601 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2602 &tracking_store,
2603 protocol_config,
2604 self.metrics.limits_metrics.clone(),
2605 false, execution_params,
2607 &epoch_id,
2608 epoch_timestamp_ms,
2609 checked_input_objects,
2610 gas_data,
2611 gas_status,
2612 kind,
2613 None, signer,
2615 tx_digest,
2616 dev_inspect,
2617 );
2618
2619 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2621 let has_insufficient_object_funds = inner_temp_store
2622 .accumulator_running_max_withdraws
2623 .iter()
2624 .filter(|(id, _)| !address_funds.contains(id))
2625 .any(|(id, max_withdraw)| {
2626 let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2627 balance < *max_withdraw
2628 });
2629
2630 if has_insufficient_object_funds {
2631 let retry_gas_status = SuiGasStatus::new(
2632 cloned_gas.budget,
2633 cloned_gas.price,
2634 epoch_store.reference_gas_price(),
2635 protocol_config,
2636 )?;
2637 let (store, _, effects, result) = executor.dev_inspect_transaction(
2638 &tracking_store,
2639 protocol_config,
2640 self.metrics.limits_metrics.clone(),
2641 false,
2642 ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2643 &epoch_id,
2644 epoch_timestamp_ms,
2645 cloned_input_objects,
2646 cloned_gas,
2647 retry_gas_status,
2648 cloned_kind,
2649 None, signer,
2651 tx_digest,
2652 dev_inspect,
2653 );
2654 (store, effects, result)
2655 } else {
2656 (inner_temp_store, effects, execution_result)
2657 }
2658 } else {
2659 (inner_temp_store, effects, execution_result)
2660 };
2661
2662 let loaded_runtime_objects = tracking_store.into_read_objects();
2663 let unchanged_loaded_runtime_objects =
2664 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2665 &transaction,
2666 &effects,
2667 &loaded_runtime_objects,
2668 );
2669
2670 let object_set = {
2671 let objects = {
2672 let mut objects = loaded_runtime_objects;
2673
2674 for o in inner_temp_store
2675 .input_objects
2676 .into_values()
2677 .chain(inner_temp_store.written.into_values())
2678 {
2679 objects.insert(o);
2680 }
2681
2682 objects
2683 };
2684
2685 let object_keys = sui_types::storage::get_transaction_object_set(
2686 &transaction,
2687 &effects,
2688 &unchanged_loaded_runtime_objects,
2689 );
2690
2691 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2692 for k in object_keys {
2693 if let Some(o) = objects.get(&k) {
2694 set.insert(o.clone());
2695 }
2696 }
2697
2698 set
2699 };
2700
2701 Ok(SimulateTransactionResult {
2702 objects: object_set,
2703 events: effects.events_digest().map(|_| inner_temp_store.events),
2704 effects,
2705 execution_result,
2706 mock_gas_id,
2707 unchanged_loaded_runtime_objects,
2708 suggested_gas_price: self
2709 .congestion_tracker
2710 .get_suggested_gas_prices(&transaction),
2711 })
2712 }
2713
2714 #[allow(clippy::collapsible_else_if)]
2716 #[instrument(skip_all)]
2717 pub async fn dev_inspect_transaction_block(
2718 &self,
2719 sender: SuiAddress,
2720 transaction_kind: TransactionKind,
2721 gas_price: Option<u64>,
2722 gas_budget: Option<u64>,
2723 gas_sponsor: Option<SuiAddress>,
2724 gas_objects: Option<Vec<ObjectRef>>,
2725 show_raw_txn_data_and_effects: Option<bool>,
2726 skip_checks: Option<bool>,
2727 ) -> SuiResult<DevInspectResults> {
2728 let epoch_store = self.load_epoch_store_one_call_per_task();
2729
2730 if !self.is_fullnode(&epoch_store) {
2731 return Err(SuiErrorKind::UnsupportedFeatureError {
2732 error: "dev-inspect is only supported on fullnodes".to_string(),
2733 }
2734 .into());
2735 }
2736
2737 if transaction_kind.is_system_tx() {
2738 return Err(SuiErrorKind::UnsupportedFeatureError {
2739 error: "system transactions are not supported".to_string(),
2740 }
2741 .into());
2742 }
2743
2744 let skip_checks = skip_checks.unwrap_or(true);
2745 if skip_checks && self.config.dev_inspect_disabled {
2746 return Err(SuiErrorKind::UnsupportedFeatureError {
2747 error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2748 }
2749 .into());
2750 }
2751
2752 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2753 let reference_gas_price = epoch_store.reference_gas_price();
2754 let protocol_config = epoch_store.protocol_config();
2755 let max_tx_gas = protocol_config.max_tx_gas();
2756
2757 let price = gas_price.unwrap_or(reference_gas_price);
2758 let budget = gas_budget.unwrap_or(max_tx_gas);
2759 let owner = gas_sponsor.unwrap_or(sender);
2760 let payment = gas_objects.unwrap_or_default();
2762 let mut transaction = TransactionData::V1(TransactionDataV1 {
2763 kind: transaction_kind.clone(),
2764 sender,
2765 gas_data: GasData {
2766 payment,
2767 owner,
2768 price,
2769 budget,
2770 },
2771 expiration: TransactionExpiration::None,
2772 });
2773
2774 let raw_txn_data = if show_raw_txn_data_and_effects {
2775 bcs::to_bytes(&transaction).map_err(|_| {
2776 SuiErrorKind::TransactionSerializationError {
2777 error: "Failed to serialize transaction during dev inspect".to_string(),
2778 }
2779 })?
2780 } else {
2781 vec![]
2782 };
2783
2784 transaction.validity_check_no_gas_check(protocol_config)?;
2785
2786 let input_object_kinds = transaction.input_objects()?;
2787 let receiving_object_refs = transaction.receiving_objects();
2788
2789 sui_transaction_checks::deny::check_transaction_for_signing(
2790 &transaction,
2791 &[],
2792 &input_object_kinds,
2793 &receiving_object_refs,
2794 &self.config.transaction_deny_config,
2795 self.get_backing_package_store().as_ref(),
2796 )?;
2797
2798 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2799 None,
2801 &input_object_kinds,
2802 &receiving_object_refs,
2803 epoch_store.epoch(),
2804 )?;
2805
2806 let (gas_status, checked_input_objects) = if skip_checks {
2807 if transaction.gas().is_empty() {
2811 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2813 DEV_INSPECT_GAS_COIN_VALUE,
2814 transaction.gas_owner(),
2815 );
2816 let gas_object_ref = dummy_gas_object.compute_object_reference();
2817 transaction.gas_data_mut().payment = vec![gas_object_ref];
2818 input_objects.push(ObjectReadResult::new(
2819 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2820 dummy_gas_object.into(),
2821 ));
2822 }
2823 sui_transaction_checks::check_dev_inspect_input(
2824 protocol_config,
2825 &transaction,
2826 input_objects,
2827 receiving_objects,
2828 reference_gas_price,
2829 )?
2830 } else {
2831 if transaction.gas().is_empty() {
2834 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2836 DEV_INSPECT_GAS_COIN_VALUE,
2837 transaction.gas_owner(),
2838 );
2839 let gas_object_ref = dummy_gas_object.compute_object_reference();
2840 transaction.gas_data_mut().payment = vec![gas_object_ref];
2841 sui_transaction_checks::check_transaction_input_with_given_gas(
2842 epoch_store.protocol_config(),
2843 epoch_store.reference_gas_price(),
2844 &transaction,
2845 input_objects,
2846 receiving_objects,
2847 dummy_gas_object,
2848 &self.metrics.bytecode_verifier_metrics,
2849 &self.config.verifier_signing_config,
2850 )?
2851 } else {
2852 sui_transaction_checks::check_transaction_input(
2853 epoch_store.protocol_config(),
2854 epoch_store.reference_gas_price(),
2855 &transaction,
2856 input_objects,
2857 &receiving_objects,
2858 &self.metrics.bytecode_verifier_metrics,
2859 &self.config.verifier_signing_config,
2860 )?
2861 }
2862 };
2863
2864 let executor = sui_execution::executor(protocol_config, true)
2865 .expect("Creating an executor should not fail here");
2866 let gas_data = transaction.gas_data().clone();
2867 let intent_msg = IntentMessage::new(
2868 Intent {
2869 version: IntentVersion::V0,
2870 scope: IntentScope::TransactionData,
2871 app_id: AppId::Sui,
2872 },
2873 transaction,
2874 );
2875 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2876 let early_execution_error = get_early_execution_error(
2877 &transaction_digest,
2878 &checked_input_objects,
2879 self.config.certificate_deny_config.certificate_deny_set(),
2880 &FundsWithdrawStatus::MaybeSufficient,
2882 );
2883 let execution_params = match early_execution_error {
2884 Some(error) => ExecutionOrEarlyError::Err(error),
2885 None => ExecutionOrEarlyError::Ok(()),
2886 };
2887
2888 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2889 self.get_backing_store().as_ref(),
2890 protocol_config,
2891 self.metrics.limits_metrics.clone(),
2892 false,
2893 execution_params,
2894 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2895 epoch_store
2896 .epoch_start_config()
2897 .epoch_data()
2898 .epoch_start_timestamp(),
2899 checked_input_objects,
2900 gas_data,
2901 gas_status,
2902 transaction_kind,
2903 None, sender,
2905 transaction_digest,
2906 skip_checks,
2907 );
2908
2909 let raw_effects = if show_raw_txn_data_and_effects {
2910 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2911 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2912 })?
2913 } else {
2914 vec![]
2915 };
2916
2917 let mut layout_resolver =
2918 epoch_store
2919 .executor()
2920 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2921 &inner_temp_store,
2922 self.get_backing_package_store(),
2923 )));
2924
2925 DevInspectResults::new(
2926 effects,
2927 inner_temp_store.events.clone(),
2928 execution_result,
2929 raw_txn_data,
2930 raw_effects,
2931 layout_resolver.as_mut(),
2932 )
2933 }
2934
2935 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2937 let epoch_store = self.epoch_store_for_testing();
2938 Ok(epoch_store.reference_gas_price())
2939 }
2940
2941 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2942 self.get_transaction_cache_reader()
2943 .is_tx_already_executed(digest)
2944 }
2945
2946 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2947 fn index_tx(
2948 sequence: u64,
2949 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2950 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2951 indexes: &IndexStore,
2952 digest: &TransactionDigest,
2953 cert: &VerifiedExecutableTransaction,
2955 effects: &TransactionEffects,
2956 events: &TransactionEvents,
2957 timestamp_ms: u64,
2958 tx_coins: Option<TxCoins>,
2959 written: &WrittenObjects,
2960 inner_temporary_store: &InnerTemporaryStore,
2961 epoch_store: &Arc<AuthorityPerEpochStore>,
2962 acquire_locks: bool,
2963 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2964 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2965 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2966
2967 indexes.index_tx(
2968 sequence,
2969 cert.data().intent_message().value.sender(),
2970 cert.data()
2971 .intent_message()
2972 .value
2973 .input_objects()?
2974 .iter()
2975 .map(|o| o.object_id()),
2976 effects
2977 .all_changed_objects()
2978 .into_iter()
2979 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2980 cert.data()
2981 .intent_message()
2982 .value
2983 .move_calls()
2984 .into_iter()
2985 .map(|(_cmd_idx, package, module, function)| {
2986 (*package, module.to_owned(), function.to_owned())
2987 }),
2988 events,
2989 changes,
2990 digest,
2991 timestamp_ms,
2992 tx_coins,
2993 effects.accumulator_events(),
2994 acquire_locks,
2995 )
2996 }
2997
2998 #[cfg(msim)]
2999 fn simulate_fork_during_execution(
3000 &self,
3001 certificate: &VerifiedExecutableTransaction,
3002 epoch_store: &Arc<AuthorityPerEpochStore>,
3003 effects: &mut TransactionEffects,
3004 forked_validators: std::sync::Arc<
3005 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3006 >,
3007 full_halt: bool,
3008 effects_overrides: std::sync::Arc<
3009 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3010 >,
3011 fork_probability: f32,
3012 ) {
3013 use std::cell::RefCell;
3014 thread_local! {
3015 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3016 }
3017 if !certificate.data().intent_message().value.is_system_tx() {
3018 let committee = epoch_store.committee();
3019 let cur_stake = (**committee).weight(&self.name);
3020 if cur_stake > 0 {
3021 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3022 let already_forked = forked_validators
3023 .lock()
3024 .ok()
3025 .map(|set| set.contains(&self.name))
3026 .unwrap_or(false);
3027
3028 if !already_forked {
3029 let should_fork = if full_halt {
3030 *total_stake <= committee.validity_threshold()
3032 } else {
3033 *total_stake + cur_stake < committee.validity_threshold()
3035 };
3036
3037 if should_fork {
3038 *total_stake += cur_stake;
3039
3040 if let Ok(mut external_set) = forked_validators.lock() {
3041 external_set.insert(self.name);
3042 info!("forked_validators: {:?}", external_set);
3043 }
3044 }
3045 }
3046
3047 if let Ok(external_set) = forked_validators.lock() {
3048 if external_set.contains(&self.name) {
3049 let tx_digest = certificate.digest().to_string();
3055 if let Ok(mut overrides) = effects_overrides.lock() {
3056 if overrides.contains_key(&tx_digest)
3057 || overrides.is_empty()
3058 && sui_simulator::random::deterministic_probability(
3059 &tx_digest,
3060 fork_probability,
3061 )
3062 {
3063 let original_effects_digest = effects.digest().to_string();
3064 overrides
3065 .insert(tx_digest.clone(), original_effects_digest.clone());
3066 info!(
3067 ?tx_digest,
3068 ?original_effects_digest,
3069 "Captured forked effects digest for transaction"
3070 );
3071 effects.gas_cost_summary_mut_for_testing().computation_cost +=
3072 1;
3073 }
3074 }
3075 }
3076 }
3077 });
3078 }
3079 }
3080 }
3081
3082 fn process_object_index(
3083 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3084 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3085 effects: &TransactionEffects,
3086 written: &WrittenObjects,
3087 inner_temporary_store: &InnerTemporaryStore,
3088 epoch_store: &Arc<AuthorityPerEpochStore>,
3089 ) -> SuiResult<ObjectIndexChanges> {
3090 let mut layout_resolver =
3091 epoch_store
3092 .executor()
3093 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3094 inner_temporary_store,
3095 backing_package_store,
3096 )));
3097
3098 let modified_at_version = effects
3099 .modified_at_versions()
3100 .into_iter()
3101 .collect::<HashMap<_, _>>();
3102
3103 let tx_digest = effects.transaction_digest();
3104 let mut deleted_owners = vec![];
3105 let mut deleted_dynamic_fields = vec![];
3106 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3107 let old_version = modified_at_version.get(&id).unwrap();
3108 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3111 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3112 ) {
3113 Owner::AddressOwner(addr)
3114 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3115 Owner::ObjectOwner(object_id) => {
3116 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3117 }
3118 _ => {}
3119 }
3120 }
3121
3122 let mut new_owners = vec![];
3123 let mut new_dynamic_fields = vec![];
3124
3125 for (oref, owner, kind) in effects.all_changed_objects() {
3126 let id = &oref.0;
3127 if let WriteKind::Mutate = kind {
3129 let Some(old_version) = modified_at_version.get(id) else {
3130 panic!(
3131 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3132 tx_digest
3133 );
3134 };
3135 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3138 panic!(
3139 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3140 tx_digest, id, old_version
3141 );
3142 };
3143 if old_object.owner != owner {
3144 match old_object.owner {
3145 Owner::AddressOwner(addr)
3146 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3147 deleted_owners.push((addr, *id));
3148 }
3149 Owner::ObjectOwner(object_id) => {
3150 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3151 }
3152 _ => {}
3153 }
3154 }
3155 }
3156
3157 match owner {
3158 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3159 let new_object = written.get(id).unwrap_or_else(
3161 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3162 );
3163 assert_eq!(
3164 new_object.version(),
3165 oref.1,
3166 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3167 tx_digest,
3168 id,
3169 new_object.version(),
3170 oref.1
3171 );
3172
3173 let type_ = new_object
3174 .type_()
3175 .map(|type_| ObjectType::Struct(type_.clone()))
3176 .unwrap_or(ObjectType::Package);
3177
3178 new_owners.push((
3179 (addr, *id),
3180 ObjectInfo {
3181 object_id: *id,
3182 version: oref.1,
3183 digest: oref.2,
3184 type_,
3185 owner,
3186 previous_transaction: *effects.transaction_digest(),
3187 },
3188 ));
3189 }
3190 Owner::ObjectOwner(owner) => {
3191 let new_object = written.get(id).unwrap_or_else(
3192 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3193 );
3194 assert_eq!(
3195 new_object.version(),
3196 oref.1,
3197 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3198 tx_digest,
3199 id,
3200 new_object.version(),
3201 oref.1
3202 );
3203
3204 let Some(df_info) = Self::try_create_dynamic_field_info(
3205 object_store,
3206 new_object,
3207 written,
3208 layout_resolver.as_mut(),
3209 )
3210 .unwrap_or_else(|e| {
3211 error!(
3212 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3213 e, new_object
3214 );
3215 None
3216 }) else {
3217 continue;
3219 };
3220 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3221 }
3222 _ => {}
3223 }
3224 }
3225
3226 Ok(ObjectIndexChanges {
3227 deleted_owners,
3228 deleted_dynamic_fields,
3229 new_owners,
3230 new_dynamic_fields,
3231 })
3232 }
3233
3234 fn try_create_dynamic_field_info(
3235 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3236 o: &Object,
3237 written: &WrittenObjects,
3238 resolver: &mut dyn LayoutResolver,
3239 ) -> SuiResult<Option<DynamicFieldInfo>> {
3240 let Some(move_object) = o.data.try_as_move().cloned() else {
3242 return Ok(None);
3243 };
3244
3245 if !move_object.type_().is_dynamic_field() {
3247 return Ok(None);
3248 }
3249
3250 let layout = resolver
3251 .get_annotated_layout(&move_object.type_().clone().into())?
3252 .into_layout();
3253
3254 let field =
3255 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3256 SuiErrorKind::ObjectDeserializationError {
3257 error: e.to_string(),
3258 }
3259 })?;
3260
3261 let type_ = field.kind;
3262 let name_type: TypeTag = field.name_layout.into();
3263 let bcs_name = field.name_bytes.to_owned();
3264
3265 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3266 .map_err(|e| {
3267 warn!("{e}");
3268 SuiErrorKind::ObjectDeserializationError {
3269 error: e.to_string(),
3270 }
3271 })?;
3272
3273 let name = DynamicFieldName {
3274 type_: name_type,
3275 value: SuiMoveValue::from(name_value).to_json_value(),
3276 };
3277
3278 let value_metadata = field.value_metadata().map_err(|e| {
3279 warn!("{e}");
3280 SuiErrorKind::ObjectDeserializationError {
3281 error: e.to_string(),
3282 }
3283 })?;
3284
3285 Ok(Some(match value_metadata {
3286 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3287 name,
3288 bcs_name,
3289 type_,
3290 object_type: object_type.to_canonical_string(true),
3291 object_id: o.id(),
3292 version: o.version(),
3293 digest: o.digest(),
3294 },
3295
3296 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3297 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3301 let version = object.version();
3302 let digest = object.digest();
3303 let object_type = object.data.type_().unwrap().clone();
3304 (version, digest, object_type)
3305 } else {
3306 let object = object_store
3308 .get_object_by_key(&object_id, o.version())
3309 .ok_or_else(|| UserInputError::ObjectNotFound {
3310 object_id,
3311 version: Some(o.version()),
3312 })?;
3313 let version = object.version();
3314 let digest = object.digest();
3315 let object_type = object.data.type_().unwrap().clone();
3316 (version, digest, object_type)
3317 };
3318
3319 DynamicFieldInfo {
3320 name,
3321 bcs_name,
3322 type_,
3323 object_type: object_type.to_string(),
3324 object_id,
3325 version,
3326 digest,
3327 }
3328 }
3329 }))
3330 }
3331
3332 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3333 fn post_process_one_tx(
3334 &self,
3335 certificate: &VerifiedExecutableTransaction,
3336 effects: &TransactionEffects,
3337 inner_temporary_store: &InnerTemporaryStore,
3338 epoch_store: &Arc<AuthorityPerEpochStore>,
3339 ) -> SuiResult {
3340 let Some(indexes) = &self.indexes else {
3341 return Ok(());
3342 };
3343
3344 let tx_digest = *certificate.digest();
3345
3346 let sequence = indexes.allocate_sequence_number();
3348
3349 if self.config.sync_post_process_one_tx {
3350 let result = Self::post_process_one_tx_impl(
3355 sequence,
3356 indexes,
3357 &self.subscription_handler,
3358 &self.metrics,
3359 self.name,
3360 self.get_backing_package_store(),
3361 self.get_object_store(),
3362 certificate,
3363 effects,
3364 inner_temporary_store,
3365 epoch_store,
3366 true, );
3368
3369 match result {
3370 Ok((raw_batch, cache_updates_with_locks)) => {
3371 let mut db_batch = indexes.new_db_batch();
3372 db_batch
3373 .concat(vec![raw_batch])
3374 .expect("failed to absorb raw index batch");
3375 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3377 cache_updates_with_locks;
3378 indexes
3379 .commit_index_batch(db_batch, vec![inner])
3380 .expect("failed to commit index batch");
3381 }
3382 Err(e) => {
3383 self.metrics.post_processing_total_failures.inc();
3384 error!(?tx_digest, "tx post processing failed: {e}");
3385 return Err(e);
3386 }
3387 }
3388
3389 return Ok(());
3390 }
3391
3392 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3393 self.pending_post_processing.insert(tx_digest, done_rx);
3394
3395 let indexes = indexes.clone();
3396 let subscription_handler = self.subscription_handler.clone();
3397 let metrics = self.metrics.clone();
3398 let name = self.name;
3399 let backing_package_store = self.get_backing_package_store().clone();
3400 let object_store = self.get_object_store().clone();
3401 let semaphore = self.post_processing_semaphore.clone();
3402
3403 let certificate = certificate.clone();
3404 let effects = effects.clone();
3405 let inner_temporary_store = inner_temporary_store.clone();
3406 let epoch_store = epoch_store.clone();
3407
3408 tokio::spawn(async move {
3410 let permit = {
3411 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3412 semaphore
3413 .acquire_owned()
3414 .await
3415 .expect("post-processing semaphore should not be closed")
3416 };
3417
3418 let _ = tokio::task::spawn_blocking(move || {
3419 let _permit = permit;
3420
3421 let result = Self::post_process_one_tx_impl(
3422 sequence,
3423 &indexes,
3424 &subscription_handler,
3425 &metrics,
3426 name,
3427 &backing_package_store,
3428 &object_store,
3429 &certificate,
3430 &effects,
3431 &inner_temporary_store,
3432 &epoch_store,
3433 false, );
3435
3436 match result {
3437 Ok((raw_batch, cache_updates_with_locks)) => {
3438 fail_point!("crash-after-post-process-one-tx");
3439 let output = (raw_batch, cache_updates_with_locks.into_inner());
3440 let _ = done_tx.send(output);
3441 }
3442 Err(e) => {
3443 metrics.post_processing_total_failures.inc();
3444 error!(?tx_digest, "tx post processing failed: {e}");
3445 }
3446 }
3447 })
3448 .await;
3449 });
3450
3451 Ok(())
3452 }
3453
3454 fn post_process_one_tx_impl(
3455 sequence: u64,
3456 indexes: &Arc<IndexStore>,
3457 subscription_handler: &Arc<SubscriptionHandler>,
3458 metrics: &Arc<AuthorityMetrics>,
3459 name: AuthorityName,
3460 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3461 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3462 certificate: &VerifiedExecutableTransaction,
3463 effects: &TransactionEffects,
3464 inner_temporary_store: &InnerTemporaryStore,
3465 epoch_store: &Arc<AuthorityPerEpochStore>,
3466 acquire_locks: bool,
3467 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3468 let _scope = monitored_scope("Execution::post_process_one_tx");
3469
3470 let tx_digest = certificate.digest();
3471 let timestamp_ms = Self::unixtime_now_ms();
3472 let events = &inner_temporary_store.events;
3473 let written = &inner_temporary_store.written;
3474 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3475 name,
3476 object_store,
3477 effects,
3478 inner_temporary_store,
3479 epoch_store,
3480 );
3481
3482 let (raw_batch, cache_updates) = Self::index_tx(
3483 sequence,
3484 backing_package_store,
3485 object_store,
3486 indexes,
3487 tx_digest,
3488 certificate,
3489 effects,
3490 events,
3491 timestamp_ms,
3492 tx_coins,
3493 written,
3494 inner_temporary_store,
3495 epoch_store,
3496 acquire_locks,
3497 )
3498 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3499 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3500 .expect("Indexing tx should not fail");
3501
3502 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3503 let events = Self::make_transaction_block_events(
3504 backing_package_store,
3505 events.clone(),
3506 *tx_digest,
3507 timestamp_ms,
3508 epoch_store,
3509 inner_temporary_store,
3510 )?;
3511 subscription_handler
3513 .process_tx(certificate.data().transaction_data(), &effects, &events)
3514 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3515 .tap_err(|e| {
3516 warn!(
3517 ?tx_digest,
3518 "Post processing - Couldn't process events for tx: {}", e
3519 )
3520 })?;
3521
3522 metrics
3523 .post_processing_total_events_emitted
3524 .inc_by(events.data.len() as u64);
3525
3526 Ok((raw_batch, cache_updates))
3527 }
3528
3529 fn make_transaction_block_events(
3530 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3531 transaction_events: TransactionEvents,
3532 digest: TransactionDigest,
3533 timestamp_ms: u64,
3534 epoch_store: &Arc<AuthorityPerEpochStore>,
3535 inner_temporary_store: &InnerTemporaryStore,
3536 ) -> SuiResult<SuiTransactionBlockEvents> {
3537 let mut layout_resolver =
3538 epoch_store
3539 .executor()
3540 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3541 inner_temporary_store,
3542 backing_package_store,
3543 )));
3544 SuiTransactionBlockEvents::try_from(
3545 transaction_events,
3546 digest,
3547 Some(timestamp_ms),
3548 layout_resolver.as_mut(),
3549 )
3550 }
3551
3552 pub fn unixtime_now_ms() -> u64 {
3553 let now = SystemTime::now()
3554 .duration_since(UNIX_EPOCH)
3555 .expect("Time went backwards")
3556 .as_millis();
3557 u64::try_from(now).expect("Travelling in time machine")
3558 }
3559
3560 #[instrument(level = "trace", skip_all)]
3564 pub async fn handle_transaction_info_request(
3565 &self,
3566 request: TransactionInfoRequest,
3567 ) -> SuiResult<TransactionInfoResponse> {
3568 let epoch_store = self.load_epoch_store_one_call_per_task();
3569 let (transaction, status) = self
3570 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3571 .ok_or(SuiErrorKind::TransactionNotFound {
3572 digest: request.transaction_digest,
3573 })?;
3574 Ok(TransactionInfoResponse {
3575 transaction,
3576 status,
3577 })
3578 }
3579
3580 #[instrument(level = "trace", skip_all)]
3581 pub async fn handle_object_info_request(
3582 &self,
3583 request: ObjectInfoRequest,
3584 ) -> SuiResult<ObjectInfoResponse> {
3585 let epoch_store = self.load_epoch_store_one_call_per_task();
3586
3587 let requested_object_seq = match request.request_kind {
3588 ObjectInfoRequestKind::LatestObjectInfo => {
3589 let (_, seq, _) = self
3590 .get_object_or_tombstone(request.object_id)
3591 .await
3592 .ok_or_else(|| {
3593 SuiError::from(UserInputError::ObjectNotFound {
3594 object_id: request.object_id,
3595 version: None,
3596 })
3597 })?;
3598 seq
3599 }
3600 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3601 };
3602
3603 let object = self
3604 .get_object_store()
3605 .get_object_by_key(&request.object_id, requested_object_seq)
3606 .ok_or_else(|| {
3607 SuiError::from(UserInputError::ObjectNotFound {
3608 object_id: request.object_id,
3609 version: Some(requested_object_seq),
3610 })
3611 })?;
3612
3613 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3614 (request.generate_layout, object.data.try_as_move())
3615 {
3616 Some(into_struct_layout(
3617 self.load_epoch_store_one_call_per_task()
3618 .executor()
3619 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3620 .get_annotated_layout(&move_obj.type_().clone().into())?,
3621 )?)
3622 } else {
3623 None
3624 };
3625
3626 let lock = if !object.is_address_owned() {
3627 None
3629 } else {
3630 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3631 .await?
3632 .map(|s| s.into_inner())
3633 };
3634
3635 Ok(ObjectInfoResponse {
3636 object,
3637 layout,
3638 lock_for_debugging: lock,
3639 })
3640 }
3641
3642 #[instrument(level = "trace", skip_all)]
3643 pub fn handle_checkpoint_request(
3644 &self,
3645 request: &CheckpointRequest,
3646 ) -> SuiResult<CheckpointResponse> {
3647 let summary = match request.sequence_number {
3648 Some(seq) => self
3649 .checkpoint_store
3650 .get_checkpoint_by_sequence_number(seq)?,
3651 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3652 }
3653 .map(|v| v.into_inner());
3654 let contents = match &summary {
3655 Some(s) => self
3656 .checkpoint_store
3657 .get_checkpoint_contents(&s.content_digest)?,
3658 None => None,
3659 };
3660 Ok(CheckpointResponse {
3661 checkpoint: summary,
3662 contents,
3663 })
3664 }
3665
3666 #[instrument(level = "trace", skip_all)]
3667 pub fn handle_checkpoint_request_v2(
3668 &self,
3669 request: &CheckpointRequestV2,
3670 ) -> SuiResult<CheckpointResponseV2> {
3671 let summary = if request.certified {
3672 let summary = match request.sequence_number {
3673 Some(seq) => self
3674 .checkpoint_store
3675 .get_checkpoint_by_sequence_number(seq)?,
3676 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3677 }
3678 .map(|v| v.into_inner());
3679 summary.map(CheckpointSummaryResponse::Certified)
3680 } else {
3681 let summary = match request.sequence_number {
3682 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3683 None => self
3684 .checkpoint_store
3685 .get_latest_locally_computed_checkpoint()?,
3686 };
3687 summary.map(CheckpointSummaryResponse::Pending)
3688 };
3689 let contents = match &summary {
3690 Some(s) => self
3691 .checkpoint_store
3692 .get_checkpoint_contents(&s.content_digest())?,
3693 None => None,
3694 };
3695 Ok(CheckpointResponseV2 {
3696 checkpoint: summary,
3697 contents,
3698 })
3699 }
3700
3701 fn check_protocol_version(
3702 supported_protocol_versions: SupportedProtocolVersions,
3703 current_version: ProtocolVersion,
3704 ) {
3705 info!("current protocol version is now {:?}", current_version);
3706 info!("supported versions are: {:?}", supported_protocol_versions);
3707 if !supported_protocol_versions.is_version_supported(current_version) {
3708 let msg = format!(
3709 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3710 current_version, supported_protocol_versions,
3711 );
3712
3713 error!("{}", msg);
3714 eprintln!("{}", msg);
3715
3716 #[cfg(not(msim))]
3717 std::process::exit(1);
3718
3719 #[cfg(msim)]
3720 sui_simulator::task::shutdown_current_node();
3721 }
3722 }
3723
3724 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3726 pub async fn new(
3727 name: AuthorityName,
3728 secret: StableSyncAuthoritySigner,
3729 supported_protocol_versions: SupportedProtocolVersions,
3730 store: Arc<AuthorityStore>,
3731 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3732 epoch_store: Arc<AuthorityPerEpochStore>,
3733 committee_store: Arc<CommitteeStore>,
3734 indexes: Option<Arc<IndexStore>>,
3735 rpc_index: Option<Arc<RpcIndexStore>>,
3736 checkpoint_store: Arc<CheckpointStore>,
3737 prometheus_registry: &Registry,
3738 genesis_objects: &[Object],
3739 db_checkpoint_config: &DBCheckpointConfig,
3740 config: NodeConfig,
3741 chain_identifier: ChainIdentifier,
3742 policy_config: Option<PolicyConfig>,
3743 firewall_config: Option<RemoteFirewallConfig>,
3744 pruner_watermarks: Arc<PrunerWatermarks>,
3745 ) -> Arc<Self> {
3746 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3747
3748 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3749 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3750 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3751 execution_cache_trait_pointers.object_cache_reader.clone(),
3752 execution_cache_trait_pointers.account_funds_read.clone(),
3753 execution_cache_trait_pointers
3754 .transaction_cache_reader
3755 .clone(),
3756 tx_ready_certificates,
3757 &epoch_store,
3758 config.funds_withdraw_scheduler_type,
3759 metrics.clone(),
3760 prometheus_registry,
3761 ));
3762 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3763
3764 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3765 epoch_store.get_parent_path(),
3766 &config.authority_store_pruning_config,
3767 );
3768 let _pruner = AuthorityStorePruner::new(
3769 store.perpetual_tables.clone(),
3770 checkpoint_store.clone(),
3771 rpc_index.clone(),
3772 indexes.clone(),
3773 config.authority_store_pruning_config.clone(),
3774 epoch_store.committee().authority_exists(&name),
3775 epoch_store.epoch_start_state().epoch_duration_ms(),
3776 prometheus_registry,
3777 pruner_watermarks,
3778 );
3779 let input_loader =
3780 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3781 let epoch = epoch_store.epoch();
3782 let traffic_controller_metrics =
3783 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3784 let traffic_controller = if let Some(policy_config) = policy_config {
3785 Some(Arc::new(
3786 TrafficController::init(
3787 policy_config,
3788 traffic_controller_metrics,
3789 firewall_config.clone(),
3790 )
3791 .await,
3792 ))
3793 } else {
3794 None
3795 };
3796
3797 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3798 ForkRecoveryState::new(Some(fork_config))
3799 .expect("Failed to initialize fork recovery state")
3800 });
3801
3802 let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3803 execution_cache_trait_pointers.child_object_resolver.clone(),
3804 ));
3805
3806 let object_funds_checker_metrics =
3807 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3808 let state = Arc::new(AuthorityState {
3809 name,
3810 secret,
3811 execution_lock: RwLock::new(epoch),
3812 epoch_store: ArcSwap::new(epoch_store.clone()),
3813 input_loader,
3814 execution_cache_trait_pointers,
3815 coin_reservation_resolver,
3816 indexes,
3817 rpc_index,
3818 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3819 checkpoint_store,
3820 committee_store,
3821 execution_scheduler,
3822 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3823 metrics,
3824 _pruner,
3825 _authority_per_epoch_pruner,
3826 db_checkpoint_config: db_checkpoint_config.clone(),
3827 config,
3828 overload_info: AuthorityOverloadInfo::default(),
3829 chain_identifier,
3830 congestion_tracker: Arc::new(CongestionTracker::new()),
3831 traffic_controller,
3832 fork_recovery_state,
3833 notify_epoch: tokio::sync::watch::channel(epoch).0,
3834 object_funds_checker: ArcSwapOption::empty(),
3835 object_funds_checker_metrics,
3836 pending_post_processing: Arc::new(DashMap::new()),
3837 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3838 });
3839 state.init_object_funds_checker().await;
3840
3841 let state_clone = Arc::downgrade(&state);
3842 spawn_monitored_task!(fix_indexes(state_clone));
3843 let authority_state = Arc::downgrade(&state);
3845 spawn_monitored_task!(execution_process(
3846 authority_state,
3847 rx_ready_certificates,
3848 rx_execution_shutdown,
3849 ));
3850 state
3852 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3853 .expect("Error indexing genesis objects.");
3854
3855 if epoch_store
3856 .protocol_config()
3857 .enable_multi_epoch_transaction_expiration()
3858 && epoch_store.epoch() > 0
3859 {
3860 use typed_store::Map;
3861 let previous_epoch = epoch_store.epoch() - 1;
3862 let start_key = (previous_epoch, TransactionDigest::ZERO);
3863 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3864 let has_previous_epoch_data = store
3865 .perpetual_tables
3866 .executed_transaction_digests
3867 .safe_range_iter(start_key..end_key)
3868 .next()
3869 .is_some();
3870
3871 if !has_previous_epoch_data {
3872 panic!(
3873 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3874 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3875 Please restore from a snapshot using the latest version of sui-tool.",
3876 previous_epoch
3877 );
3878 }
3879 }
3880
3881 state
3882 }
3883
3884 async fn init_object_funds_checker(&self) {
3885 let epoch_store = self.epoch_store.load();
3886 if self.is_validator(&epoch_store)
3887 && epoch_store.protocol_config().enable_object_funds_withdraw()
3888 {
3889 if self.object_funds_checker.load().is_none() {
3890 let inner = self
3891 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3892 .await
3893 .map(|o| {
3894 Arc::new(ObjectFundsChecker::new(
3895 o.version(),
3896 self.object_funds_checker_metrics.clone(),
3897 ))
3898 });
3899 self.object_funds_checker.store(inner);
3900 }
3901 } else {
3902 self.object_funds_checker.store(None);
3903 }
3904 }
3905
3906 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3908 &self.execution_cache_trait_pointers.object_cache_reader
3909 }
3910
3911 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3912 &self.execution_cache_trait_pointers.transaction_cache_reader
3913 }
3914
3915 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3916 &self.execution_cache_trait_pointers.cache_writer
3917 }
3918
3919 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3920 &self.execution_cache_trait_pointers.backing_store
3921 }
3922
3923 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3924 &self.execution_cache_trait_pointers.child_object_resolver
3925 }
3926
3927 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3928 &self.execution_cache_trait_pointers.account_funds_read
3929 }
3930
3931 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3932 &self.execution_cache_trait_pointers.backing_package_store
3933 }
3934
3935 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3936 &self.execution_cache_trait_pointers.object_store
3937 }
3938
3939 pub fn pending_post_processing(
3940 &self,
3941 ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3942 &self.pending_post_processing
3943 }
3944
3945 pub async fn await_post_processing(
3946 &self,
3947 tx_digest: &TransactionDigest,
3948 ) -> Option<PostProcessingOutput> {
3949 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3950 rx.await.ok()
3952 } else {
3953 None
3955 }
3956 }
3957
3958 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3962 if let Some(indexes) = &self.indexes
3963 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3964 {
3965 let mut db_batch = indexes.new_db_batch();
3966 db_batch
3967 .concat(vec![raw_batch])
3968 .expect("failed to build index batch");
3969 indexes
3970 .commit_index_batch(db_batch, vec![cache_updates])
3971 .expect("failed to commit index batch");
3972 }
3973 }
3974
3975 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3976 &self.execution_cache_trait_pointers.reconfig_api
3977 }
3978
3979 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3980 &self.execution_cache_trait_pointers.global_state_hash_store
3981 }
3982
3983 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3984 &self.execution_cache_trait_pointers.checkpoint_cache
3985 }
3986
3987 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3988 &self.execution_cache_trait_pointers.state_sync_store
3989 }
3990
3991 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3992 &self.execution_cache_trait_pointers.cache_commit
3993 }
3994
3995 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3996 self.execution_cache_trait_pointers
3997 .testing_api
3998 .database_for_testing()
3999 }
4000
4001 pub fn cache_for_testing(&self) -> &WritebackCache {
4002 self.execution_cache_trait_pointers
4003 .testing_api
4004 .cache_for_testing()
4005 }
4006
4007 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4008 &self,
4009 config: NodeConfig,
4010 metrics: Arc<AuthorityStorePruningMetrics>,
4011 ) -> anyhow::Result<()> {
4012 use crate::authority::authority_store_pruner::PrunerWatermarks;
4013 let watermarks = Arc::new(PrunerWatermarks::default());
4014 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4015 &self.database_for_testing().perpetual_tables,
4016 &self.checkpoint_store,
4017 self.rpc_index.as_deref(),
4018 config.authority_store_pruning_config,
4019 metrics,
4020 EPOCH_DURATION_MS_FOR_TESTING,
4021 &watermarks,
4022 )
4023 .await
4024 }
4025
4026 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4027 &self.execution_scheduler
4028 }
4029
4030 fn create_owner_index_if_empty(
4031 &self,
4032 genesis_objects: &[Object],
4033 epoch_store: &Arc<AuthorityPerEpochStore>,
4034 ) -> SuiResult {
4035 let Some(index_store) = &self.indexes else {
4036 return Ok(());
4037 };
4038 if !index_store.is_empty() {
4039 return Ok(());
4040 }
4041
4042 let mut new_owners = vec![];
4043 let mut new_dynamic_fields = vec![];
4044 let mut layout_resolver = epoch_store
4045 .executor()
4046 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
4047 for o in genesis_objects.iter() {
4048 match o.owner {
4049 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4050 new_owners.push((
4051 (addr, o.id()),
4052 ObjectInfo::new(&o.compute_object_reference(), o),
4053 ))
4054 }
4055 Owner::ObjectOwner(object_id) => {
4056 let id = o.id();
4057 let Some(info) = Self::try_create_dynamic_field_info(
4058 self.get_object_store(),
4059 o,
4060 &BTreeMap::new(),
4061 layout_resolver.as_mut(),
4062 )?
4063 else {
4064 continue;
4065 };
4066 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4067 }
4068 _ => {}
4069 }
4070 }
4071
4072 index_store.insert_genesis_objects(ObjectIndexChanges {
4073 deleted_owners: vec![],
4074 deleted_dynamic_fields: vec![],
4075 new_owners,
4076 new_dynamic_fields,
4077 })
4078 }
4079
4080 pub fn execution_lock_for_executable_transaction(
4084 &self,
4085 transaction: &VerifiedExecutableTransaction,
4086 ) -> Option<ExecutionLockReadGuard<'_>> {
4087 let lock = self.execution_lock.try_read().ok()?;
4088 if *lock == transaction.auth_sig().epoch() {
4089 Some(lock)
4090 } else {
4091 None
4093 }
4094 }
4095
4096 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4099 self.execution_lock
4100 .try_read()
4101 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4102 }
4103
4104 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4105 self.execution_lock.write().await
4106 }
4107
4108 #[instrument(level = "error", skip_all)]
4109 pub async fn reconfigure(
4110 &self,
4111 cur_epoch_store: &AuthorityPerEpochStore,
4112 supported_protocol_versions: SupportedProtocolVersions,
4113 new_committee: Committee,
4114 epoch_start_configuration: EpochStartConfiguration,
4115 state_hasher: Arc<GlobalStateHasher>,
4116 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4117 epoch_last_checkpoint: CheckpointSequenceNumber,
4118 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4119 Self::check_protocol_version(
4120 supported_protocol_versions,
4121 epoch_start_configuration
4122 .epoch_start_state()
4123 .protocol_version(),
4124 );
4125
4126 self.committee_store.insert_new_committee(&new_committee)?;
4127
4128 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4130
4131 cur_epoch_store.epoch_terminated().await;
4133
4134 {
4138 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4139 if state.should_accept_user_certs() {
4140 cur_epoch_store.close_user_certs(state);
4147 }
4148 }
4150
4151 self.get_reconfig_api()
4152 .clear_state_end_of_epoch(&execution_lock);
4153 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4154 self.maybe_reaccumulate_state_hash(
4155 cur_epoch_store,
4156 epoch_start_configuration
4157 .epoch_start_state()
4158 .protocol_version(),
4159 );
4160 self.get_reconfig_api()
4161 .set_epoch_start_configuration(&epoch_start_configuration);
4162 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4163 && self
4164 .db_checkpoint_config
4165 .perform_db_checkpoints_at_epoch_end
4166 {
4167 let checkpoint_indexes = self
4168 .db_checkpoint_config
4169 .perform_index_db_checkpoints_at_epoch_end
4170 .unwrap_or(false);
4171 let current_epoch = cur_epoch_store.epoch();
4172 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4173 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4174 }
4175
4176 self.get_reconfig_api()
4177 .reconfigure_cache(&epoch_start_configuration)
4178 .await;
4179
4180 let new_epoch = new_committee.epoch;
4181 let new_epoch_store = self
4182 .reopen_epoch_db(
4183 cur_epoch_store,
4184 new_committee,
4185 epoch_start_configuration,
4186 expensive_safety_check_config,
4187 epoch_last_checkpoint,
4188 )
4189 .await?;
4190 assert_eq!(new_epoch_store.epoch(), new_epoch);
4191 self.execution_scheduler
4192 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4193 self.init_object_funds_checker().await;
4194 *execution_lock = new_epoch;
4195
4196 self.notify_epoch(new_epoch);
4197 Ok(new_epoch_store)
4201 }
4202
4203 fn notify_epoch(&self, new_epoch: EpochId) {
4204 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4205 }
4206
4207 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4208 let mut rx = self.notify_epoch.subscribe();
4209 loop {
4210 let epoch = *rx.borrow();
4211 if epoch >= target_epoch {
4212 return Ok(epoch);
4213 }
4214 rx.changed().await?;
4215 }
4216 }
4217
4218 pub async fn settle_accumulator_for_testing(
4222 &self,
4223 effects: &[TransactionEffects],
4224 checkpoint_seq: Option<u64>,
4225 ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4226 let accumulator_version = self
4227 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4228 .await
4229 .unwrap()
4230 .version();
4231 let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4233 let builder = AccumulatorSettlementTxBuilder::new(
4234 Some(self.get_transaction_cache_reader().as_ref()),
4235 effects,
4236 ckpt_seq,
4237 0,
4238 );
4239 let balance_changes = builder.collect_funds_changes();
4240 let epoch_store = self.epoch_store_for_testing();
4241 let epoch = epoch_store.epoch();
4242 let accumulator_root_obj_initial_shared_version = epoch_store
4243 .epoch_start_config()
4244 .accumulator_root_obj_initial_shared_version()
4245 .unwrap();
4246 let settlements = builder.build_tx(
4247 epoch_store.protocol_config(),
4248 epoch,
4249 accumulator_root_obj_initial_shared_version,
4250 ckpt_seq,
4251 ckpt_seq,
4252 );
4253
4254 let settlements: Vec<_> = settlements
4255 .into_iter()
4256 .map(|tx| {
4257 VerifiedExecutableTransaction::new_system(
4258 VerifiedTransaction::new_system_transaction(tx),
4259 epoch,
4260 )
4261 })
4262 .collect();
4263
4264 let assigned_versions = epoch_store
4265 .assign_shared_object_versions_for_tests(
4266 self.get_object_cache_reader().as_ref(),
4267 &settlements,
4268 )
4269 .unwrap();
4270 let version_map = assigned_versions.into_map();
4271
4272 let mut replay_txns = Vec::new();
4273 let mut settlement_effects = Vec::with_capacity(settlements.len());
4274 for tx in settlements {
4275 let assigned = version_map.get(&tx.key()).unwrap().clone();
4276 let env = ExecutionEnv::new().with_assigned_versions(assigned);
4277 let (effects, _) = self
4278 .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4279 .await
4280 .unwrap();
4281 assert!(effects.status().is_ok());
4282 replay_txns.push((tx, env));
4283 settlement_effects.push(effects);
4284 }
4285
4286 let barrier = accumulators::build_accumulator_barrier_tx(
4287 epoch,
4288 accumulator_root_obj_initial_shared_version,
4289 ckpt_seq,
4290 &settlement_effects,
4291 );
4292 let barrier = VerifiedExecutableTransaction::new_system(
4293 VerifiedTransaction::new_system_transaction(barrier),
4294 epoch,
4295 );
4296
4297 let assigned_versions = epoch_store
4298 .assign_shared_object_versions_for_tests(
4299 self.get_object_cache_reader().as_ref(),
4300 std::slice::from_ref(&barrier),
4301 )
4302 .unwrap();
4303 let version_map = assigned_versions.into_map();
4304
4305 let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4306 let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4307 let (effects, _) = self
4308 .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4309 .await
4310 .unwrap();
4311 assert!(effects.status().is_ok());
4312 replay_txns.push((barrier, env));
4313
4314 let next_accumulator_version = accumulator_version.next();
4315 self.execution_scheduler
4316 .settle_address_funds(FundsSettlement {
4317 funds_changes: balance_changes,
4318 next_accumulator_version,
4319 });
4320 replay_txns
4323 }
4324
4325 pub async fn replay_settlement_for_testing(
4328 &self,
4329 txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4330 ) {
4331 let epoch_store = self.epoch_store_for_testing();
4332 for (tx, env) in txns {
4333 let (effects, _) = self
4334 .try_execute_immediately(tx, env.clone(), &epoch_store)
4335 .await
4336 .unwrap();
4337 assert!(effects.status().is_ok());
4338 }
4339 }
4340
4341 pub async fn reconfigure_for_testing(&self) {
4345 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4346 let epoch_store = self.epoch_store_for_testing().clone();
4347 let protocol_config = epoch_store.protocol_config().clone();
4348 let _guard =
4355 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4356 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4357 self.get_backing_package_store().clone(),
4358 self.get_object_store().clone(),
4359 &self.config.expensive_safety_check_config,
4360 self.checkpoint_store
4361 .get_epoch_last_checkpoint(epoch_store.epoch())
4362 .unwrap()
4363 .map(|c| *c.sequence_number())
4364 .unwrap_or_default(),
4365 );
4366 self.execution_scheduler
4367 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4368 let new_epoch = new_epoch_store.epoch();
4369 self.epoch_store.store(new_epoch_store);
4370 epoch_store.epoch_terminated().await;
4371
4372 *execution_lock = new_epoch;
4373 }
4374
4375 #[instrument(level = "error", skip_all)]
4378 fn maybe_reaccumulate_state_hash(
4379 &self,
4380 cur_epoch_store: &AuthorityPerEpochStore,
4381 new_protocol_version: ProtocolVersion,
4382 ) {
4383 self.get_reconfig_api()
4384 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4385 }
4386
4387 #[instrument(level = "error", skip_all)]
4388 fn check_system_consistency(
4389 &self,
4390 cur_epoch_store: &AuthorityPerEpochStore,
4391 state_hasher: Arc<GlobalStateHasher>,
4392 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4393 ) {
4394 info!(
4395 "Performing sui conservation consistency check for epoch {}",
4396 cur_epoch_store.epoch()
4397 );
4398
4399 if let Err(err) = self
4400 .get_reconfig_api()
4401 .expensive_check_sui_conservation(cur_epoch_store)
4402 {
4403 if cfg!(debug_assertions) {
4404 panic!("{}", err);
4405 } else {
4406 warn!("Sui conservation consistency check failed: {}", err);
4409 }
4410 } else {
4411 info!("Sui conservation consistency check passed");
4412 }
4413
4414 if expensive_safety_check_config.enable_state_consistency_check() {
4416 info!(
4417 "Performing state consistency check for epoch {}",
4418 cur_epoch_store.epoch()
4419 );
4420 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4421 }
4422
4423 if expensive_safety_check_config.enable_secondary_index_checks()
4424 && let Some(indexes) = self.indexes.clone()
4425 {
4426 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4427 .expect("secondary indexes are inconsistent");
4428 }
4429
4430 if expensive_safety_check_config.enable_secondary_index_checks()
4434 && let Some(indexes) = self.indexes.clone()
4435 {
4436 let epoch = cur_epoch_store.epoch();
4437 let first_checkpoint = if epoch == 0 {
4441 0
4442 } else {
4443 self.checkpoint_store
4444 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4445 .expect("Failed to get previous epoch's last checkpoint")
4446 .expect("Previous epoch's last checkpoint missing")
4447 + 1
4448 };
4449 let highest_executed = self
4450 .checkpoint_store
4451 .get_highest_executed_checkpoint_seq_number()
4452 .expect("Failed to get highest executed checkpoint")
4453 .expect("No executed checkpoints");
4454
4455 info!(
4456 "Verifying checkpointed transactions are in transactions_seq \
4457 (checkpoints {first_checkpoint}..={highest_executed})"
4458 );
4459 for seq in first_checkpoint..=highest_executed {
4460 let checkpoint = self
4461 .checkpoint_store
4462 .get_checkpoint_by_sequence_number(seq)
4463 .expect("Failed to get checkpoint")
4464 .expect("Checkpoint missing");
4465 let contents = self
4466 .checkpoint_store
4467 .get_checkpoint_contents(&checkpoint.content_digest)
4468 .expect("Failed to get checkpoint contents")
4469 .expect("Checkpoint contents missing");
4470 for digests in contents.iter() {
4471 let tx_digest = digests.transaction;
4472 assert!(
4473 indexes
4474 .get_transaction_seq(&tx_digest)
4475 .expect("Failed to read transactions_seq")
4476 .is_some(),
4477 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4478 );
4479 }
4480 }
4481 info!("All checkpointed transactions verified in transactions_seq");
4482 }
4483 }
4484
4485 fn expensive_check_is_consistent_state(
4486 &self,
4487 state_hasher: Arc<GlobalStateHasher>,
4488 cur_epoch_store: &AuthorityPerEpochStore,
4489 ) {
4490 let live_object_set_hash = state_hasher.digest_live_object_set(
4491 !cur_epoch_store
4492 .protocol_config()
4493 .simplified_unwrap_then_delete(),
4494 );
4495
4496 let root_state_hash: ECMHLiveObjectSetDigest = self
4497 .get_global_state_hash_store()
4498 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4499 .expect("Retrieving root state hash cannot fail")
4500 .expect("Root state hash for epoch must exist")
4501 .1
4502 .digest()
4503 .into();
4504
4505 let is_inconsistent = root_state_hash != live_object_set_hash;
4506 if is_inconsistent {
4507 debug_fatal!(
4508 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4509 root_state_hash,
4510 live_object_set_hash
4511 );
4512 } else {
4513 info!("State consistency check passed");
4514 }
4515
4516 state_hasher.set_inconsistent_state(is_inconsistent);
4517 }
4518
4519 pub fn current_epoch_for_testing(&self) -> EpochId {
4520 self.epoch_store_for_testing().epoch()
4521 }
4522
4523 #[instrument(level = "error", skip_all)]
4524 pub fn checkpoint_all_dbs(
4525 &self,
4526 checkpoint_path: &Path,
4527 cur_epoch_store: &AuthorityPerEpochStore,
4528 checkpoint_indexes: bool,
4529 ) -> SuiResult {
4530 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4531 let current_epoch = cur_epoch_store.epoch();
4532
4533 if checkpoint_path.exists() {
4534 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4535 return Ok(());
4536 }
4537
4538 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4539 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4540
4541 if checkpoint_path_tmp.exists() {
4542 fs::remove_dir_all(&checkpoint_path_tmp)
4543 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4544 }
4545
4546 fs::create_dir_all(&checkpoint_path_tmp)
4547 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4548 fs::create_dir(&store_checkpoint_path_tmp)
4549 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4550
4551 self.checkpoint_store
4554 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4555
4556 self.get_reconfig_api()
4557 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4558
4559 self.committee_store
4560 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4561
4562 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4563 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4564 }
4565
4566 fs::rename(checkpoint_path_tmp, checkpoint_path)
4567 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4568 Ok(())
4569 }
4570
4571 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4577 self.epoch_store.load()
4578 }
4579
4580 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4582 self.load_epoch_store_one_call_per_task()
4583 }
4584
4585 pub fn clone_committee_for_testing(&self) -> Committee {
4586 Committee::clone(self.epoch_store_for_testing().committee())
4587 }
4588
4589 #[instrument(level = "trace", skip_all)]
4590 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4591 self.get_object_store().get_object(object_id)
4592 }
4593
4594 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4595 Ok(self
4596 .get_object(&SUI_SYSTEM_ADDRESS.into())
4597 .await
4598 .expect("framework object should always exist")
4599 .compute_object_reference())
4600 }
4601
4602 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4604 self.get_object_cache_reader()
4605 .get_sui_system_state_object_unsafe()
4606 }
4607
4608 #[instrument(level = "trace", skip_all)]
4609 fn get_transaction_checkpoint_sequence(
4610 &self,
4611 digest: &TransactionDigest,
4612 epoch_store: &AuthorityPerEpochStore,
4613 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4614 epoch_store.get_transaction_checkpoint(digest)
4615 }
4616
4617 #[instrument(level = "trace", skip_all)]
4618 pub fn get_checkpoint_by_sequence_number(
4619 &self,
4620 sequence_number: CheckpointSequenceNumber,
4621 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4622 Ok(self
4623 .checkpoint_store
4624 .get_checkpoint_by_sequence_number(sequence_number)?)
4625 }
4626
4627 #[instrument(level = "trace", skip_all)]
4628 pub fn get_transaction_checkpoint_for_tests(
4629 &self,
4630 digest: &TransactionDigest,
4631 epoch_store: &AuthorityPerEpochStore,
4632 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4633 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4634 let Some(checkpoint) = checkpoint else {
4635 return Ok(None);
4636 };
4637 let checkpoint = self
4638 .checkpoint_store
4639 .get_checkpoint_by_sequence_number(checkpoint)?;
4640 Ok(checkpoint)
4641 }
4642
4643 #[instrument(level = "trace", skip_all)]
4644 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4645 Ok(
4646 match self
4647 .get_object_cache_reader()
4648 .get_latest_object_or_tombstone(*object_id)
4649 {
4650 Some((_, ObjectOrTombstone::Object(object))) => {
4651 let layout = self.get_object_layout(&object)?;
4652 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4653 }
4654 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4655 None => ObjectRead::NotExists(*object_id),
4656 },
4657 )
4658 }
4659
4660 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4662 self.chain_identifier
4663 }
4664
4665 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4666 self.fork_recovery_state.as_ref()
4667 }
4668
4669 #[instrument(level = "trace", skip_all)]
4670 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4671 where
4672 T: DeserializeOwned,
4673 {
4674 let o = self.get_object_read(object_id)?.into_object()?;
4675 if let Some(move_object) = o.data.try_as_move() {
4676 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4677 SuiErrorKind::ObjectDeserializationError {
4678 error: format!("{e}"),
4679 }
4680 })?)
4681 } else {
4682 Err(SuiErrorKind::ObjectDeserializationError {
4683 error: format!("Provided object : [{object_id}] is not a Move object."),
4684 }
4685 .into())
4686 }
4687 }
4688
4689 #[instrument(level = "trace", skip_all)]
4695 pub fn get_past_object_read(
4696 &self,
4697 object_id: &ObjectID,
4698 version: SequenceNumber,
4699 ) -> SuiResult<PastObjectRead> {
4700 let Some(obj_ref) = self
4702 .get_object_cache_reader()
4703 .get_latest_object_ref_or_tombstone(*object_id)
4704 else {
4705 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4706 };
4707
4708 if version > obj_ref.1 {
4709 return Ok(PastObjectRead::VersionTooHigh {
4710 object_id: *object_id,
4711 asked_version: version,
4712 latest_version: obj_ref.1,
4713 });
4714 }
4715
4716 if version < obj_ref.1 {
4717 return Ok(match self.read_object_at_version(object_id, version)? {
4719 Some((object, layout)) => {
4720 let obj_ref = object.compute_object_reference();
4721 PastObjectRead::VersionFound(obj_ref, object, layout)
4722 }
4723
4724 None => PastObjectRead::VersionNotFound(*object_id, version),
4725 });
4726 }
4727
4728 if !obj_ref.2.is_alive() {
4729 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4730 }
4731
4732 match self.read_object_at_version(object_id, obj_ref.1)? {
4733 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4734 None => {
4735 debug_fatal!(
4736 "Object with in parent_entry is missing from object store, datastore is \
4737 inconsistent"
4738 );
4739 Err(UserInputError::ObjectNotFound {
4740 object_id: *object_id,
4741 version: Some(obj_ref.1),
4742 }
4743 .into())
4744 }
4745 }
4746 }
4747
4748 #[instrument(level = "trace", skip_all)]
4749 fn read_object_at_version(
4750 &self,
4751 object_id: &ObjectID,
4752 version: SequenceNumber,
4753 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4754 let Some(object) = self
4755 .get_object_cache_reader()
4756 .get_object_by_key(object_id, version)
4757 else {
4758 return Ok(None);
4759 };
4760
4761 let layout = self.get_object_layout(&object)?;
4762 Ok(Some((object, layout)))
4763 }
4764
4765 pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4766 let layout = object
4767 .data
4768 .try_as_move()
4769 .map(|object| {
4770 into_struct_layout(
4771 self.load_epoch_store_one_call_per_task()
4772 .executor()
4773 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4775 .get_annotated_layout(&object.type_().clone().into())?,
4776 )
4777 })
4778 .transpose()?;
4779 Ok(layout)
4780 }
4781
4782 #[instrument(level = "trace", skip_all)]
4786 pub fn get_address_balance_coin_info(
4787 &self,
4788 owner: SuiAddress,
4789 balance_type: TypeTag,
4790 ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4791 let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4792 let accumulator_obj = AccumulatorValue::load_object_by_id(
4793 self.get_child_object_resolver().as_ref(),
4794 None,
4795 *accumulator_id.inner(),
4796 )?;
4797
4798 let Some(accumulator_obj) = accumulator_obj else {
4799 return Ok(None);
4800 };
4801
4802 let currency_type =
4805 Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4806
4807 let balance = crate::accumulators::balances::get_balance(
4808 owner,
4809 self.get_child_object_resolver().as_ref(),
4810 currency_type,
4811 )?;
4812
4813 if balance == 0 {
4814 return Ok(None);
4815 };
4816
4817 let object_ref = coin_reservation::encode_object_ref(
4818 accumulator_obj.id(),
4819 accumulator_obj.version(),
4820 self.load_epoch_store_one_call_per_task().epoch(),
4821 balance,
4822 self.get_chain_identifier(),
4823 );
4824
4825 Ok(Some((
4826 object_ref,
4827 balance,
4828 accumulator_obj.previous_transaction,
4829 )))
4830 }
4831
4832 #[instrument(level = "trace", skip_all)]
4835 pub fn get_all_address_balance_coin_infos(
4836 &self,
4837 owner: SuiAddress,
4838 ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4839 let indexes = self
4840 .indexes
4841 .as_ref()
4842 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4843
4844 let mut result = std::collections::HashMap::new();
4845 for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4846 let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4847 if let Some((obj_ref, balance, prev_tx)) =
4848 self.get_address_balance_coin_info(owner, balance_type)?
4849 {
4850 result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4853 }
4854 }
4855 Ok(result)
4856 }
4857
4858 fn get_owner_at_version(
4859 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4860 object_id: &ObjectID,
4861 version: SequenceNumber,
4862 ) -> SuiResult<Owner> {
4863 object_store
4864 .get_object_by_key(object_id, version)
4865 .ok_or_else(|| {
4866 SuiError::from(UserInputError::ObjectNotFound {
4867 object_id: *object_id,
4868 version: Some(version),
4869 })
4870 })
4871 .map(|o| o.owner.clone())
4872 }
4873
4874 #[instrument(level = "trace", skip_all)]
4875 pub fn get_owner_objects(
4876 &self,
4877 owner: SuiAddress,
4878 cursor: Option<ObjectID>,
4880 limit: usize,
4881 filter: Option<SuiObjectDataFilter>,
4882 ) -> SuiResult<Vec<ObjectInfo>> {
4883 if let Some(indexes) = &self.indexes {
4884 indexes.get_owner_objects(owner, cursor, limit, filter)
4885 } else {
4886 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4887 }
4888 }
4889
4890 #[instrument(level = "trace", skip_all)]
4891 pub fn get_owned_coins_iterator_with_cursor(
4892 &self,
4893 owner: SuiAddress,
4894 cursor: (String, u64, ObjectID),
4896 limit: usize,
4897 one_coin_type_only: bool,
4898 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4899 if let Some(indexes) = &self.indexes {
4900 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4901 } else {
4902 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4903 }
4904 }
4905
4906 #[instrument(level = "trace", skip_all)]
4907 pub fn get_owner_objects_iterator(
4908 &self,
4909 owner: SuiAddress,
4910 cursor: Option<ObjectID>,
4912 filter: Option<SuiObjectDataFilter>,
4913 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4914 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4915 if let Some(indexes) = &self.indexes {
4916 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4917 } else {
4918 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4919 }
4920 }
4921
4922 #[instrument(level = "trace", skip_all)]
4923 pub async fn get_move_objects<T>(
4924 &self,
4925 owner: SuiAddress,
4926 type_: MoveObjectType,
4927 ) -> SuiResult<Vec<T>>
4928 where
4929 T: DeserializeOwned,
4930 {
4931 let object_ids = self
4932 .get_owner_objects_iterator(owner, None, None)?
4933 .filter(|o| match &o.type_ {
4934 ObjectType::Struct(s) => &type_ == s,
4935 ObjectType::Package => false,
4936 })
4937 .map(|info| ObjectKey(info.object_id, info.version))
4938 .collect::<Vec<_>>();
4939 let mut move_objects = vec![];
4940
4941 let objects = self
4942 .get_object_store()
4943 .multi_get_objects_by_key(&object_ids);
4944
4945 for (o, id) in objects.into_iter().zip(object_ids) {
4946 let object = o.ok_or_else(|| {
4947 SuiError::from(UserInputError::ObjectNotFound {
4948 object_id: id.0,
4949 version: Some(id.1),
4950 })
4951 })?;
4952 let move_object = object.data.try_as_move().ok_or_else(|| {
4953 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4954 })?;
4955 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4956 SuiErrorKind::ObjectDeserializationError {
4957 error: format!("{e}"),
4958 }
4959 })?);
4960 }
4961 Ok(move_objects)
4962 }
4963
4964 #[instrument(level = "trace", skip_all)]
4965 pub fn get_dynamic_fields(
4966 &self,
4967 owner: ObjectID,
4968 cursor: Option<ObjectID>,
4970 limit: usize,
4971 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4972 Ok(self
4973 .get_dynamic_fields_iterator(owner, cursor)?
4974 .take(limit)
4975 .collect::<Result<Vec<_>, _>>()?)
4976 }
4977
4978 fn get_dynamic_fields_iterator(
4979 &self,
4980 owner: ObjectID,
4981 cursor: Option<ObjectID>,
4983 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4984 {
4985 if let Some(indexes) = &self.indexes {
4986 indexes.get_dynamic_fields_iterator(owner, cursor)
4987 } else {
4988 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4989 }
4990 }
4991
4992 #[instrument(level = "trace", skip_all)]
4993 pub fn get_dynamic_field_object_id(
4994 &self,
4995 owner: ObjectID,
4996 name_type: TypeTag,
4997 name_bcs_bytes: &[u8],
4998 ) -> SuiResult<Option<ObjectID>> {
4999 if let Some(indexes) = &self.indexes {
5000 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5001 } else {
5002 Err(SuiErrorKind::IndexStoreNotAvailable.into())
5003 }
5004 }
5005
5006 #[instrument(level = "trace", skip_all)]
5007 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5008 Ok(self.get_indexes()?.next_sequence_number())
5009 }
5010
5011 #[instrument(level = "trace", skip_all)]
5012 pub async fn get_executed_transaction_and_effects(
5013 &self,
5014 digest: TransactionDigest,
5015 kv_store: Arc<TransactionKeyValueStore>,
5016 ) -> SuiResult<(Transaction, TransactionEffects)> {
5017 let transaction = kv_store.get_tx(digest).await?;
5018 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5019 Ok((transaction, effects))
5020 }
5021
5022 #[instrument(level = "trace", skip_all)]
5023 pub fn multi_get_checkpoint_by_sequence_number(
5024 &self,
5025 sequence_numbers: &[CheckpointSequenceNumber],
5026 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5027 Ok(self
5028 .checkpoint_store
5029 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5030 }
5031
5032 #[instrument(level = "trace", skip_all)]
5033 pub fn get_transaction_events(
5034 &self,
5035 digest: &TransactionDigest,
5036 ) -> SuiResult<TransactionEvents> {
5037 self.get_transaction_cache_reader()
5038 .get_events(digest)
5039 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5040 }
5041
5042 pub fn get_transaction_input_objects(
5043 &self,
5044 effects: &TransactionEffects,
5045 ) -> SuiResult<Vec<Object>> {
5046 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5047 .map_err(Into::into)
5048 }
5049
5050 pub fn get_transaction_output_objects(
5051 &self,
5052 effects: &TransactionEffects,
5053 ) -> SuiResult<Vec<Object>> {
5054 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5055 .map_err(Into::into)
5056 }
5057
5058 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5059 match &self.indexes {
5060 Some(i) => Ok(i.clone()),
5061 None => Err(SuiErrorKind::UnsupportedFeatureError {
5062 error: "extended object indexing is not enabled on this server".into(),
5063 }
5064 .into()),
5065 }
5066 }
5067
5068 pub async fn get_transactions_for_tests(
5069 self: &Arc<Self>,
5070 filter: Option<TransactionFilter>,
5071 cursor: Option<TransactionDigest>,
5072 limit: Option<usize>,
5073 reverse: bool,
5074 ) -> SuiResult<Vec<TransactionDigest>> {
5075 let metrics = KeyValueStoreMetrics::new_for_tests();
5076 let kv_store = Arc::new(TransactionKeyValueStore::new(
5077 "rocksdb",
5078 metrics,
5079 self.clone(),
5080 ));
5081 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5082 .await
5083 }
5084
5085 #[instrument(level = "trace", skip_all)]
5086 pub async fn get_transactions(
5087 &self,
5088 kv_store: &Arc<TransactionKeyValueStore>,
5089 filter: Option<TransactionFilter>,
5090 cursor: Option<TransactionDigest>,
5092 limit: Option<usize>,
5093 reverse: bool,
5094 ) -> SuiResult<Vec<TransactionDigest>> {
5095 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5096 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5097 let iter = checkpoint_contents.iter().map(|c| c.transaction);
5098 if reverse {
5099 let iter = iter
5100 .rev()
5101 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5102 .skip(usize::from(cursor.is_some()));
5103 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5104 } else {
5105 let iter = iter
5106 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5107 .skip(usize::from(cursor.is_some()));
5108 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5109 }
5110 }
5111 self.get_indexes()?
5112 .get_transactions(filter, cursor, limit, reverse)
5113 }
5114
5115 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5116 &self.checkpoint_store
5117 }
5118
5119 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5120 self.get_checkpoint_store()
5121 .get_highest_executed_checkpoint_seq_number()?
5122 .ok_or(
5123 SuiErrorKind::UserInputError {
5124 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5125 }
5126 .into(),
5127 )
5128 }
5129
5130 #[cfg(msim)]
5131 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5132 self.database_for_testing()
5133 .perpetual_tables
5134 .get_highest_pruned_checkpoint()
5135 .map(|c| c.unwrap_or(0))
5136 .map_err(Into::into)
5137 }
5138
5139 #[instrument(level = "trace", skip_all)]
5140 pub fn get_checkpoint_summary_by_sequence_number(
5141 &self,
5142 sequence_number: CheckpointSequenceNumber,
5143 ) -> SuiResult<CheckpointSummary> {
5144 let verified_checkpoint = self
5145 .get_checkpoint_store()
5146 .get_checkpoint_by_sequence_number(sequence_number)?;
5147 match verified_checkpoint {
5148 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5149 None => Err(SuiErrorKind::UserInputError {
5150 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5151 }
5152 .into()),
5153 }
5154 }
5155
5156 #[instrument(level = "trace", skip_all)]
5157 pub fn get_checkpoint_summary_by_digest(
5158 &self,
5159 digest: CheckpointDigest,
5160 ) -> SuiResult<CheckpointSummary> {
5161 let verified_checkpoint = self
5162 .get_checkpoint_store()
5163 .get_checkpoint_by_digest(&digest)?;
5164 match verified_checkpoint {
5165 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5166 None => Err(SuiErrorKind::UserInputError {
5167 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5168 }
5169 .into()),
5170 }
5171 }
5172
5173 #[instrument(level = "trace", skip_all)]
5174 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5175 if is_system_package(package_id) {
5176 return self.find_genesis_txn_digest();
5177 }
5178 Ok(self
5179 .get_object_read(&package_id)?
5180 .into_object()?
5181 .previous_transaction)
5182 }
5183
5184 #[instrument(level = "trace", skip_all)]
5185 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5186 let summary = self
5187 .get_verified_checkpoint_by_sequence_number(0)?
5188 .into_message();
5189 let content = self.get_checkpoint_contents(summary.content_digest)?;
5190 let genesis_transaction = content.enumerate_transactions(&summary).next();
5191 Ok(genesis_transaction
5192 .ok_or(SuiErrorKind::UserInputError {
5193 error: UserInputError::GenesisTransactionNotFound,
5194 })?
5195 .1
5196 .transaction)
5197 }
5198
5199 #[instrument(level = "trace", skip_all)]
5200 pub fn get_verified_checkpoint_by_sequence_number(
5201 &self,
5202 sequence_number: CheckpointSequenceNumber,
5203 ) -> SuiResult<VerifiedCheckpoint> {
5204 let verified_checkpoint = self
5205 .get_checkpoint_store()
5206 .get_checkpoint_by_sequence_number(sequence_number)?;
5207 match verified_checkpoint {
5208 Some(verified_checkpoint) => Ok(verified_checkpoint),
5209 None => Err(SuiErrorKind::UserInputError {
5210 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5211 }
5212 .into()),
5213 }
5214 }
5215
5216 #[instrument(level = "trace", skip_all)]
5217 pub fn get_verified_checkpoint_summary_by_digest(
5218 &self,
5219 digest: CheckpointDigest,
5220 ) -> SuiResult<VerifiedCheckpoint> {
5221 let verified_checkpoint = self
5222 .get_checkpoint_store()
5223 .get_checkpoint_by_digest(&digest)?;
5224 match verified_checkpoint {
5225 Some(verified_checkpoint) => Ok(verified_checkpoint),
5226 None => Err(SuiErrorKind::UserInputError {
5227 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5228 }
5229 .into()),
5230 }
5231 }
5232
5233 #[instrument(level = "trace", skip_all)]
5234 pub fn get_checkpoint_contents(
5235 &self,
5236 digest: CheckpointContentsDigest,
5237 ) -> SuiResult<CheckpointContents> {
5238 self.get_checkpoint_store()
5239 .get_checkpoint_contents(&digest)?
5240 .ok_or(
5241 SuiErrorKind::UserInputError {
5242 error: UserInputError::CheckpointContentsNotFound(digest),
5243 }
5244 .into(),
5245 )
5246 }
5247
5248 #[instrument(level = "trace", skip_all)]
5249 pub fn get_checkpoint_contents_by_sequence_number(
5250 &self,
5251 sequence_number: CheckpointSequenceNumber,
5252 ) -> SuiResult<CheckpointContents> {
5253 let verified_checkpoint = self
5254 .get_checkpoint_store()
5255 .get_checkpoint_by_sequence_number(sequence_number)?;
5256 match verified_checkpoint {
5257 Some(verified_checkpoint) => {
5258 let content_digest = verified_checkpoint.into_inner().content_digest;
5259 self.get_checkpoint_contents(content_digest)
5260 }
5261 None => Err(SuiErrorKind::UserInputError {
5262 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5263 }
5264 .into()),
5265 }
5266 }
5267
5268 #[instrument(level = "trace", skip_all)]
5269 pub async fn query_events(
5270 &self,
5271 kv_store: &Arc<TransactionKeyValueStore>,
5272 query: EventFilter,
5273 cursor: Option<EventID>,
5275 limit: usize,
5276 descending: bool,
5277 ) -> SuiResult<Vec<SuiEvent>> {
5278 let index_store = self.get_indexes()?;
5279
5280 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5282 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5283 SuiErrorKind::TransactionNotFound {
5284 digest: cursor.tx_digest,
5285 },
5286 )?;
5287 (tx_seq, cursor.event_seq as usize)
5288 } else if descending {
5289 (u64::MAX, usize::MAX)
5290 } else {
5291 (0, 0)
5292 };
5293
5294 let limit = limit + 1;
5295 let mut event_keys = match query {
5296 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5297 EventFilter::Transaction(digest) => {
5298 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5299 }
5300 EventFilter::MoveModule { package, module } => {
5301 let module_id = ModuleId::new(package.into(), module);
5302 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5303 }
5304 EventFilter::MoveEventType(struct_name) => index_store
5305 .events_by_move_event_struct_name(
5306 &struct_name,
5307 tx_num,
5308 event_num,
5309 limit,
5310 descending,
5311 )?,
5312 EventFilter::Sender(sender) => {
5313 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5314 }
5315 EventFilter::TimeRange {
5316 start_time,
5317 end_time,
5318 } => index_store
5319 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5320 EventFilter::MoveEventModule { package, module } => index_store
5321 .events_by_move_event_module(
5322 &ModuleId::new(package.into(), module),
5323 tx_num,
5324 event_num,
5325 limit,
5326 descending,
5327 )?,
5328 EventFilter::Any(_) => {
5330 return Err(SuiErrorKind::UserInputError {
5331 error: UserInputError::Unsupported(
5332 "'Any' queries are not supported by the fullnode.".to_string(),
5333 ),
5334 }
5335 .into());
5336 }
5337 };
5338
5339 if cursor.is_some() {
5342 if !event_keys.is_empty() {
5343 event_keys.remove(0);
5344 }
5345 } else {
5346 event_keys.truncate(limit - 1);
5347 }
5348
5349 let transaction_digests = event_keys
5351 .iter()
5352 .map(|(_, digest, _, _)| *digest)
5353 .collect::<HashSet<_>>()
5354 .into_iter()
5355 .collect::<Vec<_>>();
5356
5357 let events = kv_store
5358 .multi_get_events_by_tx_digests(&transaction_digests)
5359 .await?;
5360
5361 let events_map: HashMap<_, _> =
5362 transaction_digests.iter().zip(events.into_iter()).collect();
5363
5364 let stored_events = event_keys
5365 .into_iter()
5366 .map(|k| {
5367 (
5368 k,
5369 events_map
5370 .get(&k.1)
5371 .expect("fetched digest is missing")
5372 .clone()
5373 .and_then(|e| e.data.get(k.2).cloned()),
5374 )
5375 })
5376 .map(
5377 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5378 event
5379 .map(|e| (e, tx_digest, event_seq, timestamp))
5380 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5381 },
5382 )
5383 .collect::<Result<Vec<_>, _>>()?;
5384
5385 let epoch_store = self.load_epoch_store_one_call_per_task();
5386 let backing_store = self.get_backing_package_store().as_ref();
5387 let mut layout_resolver = epoch_store
5388 .executor()
5389 .type_layout_resolver(Box::new(backing_store));
5390 let mut events = vec![];
5391 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5392 events.push(SuiEvent::try_from(
5393 e.clone(),
5394 tx_digest,
5395 event_seq as u64,
5396 Some(timestamp),
5397 layout_resolver.get_annotated_layout(&e.type_)?,
5398 )?)
5399 }
5400 Ok(events)
5401 }
5402
5403 pub async fn insert_genesis_object(&self, object: Object) {
5404 self.get_reconfig_api().insert_genesis_object(object);
5405 }
5406
5407 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5408 futures::future::join_all(
5409 objects
5410 .iter()
5411 .map(|o| self.insert_genesis_object(o.clone())),
5412 )
5413 .await;
5414 }
5415
5416 #[instrument(level = "trace", skip_all)]
5418 pub fn get_transaction_status(
5419 &self,
5420 transaction_digest: &TransactionDigest,
5421 epoch_store: &Arc<AuthorityPerEpochStore>,
5422 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5423 if let Some(effects) =
5425 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5426 {
5427 if let Some(transaction) = self
5428 .get_transaction_cache_reader()
5429 .get_transaction_block(transaction_digest)
5430 {
5431 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5432 let events = if effects.events_digest().is_some() {
5433 self.get_transaction_events(effects.transaction_digest())?
5434 } else {
5435 TransactionEvents::default()
5436 };
5437 return Ok(Some((
5438 (*transaction).clone().into_message(),
5439 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5440 )));
5441 } else {
5442 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5446 }
5447 }
5448 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5449 self.metrics.tx_already_processed.inc();
5450 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5451 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5452 } else {
5453 Ok(None)
5454 }
5455 }
5456
5457 #[instrument(level = "trace", skip_all)]
5461 pub fn get_signed_effects_and_maybe_resign(
5462 &self,
5463 transaction_digest: &TransactionDigest,
5464 epoch_store: &Arc<AuthorityPerEpochStore>,
5465 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5466 let effects = self
5467 .get_transaction_cache_reader()
5468 .get_executed_effects(transaction_digest);
5469 match effects {
5470 Some(effects) => {
5471 if effects.executed_epoch() != epoch_store.epoch() {
5491 debug!(
5492 tx_digest=?transaction_digest,
5493 effects_epoch=?effects.executed_epoch(),
5494 epoch=?epoch_store.epoch(),
5495 "Re-signing the effects with the current epoch"
5496 );
5497 }
5498 Ok(Some(self.sign_effects(effects, epoch_store)?))
5499 }
5500 None => Ok(None),
5501 }
5502 }
5503
5504 #[instrument(level = "trace", skip_all)]
5505 pub(crate) fn sign_effects(
5506 &self,
5507 effects: TransactionEffects,
5508 epoch_store: &Arc<AuthorityPerEpochStore>,
5509 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5510 let tx_digest = *effects.transaction_digest();
5511 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5512 Some(sig) => {
5513 debug_assert!(sig.epoch == epoch_store.epoch());
5514 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5515 }
5516 _ => {
5517 let sig = AuthoritySignInfo::new(
5518 epoch_store.epoch(),
5519 &effects,
5520 Intent::sui_app(IntentScope::TransactionEffects),
5521 self.name,
5522 &*self.secret,
5523 );
5524
5525 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5526
5527 epoch_store.insert_effects_digest_and_signature(
5528 &tx_digest,
5529 effects.digest(),
5530 &sig,
5531 )?;
5532
5533 effects
5534 }
5535 };
5536
5537 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5538 signed_effects,
5539 ))
5540 }
5541
5542 #[instrument(level = "trace", skip_all)]
5544 fn fullnode_only_get_tx_coins_for_indexing(
5545 name: AuthorityName,
5546 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5547 effects: &TransactionEffects,
5548 inner_temporary_store: &InnerTemporaryStore,
5549 epoch_store: &Arc<AuthorityPerEpochStore>,
5550 ) -> Option<TxCoins> {
5551 if epoch_store.committee().authority_exists(&name) {
5552 return None;
5553 }
5554 let written_coin_objects = inner_temporary_store
5555 .written
5556 .iter()
5557 .filter_map(|(k, v)| {
5558 if v.is_coin() {
5559 Some((*k, v.clone()))
5560 } else {
5561 None
5562 }
5563 })
5564 .collect();
5565 let mut input_coin_objects = inner_temporary_store
5566 .input_objects
5567 .iter()
5568 .filter_map(|(k, v)| {
5569 if v.is_coin() {
5570 Some((*k, v.clone()))
5571 } else {
5572 None
5573 }
5574 })
5575 .collect::<ObjectMap>();
5576
5577 for (object_id, version) in effects.modified_at_versions() {
5581 if inner_temporary_store
5582 .loaded_runtime_objects
5583 .contains_key(&object_id)
5584 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5585 && object.is_coin()
5586 {
5587 input_coin_objects.insert(object_id, object);
5588 }
5589 }
5590
5591 Some((input_coin_objects, written_coin_objects))
5592 }
5593
5594 #[instrument(level = "trace", skip_all)]
5602 pub async fn get_transaction_lock(
5603 &self,
5604 object_ref: &ObjectRef,
5605 epoch_store: &AuthorityPerEpochStore,
5606 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5607 let lock_info = self
5608 .get_object_cache_reader()
5609 .get_lock(*object_ref, epoch_store)?;
5610 let lock_info = match lock_info {
5611 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5612 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5613 provided_obj_ref: *object_ref,
5614 current_version: locked_ref.1,
5615 }
5616 .into());
5617 }
5618 ObjectLockStatus::Initialized => {
5619 return Ok(None);
5620 }
5621 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5622 };
5623
5624 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5625 }
5626
5627 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5628 self.get_object_cache_reader().get_objects(objects)
5629 }
5630
5631 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5632 self.get_object_cache_reader()
5633 .get_latest_object_ref_or_tombstone(object_id)
5634 }
5635
5636 pub fn set_override_protocol_upgrade_buffer_stake(
5645 &self,
5646 expected_epoch: EpochId,
5647 buffer_stake_bps: u64,
5648 ) -> SuiResult {
5649 let epoch_store = self.load_epoch_store_one_call_per_task();
5650 let actual_epoch = epoch_store.epoch();
5651 if actual_epoch != expected_epoch {
5652 return Err(SuiErrorKind::WrongEpoch {
5653 expected_epoch,
5654 actual_epoch,
5655 }
5656 .into());
5657 }
5658
5659 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5660 }
5661
5662 pub fn clear_override_protocol_upgrade_buffer_stake(
5663 &self,
5664 expected_epoch: EpochId,
5665 ) -> SuiResult {
5666 let epoch_store = self.load_epoch_store_one_call_per_task();
5667 let actual_epoch = epoch_store.epoch();
5668 if actual_epoch != expected_epoch {
5669 return Err(SuiErrorKind::WrongEpoch {
5670 expected_epoch,
5671 actual_epoch,
5672 }
5673 .into());
5674 }
5675
5676 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5677 }
5678
5679 pub async fn get_available_system_packages(
5682 &self,
5683 binary_config: &BinaryConfig,
5684 ) -> Vec<ObjectRef> {
5685 let mut results = vec![];
5686
5687 let system_packages = BuiltInFramework::iter_system_packages();
5688
5689 #[cfg(msim)]
5691 let extra_packages = framework_injection::get_extra_packages(self.name);
5692 #[cfg(msim)]
5693 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5694
5695 for system_package in system_packages {
5696 let modules = system_package.modules().to_vec();
5697 #[cfg(msim)]
5699 let modules =
5700 match framework_injection::get_override_modules(&system_package.id, self.name) {
5701 Some(overrides) if overrides.is_empty() => continue,
5702 Some(overrides) => overrides,
5703 None => modules,
5704 };
5705
5706 let Some(obj_ref) = sui_framework::compare_system_package(
5707 &self.get_object_store(),
5708 &system_package.id,
5709 &modules,
5710 system_package.dependencies.to_vec(),
5711 binary_config,
5712 )
5713 .await
5714 else {
5715 return vec![];
5716 };
5717 results.push(obj_ref);
5718 }
5719
5720 results
5721 }
5722
5723 async fn get_system_package_bytes(
5737 &self,
5738 system_packages: Vec<ObjectRef>,
5739 binary_config: &BinaryConfig,
5740 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5741 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5742 let objects = self.get_objects(&ids).await;
5743
5744 let mut res = Vec::with_capacity(system_packages.len());
5745 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5746 let prev_transaction = match object {
5747 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5748 info!("Framework {} does not need updating", system_package_ref.0);
5750 continue;
5751 }
5752
5753 Some(cur_object) => cur_object.previous_transaction,
5754 None => TransactionDigest::genesis_marker(),
5755 };
5756
5757 #[cfg(msim)]
5758 let SystemPackage {
5759 id: _,
5760 bytes,
5761 dependencies,
5762 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5763 .unwrap_or_else(|| {
5764 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5765 });
5766
5767 #[cfg(not(msim))]
5768 let SystemPackage {
5769 id: _,
5770 bytes,
5771 dependencies,
5772 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5773
5774 let modules: Vec<_> = bytes
5775 .iter()
5776 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5777 .collect();
5778
5779 let new_object = Object::new_system_package(
5780 &modules,
5781 system_package_ref.1,
5782 dependencies.clone(),
5783 prev_transaction,
5784 );
5785
5786 let new_ref = new_object.compute_object_reference();
5787 if new_ref != system_package_ref {
5788 if cfg!(msim) {
5789 debug_fatal!(
5791 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5792 );
5793 } else {
5794 error!(
5795 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5796 );
5797 }
5798 return None;
5799 }
5800
5801 res.push((system_package_ref.1, bytes, dependencies));
5802 }
5803
5804 Some(res)
5805 }
5806
5807 fn is_protocol_version_supported_v2(
5808 current_protocol_version: ProtocolVersion,
5809 proposed_protocol_version: ProtocolVersion,
5810 protocol_config: &ProtocolConfig,
5811 committee: &Committee,
5812 capabilities: Vec<AuthorityCapabilitiesV2>,
5813 mut buffer_stake_bps: u64,
5814 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5815 if proposed_protocol_version > current_protocol_version + 1
5816 && !protocol_config.advance_to_highest_supported_protocol_version()
5817 {
5818 return None;
5819 }
5820
5821 if buffer_stake_bps > 10000 {
5822 warn!("clamping buffer_stake_bps to 10000");
5823 buffer_stake_bps = 10000;
5824 }
5825
5826 let mut desired_upgrades: Vec<_> = capabilities
5829 .into_iter()
5830 .filter_map(|mut cap| {
5831 if cap.available_system_packages.is_empty() {
5833 return None;
5834 }
5835
5836 cap.available_system_packages.sort();
5837
5838 info!(
5839 "validator {:?} supports {:?} with system packages: {:?}",
5840 cap.authority.concise(),
5841 cap.supported_protocol_versions,
5842 cap.available_system_packages,
5843 );
5844
5845 cap.supported_protocol_versions
5849 .get_version_digest(proposed_protocol_version)
5850 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5851 })
5852 .collect();
5853
5854 desired_upgrades.sort();
5856 desired_upgrades
5857 .into_iter()
5858 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5859 .into_iter()
5860 .find_map(|((digest, packages), group)| {
5861 assert!(!packages.is_empty());
5863
5864 let mut stake_aggregator: StakeAggregator<(), true> =
5865 StakeAggregator::new(Arc::new(committee.clone()));
5866
5867 for (_, _, authority) in group {
5868 stake_aggregator.insert_generic(authority, ());
5869 }
5870
5871 let total_votes = stake_aggregator.total_votes();
5872 let quorum_threshold = committee.quorum_threshold();
5873 let f = committee.total_votes() - committee.quorum_threshold();
5874
5875 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5877 let effective_threshold = quorum_threshold + buffer_stake;
5878
5879 info!(
5880 protocol_config_digest = ?digest,
5881 ?total_votes,
5882 ?quorum_threshold,
5883 ?buffer_stake_bps,
5884 ?effective_threshold,
5885 ?proposed_protocol_version,
5886 ?packages,
5887 "support for upgrade"
5888 );
5889
5890 let has_support = total_votes >= effective_threshold;
5891 has_support.then_some((proposed_protocol_version, packages))
5892 })
5893 }
5894
5895 fn choose_protocol_version_and_system_packages_v2(
5896 current_protocol_version: ProtocolVersion,
5897 protocol_config: &ProtocolConfig,
5898 committee: &Committee,
5899 capabilities: Vec<AuthorityCapabilitiesV2>,
5900 buffer_stake_bps: u64,
5901 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5902 assert!(protocol_config.authority_capabilities_v2());
5903 let mut next_protocol_version = current_protocol_version;
5904 let mut system_packages = vec![];
5905
5906 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5907 current_protocol_version,
5908 next_protocol_version + 1,
5909 protocol_config,
5910 committee,
5911 capabilities.clone(),
5912 buffer_stake_bps,
5913 ) {
5914 next_protocol_version = version;
5915 system_packages = packages;
5916 }
5917
5918 (next_protocol_version, system_packages)
5919 }
5920
5921 #[instrument(level = "debug", skip_all)]
5922 fn create_authenticator_state_tx(
5923 &self,
5924 epoch_store: &Arc<AuthorityPerEpochStore>,
5925 ) -> Option<EndOfEpochTransactionKind> {
5926 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5927 info!("authenticator state transactions not enabled");
5928 return None;
5929 }
5930
5931 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5932 let tx = if authenticator_state_exists {
5933 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5934 let min_epoch =
5935 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5936 let authenticator_obj_initial_shared_version = epoch_store
5937 .epoch_start_config()
5938 .authenticator_obj_initial_shared_version()
5939 .expect("initial version must exist");
5940
5941 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5942 min_epoch,
5943 authenticator_obj_initial_shared_version,
5944 );
5945
5946 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5947
5948 tx
5949 } else {
5950 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5951 info!("Creating AuthenticatorStateCreate tx");
5952 tx
5953 };
5954 Some(tx)
5955 }
5956
5957 #[instrument(level = "debug", skip_all)]
5958 fn create_randomness_state_tx(
5959 &self,
5960 epoch_store: &Arc<AuthorityPerEpochStore>,
5961 ) -> Option<EndOfEpochTransactionKind> {
5962 if !epoch_store.protocol_config().random_beacon() {
5963 info!("randomness state transactions not enabled");
5964 return None;
5965 }
5966
5967 if epoch_store.randomness_state_exists() {
5968 return None;
5969 }
5970
5971 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5972 info!("Creating RandomnessStateCreate tx");
5973 Some(tx)
5974 }
5975
5976 #[instrument(level = "debug", skip_all)]
5977 fn create_accumulator_root_tx(
5978 &self,
5979 epoch_store: &Arc<AuthorityPerEpochStore>,
5980 ) -> Option<EndOfEpochTransactionKind> {
5981 if !epoch_store
5982 .protocol_config()
5983 .create_root_accumulator_object()
5984 {
5985 info!("accumulator root creation not enabled");
5986 return None;
5987 }
5988
5989 if epoch_store.accumulator_root_exists() {
5990 return None;
5991 }
5992
5993 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5994 info!("Creating AccumulatorRootCreate tx");
5995 Some(tx)
5996 }
5997
5998 #[instrument(level = "debug", skip_all)]
5999 fn create_write_accumulator_storage_cost_tx(
6000 &self,
6001 epoch_store: &Arc<AuthorityPerEpochStore>,
6002 ) -> Option<EndOfEpochTransactionKind> {
6003 if !epoch_store.accumulator_root_exists() {
6004 info!("accumulator root does not exist yet");
6005 return None;
6006 }
6007 if !epoch_store.protocol_config().enable_accumulators() {
6008 info!("accumulators not enabled");
6009 return None;
6010 }
6011
6012 let object_store = self.get_object_store();
6013 let object_count =
6014 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6015 Ok(Some(count)) => count,
6016 Ok(None) => return None,
6017 Err(e) => {
6018 fatal!("failed to read accumulator object count: {e}");
6019 }
6020 };
6021
6022 let storage_cost = object_count.saturating_mul(
6023 epoch_store
6024 .protocol_config()
6025 .accumulator_object_storage_cost(),
6026 );
6027
6028 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6029 info!(
6030 object_count,
6031 storage_cost, "Creating WriteAccumulatorStorageCost tx"
6032 );
6033 Some(tx)
6034 }
6035
6036 #[instrument(level = "debug", skip_all)]
6037 fn create_coin_registry_tx(
6038 &self,
6039 epoch_store: &Arc<AuthorityPerEpochStore>,
6040 ) -> Option<EndOfEpochTransactionKind> {
6041 if !epoch_store.protocol_config().enable_coin_registry() {
6042 info!("coin registry not enabled");
6043 return None;
6044 }
6045
6046 if epoch_store.coin_registry_exists() {
6047 return None;
6048 }
6049
6050 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6051 info!("Creating CoinRegistryCreate tx");
6052 Some(tx)
6053 }
6054
6055 #[instrument(level = "debug", skip_all)]
6056 fn create_display_registry_tx(
6057 &self,
6058 epoch_store: &Arc<AuthorityPerEpochStore>,
6059 ) -> Option<EndOfEpochTransactionKind> {
6060 if !epoch_store.protocol_config().enable_display_registry() {
6061 info!("display registry not enabled");
6062 return None;
6063 }
6064
6065 if epoch_store.display_registry_exists() {
6066 return None;
6067 }
6068
6069 let tx = EndOfEpochTransactionKind::new_display_registry_create();
6070 info!("Creating DisplayRegistryCreate tx");
6071 Some(tx)
6072 }
6073
6074 #[instrument(level = "debug", skip_all)]
6075 fn create_bridge_tx(
6076 &self,
6077 epoch_store: &Arc<AuthorityPerEpochStore>,
6078 ) -> Option<EndOfEpochTransactionKind> {
6079 if !epoch_store.protocol_config().enable_bridge() {
6080 info!("bridge not enabled");
6081 return None;
6082 }
6083 if epoch_store.bridge_exists() {
6084 return None;
6085 }
6086 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6087 info!("Creating Bridge Create tx");
6088 Some(tx)
6089 }
6090
6091 #[instrument(level = "debug", skip_all)]
6092 fn init_bridge_committee_tx(
6093 &self,
6094 epoch_store: &Arc<AuthorityPerEpochStore>,
6095 ) -> Option<EndOfEpochTransactionKind> {
6096 if !epoch_store.protocol_config().enable_bridge() {
6097 info!("bridge not enabled");
6098 return None;
6099 }
6100 if !epoch_store
6101 .protocol_config()
6102 .should_try_to_finalize_bridge_committee()
6103 {
6104 info!("should not try to finalize bridge committee yet");
6105 return None;
6106 }
6107 if !epoch_store.bridge_exists() {
6109 return None;
6110 }
6111
6112 if epoch_store.bridge_committee_initiated() {
6113 return None;
6114 }
6115
6116 let bridge_initial_shared_version = epoch_store
6117 .epoch_start_config()
6118 .bridge_obj_initial_shared_version()
6119 .expect("initial version must exist");
6120 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6121 info!("Init Bridge committee tx");
6122 Some(tx)
6123 }
6124
6125 #[instrument(level = "debug", skip_all)]
6126 fn create_deny_list_state_tx(
6127 &self,
6128 epoch_store: &Arc<AuthorityPerEpochStore>,
6129 ) -> Option<EndOfEpochTransactionKind> {
6130 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6131 return None;
6132 }
6133
6134 if epoch_store.coin_deny_list_state_exists() {
6135 return None;
6136 }
6137
6138 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6139 info!("Creating DenyListStateCreate tx");
6140 Some(tx)
6141 }
6142
6143 #[instrument(level = "debug", skip_all)]
6144 fn create_address_alias_state_tx(
6145 &self,
6146 epoch_store: &Arc<AuthorityPerEpochStore>,
6147 ) -> Option<EndOfEpochTransactionKind> {
6148 if !epoch_store.protocol_config().address_aliases() {
6149 info!("address aliases not enabled");
6150 return None;
6151 }
6152
6153 if epoch_store.address_alias_state_exists() {
6154 return None;
6155 }
6156
6157 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6158 info!("Creating AddressAliasStateCreate tx");
6159 Some(tx)
6160 }
6161
6162 #[instrument(level = "debug", skip_all)]
6163 fn create_execution_time_observations_tx(
6164 &self,
6165 epoch_store: &Arc<AuthorityPerEpochStore>,
6166 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6167 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6168 ) -> Option<EndOfEpochTransactionKind> {
6169 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6170 .protocol_config()
6171 .per_object_congestion_control_mode()
6172 else {
6173 return None;
6174 };
6175
6176 let start_checkpoint = std::cmp::max(
6179 last_checkpoint_before_end_of_epoch
6180 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6181 epoch_store
6183 .epoch()
6184 .checked_sub(1)
6185 .map(|prev_epoch| {
6186 self.checkpoint_store
6187 .get_epoch_last_checkpoint_seq_number(prev_epoch)
6188 .expect("typed store must not fail")
6189 .expect(
6190 "sequence number of last checkpoint of preceding epoch must be saved",
6191 )
6192 + 1
6193 })
6194 .unwrap_or(1),
6195 );
6196 info!(
6197 "reading checkpoint range {:?}..={:?}",
6198 start_checkpoint, last_checkpoint_before_end_of_epoch
6199 );
6200 let sequence_numbers =
6201 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6202 let contents_digests: Vec<_> = self
6203 .checkpoint_store
6204 .multi_get_locally_computed_checkpoints(&sequence_numbers)
6205 .expect("typed store must not fail")
6206 .into_iter()
6207 .zip(sequence_numbers)
6208 .map(|(maybe_checkpoint, sequence_number)| {
6209 if let Some(checkpoint) = maybe_checkpoint {
6210 checkpoint.content_digest
6211 } else {
6212 self.checkpoint_store
6215 .get_checkpoint_by_sequence_number(sequence_number)
6216 .expect("typed store must not fail")
6217 .unwrap_or_else(|| {
6218 fatal!("preceding checkpoints must exist by end of epoch")
6219 })
6220 .data()
6221 .content_digest
6222 }
6223 })
6224 .collect();
6225 let tx_digests: Vec<_> = self
6226 .checkpoint_store
6227 .multi_get_checkpoint_content(&contents_digests)
6228 .expect("typed store must not fail")
6229 .into_iter()
6230 .flat_map(|maybe_contents| {
6231 maybe_contents
6232 .expect("preceding checkpoint contents must exist by end of epoch")
6233 .into_inner()
6234 .into_iter()
6235 .map(|digests| digests.transaction)
6236 })
6237 .collect();
6238 let included_execution_time_observations: HashSet<_> = self
6239 .get_transaction_cache_reader()
6240 .multi_get_transaction_blocks(&tx_digests)
6241 .into_iter()
6242 .flat_map(|maybe_tx| {
6243 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6244 .expect("preceding transaction must exist by end of epoch")
6245 .transaction_data()
6246 .kind()
6247 {
6248 #[allow(clippy::unnecessary_to_owned)]
6249 itertools::Either::Left(
6250 ptb.commands
6251 .to_owned()
6252 .into_iter()
6253 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6254 )
6255 } else {
6256 itertools::Either::Right(std::iter::empty())
6257 }
6258 })
6259 .chain(end_of_epoch_observation_keys.into_iter())
6260 .collect();
6261
6262 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6263 epoch_store
6264 .get_end_of_epoch_execution_time_observations()
6265 .filter_and_sort_v1(
6266 |(key, _)| included_execution_time_observations.contains(key),
6267 params.stored_observations_limit.try_into().unwrap(),
6268 ),
6269 );
6270 info!("Creating StoreExecutionTimeObservations tx");
6271 Some(tx)
6272 }
6273
6274 #[instrument(level = "error", skip_all)]
6285 pub async fn create_and_execute_advance_epoch_tx(
6286 &self,
6287 epoch_store: &Arc<AuthorityPerEpochStore>,
6288 gas_cost_summary: &GasCostSummary,
6289 checkpoint: CheckpointSequenceNumber,
6290 epoch_start_timestamp_ms: CheckpointTimestamp,
6291 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6292 last_checkpoint: CheckpointSequenceNumber,
6295 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6296 let mut txns = Vec::new();
6297
6298 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6299 txns.push(tx);
6300 }
6301 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6302 txns.push(tx);
6303 }
6304 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6305 txns.push(tx);
6306 }
6307 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6308 txns.push(tx);
6309 }
6310 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6311 txns.push(tx);
6312 }
6313 if let Some(tx) = self.create_execution_time_observations_tx(
6314 epoch_store,
6315 end_of_epoch_observation_keys,
6316 last_checkpoint,
6317 ) {
6318 txns.push(tx);
6319 }
6320 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6321 txns.push(tx);
6322 }
6323
6324 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6325 txns.push(tx);
6326 }
6327 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6328 txns.push(tx);
6329 }
6330 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6331 txns.push(tx);
6332 }
6333 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6334 txns.push(tx);
6335 }
6336
6337 let next_epoch = epoch_store.epoch() + 1;
6338
6339 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6340
6341 let (next_epoch_protocol_version, next_epoch_system_packages) =
6342 Self::choose_protocol_version_and_system_packages_v2(
6343 epoch_store.protocol_version(),
6344 epoch_store.protocol_config(),
6345 epoch_store.committee(),
6346 epoch_store
6347 .get_capabilities_v2()
6348 .expect("read capabilities from db cannot fail"),
6349 buffer_stake_bps,
6350 );
6351
6352 let config = epoch_store.protocol_config();
6355 let binary_config = config.binary_config(None);
6356 let Some(next_epoch_system_package_bytes) = self
6357 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6358 .await
6359 else {
6360 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6361 debug_fatal!(
6365 "upgraded system packages {:?} are not locally available, cannot create \
6366 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6367 next_epoch_system_packages
6368 );
6369 } else {
6370 error!(
6371 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6372 next_epoch_protocol_version
6373 );
6374 }
6375 return Err(CheckpointBuilderError::SystemPackagesMissing);
6384 };
6385
6386 let tx = if epoch_store
6387 .protocol_config()
6388 .end_of_epoch_transaction_supported()
6389 {
6390 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6391 next_epoch,
6392 next_epoch_protocol_version,
6393 gas_cost_summary.storage_cost,
6394 gas_cost_summary.computation_cost,
6395 gas_cost_summary.storage_rebate,
6396 gas_cost_summary.non_refundable_storage_fee,
6397 epoch_start_timestamp_ms,
6398 next_epoch_system_package_bytes,
6399 ));
6400
6401 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6402 } else {
6403 VerifiedTransaction::new_change_epoch(
6404 next_epoch,
6405 next_epoch_protocol_version,
6406 gas_cost_summary.storage_cost,
6407 gas_cost_summary.computation_cost,
6408 gas_cost_summary.storage_rebate,
6409 gas_cost_summary.non_refundable_storage_fee,
6410 epoch_start_timestamp_ms,
6411 next_epoch_system_package_bytes,
6412 )
6413 };
6414
6415 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6416 tx.clone(),
6417 epoch_store.epoch(),
6418 checkpoint,
6419 );
6420
6421 let tx_digest = executable_tx.digest();
6422
6423 info!(
6424 ?next_epoch,
6425 ?next_epoch_protocol_version,
6426 ?next_epoch_system_packages,
6427 computation_cost=?gas_cost_summary.computation_cost,
6428 storage_cost=?gas_cost_summary.storage_cost,
6429 storage_rebate=?gas_cost_summary.storage_rebate,
6430 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6431 ?tx_digest,
6432 "Creating advance epoch transaction"
6433 );
6434
6435 fail_point_async!("change_epoch_tx_delay");
6436 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6437
6438 if self
6441 .get_transaction_cache_reader()
6442 .is_tx_already_executed(tx_digest)
6443 {
6444 warn!("change epoch tx has already been executed via state sync");
6445 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6446 }
6447
6448 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6449 else {
6450 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6451 };
6452
6453 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6456 self.get_object_cache_reader().as_ref(),
6457 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6458 )?;
6459
6460 assert_eq!(assigned_versions.0.len(), 1);
6461 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6462
6463 let input_objects = self.read_objects_for_execution(
6464 &tx_lock,
6465 &executable_tx,
6466 &assigned_versions,
6467 epoch_store,
6468 )?;
6469
6470 let (transaction_outputs, _timings, _execution_error_opt) = self
6471 .execute_certificate(
6472 &execution_guard,
6473 &executable_tx,
6474 input_objects,
6475 None,
6476 ExecutionEnv::default(),
6477 epoch_store,
6478 )
6479 .unwrap();
6480 let system_obj = get_sui_system_state(&transaction_outputs.written)
6481 .expect("change epoch tx must write to system object");
6482
6483 let effects = transaction_outputs.effects;
6484 self.get_state_sync_store()
6488 .insert_transaction_and_effects(&tx, &effects);
6489
6490 info!(
6491 "Effects summary of the change epoch transaction: {:?}",
6492 effects.summary_for_debug()
6493 );
6494 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6495 assert!(effects.status().is_ok());
6497 Ok((system_obj, effects))
6498 }
6499
6500 #[instrument(level = "error", skip_all)]
6501 async fn reopen_epoch_db(
6502 &self,
6503 cur_epoch_store: &AuthorityPerEpochStore,
6504 new_committee: Committee,
6505 epoch_start_configuration: EpochStartConfiguration,
6506 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6507 epoch_last_checkpoint: CheckpointSequenceNumber,
6508 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6509 let new_epoch = new_committee.epoch;
6510 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6511 assert_eq!(
6512 epoch_start_configuration.epoch_start_state().epoch(),
6513 new_committee.epoch
6514 );
6515 fail_point!("before-open-new-epoch-store");
6516 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6517 self.name,
6518 new_committee,
6519 epoch_start_configuration,
6520 self.get_backing_package_store().clone(),
6521 self.get_object_store().clone(),
6522 expensive_safety_check_config,
6523 epoch_last_checkpoint,
6524 )?;
6525 self.epoch_store.store(new_epoch_store.clone());
6526 Ok(new_epoch_store)
6527 }
6528
6529 #[cfg(test)]
6530 pub(crate) fn iter_live_object_set_for_testing(
6531 &self,
6532 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6533 let include_wrapped_object = !self
6534 .epoch_store_for_testing()
6535 .protocol_config()
6536 .simplified_unwrap_then_delete();
6537 self.get_global_state_hash_store()
6538 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6539 }
6540
6541 #[cfg(test)]
6542 pub(crate) fn shutdown_execution_for_test(&self) {
6543 self.tx_execution_shutdown
6544 .lock()
6545 .take()
6546 .unwrap()
6547 .send(())
6548 .unwrap();
6549 }
6550
6551 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6553 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6554 self.get_object_cache_reader()
6555 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6556 self.get_reconfig_api()
6557 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6558 Ok(())
6559 }
6560}
6561
6562pub struct RandomnessRoundReceiver {
6563 authority_state: Arc<AuthorityState>,
6564 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6565}
6566
6567impl RandomnessRoundReceiver {
6568 pub fn spawn(
6569 authority_state: Arc<AuthorityState>,
6570 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6571 ) -> JoinHandle<()> {
6572 let rrr = RandomnessRoundReceiver {
6573 authority_state,
6574 randomness_rx,
6575 };
6576 spawn_monitored_task!(rrr.run())
6577 }
6578
6579 async fn run(mut self) {
6580 info!("RandomnessRoundReceiver event loop started");
6581
6582 loop {
6583 tokio::select! {
6584 maybe_recv = self.randomness_rx.recv() => {
6585 if let Some((epoch, round, bytes)) = maybe_recv {
6586 self.handle_new_randomness(epoch, round, bytes).await;
6587 } else {
6588 break;
6589 }
6590 },
6591 }
6592 }
6593
6594 info!("RandomnessRoundReceiver event loop ended");
6595 }
6596
6597 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6598 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6599 fail_point_async!("randomness-delay");
6600
6601 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6602 if epoch_store.epoch() != epoch {
6603 warn!(
6604 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6605 epoch_store.epoch()
6606 );
6607 return;
6608 }
6609 let key = TransactionKey::RandomnessRound(epoch, round);
6610 let transaction = VerifiedTransaction::new_randomness_state_update(
6611 epoch,
6612 round,
6613 bytes,
6614 epoch_store
6615 .epoch_start_config()
6616 .randomness_obj_initial_shared_version()
6617 .expect("randomness state obj must exist"),
6618 );
6619 debug!(
6620 "created randomness state update transaction with digest: {:?}",
6621 transaction.digest()
6622 );
6623 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6624 let digest = *transaction.digest();
6625
6626 self.authority_state
6631 .get_cache_commit()
6632 .persist_transaction(&transaction);
6633
6634 if epoch_store.insert_tx_key(key, digest).is_err() {
6636 warn!("epoch ended while handling new randomness");
6637 }
6638
6639 let authority_state = self.authority_state.clone();
6640 spawn_monitored_task!(async move {
6641 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6648 let result = tokio::time::timeout(
6649 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6650 authority_state
6651 .get_transaction_cache_reader()
6652 .notify_read_executed_effects(
6653 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6654 &[digest],
6655 ),
6656 )
6657 .await;
6658 let mut effects = match result {
6659 Ok(result) => result,
6660 Err(_) => {
6661 debug_fatal_no_invariant!(
6663 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6664 );
6665 authority_state
6667 .get_transaction_cache_reader()
6668 .notify_read_executed_effects(
6669 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6670 &[digest],
6671 )
6672 .await
6673 }
6674 };
6675
6676 let effects = effects.pop().expect("should return effects");
6677 if *effects.status() != ExecutionStatus::Success {
6678 fatal!(
6679 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6680 );
6681 }
6682 debug!(
6683 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6684 );
6685 });
6686 }
6687}
6688
6689#[async_trait]
6690impl TransactionKeyValueStoreTrait for AuthorityState {
6691 #[instrument(skip(self))]
6692 async fn multi_get(
6693 &self,
6694 transactions: &[TransactionDigest],
6695 effects: &[TransactionDigest],
6696 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6697 let txns = if !transactions.is_empty() {
6698 self.get_transaction_cache_reader()
6699 .multi_get_transaction_blocks(transactions)
6700 .into_iter()
6701 .map(|t| t.map(|t| (*t).clone().into_inner()))
6702 .collect()
6703 } else {
6704 vec![]
6705 };
6706
6707 let fx = if !effects.is_empty() {
6708 self.get_transaction_cache_reader()
6709 .multi_get_executed_effects(effects)
6710 } else {
6711 vec![]
6712 };
6713
6714 Ok((txns, fx))
6715 }
6716
6717 #[instrument(skip(self))]
6718 async fn multi_get_checkpoints(
6719 &self,
6720 checkpoint_summaries: &[CheckpointSequenceNumber],
6721 checkpoint_contents: &[CheckpointSequenceNumber],
6722 checkpoint_summaries_by_digest: &[CheckpointDigest],
6723 ) -> SuiResult<(
6724 Vec<Option<CertifiedCheckpointSummary>>,
6725 Vec<Option<CheckpointContents>>,
6726 Vec<Option<CertifiedCheckpointSummary>>,
6727 )> {
6728 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6730 let store = self.get_checkpoint_store();
6731 for seq in checkpoint_summaries {
6732 let checkpoint = store
6733 .get_checkpoint_by_sequence_number(*seq)?
6734 .map(|c| c.into_inner());
6735
6736 summaries.push(checkpoint);
6737 }
6738
6739 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6740 for seq in checkpoint_contents {
6741 let checkpoint = store
6742 .get_checkpoint_by_sequence_number(*seq)?
6743 .and_then(|summary| {
6744 store
6745 .get_checkpoint_contents(&summary.content_digest)
6746 .expect("db read cannot fail")
6747 });
6748 contents.push(checkpoint);
6749 }
6750
6751 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6752 for digest in checkpoint_summaries_by_digest {
6753 let checkpoint = store
6754 .get_checkpoint_by_digest(digest)?
6755 .map(|c| c.into_inner());
6756 summaries_by_digest.push(checkpoint);
6757 }
6758 Ok((summaries, contents, summaries_by_digest))
6759 }
6760
6761 #[instrument(skip(self))]
6762 async fn deprecated_get_transaction_checkpoint(
6763 &self,
6764 digest: TransactionDigest,
6765 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6766 Ok(self
6767 .get_checkpoint_cache()
6768 .deprecated_get_transaction_checkpoint(&digest)
6769 .map(|(_epoch, checkpoint)| checkpoint))
6770 }
6771
6772 #[instrument(skip(self))]
6773 async fn get_object(
6774 &self,
6775 object_id: ObjectID,
6776 version: VersionNumber,
6777 ) -> SuiResult<Option<Object>> {
6778 Ok(self
6779 .get_object_cache_reader()
6780 .get_object_by_key(&object_id, version))
6781 }
6782
6783 #[instrument(skip_all)]
6784 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6785 Ok(self
6786 .get_object_cache_reader()
6787 .multi_get_objects_by_key(object_keys))
6788 }
6789
6790 #[instrument(skip(self))]
6791 async fn multi_get_transaction_checkpoint(
6792 &self,
6793 digests: &[TransactionDigest],
6794 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6795 let res = self
6796 .get_checkpoint_cache()
6797 .deprecated_multi_get_transaction_checkpoint(digests);
6798
6799 Ok(res
6800 .into_iter()
6801 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6802 .collect())
6803 }
6804
6805 #[instrument(skip(self))]
6806 async fn multi_get_events_by_tx_digests(
6807 &self,
6808 digests: &[TransactionDigest],
6809 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6810 if digests.is_empty() {
6811 return Ok(vec![]);
6812 }
6813
6814 Ok(self
6815 .get_transaction_cache_reader()
6816 .multi_get_events(digests))
6817 }
6818}
6819
6820#[cfg(msim)]
6821pub mod framework_injection {
6822 use move_binary_format::CompiledModule;
6823 use std::collections::BTreeMap;
6824 use std::{cell::RefCell, collections::BTreeSet};
6825 use sui_framework::{BuiltInFramework, SystemPackage};
6826 use sui_types::base_types::{AuthorityName, ObjectID};
6827 use sui_types::is_system_package;
6828
6829 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6830
6831 thread_local! {
6833 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6834 }
6835
6836 type Framework = Vec<CompiledModule>;
6837
6838 pub type PackageUpgradeCallback =
6839 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6840
6841 enum PackageOverrideConfig {
6842 Global(Framework),
6843 PerValidator(PackageUpgradeCallback),
6844 }
6845
6846 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6847 modules
6848 .iter()
6849 .map(|m| {
6850 let mut buf = Vec::new();
6851 m.serialize_with_version(m.version, &mut buf).unwrap();
6852 buf
6853 })
6854 .collect()
6855 }
6856
6857 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6858 OVERRIDE.with(|bs| {
6859 bs.borrow_mut()
6860 .insert(package_id, PackageOverrideConfig::Global(modules))
6861 });
6862 }
6863
6864 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6865 OVERRIDE.with(|bs| {
6866 bs.borrow_mut()
6867 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6868 });
6869 }
6870
6871 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6872 OVERRIDE.with(|bs| {
6873 let mut new_packages_not_to_include: BTreeSet<_> =
6874 BuiltInFramework::all_package_ids().into_iter().collect();
6875 for pkg in &packages {
6876 new_packages_not_to_include.remove(&pkg.id);
6877 }
6878 for pkg in packages {
6879 bs.borrow_mut()
6880 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6881 }
6882 for empty_pkg in new_packages_not_to_include {
6883 bs.borrow_mut()
6884 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6885 }
6886 });
6887 }
6888
6889 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6890 OVERRIDE.with(|cfg| {
6891 cfg.borrow().get(package_id).and_then(|entry| match entry {
6892 PackageOverrideConfig::Global(framework) => {
6893 Some(compiled_modules_to_bytes(framework))
6894 }
6895 PackageOverrideConfig::PerValidator(func) => {
6896 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6897 }
6898 })
6899 })
6900 }
6901
6902 pub fn get_override_modules(
6903 package_id: &ObjectID,
6904 name: AuthorityName,
6905 ) -> Option<Vec<CompiledModule>> {
6906 OVERRIDE.with(|cfg| {
6907 cfg.borrow().get(package_id).and_then(|entry| match entry {
6908 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6909 PackageOverrideConfig::PerValidator(func) => func(name),
6910 })
6911 })
6912 }
6913
6914 pub fn get_override_system_package(
6915 package_id: &ObjectID,
6916 name: AuthorityName,
6917 ) -> Option<SystemPackage> {
6918 let bytes = get_override_bytes(package_id, name)?;
6919 let dependencies = if is_system_package(*package_id) {
6920 BuiltInFramework::get_package_by_id(package_id)
6921 .dependencies
6922 .to_vec()
6923 } else {
6924 BuiltInFramework::all_package_ids()
6926 };
6927 Some(SystemPackage {
6928 id: *package_id,
6929 bytes,
6930 dependencies,
6931 })
6932 }
6933
6934 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6935 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6936 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6937 cfg.borrow()
6938 .keys()
6939 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6940 .collect()
6941 });
6942
6943 extra
6944 .into_iter()
6945 .map(|package| SystemPackage {
6946 id: package,
6947 bytes: get_override_bytes(&package, name).unwrap(),
6948 dependencies: BuiltInFramework::all_package_ids(),
6949 })
6950 .collect()
6951 }
6952}
6953
6954#[derive(Debug, Serialize, Deserialize, Clone)]
6955pub struct ObjDumpFormat {
6956 pub id: ObjectID,
6957 pub version: VersionNumber,
6958 pub digest: ObjectDigest,
6959 pub object: Object,
6960}
6961
6962impl ObjDumpFormat {
6963 fn new(object: Object) -> Self {
6964 let oref = object.compute_object_reference();
6965 Self {
6966 id: oref.0,
6967 version: oref.1,
6968 digest: oref.2,
6969 object,
6970 }
6971 }
6972}
6973
6974#[derive(Debug, Serialize, Deserialize, Clone)]
6975pub struct NodeStateDump {
6976 pub tx_digest: TransactionDigest,
6977 pub sender_signed_data: SenderSignedData,
6978 pub executed_epoch: u64,
6979 pub reference_gas_price: u64,
6980 pub protocol_version: u64,
6981 pub epoch_start_timestamp_ms: u64,
6982 pub computed_effects: TransactionEffects,
6983 pub expected_effects_digest: TransactionEffectsDigest,
6984 pub relevant_system_packages: Vec<ObjDumpFormat>,
6985 pub shared_objects: Vec<ObjDumpFormat>,
6986 pub loaded_child_objects: Vec<ObjDumpFormat>,
6987 pub modified_at_versions: Vec<ObjDumpFormat>,
6988 pub runtime_reads: Vec<ObjDumpFormat>,
6989 pub input_objects: Vec<ObjDumpFormat>,
6990}
6991
6992impl NodeStateDump {
6993 pub fn new(
6994 tx_digest: &TransactionDigest,
6995 effects: &TransactionEffects,
6996 expected_effects_digest: TransactionEffectsDigest,
6997 object_store: &dyn ObjectStore,
6998 epoch_store: &Arc<AuthorityPerEpochStore>,
6999 inner_temporary_store: &InnerTemporaryStore,
7000 certificate: &VerifiedExecutableTransaction,
7001 ) -> SuiResult<Self> {
7002 let executed_epoch = epoch_store.epoch();
7004 let reference_gas_price = epoch_store.reference_gas_price();
7005 let epoch_start_config = epoch_store.epoch_start_config();
7006 let protocol_version = epoch_store.protocol_version().as_u64();
7007 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7008
7009 let mut relevant_system_packages = Vec::new();
7011 for sys_package_id in BuiltInFramework::all_package_ids() {
7012 if let Some(w) = object_store.get_object(&sys_package_id) {
7013 relevant_system_packages.push(ObjDumpFormat::new(w))
7014 }
7015 }
7016
7017 let mut shared_objects = Vec::new();
7019 for kind in effects.input_consensus_objects() {
7020 match kind {
7021 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7022 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7023 shared_objects.push(ObjDumpFormat::new(w))
7024 }
7025 }
7026 InputConsensusObject::ReadConsensusStreamEnded(..)
7027 | InputConsensusObject::MutateConsensusStreamEnded(..)
7028 | InputConsensusObject::Cancelled(..) => (), }
7030 }
7031
7032 let mut loaded_child_objects = Vec::new();
7035 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7036 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7037 loaded_child_objects.push(ObjDumpFormat::new(w))
7038 }
7039 }
7040
7041 let mut modified_at_versions = Vec::new();
7043 for (id, ver) in effects.modified_at_versions() {
7044 if let Some(w) = object_store.get_object_by_key(&id, ver) {
7045 modified_at_versions.push(ObjDumpFormat::new(w))
7046 }
7047 }
7048
7049 let mut runtime_reads = Vec::new();
7052 for obj in inner_temporary_store
7053 .runtime_packages_loaded_from_db
7054 .values()
7055 {
7056 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7057 }
7058
7059 Ok(Self {
7062 tx_digest: *tx_digest,
7063 executed_epoch,
7064 reference_gas_price,
7065 epoch_start_timestamp_ms,
7066 protocol_version,
7067 relevant_system_packages,
7068 shared_objects,
7069 loaded_child_objects,
7070 modified_at_versions,
7071 runtime_reads,
7072 sender_signed_data: certificate.clone().into_message(),
7073 input_objects: inner_temporary_store
7074 .input_objects
7075 .values()
7076 .map(|o| ObjDumpFormat::new(o.clone()))
7077 .collect(),
7078 computed_effects: effects.clone(),
7079 expected_effects_digest,
7080 })
7081 }
7082
7083 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7084 let mut objects = Vec::new();
7085 objects.extend(self.relevant_system_packages.clone());
7086 objects.extend(self.shared_objects.clone());
7087 objects.extend(self.loaded_child_objects.clone());
7088 objects.extend(self.modified_at_versions.clone());
7089 objects.extend(self.runtime_reads.clone());
7090 objects.extend(self.input_objects.clone());
7091 objects
7092 }
7093
7094 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7095 let file_name = format!(
7096 "{}_{}_NODE_DUMP.json",
7097 self.tx_digest,
7098 AuthorityState::unixtime_now_ms()
7099 );
7100 let mut path = path.to_path_buf();
7101 path.push(&file_name);
7102 let mut file = File::create(path.clone())?;
7103 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7104 Ok(path)
7105 }
7106
7107 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7108 let file = File::open(path)?;
7109 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7110 }
7111}