1use crate::accumulators::coin_reservations::CoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::checkpoints::CheckpointBuilderError;
11use crate::checkpoints::CheckpointBuilderResult;
12use crate::congestion_tracker::CongestionTracker;
13use crate::consensus_adapter::ConsensusOverloadChecker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::jsonrpc_index::CoinIndexKey2;
20use crate::rpc_index::RpcIndexStore;
21use crate::traffic_controller::TrafficController;
22use crate::traffic_controller::metrics::TrafficControllerMetrics;
23use crate::transaction_outputs::TransactionOutputs;
24use crate::verify_indexes::{fix_indexes, verify_indexes};
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::{assert_reachable, fatal};
38use parking_lot::Mutex;
39use prometheus::{
40 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
41 register_histogram_vec_with_registry, register_histogram_with_registry,
42 register_int_counter_vec_with_registry, register_int_counter_with_registry,
43 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
44};
45use serde::de::DeserializeOwned;
46use serde::{Deserialize, Serialize};
47use shared_object_version_manager::AssignedVersions;
48use shared_object_version_manager::Schedulable;
49use std::collections::BTreeMap;
50use std::collections::BTreeSet;
51use std::fs::File;
52use std::io::Write;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::Ordering;
55use std::time::Duration;
56use std::time::Instant;
57use std::time::SystemTime;
58use std::time::UNIX_EPOCH;
59use std::{
60 collections::{HashMap, HashSet},
61 fs,
62 pin::Pin,
63 str::FromStr,
64 sync::Arc,
65 vec,
66};
67use sui_config::NodeConfig;
68use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
69use sui_protocol_config::PerObjectCongestionControlMode;
70use sui_types::accumulator_root::AccumulatorObjId;
71use sui_types::crypto::RandomnessRound;
72use sui_types::dynamic_field::visitor as DFV;
73use sui_types::execution::ExecutionOutput;
74use sui_types::execution::ExecutionTimeObservationKey;
75use sui_types::execution::ExecutionTiming;
76use sui_types::execution_params::ExecutionOrEarlyError;
77use sui_types::execution_params::FundsWithdrawStatus;
78use sui_types::execution_params::get_early_execution_error;
79use sui_types::execution_status::ExecutionStatus;
80use sui_types::inner_temporary_store::PackageStoreWithFallback;
81use sui_types::layout_resolver::LayoutResolver;
82use sui_types::layout_resolver::into_struct_layout;
83use sui_types::messages_consensus::AuthorityCapabilitiesV2;
84use sui_types::object::bounded_visitor::BoundedVisitor;
85use sui_types::storage::ChildObjectResolver;
86use sui_types::storage::InputKey;
87use sui_types::storage::TrackingBackingStore;
88use sui_types::traffic_control::{
89 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
90};
91use sui_types::transaction_executor::SimulateTransactionResult;
92use sui_types::transaction_executor::TransactionChecks;
93use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
94use tap::TapFallible;
95use tokio::sync::RwLock;
96use tokio::sync::mpsc::unbounded_channel;
97use tokio::sync::watch::error::RecvError;
98use tokio::sync::{mpsc, oneshot};
99use tokio::task::JoinHandle;
100use tokio::time::timeout;
101use tracing::{debug, error, info, instrument, warn};
102
103use self::authority_store::ExecutionLockWriteGuard;
104use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
105pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
106use mysten_metrics::{monitored_scope, spawn_monitored_task};
107
108use crate::jsonrpc_index::IndexStore;
109use crate::jsonrpc_index::{
110 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
111};
112use mysten_common::debug_fatal;
113use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
114use sui_config::genesis::Genesis;
115use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
116use sui_framework::{BuiltInFramework, SystemPackage};
117use sui_json_rpc_types::{
118 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
119 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
120 SuiTransactionBlockEvents, TransactionFilter,
121};
122use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
123use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
124use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
125use sui_types::authenticator_state::get_authenticator_state;
126use sui_types::committee::{EpochId, ProtocolVersion};
127use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
128use sui_types::deny_list_v1::check_coin_deny_list_v1;
129use sui_types::digests::ChainIdentifier;
130use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
131use sui_types::effects::{
132 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
133 TransactionEvents, VerifiedSignedTransactionEffects,
134};
135use sui_types::error::{ExecutionError, ExecutionErrorKind, SuiErrorKind, UserInputError};
136use sui_types::event::{Event, EventID};
137use sui_types::executable_transaction::VerifiedExecutableTransaction;
138use sui_types::gas::{GasCostSummary, SuiGasStatus};
139use sui_types::inner_temporary_store::{
140 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
141};
142use sui_types::message_envelope::Message;
143use sui_types::messages_checkpoint::{
144 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
145 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
146 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
147 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
148};
149use sui_types::messages_grpc::{
150 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
151 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
152};
153use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
154use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
155use sui_types::signature::GenericSignature;
156use sui_types::storage::{
157 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
158};
159use sui_types::sui_system_state::SuiSystemStateTrait;
160use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
161use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
162use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
163use sui_types::{
164 SUI_SYSTEM_ADDRESS,
165 base_types::*,
166 committee::Committee,
167 crypto::AuthoritySignature,
168 error::{SuiError, SuiResult},
169 object::{Object, ObjectRead},
170 transaction::*,
171};
172use sui_types::{TypeTag, is_system_package};
173use typed_store::TypedStoreError;
174use typed_store::rocks::StagedBatch;
175
176use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
177use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
178use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
179use crate::authority::authority_store_pruner::{
180 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
181};
182use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
183use crate::authority::epoch_start_configuration::EpochStartConfiguration;
184use crate::checkpoints::CheckpointStore;
185use crate::epoch::committee_store::CommitteeStore;
186use crate::execution_cache::{
187 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
188 ObjectCacheRead, StateSyncAPI,
189};
190use crate::execution_driver::execution_process;
191use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
192use crate::metrics::LatencyObserver;
193use crate::metrics::RateTracker;
194use crate::module_cache_metrics::ResolverMetrics;
195use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
196use crate::stake_aggregator::StakeAggregator;
197use crate::subscription_handler::SubscriptionHandler;
198use crate::transaction_input_loader::TransactionInputLoader;
199
200#[cfg(msim)]
201pub use crate::checkpoints::checkpoint_executor::utils::{
202 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
203};
204
205#[cfg(msim)]
206use sui_types::committee::CommitteeTrait;
207use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
208
209#[cfg(test)]
210#[path = "unit_tests/authority_tests.rs"]
211pub mod authority_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/transaction_tests.rs"]
215pub mod transaction_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/batch_transaction_tests.rs"]
219mod batch_transaction_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/move_integration_tests.rs"]
223pub mod move_integration_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/gas_tests.rs"]
227mod gas_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/batch_verification_tests.rs"]
231mod batch_verification_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/coin_deny_list_tests.rs"]
235mod coin_deny_list_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/auth_unit_test_utils.rs"]
239pub mod auth_unit_test_utils;
240
241pub mod authority_test_utils;
242
243pub mod authority_per_epoch_store;
244pub mod authority_per_epoch_store_pruner;
245
246pub mod authority_store_pruner;
247pub mod authority_store_tables;
248pub mod authority_store_types;
249pub mod congestion_log;
250pub mod consensus_tx_status_cache;
251pub mod epoch_start_configuration;
252pub mod execution_time_estimator;
253pub mod shared_object_congestion_tracker;
254pub mod shared_object_version_manager;
255pub mod submitted_transaction_cache;
256pub mod test_authority_builder;
257pub mod transaction_deferral;
258pub mod transaction_reject_reason_cache;
259mod weighted_moving_average;
260
261pub(crate) mod authority_store;
262pub mod backpressure;
263
264pub struct AuthorityMetrics {
266 tx_orders: IntCounter,
267 total_certs: IntCounter,
268 total_effects: IntCounter,
269 pub shared_obj_tx: IntCounter,
271 sponsored_tx: IntCounter,
272 tx_already_processed: IntCounter,
273 num_input_objs: Histogram,
274 num_shared_objects: Histogram,
276 batch_size: Histogram,
277
278 authority_state_handle_vote_transaction_latency: Histogram,
279
280 internal_execution_latency: Histogram,
281 execution_load_input_objects_latency: Histogram,
282 prepare_certificate_latency: Histogram,
283 commit_certificate_latency: Histogram,
284 db_checkpoint_latency: Histogram,
285
286 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
288 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
289 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
290 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
291
292 pub(crate) execution_driver_executed_transactions: IntCounter,
293 pub(crate) execution_driver_paused_transactions: IntCounter,
294 pub(crate) execution_driver_dispatch_queue: IntGauge,
295 pub(crate) execution_queueing_delay_s: Histogram,
296 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
297 pub(crate) execution_gas_latency_ratio: Histogram,
298
299 pub(crate) skipped_consensus_txns: IntCounter,
300 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
301 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
302
303 pub(crate) authority_overload_status: IntGauge,
304 pub(crate) authority_load_shedding_percentage: IntGauge,
305
306 pub(crate) transaction_overload_sources: IntCounterVec,
307
308 post_processing_total_events_emitted: IntCounter,
310 post_processing_total_tx_indexed: IntCounter,
311 post_processing_total_tx_had_event_processed: IntCounter,
312 post_processing_total_failures: IntCounter,
313
314 pub consensus_handler_processed: IntCounterVec,
316 pub consensus_handler_transaction_sizes: HistogramVec,
317 pub consensus_handler_num_low_scoring_authorities: IntGauge,
318 pub consensus_handler_scores: IntGaugeVec,
319 pub consensus_handler_deferred_transactions: IntCounter,
320 pub consensus_handler_congested_transactions: IntCounter,
321 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
322 pub consensus_handler_cancelled_transactions: IntCounter,
323 pub consensus_handler_max_object_costs: IntGaugeVec,
324 pub consensus_committed_subdags: IntCounterVec,
325 pub consensus_committed_messages: IntGaugeVec,
326 pub consensus_committed_user_transactions: IntGaugeVec,
327 pub consensus_finalized_user_transactions: IntGaugeVec,
328 pub consensus_rejected_user_transactions: IntGaugeVec,
329 pub consensus_calculated_throughput: IntGauge,
330 pub consensus_calculated_throughput_profile: IntGauge,
331 pub consensus_block_handler_block_processed: IntCounter,
332 pub consensus_block_handler_txn_processed: IntCounterVec,
333 pub consensus_block_handler_fastpath_executions: IntCounter,
334 pub consensus_timestamp_bias: Histogram,
335
336 pub limits_metrics: Arc<LimitsMetrics>,
337
338 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
340
341 pub zklogin_sig_count: IntCounter,
343 pub multisig_sig_count: IntCounter,
345
346 pub execution_queueing_latency: LatencyObserver,
349
350 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
357
358 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
361}
362
363const POSITIVE_INT_BUCKETS: &[f64] = &[
365 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
366 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
367 7000000., 10000000.,
368];
369
370const LATENCY_SEC_BUCKETS: &[f64] = &[
371 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
372 10., 20., 30., 60., 90.,
373];
374
375const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
377 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
378 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
379];
380
381const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
384 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
385 2.0, 10.0, 30.0,
386];
387
388const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
389 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
390 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
391];
392
393pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
394
395pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
399
400impl AuthorityMetrics {
401 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
402 Self {
403 tx_orders: register_int_counter_with_registry!(
404 "total_transaction_orders",
405 "Total number of transaction orders",
406 registry,
407 )
408 .unwrap(),
409 total_certs: register_int_counter_with_registry!(
410 "total_transaction_certificates",
411 "Total number of transaction certificates handled",
412 registry,
413 )
414 .unwrap(),
415 total_effects: register_int_counter_with_registry!(
417 "total_transaction_effects",
418 "Total number of transaction effects produced",
419 registry,
420 )
421 .unwrap(),
422
423 shared_obj_tx: register_int_counter_with_registry!(
424 "num_shared_obj_tx",
425 "Number of transactions involving shared objects",
426 registry,
427 )
428 .unwrap(),
429
430 sponsored_tx: register_int_counter_with_registry!(
431 "num_sponsored_tx",
432 "Number of sponsored transactions",
433 registry,
434 )
435 .unwrap(),
436
437 tx_already_processed: register_int_counter_with_registry!(
438 "num_tx_already_processed",
439 "Number of transaction orders already processed previously",
440 registry,
441 )
442 .unwrap(),
443 num_input_objs: register_histogram_with_registry!(
444 "num_input_objects",
445 "Distribution of number of input TX objects per TX",
446 POSITIVE_INT_BUCKETS.to_vec(),
447 registry,
448 )
449 .unwrap(),
450 num_shared_objects: register_histogram_with_registry!(
451 "num_shared_objects",
452 "Number of shared input objects per TX",
453 POSITIVE_INT_BUCKETS.to_vec(),
454 registry,
455 )
456 .unwrap(),
457 batch_size: register_histogram_with_registry!(
458 "batch_size",
459 "Distribution of size of transaction batch",
460 POSITIVE_INT_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
465 "authority_state_handle_vote_transaction_latency",
466 "Latency of voting on transactions without signing",
467 LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 internal_execution_latency: register_histogram_with_registry!(
472 "authority_state_internal_execution_latency",
473 "Latency of actual certificate executions",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 execution_load_input_objects_latency: register_histogram_with_registry!(
479 "authority_state_execution_load_input_objects_latency",
480 "Latency of loading input objects for execution",
481 LOW_LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 prepare_certificate_latency: register_histogram_with_registry!(
486 "authority_state_prepare_certificate_latency",
487 "Latency of executing certificates, before committing the results",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 )
491 .unwrap(),
492 commit_certificate_latency: register_histogram_with_registry!(
493 "authority_state_commit_certificate_latency",
494 "Latency of committing certificate execution results",
495 LATENCY_SEC_BUCKETS.to_vec(),
496 registry,
497 )
498 .unwrap(),
499 db_checkpoint_latency: register_histogram_with_registry!(
500 "db_checkpoint_latency",
501 "Latency of checkpointing dbs",
502 LATENCY_SEC_BUCKETS.to_vec(),
503 registry,
504 ).unwrap(),
505 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
506 "transaction_manager_num_enqueued_certificates",
507 "Current number of certificates enqueued to ExecutionScheduler",
508 &["result"],
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513 "transaction_manager_num_pending_certificates",
514 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519 "transaction_manager_num_executing_certificates",
520 "Number of executing certificates, including queued and actually running certificates",
521 registry,
522 )
523 .unwrap(),
524 authority_overload_status: register_int_gauge_with_registry!(
525 "authority_overload_status",
526 "Whether authority is current experiencing overload and enters load shedding mode.",
527 registry)
528 .unwrap(),
529 authority_load_shedding_percentage: register_int_gauge_with_registry!(
530 "authority_load_shedding_percentage",
531 "The percentage of transactions is shed when the authority is in load shedding mode.",
532 registry)
533 .unwrap(),
534 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
535 "transaction_manager_transaction_queue_age_s",
536 "Time spent in waiting for transaction in the queue",
537 LATENCY_SEC_BUCKETS.to_vec(),
538 registry,
539 )
540 .unwrap(),
541 transaction_overload_sources: register_int_counter_vec_with_registry!(
542 "transaction_overload_sources",
543 "Number of times each source indicates transaction overload.",
544 &["source"],
545 registry)
546 .unwrap(),
547 execution_driver_executed_transactions: register_int_counter_with_registry!(
548 "execution_driver_executed_transactions",
549 "Cumulative number of transaction executed by execution driver",
550 registry,
551 )
552 .unwrap(),
553 execution_driver_paused_transactions: register_int_counter_with_registry!(
554 "execution_driver_paused_transactions",
555 "Cumulative number of transactions paused by execution driver",
556 registry,
557 )
558 .unwrap(),
559 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
560 "execution_driver_dispatch_queue",
561 "Number of transaction pending in execution driver dispatch queue",
562 registry,
563 )
564 .unwrap(),
565 execution_queueing_delay_s: register_histogram_with_registry!(
566 "execution_queueing_delay_s",
567 "Queueing delay between a transaction is ready for execution until it starts executing.",
568 LATENCY_SEC_BUCKETS.to_vec(),
569 registry
570 )
571 .unwrap(),
572 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
573 "prepare_cert_gas_latency_ratio",
574 "The ratio of computation gas divided by VM execution latency.",
575 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
576 registry
577 )
578 .unwrap(),
579 execution_gas_latency_ratio: register_histogram_with_registry!(
580 "execution_gas_latency_ratio",
581 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
582 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
583 registry
584 )
585 .unwrap(),
586 skipped_consensus_txns: register_int_counter_with_registry!(
587 "skipped_consensus_txns",
588 "Total number of consensus transactions skipped",
589 registry,
590 )
591 .unwrap(),
592 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
593 "skipped_consensus_txns_cache_hit",
594 "Total number of consensus transactions skipped because of local cache hit",
595 registry,
596 )
597 .unwrap(),
598 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
599 "consensus_handler_duplicate_tx_count",
600 "Number of times each transaction appears in its first consensus commit",
601 POSITIVE_INT_BUCKETS.to_vec(),
602 registry,
603 )
604 .unwrap(),
605 post_processing_total_events_emitted: register_int_counter_with_registry!(
606 "post_processing_total_events_emitted",
607 "Total number of events emitted in post processing",
608 registry,
609 )
610 .unwrap(),
611 post_processing_total_tx_indexed: register_int_counter_with_registry!(
612 "post_processing_total_tx_indexed",
613 "Total number of txes indexed in post processing",
614 registry,
615 )
616 .unwrap(),
617 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
618 "post_processing_total_tx_had_event_processed",
619 "Total number of txes finished event processing in post processing",
620 registry,
621 )
622 .unwrap(),
623 post_processing_total_failures: register_int_counter_with_registry!(
624 "post_processing_total_failures",
625 "Total number of failure in post processing",
626 registry,
627 )
628 .unwrap(),
629 consensus_handler_processed: register_int_counter_vec_with_registry!(
630 "consensus_handler_processed",
631 "Number of transactions processed by consensus handler",
632 &["class"],
633 registry
634 ).unwrap(),
635 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
636 "consensus_handler_transaction_sizes",
637 "Sizes of each type of transactions processed by consensus handler",
638 &["class"],
639 POSITIVE_INT_BUCKETS.to_vec(),
640 registry
641 ).unwrap(),
642 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
643 "consensus_handler_num_low_scoring_authorities",
644 "Number of low scoring authorities based on reputation scores from consensus",
645 registry
646 ).unwrap(),
647 consensus_handler_scores: register_int_gauge_vec_with_registry!(
648 "consensus_handler_scores",
649 "scores from consensus for each authority",
650 &["authority"],
651 registry,
652 ).unwrap(),
653 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
654 "consensus_handler_deferred_transactions",
655 "Number of transactions deferred by consensus handler",
656 registry,
657 ).unwrap(),
658 consensus_handler_congested_transactions: register_int_counter_with_registry!(
659 "consensus_handler_congested_transactions",
660 "Number of transactions deferred by consensus handler due to congestion",
661 registry,
662 ).unwrap(),
663 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
664 "consensus_handler_unpaid_amplification_deferrals",
665 "Number of transactions deferred due to unpaid consensus amplification",
666 registry,
667 ).unwrap(),
668 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
669 "consensus_handler_cancelled_transactions",
670 "Number of transactions cancelled by consensus handler",
671 registry,
672 ).unwrap(),
673 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
674 "consensus_handler_max_congestion_control_object_costs",
675 "Max object costs for congestion control in the current consensus commit",
676 &["commit_type"],
677 registry,
678 ).unwrap(),
679 consensus_committed_subdags: register_int_counter_vec_with_registry!(
680 "consensus_committed_subdags",
681 "Number of committed subdags, sliced by author",
682 &["authority"],
683 registry,
684 ).unwrap(),
685 consensus_committed_messages: register_int_gauge_vec_with_registry!(
686 "consensus_committed_messages",
687 "Total number of committed consensus messages, sliced by author",
688 &["authority"],
689 registry,
690 ).unwrap(),
691 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
692 "consensus_committed_user_transactions",
693 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
694 &["authority"],
695 registry,
696 ).unwrap(),
697 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
698 "consensus_finalized_user_transactions",
699 "Number of user transactions finalized, sliced by submitter",
700 &["authority"],
701 registry,
702 ).unwrap(),
703 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
704 "consensus_rejected_user_transactions",
705 "Number of user transactions rejected, sliced by submitter",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
710 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
711 zklogin_sig_count: register_int_counter_with_registry!(
712 "zklogin_sig_count",
713 "Count of zkLogin signatures",
714 registry,
715 )
716 .unwrap(),
717 multisig_sig_count: register_int_counter_with_registry!(
718 "multisig_sig_count",
719 "Count of zkLogin signatures",
720 registry,
721 )
722 .unwrap(),
723 consensus_calculated_throughput: register_int_gauge_with_registry!(
724 "consensus_calculated_throughput",
725 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
726 registry,
727 ).unwrap(),
728 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
729 "consensus_calculated_throughput_profile",
730 "The current active calculated throughput profile",
731 registry
732 ).unwrap(),
733 consensus_block_handler_block_processed: register_int_counter_with_registry!(
734 "consensus_block_handler_block_processed",
735 "Number of blocks processed by consensus block handler.",
736 registry
737 ).unwrap(),
738 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
739 "consensus_block_handler_txn_processed",
740 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
741 &["outcome"],
742 registry
743 ).unwrap(),
744 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
745 "consensus_block_handler_fastpath_executions",
746 "Number of fastpath transactions sent for execution by consensus transaction handler",
747 registry,
748 ).unwrap(),
749 consensus_timestamp_bias: register_histogram_with_registry!(
750 "consensus_timestamp_bias",
751 "Bias/delay from consensus timestamp to system time",
752 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
753 registry
754 ).unwrap(),
755 execution_queueing_latency: LatencyObserver::new(),
756 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758 }
759 }
760}
761
762pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
769
770#[derive(Debug, Clone)]
773pub struct ExecutionEnv {
774 pub assigned_versions: AssignedVersions,
776 pub expected_effects_digest: Option<TransactionEffectsDigest>,
779 pub funds_withdraw_status: FundsWithdrawStatus,
782 pub barrier_dependencies: Vec<TransactionDigest>,
785}
786
787impl Default for ExecutionEnv {
788 fn default() -> Self {
789 Self {
790 assigned_versions: Default::default(),
791 expected_effects_digest: None,
792 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
793 barrier_dependencies: Default::default(),
794 }
795 }
796}
797
798impl ExecutionEnv {
799 pub fn new() -> Self {
800 Default::default()
801 }
802
803 pub fn with_expected_effects_digest(
804 mut self,
805 expected_effects_digest: TransactionEffectsDigest,
806 ) -> Self {
807 self.expected_effects_digest = Some(expected_effects_digest);
808 self
809 }
810
811 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
812 self.assigned_versions = assigned_versions;
813 self
814 }
815
816 pub fn with_insufficient_funds(mut self) -> Self {
817 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
818 self
819 }
820
821 pub fn with_barrier_dependencies(
822 mut self,
823 barrier_dependencies: BTreeSet<TransactionDigest>,
824 ) -> Self {
825 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
826 self
827 }
828}
829
830#[derive(Debug)]
831pub struct ForkRecoveryState {
832 transaction_overrides:
834 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
835}
836
837impl Default for ForkRecoveryState {
838 fn default() -> Self {
839 Self {
840 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
841 }
842 }
843}
844
845impl ForkRecoveryState {
846 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
847 let Some(config) = config else {
848 return Ok(Self::default());
849 };
850
851 let mut transaction_overrides = HashMap::new();
852 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
853 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
854 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
855 })?;
856 let effects_digest =
857 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
858 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
859 })?;
860 transaction_overrides.insert(tx_digest, effects_digest);
861 }
862
863 Ok(Self {
864 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
865 })
866 }
867
868 pub fn get_transaction_override(
869 &self,
870 tx_digest: &TransactionDigest,
871 ) -> Option<TransactionEffectsDigest> {
872 self.transaction_overrides.read().get(tx_digest).copied()
873 }
874}
875
876pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
877
878pub struct AuthorityState {
879 pub name: AuthorityName,
882 pub secret: StableSyncAuthoritySigner,
884
885 input_loader: TransactionInputLoader,
887 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
888 coin_reservation_resolver: Arc<CoinReservationResolver>,
889
890 epoch_store: ArcSwap<AuthorityPerEpochStore>,
891
892 execution_lock: RwLock<EpochId>,
897
898 pub indexes: Option<Arc<IndexStore>>,
899 pub rpc_index: Option<Arc<RpcIndexStore>>,
900
901 pub subscription_handler: Arc<SubscriptionHandler>,
902 pub checkpoint_store: Arc<CheckpointStore>,
903
904 committee_store: Arc<CommitteeStore>,
905
906 execution_scheduler: Arc<ExecutionScheduler>,
908
909 #[allow(unused)]
911 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
912
913 pub metrics: Arc<AuthorityMetrics>,
914 _pruner: AuthorityStorePruner,
915 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
916
917 db_checkpoint_config: DBCheckpointConfig,
919
920 pub config: NodeConfig,
921
922 pub overload_info: AuthorityOverloadInfo,
924
925 chain_identifier: ChainIdentifier,
927
928 pub(crate) congestion_tracker: Arc<CongestionTracker>,
929
930 pub traffic_controller: Option<Arc<TrafficController>>,
932
933 fork_recovery_state: Option<ForkRecoveryState>,
935
936 notify_epoch: tokio::sync::watch::Sender<EpochId>,
938
939 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
940 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
941
942 pending_post_processing:
946 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
947
948 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
951}
952
953impl AuthorityState {
960 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
961 epoch_store.committee().authority_exists(&self.name)
962 }
963
964 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
965 !self.is_validator(epoch_store)
966 }
967
968 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
969 &self.committee_store
970 }
971
972 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
973 self.committee_store.clone()
974 }
975
976 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
977 &self.config.authority_overload_config
978 }
979
980 pub fn get_epoch_state_commitments(
981 &self,
982 epoch: EpochId,
983 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
984 self.checkpoint_store.get_epoch_state_commitments(epoch)
985 }
986
987 fn pre_object_load_checks(
990 &self,
991 tx_data: &TransactionData,
992 tx_signatures: &[GenericSignature],
993 input_object_kinds: &[InputObjectKind],
994 receiving_objects_refs: &[ObjectRef],
995 ) -> SuiResult<BTreeMap<AccumulatorObjId, u64>> {
996 sui_transaction_checks::deny::check_transaction_for_signing(
1000 tx_data,
1001 tx_signatures,
1002 input_object_kinds,
1003 receiving_objects_refs,
1004 &self.config.transaction_deny_config,
1005 self.get_backing_package_store().as_ref(),
1006 )?;
1007
1008 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1009 self.chain_identifier,
1010 self.coin_reservation_resolver.as_ref(),
1011 )?;
1012
1013 self.execution_cache_trait_pointers
1014 .account_funds_read
1015 .check_amounts_available(&declared_withdrawals)?;
1016
1017 Ok(declared_withdrawals)
1018 }
1019
1020 fn handle_transaction_deny_checks(
1021 &self,
1022 transaction: &VerifiedTransaction,
1023 epoch_store: &Arc<AuthorityPerEpochStore>,
1024 ) -> SuiResult<CheckedInputObjects> {
1025 let tx_digest = transaction.digest();
1026 let tx_data = transaction.data().transaction_data();
1027
1028 let input_object_kinds = tx_data.input_objects()?;
1029 let receiving_objects_refs = tx_data.receiving_objects();
1030
1031 self.pre_object_load_checks(
1032 tx_data,
1033 transaction.tx_signatures(),
1034 &input_object_kinds,
1035 &receiving_objects_refs,
1036 )?;
1037
1038 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1039 Some(tx_digest),
1040 &input_object_kinds,
1041 &receiving_objects_refs,
1042 epoch_store.epoch(),
1043 )?;
1044
1045 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1046 epoch_store.protocol_config(),
1047 epoch_store.reference_gas_price(),
1048 tx_data,
1049 input_objects,
1050 &receiving_objects,
1051 &self.metrics.bytecode_verifier_metrics,
1052 &self.config.verifier_signing_config,
1053 )?;
1054
1055 self.handle_coin_deny_list_checks(
1056 tx_data,
1057 &checked_input_objects,
1058 &receiving_objects,
1059 epoch_store,
1060 )?;
1061
1062 Ok(checked_input_objects)
1063 }
1064
1065 fn handle_coin_deny_list_checks(
1066 &self,
1067 tx_data: &TransactionData,
1068 checked_input_objects: &CheckedInputObjects,
1069 receiving_objects: &ReceivingObjects,
1070 epoch_store: &Arc<AuthorityPerEpochStore>,
1071 ) -> SuiResult<()> {
1072 let funds_withdraw_types = tx_data
1073 .get_funds_withdrawals()
1074 .into_iter()
1075 .filter_map(|withdraw| {
1076 withdraw
1077 .type_arg
1078 .get_balance_type_param()
1079 .map(|ty| ty.to_canonical_string(false))
1080 })
1081 .collect::<BTreeSet<_>>();
1082
1083 if epoch_store.coin_deny_list_v1_enabled() {
1084 check_coin_deny_list_v1(
1085 tx_data.sender(),
1086 checked_input_objects,
1087 receiving_objects,
1088 funds_withdraw_types.clone(),
1089 &self.get_object_store(),
1090 )?;
1091 }
1092
1093 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1094 check_coin_deny_list_v2_during_signing(
1095 tx_data.sender(),
1096 checked_input_objects,
1097 receiving_objects,
1098 funds_withdraw_types.clone(),
1099 &self.get_object_store(),
1100 )?;
1101 }
1102
1103 Ok(())
1104 }
1105
1106 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1116 pub fn handle_vote_transaction(
1117 &self,
1118 epoch_store: &Arc<AuthorityPerEpochStore>,
1119 transaction: VerifiedTransaction,
1120 ) -> SuiResult<()> {
1121 debug!("handle_vote_transaction");
1122
1123 let _metrics_guard = self
1124 .metrics
1125 .authority_state_handle_vote_transaction_latency
1126 .start_timer();
1127 self.metrics.tx_orders.inc();
1128
1129 if !epoch_store
1133 .get_reconfig_state_read_lock_guard()
1134 .should_accept_user_certs()
1135 {
1136 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1137 }
1138
1139 let tx_digest = *transaction.digest();
1144 if epoch_store.is_recently_finalized(&tx_digest)
1145 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1146 {
1147 assert_reachable!("transaction recently executed");
1148 return Ok(());
1149 }
1150
1151 if self
1152 .get_transaction_cache_reader()
1153 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1154 {
1155 assert_reachable!("transaction executed in last epoch");
1156 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1157 digest: (*transaction.digest()),
1158 }
1159 .into());
1160 }
1161
1162 let _execution_lock = self.execution_lock_for_validation()?;
1164
1165 let checked_input_objects =
1166 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1167
1168 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1169
1170 self.get_cache_writer()
1174 .validate_owned_object_versions(&owned_objects)
1175 }
1176
1177 pub fn check_transaction_validity(
1186 &self,
1187 epoch_store: &Arc<AuthorityPerEpochStore>,
1188 transaction: &VerifiedTransaction,
1189 enforce_live_input_objects: bool,
1190 ) -> SuiResult<()> {
1191 if !epoch_store
1192 .get_reconfig_state_read_lock_guard()
1193 .should_accept_user_certs()
1194 {
1195 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1196 }
1197
1198 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1199
1200 let checked_input_objects =
1201 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1202
1203 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1205 let cache_reader = self.get_object_cache_reader();
1206
1207 for obj_ref in &owned_objects {
1208 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1209 if obj_ref.1 < live_object.version() {
1212 return Err(SuiErrorKind::UserInputError {
1213 error: UserInputError::ObjectVersionUnavailableForConsumption {
1214 provided_obj_ref: *obj_ref,
1215 current_version: live_object.version(),
1216 },
1217 }
1218 .into());
1219 }
1220
1221 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1222 return Err(SuiErrorKind::UserInputError {
1224 error: UserInputError::ObjectVersionUnavailableForConsumption {
1225 provided_obj_ref: *obj_ref,
1226 current_version: live_object.version(),
1227 },
1228 }
1229 .into());
1230 }
1231
1232 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1234 return Err(SuiErrorKind::UserInputError {
1235 error: UserInputError::InvalidObjectDigest {
1236 object_id: obj_ref.0,
1237 expected_digest: live_object.digest(),
1238 },
1239 }
1240 .into());
1241 }
1242 }
1243 }
1244
1245 Ok(())
1246 }
1247
1248 pub fn check_system_overload_at_signing(&self) -> bool {
1249 self.config
1250 .authority_overload_config
1251 .check_system_overload_at_signing
1252 }
1253
1254 pub fn check_system_overload_at_execution(&self) -> bool {
1255 self.config
1256 .authority_overload_config
1257 .check_system_overload_at_execution
1258 }
1259
1260 pub(crate) fn check_system_overload(
1261 &self,
1262 consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1263 tx_data: &SenderSignedData,
1264 do_authority_overload_check: bool,
1265 ) -> SuiResult {
1266 if do_authority_overload_check {
1267 self.check_authority_overload(tx_data).tap_err(|_| {
1268 self.update_overload_metrics("execution_queue");
1269 })?;
1270 }
1271 self.execution_scheduler
1272 .check_execution_overload(self.overload_config(), tx_data)
1273 .tap_err(|_| {
1274 self.update_overload_metrics("execution_pending");
1275 })?;
1276 consensus_overload_checker
1277 .check_consensus_overload()
1278 .tap_err(|_| {
1279 self.update_overload_metrics("consensus");
1280 })?;
1281
1282 let pending_tx_count = self
1283 .get_cache_commit()
1284 .approximate_pending_transaction_count();
1285 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1286 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1287 retry_after_secs: 10,
1288 }
1289 .into());
1290 }
1291
1292 Ok(())
1293 }
1294
1295 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1296 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1297 return Ok(());
1298 }
1299
1300 let load_shedding_percentage = self
1301 .overload_info
1302 .load_shedding_percentage
1303 .load(Ordering::Relaxed);
1304 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1305 }
1306
1307 fn update_overload_metrics(&self, source: &str) {
1308 self.metrics
1309 .transaction_overload_sources
1310 .with_label_values(&[source])
1311 .inc();
1312 }
1313
1314 pub(crate) async fn wait_for_fastpath_dependency_objects(
1318 &self,
1319 transaction: &VerifiedTransaction,
1320 epoch: EpochId,
1321 ) -> SuiResult<bool> {
1322 let txn_data = transaction.data().transaction_data();
1323 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1324
1325 let fastpath_dependency_objects: Vec<_> = move_objects
1327 .into_iter()
1328 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1329 .chain(
1330 packages
1331 .into_iter()
1332 .map(|package_id| InputKey::Package { id: package_id }),
1333 )
1334 .collect();
1335 let receiving_keys: HashSet<_> = receiving_objects
1336 .into_iter()
1337 .filter_map(|receiving_obj_ref| {
1338 self.should_wait_for_dependency_object(receiving_obj_ref)
1339 })
1340 .collect();
1341 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1342 return Ok(true);
1343 }
1344
1345 let max_wait = if cfg!(msim) {
1348 Duration::from_millis(200)
1349 } else {
1350 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1351 };
1352
1353 match timeout(
1354 max_wait,
1355 self.get_object_cache_reader().notify_read_input_objects(
1356 &fastpath_dependency_objects,
1357 &receiving_keys,
1358 epoch,
1359 ),
1360 )
1361 .await
1362 {
1363 Ok(()) => Ok(true),
1364 Err(_) => Ok(false),
1367 }
1368 }
1369
1370 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1377 let (obj_id, cur_version, _digest) = obj_ref;
1378 let Some(latest_obj_ref) = self
1379 .get_object_cache_reader()
1380 .get_latest_object_ref_or_tombstone(obj_id)
1381 else {
1382 return Some(InputKey::VersionedObject {
1384 id: FullObjectID::new(obj_id, None),
1385 version: cur_version,
1386 });
1387 };
1388 let latest_digest = latest_obj_ref.2;
1389 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1390 return None;
1392 }
1393 let latest_version = latest_obj_ref.1;
1394 if cur_version <= latest_version {
1395 return None;
1398 }
1399 Some(InputKey::VersionedObject {
1401 id: FullObjectID::new(obj_id, None),
1402 version: cur_version,
1403 })
1404 }
1405
1406 #[instrument(level = "trace", skip_all)]
1412 pub async fn wait_for_transaction_execution_for_testing(
1413 &self,
1414 transaction: &VerifiedExecutableTransaction,
1415 epoch_store: &Arc<AuthorityPerEpochStore>,
1416 ) -> TransactionEffects {
1417 if !transaction.is_consensus_tx()
1418 && !epoch_store.protocol_config().disable_preconsensus_locking()
1419 {
1420 self.execution_scheduler.enqueue(
1428 vec![(
1429 Schedulable::Transaction(transaction.clone()),
1430 ExecutionEnv::new(),
1431 )],
1432 epoch_store,
1433 );
1434 }
1435
1436 self.notify_read_effects_for_testing(
1437 "AuthorityState::wait_for_transaction_execution_for_testing",
1438 *transaction.digest(),
1439 )
1440 .await
1441 }
1442
1443 #[instrument(level = "trace", skip_all)]
1455 pub async fn try_execute_immediately(
1456 &self,
1457 certificate: &VerifiedExecutableTransaction,
1458 mut execution_env: ExecutionEnv,
1459 epoch_store: &Arc<AuthorityPerEpochStore>,
1460 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1461 let _scope = monitored_scope("Execution::try_execute_immediately");
1462 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1463
1464 let tx_digest = certificate.digest();
1465
1466 if let Some(fork_recovery) = &self.fork_recovery_state
1467 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1468 {
1469 warn!(
1470 ?tx_digest,
1471 original_digest = ?execution_env.expected_effects_digest,
1472 override_digest = ?override_digest,
1473 "Applying fork recovery override for transaction effects digest"
1474 );
1475 execution_env.expected_effects_digest = Some(override_digest);
1476 }
1477
1478 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1480
1481 let tx_cache_reader = self.get_transaction_cache_reader();
1482 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1483 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1484 assert_eq!(
1485 effects.digest(),
1486 expected_effects_digest,
1487 "Unexpected effects digest for transaction {:?}",
1488 tx_digest
1489 );
1490 }
1491 tx_guard.release();
1492 return ExecutionOutput::Success((effects, None));
1493 }
1494
1495 let execution_start_time = Instant::now();
1496
1497 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1502 else {
1503 tx_guard.release();
1504 return ExecutionOutput::EpochEnded;
1505 };
1506 if *execution_guard != epoch_store.epoch() {
1511 tx_guard.release();
1512 info!("The epoch of the execution_guard doesn't match the epoch store");
1513 return ExecutionOutput::EpochEnded;
1514 }
1515
1516 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1517
1518 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1519 &tx_guard,
1520 &execution_guard,
1521 certificate,
1522 execution_env,
1523 epoch_store,
1524 ) {
1525 ExecutionOutput::Success(result) => result,
1526 output => return output.unwrap_err(),
1527 };
1528 let transaction_outputs = Arc::new(transaction_outputs);
1529
1530 fail_point!("crash");
1531
1532 let effects = transaction_outputs.effects.clone();
1533 debug!(
1534 ?tx_digest,
1535 fx_digest=?effects.digest(),
1536 "process_certificate succeeded in {:.3}ms",
1537 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1538 );
1539
1540 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1541 if let Err(err) = commit_result {
1542 error!(?tx_digest, "Error committing transaction: {err}");
1543 tx_guard.release();
1544 return ExecutionOutput::Fatal(err);
1545 }
1546
1547 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1548 certificate.data().transaction_data().kind()
1549 {
1550 if let Some(err) = &execution_error_opt {
1551 debug_fatal!("Authenticator state update failed: {:?}", err);
1552 }
1553 epoch_store.update_authenticator_state(auth_state);
1554
1555 if cfg!(debug_assertions) {
1557 let authenticator_state = get_authenticator_state(self.get_object_store())
1558 .expect("Read cannot fail")
1559 .expect("Authenticator state must exist");
1560
1561 let mut sys_jwks: Vec<_> = authenticator_state
1562 .active_jwks
1563 .into_iter()
1564 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1565 .collect();
1566 let mut active_jwks: Vec<_> = epoch_store
1567 .signature_verifier
1568 .get_jwks()
1569 .into_iter()
1570 .collect();
1571 sys_jwks.sort();
1572 active_jwks.sort();
1573
1574 assert_eq!(sys_jwks, active_jwks);
1575 }
1576 }
1577
1578 if certificate
1582 .transaction_data()
1583 .kind()
1584 .is_accumulator_barrier_settle_tx()
1585 {
1586 let object_funds_checker = self.object_funds_checker.load();
1587 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1588 let next_accumulator_version = accumulator_version.unwrap().next();
1591 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1592 }
1593 }
1594
1595 tx_guard.commit_tx();
1596
1597 let elapsed = execution_start_time.elapsed();
1598 epoch_store.record_local_execution_time(
1599 certificate.data().transaction_data(),
1600 &effects,
1601 timings,
1602 elapsed,
1603 );
1604
1605 let elapsed_us = elapsed.as_micros() as f64;
1606 if elapsed_us > 0.0 {
1607 self.metrics
1608 .execution_gas_latency_ratio
1609 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1610 };
1611 ExecutionOutput::Success((effects, execution_error_opt))
1612 }
1613
1614 pub fn read_objects_for_execution(
1615 &self,
1616 tx_lock: &CertLockGuard,
1617 certificate: &VerifiedExecutableTransaction,
1618 assigned_shared_object_versions: &AssignedVersions,
1619 epoch_store: &Arc<AuthorityPerEpochStore>,
1620 ) -> SuiResult<InputObjects> {
1621 let _scope = monitored_scope("Execution::load_input_objects");
1622 let _metrics_guard = self
1623 .metrics
1624 .execution_load_input_objects_latency
1625 .start_timer();
1626 let input_objects = &certificate.data().transaction_data().input_objects()?;
1627 self.input_loader.read_objects_for_execution(
1628 &certificate.key(),
1629 tx_lock,
1630 input_objects,
1631 assigned_shared_object_versions,
1632 epoch_store.epoch(),
1633 )
1634 }
1635
1636 pub async fn try_execute_for_test(
1639 &self,
1640 certificate: &VerifiedCertificate,
1641 execution_env: ExecutionEnv,
1642 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1643 let epoch_store = self.epoch_store_for_testing();
1644 let (effects, execution_error_opt) = self
1645 .try_execute_immediately(
1646 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1647 execution_env,
1648 &epoch_store,
1649 )
1650 .await
1651 .unwrap();
1652 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1653 (signed_effects, execution_error_opt)
1654 }
1655
1656 pub async fn try_execute_executable_for_test(
1657 &self,
1658 executable: &VerifiedExecutableTransaction,
1659 execution_env: ExecutionEnv,
1660 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1661 let epoch_store = self.epoch_store_for_testing();
1662 let (effects, execution_error_opt) = self
1663 .try_execute_immediately(executable, execution_env, &epoch_store)
1664 .await
1665 .unwrap();
1666 self.flush_post_processing(executable.digest()).await;
1667 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1668 (signed_effects, execution_error_opt)
1669 }
1670
1671 pub async fn notify_read_effects_for_testing(
1676 &self,
1677 task_name: &'static str,
1678 digest: TransactionDigest,
1679 ) -> TransactionEffects {
1680 self.get_transaction_cache_reader()
1681 .notify_read_executed_effects(task_name, &[digest])
1682 .await
1683 .pop()
1684 .expect("must return correct number of effects")
1685 }
1686
1687 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1688 self.get_object_cache_reader()
1689 .check_owned_objects_are_live(owned_object_refs)
1690 }
1691
1692 pub(crate) fn debug_dump_transaction_state(
1697 &self,
1698 tx_digest: &TransactionDigest,
1699 effects: &TransactionEffects,
1700 expected_effects_digest: TransactionEffectsDigest,
1701 inner_temporary_store: &InnerTemporaryStore,
1702 certificate: &VerifiedExecutableTransaction,
1703 debug_dump_config: &StateDebugDumpConfig,
1704 ) -> SuiResult<PathBuf> {
1705 let dump_dir = debug_dump_config
1706 .dump_file_directory
1707 .as_ref()
1708 .cloned()
1709 .unwrap_or(std::env::temp_dir());
1710 let epoch_store = self.load_epoch_store_one_call_per_task();
1711
1712 NodeStateDump::new(
1713 tx_digest,
1714 effects,
1715 expected_effects_digest,
1716 self.get_object_store().as_ref(),
1717 &epoch_store,
1718 inner_temporary_store,
1719 certificate,
1720 )?
1721 .write_to_file(&dump_dir)
1722 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1723 }
1724
1725 #[instrument(level = "trace", skip_all)]
1726 pub(crate) fn process_certificate(
1727 &self,
1728 tx_guard: &CertTxGuard,
1729 execution_guard: &ExecutionLockReadGuard<'_>,
1730 certificate: &VerifiedExecutableTransaction,
1731 execution_env: ExecutionEnv,
1732 epoch_store: &Arc<AuthorityPerEpochStore>,
1733 ) -> ExecutionOutput<(
1734 TransactionOutputs,
1735 Vec<ExecutionTiming>,
1736 Option<ExecutionError>,
1737 )> {
1738 let _scope = monitored_scope("Execution::process_certificate");
1739 let tx_digest = *certificate.digest();
1740
1741 let input_objects = match self.read_objects_for_execution(
1742 tx_guard.as_lock_guard(),
1743 certificate,
1744 &execution_env.assigned_versions,
1745 epoch_store,
1746 ) {
1747 Ok(objects) => objects,
1748 Err(e) => return ExecutionOutput::Fatal(e),
1749 };
1750
1751 let expected_effects_digest = match execution_env.expected_effects_digest {
1752 Some(expected_effects_digest) => Some(expected_effects_digest),
1753 None => {
1754 match epoch_store.get_signed_effects_digest(&tx_digest) {
1759 Ok(digest) => digest,
1760 Err(e) => return ExecutionOutput::Fatal(e),
1761 }
1762 }
1763 };
1764
1765 fail_point_if!("correlated-crash-process-certificate", || {
1766 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1767 sui_simulator::task::kill_current_node(None);
1768 }
1769 });
1770
1771 self.execute_certificate(
1776 execution_guard,
1777 certificate,
1778 input_objects,
1779 expected_effects_digest,
1780 execution_env,
1781 epoch_store,
1782 )
1783 }
1784
1785 pub async fn reconfigure_traffic_control(
1786 &self,
1787 params: TrafficControlReconfigParams,
1788 ) -> Result<TrafficControlReconfigParams, SuiError> {
1789 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1790 traffic_controller.admin_reconfigure(params).await
1791 } else {
1792 Err(SuiErrorKind::InvalidAdminRequest(
1793 "Traffic controller is not configured on this node".to_string(),
1794 )
1795 .into())
1796 }
1797 }
1798
1799 #[instrument(level = "trace", skip_all)]
1800 fn commit_certificate(
1801 &self,
1802 certificate: &VerifiedExecutableTransaction,
1803 transaction_outputs: Arc<TransactionOutputs>,
1804 epoch_store: &Arc<AuthorityPerEpochStore>,
1805 ) -> SuiResult {
1806 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1807 monitored_scope("Execution::commit_certificate");
1808 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1809
1810 let tx_digest = certificate.digest();
1811
1812 epoch_store.insert_executed_in_epoch(tx_digest);
1815 let key = certificate.key();
1816 if !matches!(key, TransactionKey::Digest(_)) {
1817 epoch_store.insert_tx_key(key, *tx_digest)?;
1818 }
1819
1820 fail_point!("crash");
1822
1823 self.get_cache_writer()
1824 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1825
1826 if let Some(settlement_key) = certificate
1831 .transaction_data()
1832 .kind()
1833 .accumulator_barrier_settlement_key()
1834 {
1835 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1836 }
1837
1838 if certificate.transaction_data().is_end_of_epoch_tx() {
1839 self.get_object_cache_reader()
1842 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1843 }
1844
1845 Ok(())
1846 }
1847
1848 fn update_metrics(
1849 &self,
1850 certificate: &VerifiedExecutableTransaction,
1851 inner_temporary_store: &InnerTemporaryStore,
1852 effects: &TransactionEffects,
1853 ) {
1854 if certificate.has_zklogin_sig() {
1856 self.metrics.zklogin_sig_count.inc();
1857 } else if certificate.has_upgraded_multisig() {
1858 self.metrics.multisig_sig_count.inc();
1859 }
1860
1861 self.metrics.total_effects.inc();
1862 self.metrics.total_certs.inc();
1863
1864 let consensus_object_count = effects.input_consensus_objects().len();
1865 if consensus_object_count > 0 {
1866 self.metrics.shared_obj_tx.inc();
1867 }
1868
1869 if certificate.is_sponsored_tx() {
1870 self.metrics.sponsored_tx.inc();
1871 }
1872
1873 let input_object_count = inner_temporary_store.input_objects.len();
1874 self.metrics
1875 .num_input_objs
1876 .observe(input_object_count as f64);
1877 self.metrics
1878 .num_shared_objects
1879 .observe(consensus_object_count as f64);
1880 self.metrics.batch_size.observe(
1881 certificate
1882 .data()
1883 .intent_message()
1884 .value
1885 .kind()
1886 .num_commands() as f64,
1887 );
1888 }
1889
1890 #[instrument(level = "trace", skip_all)]
1899 fn execute_certificate(
1900 &self,
1901 _execution_guard: &ExecutionLockReadGuard<'_>,
1902 certificate: &VerifiedExecutableTransaction,
1903 input_objects: InputObjects,
1904 expected_effects_digest: Option<TransactionEffectsDigest>,
1905 execution_env: ExecutionEnv,
1906 epoch_store: &Arc<AuthorityPerEpochStore>,
1907 ) -> ExecutionOutput<(
1908 TransactionOutputs,
1909 Vec<ExecutionTiming>,
1910 Option<ExecutionError>,
1911 )> {
1912 let _scope = monitored_scope("Execution::prepare_certificate");
1913 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1914 let prepare_certificate_start_time = tokio::time::Instant::now();
1915
1916 let tx_data = certificate.data().transaction_data();
1918
1919 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1920 return ExecutionOutput::Fatal(e);
1921 }
1922
1923 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1927 certificate,
1928 input_objects,
1929 epoch_store.protocol_config(),
1930 epoch_store.reference_gas_price(),
1931 ) {
1932 Ok(result) => result,
1933 Err(e) => return ExecutionOutput::Fatal(e),
1934 };
1935
1936 let owned_object_refs = input_objects.inner().filter_owned_objects();
1937 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1938 return ExecutionOutput::Fatal(e);
1939 }
1940 let tx_digest = *certificate.digest();
1941 let protocol_config = epoch_store.protocol_config();
1942 let transaction_data = &certificate.data().intent_message().value;
1943 let (kind, signer, gas_data) = transaction_data.execution_parts();
1944 let early_execution_error = get_early_execution_error(
1945 &tx_digest,
1946 &input_objects,
1947 self.config.certificate_deny_config.certificate_deny_set(),
1948 &execution_env.funds_withdraw_status,
1949 );
1950 let execution_params = match early_execution_error {
1951 Some(error) => ExecutionOrEarlyError::Err(error),
1952 None => ExecutionOrEarlyError::Ok(()),
1953 };
1954
1955 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
1956
1957 #[allow(unused_mut)]
1958 let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
1959 epoch_store.executor().execute_transaction_to_effects(
1960 &tracking_store,
1961 protocol_config,
1962 self.metrics.limits_metrics.clone(),
1963 self.config
1966 .expensive_safety_check_config
1967 .enable_deep_per_tx_sui_conservation_check(),
1968 execution_params,
1969 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1970 epoch_store
1971 .epoch_start_config()
1972 .epoch_data()
1973 .epoch_start_timestamp(),
1974 input_objects,
1975 gas_data,
1976 gas_status,
1977 kind,
1978 signer,
1979 tx_digest,
1980 &mut None,
1981 );
1982
1983 let object_funds_checker = self.object_funds_checker.load();
1984 if let Some(object_funds_checker) = object_funds_checker.as_ref()
1985 && !object_funds_checker.should_commit_object_funds_withdraws(
1986 certificate,
1987 effects.status(),
1988 &inner_temp_store.accumulator_running_max_withdraws,
1989 &execution_env,
1990 self.get_account_funds_read(),
1991 &self.execution_scheduler,
1992 epoch_store,
1993 )
1994 {
1995 assert_reachable!("retry object withdraw later");
1996 return ExecutionOutput::RetryLater;
1997 }
1998
1999 if let Some(expected_effects_digest) = expected_effects_digest
2000 && effects.digest() != expected_effects_digest
2001 {
2002 match self.debug_dump_transaction_state(
2004 &tx_digest,
2005 &effects,
2006 expected_effects_digest,
2007 &inner_temp_store,
2008 certificate,
2009 &self.config.state_debug_dump_config,
2010 ) {
2011 Ok(out_path) => {
2012 info!(
2013 "Dumped node state for transaction {} to {}",
2014 tx_digest,
2015 out_path.as_path().display().to_string()
2016 );
2017 }
2018 Err(e) => {
2019 error!("Error dumping state for transaction {}: {e}", tx_digest);
2020 }
2021 }
2022 let expected_effects = self
2023 .get_transaction_cache_reader()
2024 .get_effects(&expected_effects_digest);
2025 error!(
2026 ?tx_digest,
2027 ?expected_effects_digest,
2028 actual_effects = ?effects,
2029 expected_effects = ?expected_effects,
2030 "fork detected!"
2031 );
2032 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2033 tx_digest,
2034 expected_effects_digest,
2035 effects.digest(),
2036 ) {
2037 error!("Failed to record transaction fork: {e}");
2038 }
2039
2040 fail_point_if!("kill_transaction_fork_node", || {
2041 #[cfg(msim)]
2042 {
2043 tracing::error!(
2044 fatal = true,
2045 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2046 tx_digest
2047 );
2048 sui_simulator::task::shutdown_current_node();
2049 }
2050 });
2051
2052 fatal!(
2053 "Transaction {} is expected to have effects digest {}, but got {}!",
2054 tx_digest,
2055 expected_effects_digest,
2056 effects.digest()
2057 );
2058 }
2059
2060 fail_point_arg!("simulate_fork_during_execution", |(
2061 forked_validators,
2062 full_halt,
2063 effects_overrides,
2064 fork_probability,
2065 ): (
2066 std::sync::Arc<
2067 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2068 >,
2069 bool,
2070 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2071 f32,
2072 )| {
2073 #[cfg(msim)]
2074 self.simulate_fork_during_execution(
2075 certificate,
2076 epoch_store,
2077 &mut effects,
2078 forked_validators,
2079 full_halt,
2080 effects_overrides,
2081 fork_probability,
2082 );
2083 });
2084
2085 let unchanged_loaded_runtime_objects =
2086 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2087 certificate.transaction_data(),
2088 &effects,
2089 &tracking_store.into_read_objects(),
2090 );
2091
2092 let _ = self
2094 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2095 .tap_err(|e| {
2096 self.metrics.post_processing_total_failures.inc();
2097 error!(?tx_digest, "tx post processing failed: {e}");
2098 });
2099
2100 self.update_metrics(certificate, &inner_temp_store, &effects);
2101
2102 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2103 certificate.clone().into_unsigned(),
2104 effects,
2105 inner_temp_store,
2106 unchanged_loaded_runtime_objects,
2107 );
2108
2109 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2110 if elapsed > 0.0 {
2111 self.metrics.prepare_cert_gas_latency_ratio.observe(
2112 transaction_outputs
2113 .effects
2114 .gas_cost_summary()
2115 .computation_cost as f64
2116 / elapsed,
2117 );
2118 }
2119
2120 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2121 }
2122
2123 pub fn prepare_certificate_for_benchmark(
2124 &self,
2125 certificate: &VerifiedExecutableTransaction,
2126 input_objects: InputObjects,
2127 epoch_store: &Arc<AuthorityPerEpochStore>,
2128 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2129 let lock = RwLock::new(epoch_store.epoch());
2130 let execution_guard = lock.try_read().unwrap();
2131
2132 let (transaction_outputs, _timings, execution_error_opt) = self
2133 .execute_certificate(
2134 &execution_guard,
2135 certificate,
2136 input_objects,
2137 None,
2138 ExecutionEnv::default(),
2139 epoch_store,
2140 )
2141 .unwrap();
2142 Ok((transaction_outputs, execution_error_opt))
2143 }
2144
2145 #[instrument(skip_all)]
2146 #[allow(clippy::type_complexity)]
2147 pub async fn dry_exec_transaction(
2148 &self,
2149 transaction: TransactionData,
2150 transaction_digest: TransactionDigest,
2151 ) -> SuiResult<(
2152 DryRunTransactionBlockResponse,
2153 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2154 TransactionEffects,
2155 Option<ObjectID>,
2156 )> {
2157 let epoch_store = self.load_epoch_store_one_call_per_task();
2158 if !self.is_fullnode(&epoch_store) {
2159 return Err(SuiErrorKind::UnsupportedFeatureError {
2160 error: "dry-exec is only supported on fullnodes".to_string(),
2161 }
2162 .into());
2163 }
2164
2165 if transaction.kind().is_system_tx() {
2166 return Err(SuiErrorKind::UnsupportedFeatureError {
2167 error: "dry-exec does not support system transactions".to_string(),
2168 }
2169 .into());
2170 }
2171
2172 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2173 }
2174
2175 #[allow(clippy::type_complexity)]
2176 pub fn dry_exec_transaction_for_benchmark(
2177 &self,
2178 transaction: TransactionData,
2179 transaction_digest: TransactionDigest,
2180 ) -> SuiResult<(
2181 DryRunTransactionBlockResponse,
2182 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2183 TransactionEffects,
2184 Option<ObjectID>,
2185 )> {
2186 let epoch_store = self.load_epoch_store_one_call_per_task();
2187 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2188 }
2189
2190 #[allow(clippy::type_complexity)]
2191 fn dry_exec_transaction_impl(
2192 &self,
2193 epoch_store: &AuthorityPerEpochStore,
2194 transaction: TransactionData,
2195 transaction_digest: TransactionDigest,
2196 ) -> SuiResult<(
2197 DryRunTransactionBlockResponse,
2198 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2199 TransactionEffects,
2200 Option<ObjectID>,
2201 )> {
2202 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2204
2205 let input_object_kinds = transaction.input_objects()?;
2206 let receiving_object_refs = transaction.receiving_objects();
2207
2208 sui_transaction_checks::deny::check_transaction_for_signing(
2209 &transaction,
2210 &[],
2211 &input_object_kinds,
2212 &receiving_object_refs,
2213 &self.config.transaction_deny_config,
2214 self.get_backing_package_store().as_ref(),
2215 )?;
2216
2217 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2218 None,
2220 &input_object_kinds,
2221 &receiving_object_refs,
2222 epoch_store.epoch(),
2223 )?;
2224
2225 let mut gas_data = transaction.gas_data().clone();
2227 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2228 let sender = transaction.sender();
2229 const MIST_TO_SUI: u64 = 1_000_000_000;
2231 const DRY_RUN_SUI: u64 = 1_000_000_000;
2232 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2233 let gas_object_id = ObjectID::random();
2234 let gas_object = Object::new_move(
2235 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2236 Owner::AddressOwner(sender),
2237 TransactionDigest::genesis_marker(),
2238 );
2239 let gas_object_ref = gas_object.compute_object_reference();
2240 gas_data.payment = vec![gas_object_ref];
2241 (
2242 sui_transaction_checks::check_transaction_input_with_given_gas(
2243 epoch_store.protocol_config(),
2244 epoch_store.reference_gas_price(),
2245 &transaction,
2246 input_objects,
2247 receiving_objects,
2248 gas_object,
2249 &self.metrics.bytecode_verifier_metrics,
2250 &self.config.verifier_signing_config,
2251 )?,
2252 Some(gas_object_id),
2253 )
2254 } else {
2255 (
2256 sui_transaction_checks::check_transaction_input(
2257 epoch_store.protocol_config(),
2258 epoch_store.reference_gas_price(),
2259 &transaction,
2260 input_objects,
2261 &receiving_objects,
2262 &self.metrics.bytecode_verifier_metrics,
2263 &self.config.verifier_signing_config,
2264 )?,
2265 None,
2266 )
2267 };
2268
2269 let protocol_config = epoch_store.protocol_config();
2270 let (kind, signer, _) = transaction.execution_parts();
2271
2272 let silent = true;
2273 let executor = sui_execution::executor(protocol_config, silent)
2274 .expect("Creating an executor should not fail here");
2275
2276 let expensive_checks = false;
2277 let early_execution_error = get_early_execution_error(
2278 &transaction_digest,
2279 &checked_input_objects,
2280 self.config.certificate_deny_config.certificate_deny_set(),
2281 &FundsWithdrawStatus::MaybeSufficient,
2287 );
2288 let execution_params = match early_execution_error {
2289 Some(error) => ExecutionOrEarlyError::Err(error),
2290 None => ExecutionOrEarlyError::Ok(()),
2291 };
2292 let (inner_temp_store, _, effects, _timings, execution_error) = executor
2293 .execute_transaction_to_effects(
2294 self.get_backing_store().as_ref(),
2295 protocol_config,
2296 self.metrics.limits_metrics.clone(),
2297 expensive_checks,
2298 execution_params,
2299 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2300 epoch_store
2301 .epoch_start_config()
2302 .epoch_data()
2303 .epoch_start_timestamp(),
2304 checked_input_objects,
2305 gas_data,
2306 gas_status,
2307 kind,
2308 signer,
2309 transaction_digest,
2310 &mut None,
2311 );
2312 let tx_digest = *effects.transaction_digest();
2313
2314 let module_cache =
2315 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2316
2317 let mut layout_resolver =
2318 epoch_store
2319 .executor()
2320 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2321 &inner_temp_store,
2322 self.get_backing_package_store(),
2323 )));
2324 let object_changes = Vec::new();
2326
2327 let balance_changes = Vec::new();
2329
2330 let written_with_kind = effects
2331 .created()
2332 .into_iter()
2333 .map(|(oref, _)| (oref, WriteKind::Create))
2334 .chain(
2335 effects
2336 .unwrapped()
2337 .into_iter()
2338 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2339 )
2340 .chain(
2341 effects
2342 .mutated()
2343 .into_iter()
2344 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2345 )
2346 .map(|(oref, kind)| {
2347 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2348 (oref.0, (oref, obj.clone(), kind))
2350 })
2351 .collect();
2352
2353 let execution_error_source = execution_error
2354 .as_ref()
2355 .err()
2356 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2357
2358 Ok((
2359 DryRunTransactionBlockResponse {
2360 suggested_gas_price: self
2361 .congestion_tracker
2362 .get_suggested_gas_prices(&transaction),
2363 input: SuiTransactionBlockData::try_from_with_module_cache(
2364 transaction,
2365 &module_cache,
2366 )
2367 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2368 error: format!(
2369 "Failed to convert transaction to SuiTransactionBlockData: {}",
2370 e
2371 ),
2372 })?, effects: effects.clone().try_into()?,
2374 events: SuiTransactionBlockEvents::try_from(
2375 inner_temp_store.events.clone(),
2376 tx_digest,
2377 None,
2378 layout_resolver.as_mut(),
2379 )?,
2380 object_changes,
2381 balance_changes,
2382 execution_error_source,
2383 },
2384 written_with_kind,
2385 effects,
2386 mock_gas,
2387 ))
2388 }
2389
2390 pub fn simulate_transaction(
2391 &self,
2392 mut transaction: TransactionData,
2393 checks: TransactionChecks,
2394 allow_mock_gas_coin: bool,
2395 ) -> SuiResult<SimulateTransactionResult> {
2396 if transaction.kind().is_system_tx() {
2397 return Err(SuiErrorKind::UnsupportedFeatureError {
2398 error: "simulate does not support system transactions".to_string(),
2399 }
2400 .into());
2401 }
2402
2403 let epoch_store = self.load_epoch_store_one_call_per_task();
2404 if !self.is_fullnode(&epoch_store) {
2405 return Err(SuiErrorKind::UnsupportedFeatureError {
2406 error: "simulate is only supported on fullnodes".to_string(),
2407 }
2408 .into());
2409 }
2410
2411 let dev_inspect = checks.disabled();
2412 if dev_inspect && self.config.dev_inspect_disabled {
2413 return Err(SuiErrorKind::UnsupportedFeatureError {
2414 error: "simulate with checks disabled is not allowed on this node".to_string(),
2415 }
2416 .into());
2417 }
2418
2419 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2421
2422 let input_object_kinds = transaction.input_objects()?;
2423 let receiving_object_refs = transaction.receiving_objects();
2424
2425 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() {
2429 let obj = Object::new_move(
2430 MoveObject::new_gas_coin(
2431 OBJECT_START_VERSION,
2432 ObjectID::MAX,
2433 DEV_INSPECT_GAS_COIN_VALUE,
2434 ),
2435 Owner::AddressOwner(transaction.gas_data().owner),
2436 TransactionDigest::genesis_marker(),
2437 );
2438 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2439 Some(obj)
2440 } else {
2441 None
2442 };
2443
2444 let declared_withdrawals = self.pre_object_load_checks(
2445 &transaction,
2446 &[],
2447 &input_object_kinds,
2448 &receiving_object_refs,
2449 )?;
2450 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2451
2452 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2453 None,
2455 &input_object_kinds,
2456 &receiving_object_refs,
2457 epoch_store.epoch(),
2458 )?;
2459
2460 let mock_gas_id = mock_gas_object.map(|obj| {
2462 let id = obj.id();
2463 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2464 id
2465 });
2466
2467 let protocol_config = epoch_store.protocol_config();
2468
2469 let (gas_status, checked_input_objects) = if dev_inspect {
2470 sui_transaction_checks::check_dev_inspect_input(
2471 protocol_config,
2472 &transaction,
2473 input_objects,
2474 receiving_objects,
2475 epoch_store.reference_gas_price(),
2476 )?
2477 } else {
2478 sui_transaction_checks::check_transaction_input(
2479 epoch_store.protocol_config(),
2480 epoch_store.reference_gas_price(),
2481 &transaction,
2482 input_objects,
2483 &receiving_objects,
2484 &self.metrics.bytecode_verifier_metrics,
2485 &self.config.verifier_signing_config,
2486 )?
2487 };
2488
2489 let executor = sui_execution::executor(
2491 protocol_config,
2492 true, )
2494 .expect("Creating an executor should not fail here");
2495
2496 let (kind, signer, gas_data) = transaction.execution_parts();
2497 let early_execution_error = get_early_execution_error(
2498 &transaction.digest(),
2499 &checked_input_objects,
2500 self.config.certificate_deny_config.certificate_deny_set(),
2501 &FundsWithdrawStatus::MaybeSufficient,
2502 );
2503 let execution_params = match early_execution_error {
2504 Some(error) => ExecutionOrEarlyError::Err(error),
2505 None => ExecutionOrEarlyError::Ok(()),
2506 };
2507
2508 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2509
2510 let cloned_input_objects = checked_input_objects.clone();
2512 let cloned_gas = gas_data.clone();
2513 let cloned_kind = kind.clone();
2514 let tx_digest = transaction.digest();
2515 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2516 let epoch_timestamp_ms = epoch_store
2517 .epoch_start_config()
2518 .epoch_data()
2519 .epoch_start_timestamp();
2520 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2521 &tracking_store,
2522 protocol_config,
2523 self.metrics.limits_metrics.clone(),
2524 false, execution_params,
2526 &epoch_id,
2527 epoch_timestamp_ms,
2528 checked_input_objects,
2529 gas_data,
2530 gas_status,
2531 kind,
2532 signer,
2533 tx_digest,
2534 dev_inspect,
2535 );
2536
2537 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2539 let has_insufficient_object_funds = inner_temp_store
2540 .accumulator_running_max_withdraws
2541 .iter()
2542 .filter(|(id, _)| !address_funds.contains(id))
2543 .any(|(id, max_withdraw)| {
2544 let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2545 balance < *max_withdraw
2546 });
2547
2548 if has_insufficient_object_funds {
2549 let retry_gas_status = SuiGasStatus::new(
2550 cloned_gas.budget,
2551 cloned_gas.price,
2552 epoch_store.reference_gas_price(),
2553 protocol_config,
2554 )?;
2555 let (store, _, effects, result) = executor.dev_inspect_transaction(
2556 &tracking_store,
2557 protocol_config,
2558 self.metrics.limits_metrics.clone(),
2559 false,
2560 ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2561 &epoch_id,
2562 epoch_timestamp_ms,
2563 cloned_input_objects,
2564 cloned_gas,
2565 retry_gas_status,
2566 cloned_kind,
2567 signer,
2568 tx_digest,
2569 dev_inspect,
2570 );
2571 (store, effects, result)
2572 } else {
2573 (inner_temp_store, effects, execution_result)
2574 }
2575 } else {
2576 (inner_temp_store, effects, execution_result)
2577 };
2578
2579 let loaded_runtime_objects = tracking_store.into_read_objects();
2580 let unchanged_loaded_runtime_objects =
2581 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2582 &transaction,
2583 &effects,
2584 &loaded_runtime_objects,
2585 );
2586
2587 let object_set = {
2588 let objects = {
2589 let mut objects = loaded_runtime_objects;
2590
2591 for o in inner_temp_store
2592 .input_objects
2593 .into_values()
2594 .chain(inner_temp_store.written.into_values())
2595 {
2596 objects.insert(o);
2597 }
2598
2599 objects
2600 };
2601
2602 let object_keys = sui_types::storage::get_transaction_object_set(
2603 &transaction,
2604 &effects,
2605 &unchanged_loaded_runtime_objects,
2606 );
2607
2608 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2609 for k in object_keys {
2610 if let Some(o) = objects.get(&k) {
2611 set.insert(o.clone());
2612 }
2613 }
2614
2615 set
2616 };
2617
2618 Ok(SimulateTransactionResult {
2619 objects: object_set,
2620 events: effects.events_digest().map(|_| inner_temp_store.events),
2621 effects,
2622 execution_result,
2623 mock_gas_id,
2624 unchanged_loaded_runtime_objects,
2625 suggested_gas_price: self
2626 .congestion_tracker
2627 .get_suggested_gas_prices(&transaction),
2628 })
2629 }
2630
2631 #[allow(clippy::collapsible_else_if)]
2633 #[instrument(skip_all)]
2634 pub async fn dev_inspect_transaction_block(
2635 &self,
2636 sender: SuiAddress,
2637 transaction_kind: TransactionKind,
2638 gas_price: Option<u64>,
2639 gas_budget: Option<u64>,
2640 gas_sponsor: Option<SuiAddress>,
2641 gas_objects: Option<Vec<ObjectRef>>,
2642 show_raw_txn_data_and_effects: Option<bool>,
2643 skip_checks: Option<bool>,
2644 ) -> SuiResult<DevInspectResults> {
2645 let epoch_store = self.load_epoch_store_one_call_per_task();
2646
2647 if !self.is_fullnode(&epoch_store) {
2648 return Err(SuiErrorKind::UnsupportedFeatureError {
2649 error: "dev-inspect is only supported on fullnodes".to_string(),
2650 }
2651 .into());
2652 }
2653
2654 if transaction_kind.is_system_tx() {
2655 return Err(SuiErrorKind::UnsupportedFeatureError {
2656 error: "system transactions are not supported".to_string(),
2657 }
2658 .into());
2659 }
2660
2661 let skip_checks = skip_checks.unwrap_or(true);
2662 if skip_checks && self.config.dev_inspect_disabled {
2663 return Err(SuiErrorKind::UnsupportedFeatureError {
2664 error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2665 }
2666 .into());
2667 }
2668
2669 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2670 let reference_gas_price = epoch_store.reference_gas_price();
2671 let protocol_config = epoch_store.protocol_config();
2672 let max_tx_gas = protocol_config.max_tx_gas();
2673
2674 let price = gas_price.unwrap_or(reference_gas_price);
2675 let budget = gas_budget.unwrap_or(max_tx_gas);
2676 let owner = gas_sponsor.unwrap_or(sender);
2677 let payment = gas_objects.unwrap_or_default();
2679 let mut transaction = TransactionData::V1(TransactionDataV1 {
2680 kind: transaction_kind.clone(),
2681 sender,
2682 gas_data: GasData {
2683 payment,
2684 owner,
2685 price,
2686 budget,
2687 },
2688 expiration: TransactionExpiration::None,
2689 });
2690
2691 let raw_txn_data = if show_raw_txn_data_and_effects {
2692 bcs::to_bytes(&transaction).map_err(|_| {
2693 SuiErrorKind::TransactionSerializationError {
2694 error: "Failed to serialize transaction during dev inspect".to_string(),
2695 }
2696 })?
2697 } else {
2698 vec![]
2699 };
2700
2701 transaction.validity_check_no_gas_check(protocol_config)?;
2702
2703 let input_object_kinds = transaction.input_objects()?;
2704 let receiving_object_refs = transaction.receiving_objects();
2705
2706 sui_transaction_checks::deny::check_transaction_for_signing(
2707 &transaction,
2708 &[],
2709 &input_object_kinds,
2710 &receiving_object_refs,
2711 &self.config.transaction_deny_config,
2712 self.get_backing_package_store().as_ref(),
2713 )?;
2714
2715 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2716 None,
2718 &input_object_kinds,
2719 &receiving_object_refs,
2720 epoch_store.epoch(),
2721 )?;
2722
2723 let (gas_status, checked_input_objects) = if skip_checks {
2724 if transaction.gas().is_empty() {
2728 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2730 DEV_INSPECT_GAS_COIN_VALUE,
2731 transaction.gas_owner(),
2732 );
2733 let gas_object_ref = dummy_gas_object.compute_object_reference();
2734 transaction.gas_data_mut().payment = vec![gas_object_ref];
2735 input_objects.push(ObjectReadResult::new(
2736 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2737 dummy_gas_object.into(),
2738 ));
2739 }
2740 sui_transaction_checks::check_dev_inspect_input(
2741 protocol_config,
2742 &transaction,
2743 input_objects,
2744 receiving_objects,
2745 reference_gas_price,
2746 )?
2747 } else {
2748 if transaction.gas().is_empty() {
2751 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2753 DEV_INSPECT_GAS_COIN_VALUE,
2754 transaction.gas_owner(),
2755 );
2756 let gas_object_ref = dummy_gas_object.compute_object_reference();
2757 transaction.gas_data_mut().payment = vec![gas_object_ref];
2758 sui_transaction_checks::check_transaction_input_with_given_gas(
2759 epoch_store.protocol_config(),
2760 epoch_store.reference_gas_price(),
2761 &transaction,
2762 input_objects,
2763 receiving_objects,
2764 dummy_gas_object,
2765 &self.metrics.bytecode_verifier_metrics,
2766 &self.config.verifier_signing_config,
2767 )?
2768 } else {
2769 sui_transaction_checks::check_transaction_input(
2770 epoch_store.protocol_config(),
2771 epoch_store.reference_gas_price(),
2772 &transaction,
2773 input_objects,
2774 &receiving_objects,
2775 &self.metrics.bytecode_verifier_metrics,
2776 &self.config.verifier_signing_config,
2777 )?
2778 }
2779 };
2780
2781 let executor = sui_execution::executor(protocol_config, true)
2782 .expect("Creating an executor should not fail here");
2783 let gas_data = transaction.gas_data().clone();
2784 let intent_msg = IntentMessage::new(
2785 Intent {
2786 version: IntentVersion::V0,
2787 scope: IntentScope::TransactionData,
2788 app_id: AppId::Sui,
2789 },
2790 transaction,
2791 );
2792 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2793 let early_execution_error = get_early_execution_error(
2794 &transaction_digest,
2795 &checked_input_objects,
2796 self.config.certificate_deny_config.certificate_deny_set(),
2797 &FundsWithdrawStatus::MaybeSufficient,
2799 );
2800 let execution_params = match early_execution_error {
2801 Some(error) => ExecutionOrEarlyError::Err(error),
2802 None => ExecutionOrEarlyError::Ok(()),
2803 };
2804
2805 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2806 self.get_backing_store().as_ref(),
2807 protocol_config,
2808 self.metrics.limits_metrics.clone(),
2809 false,
2810 execution_params,
2811 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2812 epoch_store
2813 .epoch_start_config()
2814 .epoch_data()
2815 .epoch_start_timestamp(),
2816 checked_input_objects,
2817 gas_data,
2818 gas_status,
2819 transaction_kind,
2820 sender,
2821 transaction_digest,
2822 skip_checks,
2823 );
2824
2825 let raw_effects = if show_raw_txn_data_and_effects {
2826 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2827 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2828 })?
2829 } else {
2830 vec![]
2831 };
2832
2833 let mut layout_resolver =
2834 epoch_store
2835 .executor()
2836 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2837 &inner_temp_store,
2838 self.get_backing_package_store(),
2839 )));
2840
2841 DevInspectResults::new(
2842 effects,
2843 inner_temp_store.events.clone(),
2844 execution_result,
2845 raw_txn_data,
2846 raw_effects,
2847 layout_resolver.as_mut(),
2848 )
2849 }
2850
2851 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2853 let epoch_store = self.epoch_store_for_testing();
2854 Ok(epoch_store.reference_gas_price())
2855 }
2856
2857 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2858 self.get_transaction_cache_reader()
2859 .is_tx_already_executed(digest)
2860 }
2861
2862 #[instrument(level = "debug", skip_all, err(level = "debug"))]
2863 fn index_tx(
2864 sequence: u64,
2865 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2866 object_store: &Arc<dyn ObjectStore + Send + Sync>,
2867 indexes: &IndexStore,
2868 digest: &TransactionDigest,
2869 cert: &VerifiedExecutableTransaction,
2871 effects: &TransactionEffects,
2872 events: &TransactionEvents,
2873 timestamp_ms: u64,
2874 tx_coins: Option<TxCoins>,
2875 written: &WrittenObjects,
2876 inner_temporary_store: &InnerTemporaryStore,
2877 epoch_store: &Arc<AuthorityPerEpochStore>,
2878 acquire_locks: bool,
2879 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2880 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2881 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2882
2883 indexes.index_tx(
2884 sequence,
2885 cert.data().intent_message().value.sender(),
2886 cert.data()
2887 .intent_message()
2888 .value
2889 .input_objects()?
2890 .iter()
2891 .map(|o| o.object_id()),
2892 effects
2893 .all_changed_objects()
2894 .into_iter()
2895 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2896 cert.data()
2897 .intent_message()
2898 .value
2899 .move_calls()
2900 .into_iter()
2901 .map(|(_cmd_idx, package, module, function)| {
2902 (*package, module.to_owned(), function.to_owned())
2903 }),
2904 events,
2905 changes,
2906 digest,
2907 timestamp_ms,
2908 tx_coins,
2909 effects.accumulator_events(),
2910 acquire_locks,
2911 )
2912 }
2913
2914 #[cfg(msim)]
2915 fn simulate_fork_during_execution(
2916 &self,
2917 certificate: &VerifiedExecutableTransaction,
2918 epoch_store: &Arc<AuthorityPerEpochStore>,
2919 effects: &mut TransactionEffects,
2920 forked_validators: std::sync::Arc<
2921 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2922 >,
2923 full_halt: bool,
2924 effects_overrides: std::sync::Arc<
2925 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2926 >,
2927 fork_probability: f32,
2928 ) {
2929 use std::cell::RefCell;
2930 thread_local! {
2931 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2932 }
2933 if !certificate.data().intent_message().value.is_system_tx() {
2934 let committee = epoch_store.committee();
2935 let cur_stake = (**committee).weight(&self.name);
2936 if cur_stake > 0 {
2937 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2938 let already_forked = forked_validators
2939 .lock()
2940 .ok()
2941 .map(|set| set.contains(&self.name))
2942 .unwrap_or(false);
2943
2944 if !already_forked {
2945 let should_fork = if full_halt {
2946 *total_stake <= committee.validity_threshold()
2948 } else {
2949 *total_stake + cur_stake < committee.validity_threshold()
2951 };
2952
2953 if should_fork {
2954 *total_stake += cur_stake;
2955
2956 if let Ok(mut external_set) = forked_validators.lock() {
2957 external_set.insert(self.name);
2958 info!("forked_validators: {:?}", external_set);
2959 }
2960 }
2961 }
2962
2963 if let Ok(external_set) = forked_validators.lock() {
2964 if external_set.contains(&self.name) {
2965 let tx_digest = certificate.digest().to_string();
2971 if let Ok(mut overrides) = effects_overrides.lock() {
2972 if overrides.contains_key(&tx_digest)
2973 || overrides.is_empty()
2974 && sui_simulator::random::deterministic_probability(
2975 &tx_digest,
2976 fork_probability,
2977 )
2978 {
2979 let original_effects_digest = effects.digest().to_string();
2980 overrides
2981 .insert(tx_digest.clone(), original_effects_digest.clone());
2982 info!(
2983 ?tx_digest,
2984 ?original_effects_digest,
2985 "Captured forked effects digest for transaction"
2986 );
2987 effects.gas_cost_summary_mut_for_testing().computation_cost +=
2988 1;
2989 }
2990 }
2991 }
2992 }
2993 });
2994 }
2995 }
2996 }
2997
2998 fn process_object_index(
2999 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3000 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3001 effects: &TransactionEffects,
3002 written: &WrittenObjects,
3003 inner_temporary_store: &InnerTemporaryStore,
3004 epoch_store: &Arc<AuthorityPerEpochStore>,
3005 ) -> SuiResult<ObjectIndexChanges> {
3006 let mut layout_resolver =
3007 epoch_store
3008 .executor()
3009 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3010 inner_temporary_store,
3011 backing_package_store,
3012 )));
3013
3014 let modified_at_version = effects
3015 .modified_at_versions()
3016 .into_iter()
3017 .collect::<HashMap<_, _>>();
3018
3019 let tx_digest = effects.transaction_digest();
3020 let mut deleted_owners = vec![];
3021 let mut deleted_dynamic_fields = vec![];
3022 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3023 let old_version = modified_at_version.get(&id).unwrap();
3024 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3027 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3028 ) {
3029 Owner::AddressOwner(addr)
3030 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3031 Owner::ObjectOwner(object_id) => {
3032 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3033 }
3034 _ => {}
3035 }
3036 }
3037
3038 let mut new_owners = vec![];
3039 let mut new_dynamic_fields = vec![];
3040
3041 for (oref, owner, kind) in effects.all_changed_objects() {
3042 let id = &oref.0;
3043 if let WriteKind::Mutate = kind {
3045 let Some(old_version) = modified_at_version.get(id) else {
3046 panic!(
3047 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3048 tx_digest
3049 );
3050 };
3051 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3054 panic!(
3055 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3056 tx_digest, id, old_version
3057 );
3058 };
3059 if old_object.owner != owner {
3060 match old_object.owner {
3061 Owner::AddressOwner(addr)
3062 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3063 deleted_owners.push((addr, *id));
3064 }
3065 Owner::ObjectOwner(object_id) => {
3066 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3067 }
3068 _ => {}
3069 }
3070 }
3071 }
3072
3073 match owner {
3074 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3075 let new_object = written.get(id).unwrap_or_else(
3077 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3078 );
3079 assert_eq!(
3080 new_object.version(),
3081 oref.1,
3082 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3083 tx_digest,
3084 id,
3085 new_object.version(),
3086 oref.1
3087 );
3088
3089 let type_ = new_object
3090 .type_()
3091 .map(|type_| ObjectType::Struct(type_.clone()))
3092 .unwrap_or(ObjectType::Package);
3093
3094 new_owners.push((
3095 (addr, *id),
3096 ObjectInfo {
3097 object_id: *id,
3098 version: oref.1,
3099 digest: oref.2,
3100 type_,
3101 owner,
3102 previous_transaction: *effects.transaction_digest(),
3103 },
3104 ));
3105 }
3106 Owner::ObjectOwner(owner) => {
3107 let new_object = written.get(id).unwrap_or_else(
3108 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3109 );
3110 assert_eq!(
3111 new_object.version(),
3112 oref.1,
3113 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3114 tx_digest,
3115 id,
3116 new_object.version(),
3117 oref.1
3118 );
3119
3120 let Some(df_info) = Self::try_create_dynamic_field_info(
3121 object_store,
3122 new_object,
3123 written,
3124 layout_resolver.as_mut(),
3125 )
3126 .unwrap_or_else(|e| {
3127 error!(
3128 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3129 e, new_object
3130 );
3131 None
3132 }) else {
3133 continue;
3135 };
3136 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3137 }
3138 _ => {}
3139 }
3140 }
3141
3142 Ok(ObjectIndexChanges {
3143 deleted_owners,
3144 deleted_dynamic_fields,
3145 new_owners,
3146 new_dynamic_fields,
3147 })
3148 }
3149
3150 fn try_create_dynamic_field_info(
3151 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3152 o: &Object,
3153 written: &WrittenObjects,
3154 resolver: &mut dyn LayoutResolver,
3155 ) -> SuiResult<Option<DynamicFieldInfo>> {
3156 let Some(move_object) = o.data.try_as_move().cloned() else {
3158 return Ok(None);
3159 };
3160
3161 if !move_object.type_().is_dynamic_field() {
3163 return Ok(None);
3164 }
3165
3166 let layout = resolver
3167 .get_annotated_layout(&move_object.type_().clone().into())?
3168 .into_layout();
3169
3170 let field =
3171 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3172 SuiErrorKind::ObjectDeserializationError {
3173 error: e.to_string(),
3174 }
3175 })?;
3176
3177 let type_ = field.kind;
3178 let name_type: TypeTag = field.name_layout.into();
3179 let bcs_name = field.name_bytes.to_owned();
3180
3181 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3182 .map_err(|e| {
3183 warn!("{e}");
3184 SuiErrorKind::ObjectDeserializationError {
3185 error: e.to_string(),
3186 }
3187 })?;
3188
3189 let name = DynamicFieldName {
3190 type_: name_type,
3191 value: SuiMoveValue::from(name_value).to_json_value(),
3192 };
3193
3194 let value_metadata = field.value_metadata().map_err(|e| {
3195 warn!("{e}");
3196 SuiErrorKind::ObjectDeserializationError {
3197 error: e.to_string(),
3198 }
3199 })?;
3200
3201 Ok(Some(match value_metadata {
3202 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3203 name,
3204 bcs_name,
3205 type_,
3206 object_type: object_type.to_canonical_string(true),
3207 object_id: o.id(),
3208 version: o.version(),
3209 digest: o.digest(),
3210 },
3211
3212 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3213 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3217 let version = object.version();
3218 let digest = object.digest();
3219 let object_type = object.data.type_().unwrap().clone();
3220 (version, digest, object_type)
3221 } else {
3222 let object = object_store
3224 .get_object_by_key(&object_id, o.version())
3225 .ok_or_else(|| UserInputError::ObjectNotFound {
3226 object_id,
3227 version: Some(o.version()),
3228 })?;
3229 let version = object.version();
3230 let digest = object.digest();
3231 let object_type = object.data.type_().unwrap().clone();
3232 (version, digest, object_type)
3233 };
3234
3235 DynamicFieldInfo {
3236 name,
3237 bcs_name,
3238 type_,
3239 object_type: object_type.to_string(),
3240 object_id,
3241 version,
3242 digest,
3243 }
3244 }
3245 }))
3246 }
3247
3248 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3249 fn post_process_one_tx(
3250 &self,
3251 certificate: &VerifiedExecutableTransaction,
3252 effects: &TransactionEffects,
3253 inner_temporary_store: &InnerTemporaryStore,
3254 epoch_store: &Arc<AuthorityPerEpochStore>,
3255 ) -> SuiResult {
3256 let Some(indexes) = &self.indexes else {
3257 return Ok(());
3258 };
3259
3260 let tx_digest = *certificate.digest();
3261
3262 let sequence = indexes.allocate_sequence_number();
3264
3265 if self.config.sync_post_process_one_tx {
3266 let result = Self::post_process_one_tx_impl(
3271 sequence,
3272 indexes,
3273 &self.subscription_handler,
3274 &self.metrics,
3275 self.name,
3276 self.get_backing_package_store(),
3277 self.get_object_store(),
3278 certificate,
3279 effects,
3280 inner_temporary_store,
3281 epoch_store,
3282 true, );
3284
3285 match result {
3286 Ok((raw_batch, cache_updates_with_locks)) => {
3287 let mut db_batch = indexes.new_db_batch();
3288 db_batch
3289 .concat(vec![raw_batch])
3290 .expect("failed to absorb raw index batch");
3291 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3293 cache_updates_with_locks;
3294 indexes
3295 .commit_index_batch(db_batch, vec![inner])
3296 .expect("failed to commit index batch");
3297 }
3298 Err(e) => {
3299 self.metrics.post_processing_total_failures.inc();
3300 error!(?tx_digest, "tx post processing failed: {e}");
3301 return Err(e);
3302 }
3303 }
3304
3305 return Ok(());
3306 }
3307
3308 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3309 self.pending_post_processing.insert(tx_digest, done_rx);
3310
3311 let indexes = indexes.clone();
3312 let subscription_handler = self.subscription_handler.clone();
3313 let metrics = self.metrics.clone();
3314 let name = self.name;
3315 let backing_package_store = self.get_backing_package_store().clone();
3316 let object_store = self.get_object_store().clone();
3317 let semaphore = self.post_processing_semaphore.clone();
3318
3319 let certificate = certificate.clone();
3320 let effects = effects.clone();
3321 let inner_temporary_store = inner_temporary_store.clone();
3322 let epoch_store = epoch_store.clone();
3323
3324 tokio::spawn(async move {
3326 let permit = {
3327 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3328 semaphore
3329 .acquire_owned()
3330 .await
3331 .expect("post-processing semaphore should not be closed")
3332 };
3333
3334 let _ = tokio::task::spawn_blocking(move || {
3335 let _permit = permit;
3336
3337 let result = Self::post_process_one_tx_impl(
3338 sequence,
3339 &indexes,
3340 &subscription_handler,
3341 &metrics,
3342 name,
3343 &backing_package_store,
3344 &object_store,
3345 &certificate,
3346 &effects,
3347 &inner_temporary_store,
3348 &epoch_store,
3349 false, );
3351
3352 match result {
3353 Ok((raw_batch, cache_updates_with_locks)) => {
3354 fail_point!("crash-after-post-process-one-tx");
3355 let output = (raw_batch, cache_updates_with_locks.into_inner());
3356 let _ = done_tx.send(output);
3357 }
3358 Err(e) => {
3359 metrics.post_processing_total_failures.inc();
3360 error!(?tx_digest, "tx post processing failed: {e}");
3361 }
3362 }
3363 })
3364 .await;
3365 });
3366
3367 Ok(())
3368 }
3369
3370 fn post_process_one_tx_impl(
3371 sequence: u64,
3372 indexes: &Arc<IndexStore>,
3373 subscription_handler: &Arc<SubscriptionHandler>,
3374 metrics: &Arc<AuthorityMetrics>,
3375 name: AuthorityName,
3376 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3377 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3378 certificate: &VerifiedExecutableTransaction,
3379 effects: &TransactionEffects,
3380 inner_temporary_store: &InnerTemporaryStore,
3381 epoch_store: &Arc<AuthorityPerEpochStore>,
3382 acquire_locks: bool,
3383 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3384 let _scope = monitored_scope("Execution::post_process_one_tx");
3385
3386 let tx_digest = certificate.digest();
3387 let timestamp_ms = Self::unixtime_now_ms();
3388 let events = &inner_temporary_store.events;
3389 let written = &inner_temporary_store.written;
3390 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3391 name,
3392 object_store,
3393 effects,
3394 inner_temporary_store,
3395 epoch_store,
3396 );
3397
3398 let (raw_batch, cache_updates) = Self::index_tx(
3399 sequence,
3400 backing_package_store,
3401 object_store,
3402 indexes,
3403 tx_digest,
3404 certificate,
3405 effects,
3406 events,
3407 timestamp_ms,
3408 tx_coins,
3409 written,
3410 inner_temporary_store,
3411 epoch_store,
3412 acquire_locks,
3413 )
3414 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3415 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3416 .expect("Indexing tx should not fail");
3417
3418 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3419 let events = Self::make_transaction_block_events(
3420 backing_package_store,
3421 events.clone(),
3422 *tx_digest,
3423 timestamp_ms,
3424 epoch_store,
3425 inner_temporary_store,
3426 )?;
3427 subscription_handler
3429 .process_tx(certificate.data().transaction_data(), &effects, &events)
3430 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3431 .tap_err(|e| {
3432 warn!(
3433 ?tx_digest,
3434 "Post processing - Couldn't process events for tx: {}", e
3435 )
3436 })?;
3437
3438 metrics
3439 .post_processing_total_events_emitted
3440 .inc_by(events.data.len() as u64);
3441
3442 Ok((raw_batch, cache_updates))
3443 }
3444
3445 fn make_transaction_block_events(
3446 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3447 transaction_events: TransactionEvents,
3448 digest: TransactionDigest,
3449 timestamp_ms: u64,
3450 epoch_store: &Arc<AuthorityPerEpochStore>,
3451 inner_temporary_store: &InnerTemporaryStore,
3452 ) -> SuiResult<SuiTransactionBlockEvents> {
3453 let mut layout_resolver =
3454 epoch_store
3455 .executor()
3456 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3457 inner_temporary_store,
3458 backing_package_store,
3459 )));
3460 SuiTransactionBlockEvents::try_from(
3461 transaction_events,
3462 digest,
3463 Some(timestamp_ms),
3464 layout_resolver.as_mut(),
3465 )
3466 }
3467
3468 pub fn unixtime_now_ms() -> u64 {
3469 let now = SystemTime::now()
3470 .duration_since(UNIX_EPOCH)
3471 .expect("Time went backwards")
3472 .as_millis();
3473 u64::try_from(now).expect("Travelling in time machine")
3474 }
3475
3476 #[instrument(level = "trace", skip_all)]
3480 pub async fn handle_transaction_info_request(
3481 &self,
3482 request: TransactionInfoRequest,
3483 ) -> SuiResult<TransactionInfoResponse> {
3484 let epoch_store = self.load_epoch_store_one_call_per_task();
3485 let (transaction, status) = self
3486 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3487 .ok_or(SuiErrorKind::TransactionNotFound {
3488 digest: request.transaction_digest,
3489 })?;
3490 Ok(TransactionInfoResponse {
3491 transaction,
3492 status,
3493 })
3494 }
3495
3496 #[instrument(level = "trace", skip_all)]
3497 pub async fn handle_object_info_request(
3498 &self,
3499 request: ObjectInfoRequest,
3500 ) -> SuiResult<ObjectInfoResponse> {
3501 let epoch_store = self.load_epoch_store_one_call_per_task();
3502
3503 let requested_object_seq = match request.request_kind {
3504 ObjectInfoRequestKind::LatestObjectInfo => {
3505 let (_, seq, _) = self
3506 .get_object_or_tombstone(request.object_id)
3507 .await
3508 .ok_or_else(|| {
3509 SuiError::from(UserInputError::ObjectNotFound {
3510 object_id: request.object_id,
3511 version: None,
3512 })
3513 })?;
3514 seq
3515 }
3516 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3517 };
3518
3519 let object = self
3520 .get_object_store()
3521 .get_object_by_key(&request.object_id, requested_object_seq)
3522 .ok_or_else(|| {
3523 SuiError::from(UserInputError::ObjectNotFound {
3524 object_id: request.object_id,
3525 version: Some(requested_object_seq),
3526 })
3527 })?;
3528
3529 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3530 (request.generate_layout, object.data.try_as_move())
3531 {
3532 Some(into_struct_layout(
3533 self.load_epoch_store_one_call_per_task()
3534 .executor()
3535 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3536 .get_annotated_layout(&move_obj.type_().clone().into())?,
3537 )?)
3538 } else {
3539 None
3540 };
3541
3542 let lock = if !object.is_address_owned() {
3543 None
3545 } else {
3546 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3547 .await?
3548 .map(|s| s.into_inner())
3549 };
3550
3551 Ok(ObjectInfoResponse {
3552 object,
3553 layout,
3554 lock_for_debugging: lock,
3555 })
3556 }
3557
3558 #[instrument(level = "trace", skip_all)]
3559 pub fn handle_checkpoint_request(
3560 &self,
3561 request: &CheckpointRequest,
3562 ) -> SuiResult<CheckpointResponse> {
3563 let summary = match request.sequence_number {
3564 Some(seq) => self
3565 .checkpoint_store
3566 .get_checkpoint_by_sequence_number(seq)?,
3567 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3568 }
3569 .map(|v| v.into_inner());
3570 let contents = match &summary {
3571 Some(s) => self
3572 .checkpoint_store
3573 .get_checkpoint_contents(&s.content_digest)?,
3574 None => None,
3575 };
3576 Ok(CheckpointResponse {
3577 checkpoint: summary,
3578 contents,
3579 })
3580 }
3581
3582 #[instrument(level = "trace", skip_all)]
3583 pub fn handle_checkpoint_request_v2(
3584 &self,
3585 request: &CheckpointRequestV2,
3586 ) -> SuiResult<CheckpointResponseV2> {
3587 let summary = if request.certified {
3588 let summary = match request.sequence_number {
3589 Some(seq) => self
3590 .checkpoint_store
3591 .get_checkpoint_by_sequence_number(seq)?,
3592 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3593 }
3594 .map(|v| v.into_inner());
3595 summary.map(CheckpointSummaryResponse::Certified)
3596 } else {
3597 let summary = match request.sequence_number {
3598 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3599 None => self
3600 .checkpoint_store
3601 .get_latest_locally_computed_checkpoint()?,
3602 };
3603 summary.map(CheckpointSummaryResponse::Pending)
3604 };
3605 let contents = match &summary {
3606 Some(s) => self
3607 .checkpoint_store
3608 .get_checkpoint_contents(&s.content_digest())?,
3609 None => None,
3610 };
3611 Ok(CheckpointResponseV2 {
3612 checkpoint: summary,
3613 contents,
3614 })
3615 }
3616
3617 fn check_protocol_version(
3618 supported_protocol_versions: SupportedProtocolVersions,
3619 current_version: ProtocolVersion,
3620 ) {
3621 info!("current protocol version is now {:?}", current_version);
3622 info!("supported versions are: {:?}", supported_protocol_versions);
3623 if !supported_protocol_versions.is_version_supported(current_version) {
3624 let msg = format!(
3625 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3626 current_version, supported_protocol_versions,
3627 );
3628
3629 error!("{}", msg);
3630 eprintln!("{}", msg);
3631
3632 #[cfg(not(msim))]
3633 std::process::exit(1);
3634
3635 #[cfg(msim)]
3636 sui_simulator::task::shutdown_current_node();
3637 }
3638 }
3639
3640 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3642 pub async fn new(
3643 name: AuthorityName,
3644 secret: StableSyncAuthoritySigner,
3645 supported_protocol_versions: SupportedProtocolVersions,
3646 store: Arc<AuthorityStore>,
3647 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3648 epoch_store: Arc<AuthorityPerEpochStore>,
3649 committee_store: Arc<CommitteeStore>,
3650 indexes: Option<Arc<IndexStore>>,
3651 rpc_index: Option<Arc<RpcIndexStore>>,
3652 checkpoint_store: Arc<CheckpointStore>,
3653 prometheus_registry: &Registry,
3654 genesis_objects: &[Object],
3655 db_checkpoint_config: &DBCheckpointConfig,
3656 config: NodeConfig,
3657 chain_identifier: ChainIdentifier,
3658 policy_config: Option<PolicyConfig>,
3659 firewall_config: Option<RemoteFirewallConfig>,
3660 pruner_watermarks: Arc<PrunerWatermarks>,
3661 ) -> Arc<Self> {
3662 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3663
3664 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3665 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3666 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3667 execution_cache_trait_pointers.object_cache_reader.clone(),
3668 execution_cache_trait_pointers.account_funds_read.clone(),
3669 execution_cache_trait_pointers
3670 .transaction_cache_reader
3671 .clone(),
3672 tx_ready_certificates,
3673 &epoch_store,
3674 config.funds_withdraw_scheduler_type,
3675 metrics.clone(),
3676 prometheus_registry,
3677 ));
3678 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3679
3680 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3681 epoch_store.get_parent_path(),
3682 &config.authority_store_pruning_config,
3683 );
3684 let _pruner = AuthorityStorePruner::new(
3685 store.perpetual_tables.clone(),
3686 checkpoint_store.clone(),
3687 rpc_index.clone(),
3688 indexes.clone(),
3689 config.authority_store_pruning_config.clone(),
3690 epoch_store.committee().authority_exists(&name),
3691 epoch_store.epoch_start_state().epoch_duration_ms(),
3692 prometheus_registry,
3693 pruner_watermarks,
3694 );
3695 let input_loader =
3696 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3697 let epoch = epoch_store.epoch();
3698 let traffic_controller_metrics =
3699 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3700 let traffic_controller = if let Some(policy_config) = policy_config {
3701 Some(Arc::new(
3702 TrafficController::init(
3703 policy_config,
3704 traffic_controller_metrics,
3705 firewall_config.clone(),
3706 )
3707 .await,
3708 ))
3709 } else {
3710 None
3711 };
3712
3713 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3714 ForkRecoveryState::new(Some(fork_config))
3715 .expect("Failed to initialize fork recovery state")
3716 });
3717
3718 let coin_reservation_resolver = Arc::new(CoinReservationResolver::new(
3719 execution_cache_trait_pointers.child_object_resolver.clone(),
3720 ));
3721
3722 let object_funds_checker_metrics =
3723 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3724 let state = Arc::new(AuthorityState {
3725 name,
3726 secret,
3727 execution_lock: RwLock::new(epoch),
3728 epoch_store: ArcSwap::new(epoch_store.clone()),
3729 input_loader,
3730 execution_cache_trait_pointers,
3731 coin_reservation_resolver,
3732 indexes,
3733 rpc_index,
3734 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3735 checkpoint_store,
3736 committee_store,
3737 execution_scheduler,
3738 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3739 metrics,
3740 _pruner,
3741 _authority_per_epoch_pruner,
3742 db_checkpoint_config: db_checkpoint_config.clone(),
3743 config,
3744 overload_info: AuthorityOverloadInfo::default(),
3745 chain_identifier,
3746 congestion_tracker: Arc::new(CongestionTracker::new()),
3747 traffic_controller,
3748 fork_recovery_state,
3749 notify_epoch: tokio::sync::watch::channel(epoch).0,
3750 object_funds_checker: ArcSwapOption::empty(),
3751 object_funds_checker_metrics,
3752 pending_post_processing: Arc::new(DashMap::new()),
3753 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3754 });
3755 state.init_object_funds_checker().await;
3756
3757 let state_clone = Arc::downgrade(&state);
3758 spawn_monitored_task!(fix_indexes(state_clone));
3759 let authority_state = Arc::downgrade(&state);
3761 spawn_monitored_task!(execution_process(
3762 authority_state,
3763 rx_ready_certificates,
3764 rx_execution_shutdown,
3765 ));
3766 state
3768 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3769 .expect("Error indexing genesis objects.");
3770
3771 if epoch_store
3772 .protocol_config()
3773 .enable_multi_epoch_transaction_expiration()
3774 && epoch_store.epoch() > 0
3775 {
3776 use typed_store::Map;
3777 let previous_epoch = epoch_store.epoch() - 1;
3778 let start_key = (previous_epoch, TransactionDigest::ZERO);
3779 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3780 let has_previous_epoch_data = store
3781 .perpetual_tables
3782 .executed_transaction_digests
3783 .safe_range_iter(start_key..end_key)
3784 .next()
3785 .is_some();
3786
3787 if !has_previous_epoch_data {
3788 panic!(
3789 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3790 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3791 Please restore from a snapshot using the latest version of sui-tool.",
3792 previous_epoch
3793 );
3794 }
3795 }
3796
3797 state
3798 }
3799
3800 async fn init_object_funds_checker(&self) {
3801 let epoch_store = self.epoch_store.load();
3802 if self.is_validator(&epoch_store)
3803 && epoch_store.protocol_config().enable_object_funds_withdraw()
3804 {
3805 if self.object_funds_checker.load().is_none() {
3806 let inner = self
3807 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3808 .await
3809 .map(|o| {
3810 Arc::new(ObjectFundsChecker::new(
3811 o.version(),
3812 self.object_funds_checker_metrics.clone(),
3813 ))
3814 });
3815 self.object_funds_checker.store(inner);
3816 }
3817 } else {
3818 self.object_funds_checker.store(None);
3819 }
3820 }
3821
3822 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3824 &self.execution_cache_trait_pointers.object_cache_reader
3825 }
3826
3827 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3828 &self.execution_cache_trait_pointers.transaction_cache_reader
3829 }
3830
3831 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3832 &self.execution_cache_trait_pointers.cache_writer
3833 }
3834
3835 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3836 &self.execution_cache_trait_pointers.backing_store
3837 }
3838
3839 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3840 &self.execution_cache_trait_pointers.child_object_resolver
3841 }
3842
3843 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3844 &self.execution_cache_trait_pointers.account_funds_read
3845 }
3846
3847 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3848 &self.execution_cache_trait_pointers.backing_package_store
3849 }
3850
3851 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3852 &self.execution_cache_trait_pointers.object_store
3853 }
3854
3855 pub fn pending_post_processing(
3856 &self,
3857 ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3858 &self.pending_post_processing
3859 }
3860
3861 pub async fn await_post_processing(
3862 &self,
3863 tx_digest: &TransactionDigest,
3864 ) -> Option<PostProcessingOutput> {
3865 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3866 rx.await.ok()
3868 } else {
3869 None
3871 }
3872 }
3873
3874 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3878 if let Some(indexes) = &self.indexes
3879 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3880 {
3881 let mut db_batch = indexes.new_db_batch();
3882 db_batch
3883 .concat(vec![raw_batch])
3884 .expect("failed to build index batch");
3885 indexes
3886 .commit_index_batch(db_batch, vec![cache_updates])
3887 .expect("failed to commit index batch");
3888 }
3889 }
3890
3891 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3892 &self.execution_cache_trait_pointers.reconfig_api
3893 }
3894
3895 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3896 &self.execution_cache_trait_pointers.global_state_hash_store
3897 }
3898
3899 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3900 &self.execution_cache_trait_pointers.checkpoint_cache
3901 }
3902
3903 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3904 &self.execution_cache_trait_pointers.state_sync_store
3905 }
3906
3907 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3908 &self.execution_cache_trait_pointers.cache_commit
3909 }
3910
3911 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3912 self.execution_cache_trait_pointers
3913 .testing_api
3914 .database_for_testing()
3915 }
3916
3917 pub fn cache_for_testing(&self) -> &WritebackCache {
3918 self.execution_cache_trait_pointers
3919 .testing_api
3920 .cache_for_testing()
3921 }
3922
3923 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3924 &self,
3925 config: NodeConfig,
3926 metrics: Arc<AuthorityStorePruningMetrics>,
3927 ) -> anyhow::Result<()> {
3928 use crate::authority::authority_store_pruner::PrunerWatermarks;
3929 let watermarks = Arc::new(PrunerWatermarks::default());
3930 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3931 &self.database_for_testing().perpetual_tables,
3932 &self.checkpoint_store,
3933 self.rpc_index.as_deref(),
3934 config.authority_store_pruning_config,
3935 metrics,
3936 EPOCH_DURATION_MS_FOR_TESTING,
3937 &watermarks,
3938 )
3939 .await
3940 }
3941
3942 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3943 &self.execution_scheduler
3944 }
3945
3946 fn create_owner_index_if_empty(
3947 &self,
3948 genesis_objects: &[Object],
3949 epoch_store: &Arc<AuthorityPerEpochStore>,
3950 ) -> SuiResult {
3951 let Some(index_store) = &self.indexes else {
3952 return Ok(());
3953 };
3954 if !index_store.is_empty() {
3955 return Ok(());
3956 }
3957
3958 let mut new_owners = vec![];
3959 let mut new_dynamic_fields = vec![];
3960 let mut layout_resolver = epoch_store
3961 .executor()
3962 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3963 for o in genesis_objects.iter() {
3964 match o.owner {
3965 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3966 new_owners.push((
3967 (addr, o.id()),
3968 ObjectInfo::new(&o.compute_object_reference(), o),
3969 ))
3970 }
3971 Owner::ObjectOwner(object_id) => {
3972 let id = o.id();
3973 let Some(info) = Self::try_create_dynamic_field_info(
3974 self.get_object_store(),
3975 o,
3976 &BTreeMap::new(),
3977 layout_resolver.as_mut(),
3978 )?
3979 else {
3980 continue;
3981 };
3982 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3983 }
3984 _ => {}
3985 }
3986 }
3987
3988 index_store.insert_genesis_objects(ObjectIndexChanges {
3989 deleted_owners: vec![],
3990 deleted_dynamic_fields: vec![],
3991 new_owners,
3992 new_dynamic_fields,
3993 })
3994 }
3995
3996 pub fn execution_lock_for_executable_transaction(
4000 &self,
4001 transaction: &VerifiedExecutableTransaction,
4002 ) -> Option<ExecutionLockReadGuard<'_>> {
4003 let lock = self.execution_lock.try_read().ok()?;
4004 if *lock == transaction.auth_sig().epoch() {
4005 Some(lock)
4006 } else {
4007 None
4009 }
4010 }
4011
4012 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4015 self.execution_lock
4016 .try_read()
4017 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4018 }
4019
4020 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4021 self.execution_lock.write().await
4022 }
4023
4024 #[instrument(level = "error", skip_all)]
4025 pub async fn reconfigure(
4026 &self,
4027 cur_epoch_store: &AuthorityPerEpochStore,
4028 supported_protocol_versions: SupportedProtocolVersions,
4029 new_committee: Committee,
4030 epoch_start_configuration: EpochStartConfiguration,
4031 state_hasher: Arc<GlobalStateHasher>,
4032 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4033 epoch_last_checkpoint: CheckpointSequenceNumber,
4034 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4035 Self::check_protocol_version(
4036 supported_protocol_versions,
4037 epoch_start_configuration
4038 .epoch_start_state()
4039 .protocol_version(),
4040 );
4041
4042 self.committee_store.insert_new_committee(&new_committee)?;
4043
4044 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4046
4047 cur_epoch_store.epoch_terminated().await;
4049
4050 {
4054 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4055 if state.should_accept_user_certs() {
4056 cur_epoch_store.close_user_certs(state);
4063 }
4064 }
4066
4067 self.get_reconfig_api()
4068 .clear_state_end_of_epoch(&execution_lock);
4069 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4070 self.maybe_reaccumulate_state_hash(
4071 cur_epoch_store,
4072 epoch_start_configuration
4073 .epoch_start_state()
4074 .protocol_version(),
4075 );
4076 self.get_reconfig_api()
4077 .set_epoch_start_configuration(&epoch_start_configuration);
4078 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4079 && self
4080 .db_checkpoint_config
4081 .perform_db_checkpoints_at_epoch_end
4082 {
4083 let checkpoint_indexes = self
4084 .db_checkpoint_config
4085 .perform_index_db_checkpoints_at_epoch_end
4086 .unwrap_or(false);
4087 let current_epoch = cur_epoch_store.epoch();
4088 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4089 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4090 }
4091
4092 self.get_reconfig_api()
4093 .reconfigure_cache(&epoch_start_configuration)
4094 .await;
4095
4096 let new_epoch = new_committee.epoch;
4097 let new_epoch_store = self
4098 .reopen_epoch_db(
4099 cur_epoch_store,
4100 new_committee,
4101 epoch_start_configuration,
4102 expensive_safety_check_config,
4103 epoch_last_checkpoint,
4104 )
4105 .await?;
4106 assert_eq!(new_epoch_store.epoch(), new_epoch);
4107 self.execution_scheduler
4108 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4109 self.init_object_funds_checker().await;
4110 *execution_lock = new_epoch;
4111
4112 self.notify_epoch(new_epoch);
4113 Ok(new_epoch_store)
4117 }
4118
4119 fn notify_epoch(&self, new_epoch: EpochId) {
4120 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4121 }
4122
4123 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4124 let mut rx = self.notify_epoch.subscribe();
4125 loop {
4126 let epoch = *rx.borrow();
4127 if epoch >= target_epoch {
4128 return Ok(epoch);
4129 }
4130 rx.changed().await?;
4131 }
4132 }
4133
4134 pub async fn settle_accumulator_for_testing(&self, effects: &[TransactionEffects]) {
4135 let accumulator_version = self
4136 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4137 .await
4138 .unwrap()
4139 .version();
4140 let ckpt_seq = accumulator_version.value();
4143 let builder = AccumulatorSettlementTxBuilder::new(
4144 Some(self.get_transaction_cache_reader().as_ref()),
4145 effects,
4146 ckpt_seq,
4147 0,
4148 );
4149 let balance_changes = builder.collect_funds_changes();
4150 let epoch_store = self.epoch_store_for_testing();
4151 let epoch = epoch_store.epoch();
4152 let accumulator_root_obj_initial_shared_version = epoch_store
4153 .epoch_start_config()
4154 .accumulator_root_obj_initial_shared_version()
4155 .unwrap();
4156 let settlements = builder.build_tx(
4157 epoch_store.protocol_config(),
4158 epoch,
4159 accumulator_root_obj_initial_shared_version,
4160 ckpt_seq,
4161 ckpt_seq,
4162 );
4163
4164 let settlements: Vec<_> = settlements
4165 .into_iter()
4166 .map(|tx| {
4167 VerifiedExecutableTransaction::new_system(
4168 VerifiedTransaction::new_system_transaction(tx),
4169 epoch,
4170 )
4171 })
4172 .collect();
4173
4174 let assigned_versions = epoch_store
4175 .assign_shared_object_versions_for_tests(
4176 self.get_object_cache_reader().as_ref(),
4177 &settlements,
4178 )
4179 .unwrap();
4180 let version_map = assigned_versions.into_map();
4181
4182 let mut settlement_effects = Vec::with_capacity(settlements.len());
4183 for tx in settlements {
4184 let env = ExecutionEnv::new()
4185 .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
4186 let (effects, _) = self
4187 .try_execute_immediately(&tx, env, &epoch_store)
4188 .await
4189 .unwrap();
4190 assert!(effects.status().is_ok());
4191 settlement_effects.push(effects);
4192 }
4193
4194 let barrier = accumulators::build_accumulator_barrier_tx(
4195 epoch,
4196 accumulator_root_obj_initial_shared_version,
4197 ckpt_seq,
4198 &settlement_effects,
4199 );
4200 let barrier = VerifiedExecutableTransaction::new_system(
4201 VerifiedTransaction::new_system_transaction(barrier),
4202 epoch,
4203 );
4204
4205 let assigned_versions = epoch_store
4206 .assign_shared_object_versions_for_tests(
4207 self.get_object_cache_reader().as_ref(),
4208 std::slice::from_ref(&barrier),
4209 )
4210 .unwrap();
4211 let version_map = assigned_versions.into_map();
4212
4213 let env = ExecutionEnv::new()
4214 .with_assigned_versions(version_map.get(&barrier.key()).unwrap().clone());
4215 let (effects, _) = self
4216 .try_execute_immediately(&barrier, env, &epoch_store)
4217 .await
4218 .unwrap();
4219 assert!(effects.status().is_ok());
4220
4221 let next_accumulator_version = accumulator_version.next();
4222 self.execution_scheduler
4223 .settle_address_funds(FundsSettlement {
4224 funds_changes: balance_changes,
4225 next_accumulator_version,
4226 });
4227 }
4229
4230 pub async fn reconfigure_for_testing(&self) {
4234 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4235 let epoch_store = self.epoch_store_for_testing().clone();
4236 let protocol_config = epoch_store.protocol_config().clone();
4237 let _guard =
4244 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4245 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4246 self.get_backing_package_store().clone(),
4247 self.get_object_store().clone(),
4248 &self.config.expensive_safety_check_config,
4249 self.checkpoint_store
4250 .get_epoch_last_checkpoint(epoch_store.epoch())
4251 .unwrap()
4252 .map(|c| *c.sequence_number())
4253 .unwrap_or_default(),
4254 );
4255 self.execution_scheduler
4256 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4257 let new_epoch = new_epoch_store.epoch();
4258 self.epoch_store.store(new_epoch_store);
4259 epoch_store.epoch_terminated().await;
4260
4261 *execution_lock = new_epoch;
4262 }
4263
4264 #[instrument(level = "error", skip_all)]
4267 fn maybe_reaccumulate_state_hash(
4268 &self,
4269 cur_epoch_store: &AuthorityPerEpochStore,
4270 new_protocol_version: ProtocolVersion,
4271 ) {
4272 self.get_reconfig_api()
4273 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4274 }
4275
4276 #[instrument(level = "error", skip_all)]
4277 fn check_system_consistency(
4278 &self,
4279 cur_epoch_store: &AuthorityPerEpochStore,
4280 state_hasher: Arc<GlobalStateHasher>,
4281 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4282 ) {
4283 info!(
4284 "Performing sui conservation consistency check for epoch {}",
4285 cur_epoch_store.epoch()
4286 );
4287
4288 if let Err(err) = self
4289 .get_reconfig_api()
4290 .expensive_check_sui_conservation(cur_epoch_store)
4291 {
4292 if cfg!(debug_assertions) {
4293 panic!("{}", err);
4294 } else {
4295 warn!("Sui conservation consistency check failed: {}", err);
4298 }
4299 } else {
4300 info!("Sui conservation consistency check passed");
4301 }
4302
4303 if expensive_safety_check_config.enable_state_consistency_check() {
4305 info!(
4306 "Performing state consistency check for epoch {}",
4307 cur_epoch_store.epoch()
4308 );
4309 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4310 }
4311
4312 if expensive_safety_check_config.enable_secondary_index_checks()
4313 && let Some(indexes) = self.indexes.clone()
4314 {
4315 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4316 .expect("secondary indexes are inconsistent");
4317 }
4318
4319 if let Some(indexes) = self.indexes.clone() {
4325 let epoch = cur_epoch_store.epoch();
4326 let first_checkpoint = if epoch == 0 {
4330 0
4331 } else {
4332 self.checkpoint_store
4333 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4334 .expect("Failed to get previous epoch's last checkpoint")
4335 .expect("Previous epoch's last checkpoint missing")
4336 + 1
4337 };
4338 let highest_executed = self
4339 .checkpoint_store
4340 .get_highest_executed_checkpoint_seq_number()
4341 .expect("Failed to get highest executed checkpoint")
4342 .expect("No executed checkpoints");
4343
4344 info!(
4345 "Verifying checkpointed transactions are in transactions_seq \
4346 (checkpoints {first_checkpoint}..={highest_executed})"
4347 );
4348 for seq in first_checkpoint..=highest_executed {
4349 let checkpoint = self
4350 .checkpoint_store
4351 .get_checkpoint_by_sequence_number(seq)
4352 .expect("Failed to get checkpoint")
4353 .expect("Checkpoint missing");
4354 let contents = self
4355 .checkpoint_store
4356 .get_checkpoint_contents(&checkpoint.content_digest)
4357 .expect("Failed to get checkpoint contents")
4358 .expect("Checkpoint contents missing");
4359 for digests in contents.iter() {
4360 let tx_digest = digests.transaction;
4361 assert!(
4362 indexes
4363 .get_transaction_seq(&tx_digest)
4364 .expect("Failed to read transactions_seq")
4365 .is_some(),
4366 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4367 );
4368 }
4369 }
4370 info!("All checkpointed transactions verified in transactions_seq");
4371 }
4372 }
4373
4374 fn expensive_check_is_consistent_state(
4375 &self,
4376 state_hasher: Arc<GlobalStateHasher>,
4377 cur_epoch_store: &AuthorityPerEpochStore,
4378 ) {
4379 let live_object_set_hash = state_hasher.digest_live_object_set(
4380 !cur_epoch_store
4381 .protocol_config()
4382 .simplified_unwrap_then_delete(),
4383 );
4384
4385 let root_state_hash: ECMHLiveObjectSetDigest = self
4386 .get_global_state_hash_store()
4387 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4388 .expect("Retrieving root state hash cannot fail")
4389 .expect("Root state hash for epoch must exist")
4390 .1
4391 .digest()
4392 .into();
4393
4394 let is_inconsistent = root_state_hash != live_object_set_hash;
4395 if is_inconsistent {
4396 debug_fatal!(
4397 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4398 root_state_hash,
4399 live_object_set_hash
4400 );
4401 } else {
4402 info!("State consistency check passed");
4403 }
4404
4405 state_hasher.set_inconsistent_state(is_inconsistent);
4406 }
4407
4408 pub fn current_epoch_for_testing(&self) -> EpochId {
4409 self.epoch_store_for_testing().epoch()
4410 }
4411
4412 #[instrument(level = "error", skip_all)]
4413 pub fn checkpoint_all_dbs(
4414 &self,
4415 checkpoint_path: &Path,
4416 cur_epoch_store: &AuthorityPerEpochStore,
4417 checkpoint_indexes: bool,
4418 ) -> SuiResult {
4419 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4420 let current_epoch = cur_epoch_store.epoch();
4421
4422 if checkpoint_path.exists() {
4423 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4424 return Ok(());
4425 }
4426
4427 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4428 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4429
4430 if checkpoint_path_tmp.exists() {
4431 fs::remove_dir_all(&checkpoint_path_tmp)
4432 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4433 }
4434
4435 fs::create_dir_all(&checkpoint_path_tmp)
4436 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4437 fs::create_dir(&store_checkpoint_path_tmp)
4438 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4439
4440 self.checkpoint_store
4443 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4444
4445 self.get_reconfig_api()
4446 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4447
4448 self.committee_store
4449 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4450
4451 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4452 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4453 }
4454
4455 fs::rename(checkpoint_path_tmp, checkpoint_path)
4456 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4457 Ok(())
4458 }
4459
4460 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4466 self.epoch_store.load()
4467 }
4468
4469 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4471 self.load_epoch_store_one_call_per_task()
4472 }
4473
4474 pub fn clone_committee_for_testing(&self) -> Committee {
4475 Committee::clone(self.epoch_store_for_testing().committee())
4476 }
4477
4478 #[instrument(level = "trace", skip_all)]
4479 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4480 self.get_object_store().get_object(object_id)
4481 }
4482
4483 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4484 Ok(self
4485 .get_object(&SUI_SYSTEM_ADDRESS.into())
4486 .await
4487 .expect("framework object should always exist")
4488 .compute_object_reference())
4489 }
4490
4491 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4493 self.get_object_cache_reader()
4494 .get_sui_system_state_object_unsafe()
4495 }
4496
4497 #[instrument(level = "trace", skip_all)]
4498 fn get_transaction_checkpoint_sequence(
4499 &self,
4500 digest: &TransactionDigest,
4501 epoch_store: &AuthorityPerEpochStore,
4502 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4503 epoch_store.get_transaction_checkpoint(digest)
4504 }
4505
4506 #[instrument(level = "trace", skip_all)]
4507 pub fn get_checkpoint_by_sequence_number(
4508 &self,
4509 sequence_number: CheckpointSequenceNumber,
4510 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4511 Ok(self
4512 .checkpoint_store
4513 .get_checkpoint_by_sequence_number(sequence_number)?)
4514 }
4515
4516 #[instrument(level = "trace", skip_all)]
4517 pub fn get_transaction_checkpoint_for_tests(
4518 &self,
4519 digest: &TransactionDigest,
4520 epoch_store: &AuthorityPerEpochStore,
4521 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4522 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4523 let Some(checkpoint) = checkpoint else {
4524 return Ok(None);
4525 };
4526 let checkpoint = self
4527 .checkpoint_store
4528 .get_checkpoint_by_sequence_number(checkpoint)?;
4529 Ok(checkpoint)
4530 }
4531
4532 #[instrument(level = "trace", skip_all)]
4533 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4534 Ok(
4535 match self
4536 .get_object_cache_reader()
4537 .get_latest_object_or_tombstone(*object_id)
4538 {
4539 Some((_, ObjectOrTombstone::Object(object))) => {
4540 let layout = self.get_object_layout(&object)?;
4541 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4542 }
4543 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4544 None => ObjectRead::NotExists(*object_id),
4545 },
4546 )
4547 }
4548
4549 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4551 self.chain_identifier
4552 }
4553
4554 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4555 self.fork_recovery_state.as_ref()
4556 }
4557
4558 #[instrument(level = "trace", skip_all)]
4559 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4560 where
4561 T: DeserializeOwned,
4562 {
4563 let o = self.get_object_read(object_id)?.into_object()?;
4564 if let Some(move_object) = o.data.try_as_move() {
4565 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4566 SuiErrorKind::ObjectDeserializationError {
4567 error: format!("{e}"),
4568 }
4569 })?)
4570 } else {
4571 Err(SuiErrorKind::ObjectDeserializationError {
4572 error: format!("Provided object : [{object_id}] is not a Move object."),
4573 }
4574 .into())
4575 }
4576 }
4577
4578 #[instrument(level = "trace", skip_all)]
4584 pub fn get_past_object_read(
4585 &self,
4586 object_id: &ObjectID,
4587 version: SequenceNumber,
4588 ) -> SuiResult<PastObjectRead> {
4589 let Some(obj_ref) = self
4591 .get_object_cache_reader()
4592 .get_latest_object_ref_or_tombstone(*object_id)
4593 else {
4594 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4595 };
4596
4597 if version > obj_ref.1 {
4598 return Ok(PastObjectRead::VersionTooHigh {
4599 object_id: *object_id,
4600 asked_version: version,
4601 latest_version: obj_ref.1,
4602 });
4603 }
4604
4605 if version < obj_ref.1 {
4606 return Ok(match self.read_object_at_version(object_id, version)? {
4608 Some((object, layout)) => {
4609 let obj_ref = object.compute_object_reference();
4610 PastObjectRead::VersionFound(obj_ref, object, layout)
4611 }
4612
4613 None => PastObjectRead::VersionNotFound(*object_id, version),
4614 });
4615 }
4616
4617 if !obj_ref.2.is_alive() {
4618 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4619 }
4620
4621 match self.read_object_at_version(object_id, obj_ref.1)? {
4622 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4623 None => {
4624 debug_fatal!(
4625 "Object with in parent_entry is missing from object store, datastore is \
4626 inconsistent"
4627 );
4628 Err(UserInputError::ObjectNotFound {
4629 object_id: *object_id,
4630 version: Some(obj_ref.1),
4631 }
4632 .into())
4633 }
4634 }
4635 }
4636
4637 #[instrument(level = "trace", skip_all)]
4638 fn read_object_at_version(
4639 &self,
4640 object_id: &ObjectID,
4641 version: SequenceNumber,
4642 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4643 let Some(object) = self
4644 .get_object_cache_reader()
4645 .get_object_by_key(object_id, version)
4646 else {
4647 return Ok(None);
4648 };
4649
4650 let layout = self.get_object_layout(&object)?;
4651 Ok(Some((object, layout)))
4652 }
4653
4654 fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4655 let layout = object
4656 .data
4657 .try_as_move()
4658 .map(|object| {
4659 into_struct_layout(
4660 self.load_epoch_store_one_call_per_task()
4661 .executor()
4662 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4664 .get_annotated_layout(&object.type_().clone().into())?,
4665 )
4666 })
4667 .transpose()?;
4668 Ok(layout)
4669 }
4670
4671 fn get_owner_at_version(
4672 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4673 object_id: &ObjectID,
4674 version: SequenceNumber,
4675 ) -> SuiResult<Owner> {
4676 object_store
4677 .get_object_by_key(object_id, version)
4678 .ok_or_else(|| {
4679 SuiError::from(UserInputError::ObjectNotFound {
4680 object_id: *object_id,
4681 version: Some(version),
4682 })
4683 })
4684 .map(|o| o.owner.clone())
4685 }
4686
4687 #[instrument(level = "trace", skip_all)]
4688 pub fn get_owner_objects(
4689 &self,
4690 owner: SuiAddress,
4691 cursor: Option<ObjectID>,
4693 limit: usize,
4694 filter: Option<SuiObjectDataFilter>,
4695 ) -> SuiResult<Vec<ObjectInfo>> {
4696 if let Some(indexes) = &self.indexes {
4697 indexes.get_owner_objects(owner, cursor, limit, filter)
4698 } else {
4699 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4700 }
4701 }
4702
4703 #[instrument(level = "trace", skip_all)]
4704 pub fn get_owned_coins_iterator_with_cursor(
4705 &self,
4706 owner: SuiAddress,
4707 cursor: (String, u64, ObjectID),
4709 limit: usize,
4710 one_coin_type_only: bool,
4711 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4712 if let Some(indexes) = &self.indexes {
4713 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4714 } else {
4715 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4716 }
4717 }
4718
4719 #[instrument(level = "trace", skip_all)]
4720 pub fn get_owner_objects_iterator(
4721 &self,
4722 owner: SuiAddress,
4723 cursor: Option<ObjectID>,
4725 filter: Option<SuiObjectDataFilter>,
4726 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4727 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4728 if let Some(indexes) = &self.indexes {
4729 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4730 } else {
4731 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4732 }
4733 }
4734
4735 #[instrument(level = "trace", skip_all)]
4736 pub async fn get_move_objects<T>(
4737 &self,
4738 owner: SuiAddress,
4739 type_: MoveObjectType,
4740 ) -> SuiResult<Vec<T>>
4741 where
4742 T: DeserializeOwned,
4743 {
4744 let object_ids = self
4745 .get_owner_objects_iterator(owner, None, None)?
4746 .filter(|o| match &o.type_ {
4747 ObjectType::Struct(s) => &type_ == s,
4748 ObjectType::Package => false,
4749 })
4750 .map(|info| ObjectKey(info.object_id, info.version))
4751 .collect::<Vec<_>>();
4752 let mut move_objects = vec![];
4753
4754 let objects = self
4755 .get_object_store()
4756 .multi_get_objects_by_key(&object_ids);
4757
4758 for (o, id) in objects.into_iter().zip(object_ids) {
4759 let object = o.ok_or_else(|| {
4760 SuiError::from(UserInputError::ObjectNotFound {
4761 object_id: id.0,
4762 version: Some(id.1),
4763 })
4764 })?;
4765 let move_object = object.data.try_as_move().ok_or_else(|| {
4766 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4767 })?;
4768 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4769 SuiErrorKind::ObjectDeserializationError {
4770 error: format!("{e}"),
4771 }
4772 })?);
4773 }
4774 Ok(move_objects)
4775 }
4776
4777 #[instrument(level = "trace", skip_all)]
4778 pub fn get_dynamic_fields(
4779 &self,
4780 owner: ObjectID,
4781 cursor: Option<ObjectID>,
4783 limit: usize,
4784 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4785 Ok(self
4786 .get_dynamic_fields_iterator(owner, cursor)?
4787 .take(limit)
4788 .collect::<Result<Vec<_>, _>>()?)
4789 }
4790
4791 fn get_dynamic_fields_iterator(
4792 &self,
4793 owner: ObjectID,
4794 cursor: Option<ObjectID>,
4796 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4797 {
4798 if let Some(indexes) = &self.indexes {
4799 indexes.get_dynamic_fields_iterator(owner, cursor)
4800 } else {
4801 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4802 }
4803 }
4804
4805 #[instrument(level = "trace", skip_all)]
4806 pub fn get_dynamic_field_object_id(
4807 &self,
4808 owner: ObjectID,
4809 name_type: TypeTag,
4810 name_bcs_bytes: &[u8],
4811 ) -> SuiResult<Option<ObjectID>> {
4812 if let Some(indexes) = &self.indexes {
4813 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4814 } else {
4815 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4816 }
4817 }
4818
4819 #[instrument(level = "trace", skip_all)]
4820 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4821 Ok(self.get_indexes()?.next_sequence_number())
4822 }
4823
4824 #[instrument(level = "trace", skip_all)]
4825 pub async fn get_executed_transaction_and_effects(
4826 &self,
4827 digest: TransactionDigest,
4828 kv_store: Arc<TransactionKeyValueStore>,
4829 ) -> SuiResult<(Transaction, TransactionEffects)> {
4830 let transaction = kv_store.get_tx(digest).await?;
4831 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4832 Ok((transaction, effects))
4833 }
4834
4835 #[instrument(level = "trace", skip_all)]
4836 pub fn multi_get_checkpoint_by_sequence_number(
4837 &self,
4838 sequence_numbers: &[CheckpointSequenceNumber],
4839 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4840 Ok(self
4841 .checkpoint_store
4842 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4843 }
4844
4845 #[instrument(level = "trace", skip_all)]
4846 pub fn get_transaction_events(
4847 &self,
4848 digest: &TransactionDigest,
4849 ) -> SuiResult<TransactionEvents> {
4850 self.get_transaction_cache_reader()
4851 .get_events(digest)
4852 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4853 }
4854
4855 pub fn get_transaction_input_objects(
4856 &self,
4857 effects: &TransactionEffects,
4858 ) -> SuiResult<Vec<Object>> {
4859 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4860 .map_err(Into::into)
4861 }
4862
4863 pub fn get_transaction_output_objects(
4864 &self,
4865 effects: &TransactionEffects,
4866 ) -> SuiResult<Vec<Object>> {
4867 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4868 .map_err(Into::into)
4869 }
4870
4871 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4872 match &self.indexes {
4873 Some(i) => Ok(i.clone()),
4874 None => Err(SuiErrorKind::UnsupportedFeatureError {
4875 error: "extended object indexing is not enabled on this server".into(),
4876 }
4877 .into()),
4878 }
4879 }
4880
4881 pub async fn get_transactions_for_tests(
4882 self: &Arc<Self>,
4883 filter: Option<TransactionFilter>,
4884 cursor: Option<TransactionDigest>,
4885 limit: Option<usize>,
4886 reverse: bool,
4887 ) -> SuiResult<Vec<TransactionDigest>> {
4888 let metrics = KeyValueStoreMetrics::new_for_tests();
4889 let kv_store = Arc::new(TransactionKeyValueStore::new(
4890 "rocksdb",
4891 metrics,
4892 self.clone(),
4893 ));
4894 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4895 .await
4896 }
4897
4898 #[instrument(level = "trace", skip_all)]
4899 pub async fn get_transactions(
4900 &self,
4901 kv_store: &Arc<TransactionKeyValueStore>,
4902 filter: Option<TransactionFilter>,
4903 cursor: Option<TransactionDigest>,
4905 limit: Option<usize>,
4906 reverse: bool,
4907 ) -> SuiResult<Vec<TransactionDigest>> {
4908 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4909 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4910 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4911 if reverse {
4912 let iter = iter
4913 .rev()
4914 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4915 .skip(usize::from(cursor.is_some()));
4916 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4917 } else {
4918 let iter = iter
4919 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4920 .skip(usize::from(cursor.is_some()));
4921 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4922 }
4923 }
4924 self.get_indexes()?
4925 .get_transactions(filter, cursor, limit, reverse)
4926 }
4927
4928 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4929 &self.checkpoint_store
4930 }
4931
4932 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4933 self.get_checkpoint_store()
4934 .get_highest_executed_checkpoint_seq_number()?
4935 .ok_or(
4936 SuiErrorKind::UserInputError {
4937 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4938 }
4939 .into(),
4940 )
4941 }
4942
4943 #[cfg(msim)]
4944 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4945 self.database_for_testing()
4946 .perpetual_tables
4947 .get_highest_pruned_checkpoint()
4948 .map(|c| c.unwrap_or(0))
4949 .map_err(Into::into)
4950 }
4951
4952 #[instrument(level = "trace", skip_all)]
4953 pub fn get_checkpoint_summary_by_sequence_number(
4954 &self,
4955 sequence_number: CheckpointSequenceNumber,
4956 ) -> SuiResult<CheckpointSummary> {
4957 let verified_checkpoint = self
4958 .get_checkpoint_store()
4959 .get_checkpoint_by_sequence_number(sequence_number)?;
4960 match verified_checkpoint {
4961 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4962 None => Err(SuiErrorKind::UserInputError {
4963 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4964 }
4965 .into()),
4966 }
4967 }
4968
4969 #[instrument(level = "trace", skip_all)]
4970 pub fn get_checkpoint_summary_by_digest(
4971 &self,
4972 digest: CheckpointDigest,
4973 ) -> SuiResult<CheckpointSummary> {
4974 let verified_checkpoint = self
4975 .get_checkpoint_store()
4976 .get_checkpoint_by_digest(&digest)?;
4977 match verified_checkpoint {
4978 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4979 None => Err(SuiErrorKind::UserInputError {
4980 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4981 }
4982 .into()),
4983 }
4984 }
4985
4986 #[instrument(level = "trace", skip_all)]
4987 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4988 if is_system_package(package_id) {
4989 return self.find_genesis_txn_digest();
4990 }
4991 Ok(self
4992 .get_object_read(&package_id)?
4993 .into_object()?
4994 .previous_transaction)
4995 }
4996
4997 #[instrument(level = "trace", skip_all)]
4998 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4999 let summary = self
5000 .get_verified_checkpoint_by_sequence_number(0)?
5001 .into_message();
5002 let content = self.get_checkpoint_contents(summary.content_digest)?;
5003 let genesis_transaction = content.enumerate_transactions(&summary).next();
5004 Ok(genesis_transaction
5005 .ok_or(SuiErrorKind::UserInputError {
5006 error: UserInputError::GenesisTransactionNotFound,
5007 })?
5008 .1
5009 .transaction)
5010 }
5011
5012 #[instrument(level = "trace", skip_all)]
5013 pub fn get_verified_checkpoint_by_sequence_number(
5014 &self,
5015 sequence_number: CheckpointSequenceNumber,
5016 ) -> SuiResult<VerifiedCheckpoint> {
5017 let verified_checkpoint = self
5018 .get_checkpoint_store()
5019 .get_checkpoint_by_sequence_number(sequence_number)?;
5020 match verified_checkpoint {
5021 Some(verified_checkpoint) => Ok(verified_checkpoint),
5022 None => Err(SuiErrorKind::UserInputError {
5023 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5024 }
5025 .into()),
5026 }
5027 }
5028
5029 #[instrument(level = "trace", skip_all)]
5030 pub fn get_verified_checkpoint_summary_by_digest(
5031 &self,
5032 digest: CheckpointDigest,
5033 ) -> SuiResult<VerifiedCheckpoint> {
5034 let verified_checkpoint = self
5035 .get_checkpoint_store()
5036 .get_checkpoint_by_digest(&digest)?;
5037 match verified_checkpoint {
5038 Some(verified_checkpoint) => Ok(verified_checkpoint),
5039 None => Err(SuiErrorKind::UserInputError {
5040 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5041 }
5042 .into()),
5043 }
5044 }
5045
5046 #[instrument(level = "trace", skip_all)]
5047 pub fn get_checkpoint_contents(
5048 &self,
5049 digest: CheckpointContentsDigest,
5050 ) -> SuiResult<CheckpointContents> {
5051 self.get_checkpoint_store()
5052 .get_checkpoint_contents(&digest)?
5053 .ok_or(
5054 SuiErrorKind::UserInputError {
5055 error: UserInputError::CheckpointContentsNotFound(digest),
5056 }
5057 .into(),
5058 )
5059 }
5060
5061 #[instrument(level = "trace", skip_all)]
5062 pub fn get_checkpoint_contents_by_sequence_number(
5063 &self,
5064 sequence_number: CheckpointSequenceNumber,
5065 ) -> SuiResult<CheckpointContents> {
5066 let verified_checkpoint = self
5067 .get_checkpoint_store()
5068 .get_checkpoint_by_sequence_number(sequence_number)?;
5069 match verified_checkpoint {
5070 Some(verified_checkpoint) => {
5071 let content_digest = verified_checkpoint.into_inner().content_digest;
5072 self.get_checkpoint_contents(content_digest)
5073 }
5074 None => Err(SuiErrorKind::UserInputError {
5075 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5076 }
5077 .into()),
5078 }
5079 }
5080
5081 #[instrument(level = "trace", skip_all)]
5082 pub async fn query_events(
5083 &self,
5084 kv_store: &Arc<TransactionKeyValueStore>,
5085 query: EventFilter,
5086 cursor: Option<EventID>,
5088 limit: usize,
5089 descending: bool,
5090 ) -> SuiResult<Vec<SuiEvent>> {
5091 let index_store = self.get_indexes()?;
5092
5093 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5095 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5096 SuiErrorKind::TransactionNotFound {
5097 digest: cursor.tx_digest,
5098 },
5099 )?;
5100 (tx_seq, cursor.event_seq as usize)
5101 } else if descending {
5102 (u64::MAX, usize::MAX)
5103 } else {
5104 (0, 0)
5105 };
5106
5107 let limit = limit + 1;
5108 let mut event_keys = match query {
5109 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5110 EventFilter::Transaction(digest) => {
5111 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5112 }
5113 EventFilter::MoveModule { package, module } => {
5114 let module_id = ModuleId::new(package.into(), module);
5115 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5116 }
5117 EventFilter::MoveEventType(struct_name) => index_store
5118 .events_by_move_event_struct_name(
5119 &struct_name,
5120 tx_num,
5121 event_num,
5122 limit,
5123 descending,
5124 )?,
5125 EventFilter::Sender(sender) => {
5126 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5127 }
5128 EventFilter::TimeRange {
5129 start_time,
5130 end_time,
5131 } => index_store
5132 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5133 EventFilter::MoveEventModule { package, module } => index_store
5134 .events_by_move_event_module(
5135 &ModuleId::new(package.into(), module),
5136 tx_num,
5137 event_num,
5138 limit,
5139 descending,
5140 )?,
5141 EventFilter::Any(_) => {
5143 return Err(SuiErrorKind::UserInputError {
5144 error: UserInputError::Unsupported(
5145 "'Any' queries are not supported by the fullnode.".to_string(),
5146 ),
5147 }
5148 .into());
5149 }
5150 };
5151
5152 if cursor.is_some() {
5155 if !event_keys.is_empty() {
5156 event_keys.remove(0);
5157 }
5158 } else {
5159 event_keys.truncate(limit - 1);
5160 }
5161
5162 let transaction_digests = event_keys
5164 .iter()
5165 .map(|(_, digest, _, _)| *digest)
5166 .collect::<HashSet<_>>()
5167 .into_iter()
5168 .collect::<Vec<_>>();
5169
5170 let events = kv_store
5171 .multi_get_events_by_tx_digests(&transaction_digests)
5172 .await?;
5173
5174 let events_map: HashMap<_, _> =
5175 transaction_digests.iter().zip(events.into_iter()).collect();
5176
5177 let stored_events = event_keys
5178 .into_iter()
5179 .map(|k| {
5180 (
5181 k,
5182 events_map
5183 .get(&k.1)
5184 .expect("fetched digest is missing")
5185 .clone()
5186 .and_then(|e| e.data.get(k.2).cloned()),
5187 )
5188 })
5189 .map(
5190 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5191 event
5192 .map(|e| (e, tx_digest, event_seq, timestamp))
5193 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5194 },
5195 )
5196 .collect::<Result<Vec<_>, _>>()?;
5197
5198 let epoch_store = self.load_epoch_store_one_call_per_task();
5199 let backing_store = self.get_backing_package_store().as_ref();
5200 let mut layout_resolver = epoch_store
5201 .executor()
5202 .type_layout_resolver(Box::new(backing_store));
5203 let mut events = vec![];
5204 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5205 events.push(SuiEvent::try_from(
5206 e.clone(),
5207 tx_digest,
5208 event_seq as u64,
5209 Some(timestamp),
5210 layout_resolver.get_annotated_layout(&e.type_)?,
5211 )?)
5212 }
5213 Ok(events)
5214 }
5215
5216 pub async fn insert_genesis_object(&self, object: Object) {
5217 self.get_reconfig_api().insert_genesis_object(object);
5218 }
5219
5220 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5221 futures::future::join_all(
5222 objects
5223 .iter()
5224 .map(|o| self.insert_genesis_object(o.clone())),
5225 )
5226 .await;
5227 }
5228
5229 #[instrument(level = "trace", skip_all)]
5231 pub fn get_transaction_status(
5232 &self,
5233 transaction_digest: &TransactionDigest,
5234 epoch_store: &Arc<AuthorityPerEpochStore>,
5235 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5236 if let Some(effects) =
5238 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5239 {
5240 if let Some(transaction) = self
5241 .get_transaction_cache_reader()
5242 .get_transaction_block(transaction_digest)
5243 {
5244 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5245 let events = if effects.events_digest().is_some() {
5246 self.get_transaction_events(effects.transaction_digest())?
5247 } else {
5248 TransactionEvents::default()
5249 };
5250 return Ok(Some((
5251 (*transaction).clone().into_message(),
5252 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5253 )));
5254 } else {
5255 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5259 }
5260 }
5261 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5262 self.metrics.tx_already_processed.inc();
5263 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5264 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5265 } else {
5266 Ok(None)
5267 }
5268 }
5269
5270 #[instrument(level = "trace", skip_all)]
5274 pub fn get_signed_effects_and_maybe_resign(
5275 &self,
5276 transaction_digest: &TransactionDigest,
5277 epoch_store: &Arc<AuthorityPerEpochStore>,
5278 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5279 let effects = self
5280 .get_transaction_cache_reader()
5281 .get_executed_effects(transaction_digest);
5282 match effects {
5283 Some(effects) => {
5284 if effects.executed_epoch() != epoch_store.epoch() {
5304 debug!(
5305 tx_digest=?transaction_digest,
5306 effects_epoch=?effects.executed_epoch(),
5307 epoch=?epoch_store.epoch(),
5308 "Re-signing the effects with the current epoch"
5309 );
5310 }
5311 Ok(Some(self.sign_effects(effects, epoch_store)?))
5312 }
5313 None => Ok(None),
5314 }
5315 }
5316
5317 #[instrument(level = "trace", skip_all)]
5318 pub(crate) fn sign_effects(
5319 &self,
5320 effects: TransactionEffects,
5321 epoch_store: &Arc<AuthorityPerEpochStore>,
5322 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5323 let tx_digest = *effects.transaction_digest();
5324 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5325 Some(sig) => {
5326 debug_assert!(sig.epoch == epoch_store.epoch());
5327 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5328 }
5329 _ => {
5330 let sig = AuthoritySignInfo::new(
5331 epoch_store.epoch(),
5332 &effects,
5333 Intent::sui_app(IntentScope::TransactionEffects),
5334 self.name,
5335 &*self.secret,
5336 );
5337
5338 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5339
5340 epoch_store.insert_effects_digest_and_signature(
5341 &tx_digest,
5342 effects.digest(),
5343 &sig,
5344 )?;
5345
5346 effects
5347 }
5348 };
5349
5350 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5351 signed_effects,
5352 ))
5353 }
5354
5355 #[instrument(level = "trace", skip_all)]
5357 fn fullnode_only_get_tx_coins_for_indexing(
5358 name: AuthorityName,
5359 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5360 effects: &TransactionEffects,
5361 inner_temporary_store: &InnerTemporaryStore,
5362 epoch_store: &Arc<AuthorityPerEpochStore>,
5363 ) -> Option<TxCoins> {
5364 if epoch_store.committee().authority_exists(&name) {
5365 return None;
5366 }
5367 let written_coin_objects = inner_temporary_store
5368 .written
5369 .iter()
5370 .filter_map(|(k, v)| {
5371 if v.is_coin() {
5372 Some((*k, v.clone()))
5373 } else {
5374 None
5375 }
5376 })
5377 .collect();
5378 let mut input_coin_objects = inner_temporary_store
5379 .input_objects
5380 .iter()
5381 .filter_map(|(k, v)| {
5382 if v.is_coin() {
5383 Some((*k, v.clone()))
5384 } else {
5385 None
5386 }
5387 })
5388 .collect::<ObjectMap>();
5389
5390 for (object_id, version) in effects.modified_at_versions() {
5394 if inner_temporary_store
5395 .loaded_runtime_objects
5396 .contains_key(&object_id)
5397 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5398 && object.is_coin()
5399 {
5400 input_coin_objects.insert(object_id, object);
5401 }
5402 }
5403
5404 Some((input_coin_objects, written_coin_objects))
5405 }
5406
5407 #[instrument(level = "trace", skip_all)]
5415 pub async fn get_transaction_lock(
5416 &self,
5417 object_ref: &ObjectRef,
5418 epoch_store: &AuthorityPerEpochStore,
5419 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5420 let lock_info = self
5421 .get_object_cache_reader()
5422 .get_lock(*object_ref, epoch_store)?;
5423 let lock_info = match lock_info {
5424 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5425 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5426 provided_obj_ref: *object_ref,
5427 current_version: locked_ref.1,
5428 }
5429 .into());
5430 }
5431 ObjectLockStatus::Initialized => {
5432 return Ok(None);
5433 }
5434 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5435 };
5436
5437 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5438 }
5439
5440 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5441 self.get_object_cache_reader().get_objects(objects)
5442 }
5443
5444 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5445 self.get_object_cache_reader()
5446 .get_latest_object_ref_or_tombstone(object_id)
5447 }
5448
5449 pub fn set_override_protocol_upgrade_buffer_stake(
5458 &self,
5459 expected_epoch: EpochId,
5460 buffer_stake_bps: u64,
5461 ) -> SuiResult {
5462 let epoch_store = self.load_epoch_store_one_call_per_task();
5463 let actual_epoch = epoch_store.epoch();
5464 if actual_epoch != expected_epoch {
5465 return Err(SuiErrorKind::WrongEpoch {
5466 expected_epoch,
5467 actual_epoch,
5468 }
5469 .into());
5470 }
5471
5472 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5473 }
5474
5475 pub fn clear_override_protocol_upgrade_buffer_stake(
5476 &self,
5477 expected_epoch: EpochId,
5478 ) -> SuiResult {
5479 let epoch_store = self.load_epoch_store_one_call_per_task();
5480 let actual_epoch = epoch_store.epoch();
5481 if actual_epoch != expected_epoch {
5482 return Err(SuiErrorKind::WrongEpoch {
5483 expected_epoch,
5484 actual_epoch,
5485 }
5486 .into());
5487 }
5488
5489 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5490 }
5491
5492 pub async fn get_available_system_packages(
5495 &self,
5496 binary_config: &BinaryConfig,
5497 ) -> Vec<ObjectRef> {
5498 let mut results = vec![];
5499
5500 let system_packages = BuiltInFramework::iter_system_packages();
5501
5502 #[cfg(msim)]
5504 let extra_packages = framework_injection::get_extra_packages(self.name);
5505 #[cfg(msim)]
5506 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5507
5508 for system_package in system_packages {
5509 let modules = system_package.modules().to_vec();
5510 #[cfg(msim)]
5512 let modules =
5513 match framework_injection::get_override_modules(&system_package.id, self.name) {
5514 Some(overrides) if overrides.is_empty() => continue,
5515 Some(overrides) => overrides,
5516 None => modules,
5517 };
5518
5519 let Some(obj_ref) = sui_framework::compare_system_package(
5520 &self.get_object_store(),
5521 &system_package.id,
5522 &modules,
5523 system_package.dependencies.to_vec(),
5524 binary_config,
5525 )
5526 .await
5527 else {
5528 return vec![];
5529 };
5530 results.push(obj_ref);
5531 }
5532
5533 results
5534 }
5535
5536 async fn get_system_package_bytes(
5550 &self,
5551 system_packages: Vec<ObjectRef>,
5552 binary_config: &BinaryConfig,
5553 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5554 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5555 let objects = self.get_objects(&ids).await;
5556
5557 let mut res = Vec::with_capacity(system_packages.len());
5558 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5559 let prev_transaction = match object {
5560 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5561 info!("Framework {} does not need updating", system_package_ref.0);
5563 continue;
5564 }
5565
5566 Some(cur_object) => cur_object.previous_transaction,
5567 None => TransactionDigest::genesis_marker(),
5568 };
5569
5570 #[cfg(msim)]
5571 let SystemPackage {
5572 id: _,
5573 bytes,
5574 dependencies,
5575 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5576 .unwrap_or_else(|| {
5577 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5578 });
5579
5580 #[cfg(not(msim))]
5581 let SystemPackage {
5582 id: _,
5583 bytes,
5584 dependencies,
5585 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5586
5587 let modules: Vec<_> = bytes
5588 .iter()
5589 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5590 .collect();
5591
5592 let new_object = Object::new_system_package(
5593 &modules,
5594 system_package_ref.1,
5595 dependencies.clone(),
5596 prev_transaction,
5597 );
5598
5599 let new_ref = new_object.compute_object_reference();
5600 if new_ref != system_package_ref {
5601 if cfg!(msim) {
5602 debug_fatal!(
5604 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5605 );
5606 } else {
5607 error!(
5608 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5609 );
5610 }
5611 return None;
5612 }
5613
5614 res.push((system_package_ref.1, bytes, dependencies));
5615 }
5616
5617 Some(res)
5618 }
5619
5620 fn is_protocol_version_supported_v2(
5621 current_protocol_version: ProtocolVersion,
5622 proposed_protocol_version: ProtocolVersion,
5623 protocol_config: &ProtocolConfig,
5624 committee: &Committee,
5625 capabilities: Vec<AuthorityCapabilitiesV2>,
5626 mut buffer_stake_bps: u64,
5627 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5628 if proposed_protocol_version > current_protocol_version + 1
5629 && !protocol_config.advance_to_highest_supported_protocol_version()
5630 {
5631 return None;
5632 }
5633
5634 if buffer_stake_bps > 10000 {
5635 warn!("clamping buffer_stake_bps to 10000");
5636 buffer_stake_bps = 10000;
5637 }
5638
5639 let mut desired_upgrades: Vec<_> = capabilities
5642 .into_iter()
5643 .filter_map(|mut cap| {
5644 if cap.available_system_packages.is_empty() {
5646 return None;
5647 }
5648
5649 cap.available_system_packages.sort();
5650
5651 info!(
5652 "validator {:?} supports {:?} with system packages: {:?}",
5653 cap.authority.concise(),
5654 cap.supported_protocol_versions,
5655 cap.available_system_packages,
5656 );
5657
5658 cap.supported_protocol_versions
5662 .get_version_digest(proposed_protocol_version)
5663 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5664 })
5665 .collect();
5666
5667 desired_upgrades.sort();
5669 desired_upgrades
5670 .into_iter()
5671 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5672 .into_iter()
5673 .find_map(|((digest, packages), group)| {
5674 assert!(!packages.is_empty());
5676
5677 let mut stake_aggregator: StakeAggregator<(), true> =
5678 StakeAggregator::new(Arc::new(committee.clone()));
5679
5680 for (_, _, authority) in group {
5681 stake_aggregator.insert_generic(authority, ());
5682 }
5683
5684 let total_votes = stake_aggregator.total_votes();
5685 let quorum_threshold = committee.quorum_threshold();
5686 let f = committee.total_votes() - committee.quorum_threshold();
5687
5688 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5690 let effective_threshold = quorum_threshold + buffer_stake;
5691
5692 info!(
5693 protocol_config_digest = ?digest,
5694 ?total_votes,
5695 ?quorum_threshold,
5696 ?buffer_stake_bps,
5697 ?effective_threshold,
5698 ?proposed_protocol_version,
5699 ?packages,
5700 "support for upgrade"
5701 );
5702
5703 let has_support = total_votes >= effective_threshold;
5704 has_support.then_some((proposed_protocol_version, packages))
5705 })
5706 }
5707
5708 fn choose_protocol_version_and_system_packages_v2(
5709 current_protocol_version: ProtocolVersion,
5710 protocol_config: &ProtocolConfig,
5711 committee: &Committee,
5712 capabilities: Vec<AuthorityCapabilitiesV2>,
5713 buffer_stake_bps: u64,
5714 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5715 assert!(protocol_config.authority_capabilities_v2());
5716 let mut next_protocol_version = current_protocol_version;
5717 let mut system_packages = vec![];
5718
5719 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5720 current_protocol_version,
5721 next_protocol_version + 1,
5722 protocol_config,
5723 committee,
5724 capabilities.clone(),
5725 buffer_stake_bps,
5726 ) {
5727 next_protocol_version = version;
5728 system_packages = packages;
5729 }
5730
5731 (next_protocol_version, system_packages)
5732 }
5733
5734 #[instrument(level = "debug", skip_all)]
5735 fn create_authenticator_state_tx(
5736 &self,
5737 epoch_store: &Arc<AuthorityPerEpochStore>,
5738 ) -> Option<EndOfEpochTransactionKind> {
5739 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5740 info!("authenticator state transactions not enabled");
5741 return None;
5742 }
5743
5744 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5745 let tx = if authenticator_state_exists {
5746 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5747 let min_epoch =
5748 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5749 let authenticator_obj_initial_shared_version = epoch_store
5750 .epoch_start_config()
5751 .authenticator_obj_initial_shared_version()
5752 .expect("initial version must exist");
5753
5754 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5755 min_epoch,
5756 authenticator_obj_initial_shared_version,
5757 );
5758
5759 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5760
5761 tx
5762 } else {
5763 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5764 info!("Creating AuthenticatorStateCreate tx");
5765 tx
5766 };
5767 Some(tx)
5768 }
5769
5770 #[instrument(level = "debug", skip_all)]
5771 fn create_randomness_state_tx(
5772 &self,
5773 epoch_store: &Arc<AuthorityPerEpochStore>,
5774 ) -> Option<EndOfEpochTransactionKind> {
5775 if !epoch_store.protocol_config().random_beacon() {
5776 info!("randomness state transactions not enabled");
5777 return None;
5778 }
5779
5780 if epoch_store.randomness_state_exists() {
5781 return None;
5782 }
5783
5784 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5785 info!("Creating RandomnessStateCreate tx");
5786 Some(tx)
5787 }
5788
5789 #[instrument(level = "debug", skip_all)]
5790 fn create_accumulator_root_tx(
5791 &self,
5792 epoch_store: &Arc<AuthorityPerEpochStore>,
5793 ) -> Option<EndOfEpochTransactionKind> {
5794 if !epoch_store
5795 .protocol_config()
5796 .create_root_accumulator_object()
5797 {
5798 info!("accumulator root creation not enabled");
5799 return None;
5800 }
5801
5802 if epoch_store.accumulator_root_exists() {
5803 return None;
5804 }
5805
5806 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5807 info!("Creating AccumulatorRootCreate tx");
5808 Some(tx)
5809 }
5810
5811 #[instrument(level = "debug", skip_all)]
5812 fn create_write_accumulator_storage_cost_tx(
5813 &self,
5814 epoch_store: &Arc<AuthorityPerEpochStore>,
5815 ) -> Option<EndOfEpochTransactionKind> {
5816 if !epoch_store.accumulator_root_exists() {
5817 info!("accumulator root does not exist yet");
5818 return None;
5819 }
5820 if !epoch_store.protocol_config().enable_accumulators() {
5821 info!("accumulators not enabled");
5822 return None;
5823 }
5824
5825 let object_store = self.get_object_store();
5826 let object_count =
5827 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5828 Ok(Some(count)) => count,
5829 Ok(None) => return None,
5830 Err(e) => {
5831 fatal!("failed to read accumulator object count: {e}");
5832 }
5833 };
5834
5835 let storage_cost = object_count.saturating_mul(
5836 epoch_store
5837 .protocol_config()
5838 .accumulator_object_storage_cost(),
5839 );
5840
5841 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5842 info!(
5843 object_count,
5844 storage_cost, "Creating WriteAccumulatorStorageCost tx"
5845 );
5846 Some(tx)
5847 }
5848
5849 #[instrument(level = "debug", skip_all)]
5850 fn create_coin_registry_tx(
5851 &self,
5852 epoch_store: &Arc<AuthorityPerEpochStore>,
5853 ) -> Option<EndOfEpochTransactionKind> {
5854 if !epoch_store.protocol_config().enable_coin_registry() {
5855 info!("coin registry not enabled");
5856 return None;
5857 }
5858
5859 if epoch_store.coin_registry_exists() {
5860 return None;
5861 }
5862
5863 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5864 info!("Creating CoinRegistryCreate tx");
5865 Some(tx)
5866 }
5867
5868 #[instrument(level = "debug", skip_all)]
5869 fn create_display_registry_tx(
5870 &self,
5871 epoch_store: &Arc<AuthorityPerEpochStore>,
5872 ) -> Option<EndOfEpochTransactionKind> {
5873 if !epoch_store.protocol_config().enable_display_registry() {
5874 info!("display registry not enabled");
5875 return None;
5876 }
5877
5878 if epoch_store.display_registry_exists() {
5879 return None;
5880 }
5881
5882 let tx = EndOfEpochTransactionKind::new_display_registry_create();
5883 info!("Creating DisplayRegistryCreate tx");
5884 Some(tx)
5885 }
5886
5887 #[instrument(level = "debug", skip_all)]
5888 fn create_bridge_tx(
5889 &self,
5890 epoch_store: &Arc<AuthorityPerEpochStore>,
5891 ) -> Option<EndOfEpochTransactionKind> {
5892 if !epoch_store.protocol_config().enable_bridge() {
5893 info!("bridge not enabled");
5894 return None;
5895 }
5896 if epoch_store.bridge_exists() {
5897 return None;
5898 }
5899 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5900 info!("Creating Bridge Create tx");
5901 Some(tx)
5902 }
5903
5904 #[instrument(level = "debug", skip_all)]
5905 fn init_bridge_committee_tx(
5906 &self,
5907 epoch_store: &Arc<AuthorityPerEpochStore>,
5908 ) -> Option<EndOfEpochTransactionKind> {
5909 if !epoch_store.protocol_config().enable_bridge() {
5910 info!("bridge not enabled");
5911 return None;
5912 }
5913 if !epoch_store
5914 .protocol_config()
5915 .should_try_to_finalize_bridge_committee()
5916 {
5917 info!("should not try to finalize bridge committee yet");
5918 return None;
5919 }
5920 if !epoch_store.bridge_exists() {
5922 return None;
5923 }
5924
5925 if epoch_store.bridge_committee_initiated() {
5926 return None;
5927 }
5928
5929 let bridge_initial_shared_version = epoch_store
5930 .epoch_start_config()
5931 .bridge_obj_initial_shared_version()
5932 .expect("initial version must exist");
5933 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5934 info!("Init Bridge committee tx");
5935 Some(tx)
5936 }
5937
5938 #[instrument(level = "debug", skip_all)]
5939 fn create_deny_list_state_tx(
5940 &self,
5941 epoch_store: &Arc<AuthorityPerEpochStore>,
5942 ) -> Option<EndOfEpochTransactionKind> {
5943 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5944 return None;
5945 }
5946
5947 if epoch_store.coin_deny_list_state_exists() {
5948 return None;
5949 }
5950
5951 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5952 info!("Creating DenyListStateCreate tx");
5953 Some(tx)
5954 }
5955
5956 #[instrument(level = "debug", skip_all)]
5957 fn create_address_alias_state_tx(
5958 &self,
5959 epoch_store: &Arc<AuthorityPerEpochStore>,
5960 ) -> Option<EndOfEpochTransactionKind> {
5961 if !epoch_store.protocol_config().address_aliases() {
5962 info!("address aliases not enabled");
5963 return None;
5964 }
5965
5966 if epoch_store.address_alias_state_exists() {
5967 return None;
5968 }
5969
5970 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5971 info!("Creating AddressAliasStateCreate tx");
5972 Some(tx)
5973 }
5974
5975 #[instrument(level = "debug", skip_all)]
5976 fn create_execution_time_observations_tx(
5977 &self,
5978 epoch_store: &Arc<AuthorityPerEpochStore>,
5979 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5980 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5981 ) -> Option<EndOfEpochTransactionKind> {
5982 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5983 .protocol_config()
5984 .per_object_congestion_control_mode()
5985 else {
5986 return None;
5987 };
5988
5989 let start_checkpoint = std::cmp::max(
5992 last_checkpoint_before_end_of_epoch
5993 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5994 epoch_store
5996 .epoch()
5997 .checked_sub(1)
5998 .map(|prev_epoch| {
5999 self.checkpoint_store
6000 .get_epoch_last_checkpoint_seq_number(prev_epoch)
6001 .expect("typed store must not fail")
6002 .expect(
6003 "sequence number of last checkpoint of preceding epoch must be saved",
6004 )
6005 + 1
6006 })
6007 .unwrap_or(1),
6008 );
6009 info!(
6010 "reading checkpoint range {:?}..={:?}",
6011 start_checkpoint, last_checkpoint_before_end_of_epoch
6012 );
6013 let sequence_numbers =
6014 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6015 let contents_digests: Vec<_> = self
6016 .checkpoint_store
6017 .multi_get_locally_computed_checkpoints(&sequence_numbers)
6018 .expect("typed store must not fail")
6019 .into_iter()
6020 .zip(sequence_numbers)
6021 .map(|(maybe_checkpoint, sequence_number)| {
6022 if let Some(checkpoint) = maybe_checkpoint {
6023 checkpoint.content_digest
6024 } else {
6025 self.checkpoint_store
6028 .get_checkpoint_by_sequence_number(sequence_number)
6029 .expect("typed store must not fail")
6030 .unwrap_or_else(|| {
6031 fatal!("preceding checkpoints must exist by end of epoch")
6032 })
6033 .data()
6034 .content_digest
6035 }
6036 })
6037 .collect();
6038 let tx_digests: Vec<_> = self
6039 .checkpoint_store
6040 .multi_get_checkpoint_content(&contents_digests)
6041 .expect("typed store must not fail")
6042 .into_iter()
6043 .flat_map(|maybe_contents| {
6044 maybe_contents
6045 .expect("preceding checkpoint contents must exist by end of epoch")
6046 .into_inner()
6047 .into_iter()
6048 .map(|digests| digests.transaction)
6049 })
6050 .collect();
6051 let included_execution_time_observations: HashSet<_> = self
6052 .get_transaction_cache_reader()
6053 .multi_get_transaction_blocks(&tx_digests)
6054 .into_iter()
6055 .flat_map(|maybe_tx| {
6056 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6057 .expect("preceding transaction must exist by end of epoch")
6058 .transaction_data()
6059 .kind()
6060 {
6061 #[allow(clippy::unnecessary_to_owned)]
6062 itertools::Either::Left(
6063 ptb.commands
6064 .to_owned()
6065 .into_iter()
6066 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6067 )
6068 } else {
6069 itertools::Either::Right(std::iter::empty())
6070 }
6071 })
6072 .chain(end_of_epoch_observation_keys.into_iter())
6073 .collect();
6074
6075 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6076 epoch_store
6077 .get_end_of_epoch_execution_time_observations()
6078 .filter_and_sort_v1(
6079 |(key, _)| included_execution_time_observations.contains(key),
6080 params.stored_observations_limit.try_into().unwrap(),
6081 ),
6082 );
6083 info!("Creating StoreExecutionTimeObservations tx");
6084 Some(tx)
6085 }
6086
6087 #[instrument(level = "error", skip_all)]
6098 pub async fn create_and_execute_advance_epoch_tx(
6099 &self,
6100 epoch_store: &Arc<AuthorityPerEpochStore>,
6101 gas_cost_summary: &GasCostSummary,
6102 checkpoint: CheckpointSequenceNumber,
6103 epoch_start_timestamp_ms: CheckpointTimestamp,
6104 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6105 last_checkpoint: CheckpointSequenceNumber,
6108 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6109 let mut txns = Vec::new();
6110
6111 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6112 txns.push(tx);
6113 }
6114 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6115 txns.push(tx);
6116 }
6117 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6118 txns.push(tx);
6119 }
6120 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6121 txns.push(tx);
6122 }
6123 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6124 txns.push(tx);
6125 }
6126 if let Some(tx) = self.create_execution_time_observations_tx(
6127 epoch_store,
6128 end_of_epoch_observation_keys,
6129 last_checkpoint,
6130 ) {
6131 txns.push(tx);
6132 }
6133 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6134 txns.push(tx);
6135 }
6136
6137 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6138 txns.push(tx);
6139 }
6140 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6141 txns.push(tx);
6142 }
6143 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6144 txns.push(tx);
6145 }
6146 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6147 txns.push(tx);
6148 }
6149
6150 let next_epoch = epoch_store.epoch() + 1;
6151
6152 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6153
6154 let (next_epoch_protocol_version, next_epoch_system_packages) =
6155 Self::choose_protocol_version_and_system_packages_v2(
6156 epoch_store.protocol_version(),
6157 epoch_store.protocol_config(),
6158 epoch_store.committee(),
6159 epoch_store
6160 .get_capabilities_v2()
6161 .expect("read capabilities from db cannot fail"),
6162 buffer_stake_bps,
6163 );
6164
6165 let config = epoch_store.protocol_config();
6168 let binary_config = config.binary_config(None);
6169 let Some(next_epoch_system_package_bytes) = self
6170 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6171 .await
6172 else {
6173 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6174 debug_fatal!(
6178 "upgraded system packages {:?} are not locally available, cannot create \
6179 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6180 next_epoch_system_packages
6181 );
6182 } else {
6183 error!(
6184 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6185 next_epoch_protocol_version
6186 );
6187 }
6188 return Err(CheckpointBuilderError::SystemPackagesMissing);
6197 };
6198
6199 let tx = if epoch_store
6200 .protocol_config()
6201 .end_of_epoch_transaction_supported()
6202 {
6203 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6204 next_epoch,
6205 next_epoch_protocol_version,
6206 gas_cost_summary.storage_cost,
6207 gas_cost_summary.computation_cost,
6208 gas_cost_summary.storage_rebate,
6209 gas_cost_summary.non_refundable_storage_fee,
6210 epoch_start_timestamp_ms,
6211 next_epoch_system_package_bytes,
6212 ));
6213
6214 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6215 } else {
6216 VerifiedTransaction::new_change_epoch(
6217 next_epoch,
6218 next_epoch_protocol_version,
6219 gas_cost_summary.storage_cost,
6220 gas_cost_summary.computation_cost,
6221 gas_cost_summary.storage_rebate,
6222 gas_cost_summary.non_refundable_storage_fee,
6223 epoch_start_timestamp_ms,
6224 next_epoch_system_package_bytes,
6225 )
6226 };
6227
6228 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6229 tx.clone(),
6230 epoch_store.epoch(),
6231 checkpoint,
6232 );
6233
6234 let tx_digest = executable_tx.digest();
6235
6236 info!(
6237 ?next_epoch,
6238 ?next_epoch_protocol_version,
6239 ?next_epoch_system_packages,
6240 computation_cost=?gas_cost_summary.computation_cost,
6241 storage_cost=?gas_cost_summary.storage_cost,
6242 storage_rebate=?gas_cost_summary.storage_rebate,
6243 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6244 ?tx_digest,
6245 "Creating advance epoch transaction"
6246 );
6247
6248 fail_point_async!("change_epoch_tx_delay");
6249 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6250
6251 if self
6254 .get_transaction_cache_reader()
6255 .is_tx_already_executed(tx_digest)
6256 {
6257 warn!("change epoch tx has already been executed via state sync");
6258 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6259 }
6260
6261 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6262 else {
6263 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6264 };
6265
6266 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6269 self.get_object_cache_reader().as_ref(),
6270 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6271 )?;
6272
6273 assert_eq!(assigned_versions.0.len(), 1);
6274 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6275
6276 let input_objects = self.read_objects_for_execution(
6277 &tx_lock,
6278 &executable_tx,
6279 &assigned_versions,
6280 epoch_store,
6281 )?;
6282
6283 let (transaction_outputs, _timings, _execution_error_opt) = self
6284 .execute_certificate(
6285 &execution_guard,
6286 &executable_tx,
6287 input_objects,
6288 None,
6289 ExecutionEnv::default(),
6290 epoch_store,
6291 )
6292 .unwrap();
6293 let system_obj = get_sui_system_state(&transaction_outputs.written)
6294 .expect("change epoch tx must write to system object");
6295
6296 let effects = transaction_outputs.effects;
6297 self.get_state_sync_store()
6301 .insert_transaction_and_effects(&tx, &effects);
6302
6303 info!(
6304 "Effects summary of the change epoch transaction: {:?}",
6305 effects.summary_for_debug()
6306 );
6307 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6308 assert!(effects.status().is_ok());
6310 Ok((system_obj, effects))
6311 }
6312
6313 #[instrument(level = "error", skip_all)]
6314 async fn reopen_epoch_db(
6315 &self,
6316 cur_epoch_store: &AuthorityPerEpochStore,
6317 new_committee: Committee,
6318 epoch_start_configuration: EpochStartConfiguration,
6319 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6320 epoch_last_checkpoint: CheckpointSequenceNumber,
6321 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6322 let new_epoch = new_committee.epoch;
6323 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6324 assert_eq!(
6325 epoch_start_configuration.epoch_start_state().epoch(),
6326 new_committee.epoch
6327 );
6328 fail_point!("before-open-new-epoch-store");
6329 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6330 self.name,
6331 new_committee,
6332 epoch_start_configuration,
6333 self.get_backing_package_store().clone(),
6334 self.get_object_store().clone(),
6335 expensive_safety_check_config,
6336 epoch_last_checkpoint,
6337 )?;
6338 self.epoch_store.store(new_epoch_store.clone());
6339 Ok(new_epoch_store)
6340 }
6341
6342 #[cfg(test)]
6343 pub(crate) fn iter_live_object_set_for_testing(
6344 &self,
6345 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6346 let include_wrapped_object = !self
6347 .epoch_store_for_testing()
6348 .protocol_config()
6349 .simplified_unwrap_then_delete();
6350 self.get_global_state_hash_store()
6351 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6352 }
6353
6354 #[cfg(test)]
6355 pub(crate) fn shutdown_execution_for_test(&self) {
6356 self.tx_execution_shutdown
6357 .lock()
6358 .take()
6359 .unwrap()
6360 .send(())
6361 .unwrap();
6362 }
6363
6364 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6366 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6367 self.get_object_cache_reader()
6368 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6369 self.get_reconfig_api()
6370 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6371 Ok(())
6372 }
6373}
6374
6375pub struct RandomnessRoundReceiver {
6376 authority_state: Arc<AuthorityState>,
6377 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6378}
6379
6380impl RandomnessRoundReceiver {
6381 pub fn spawn(
6382 authority_state: Arc<AuthorityState>,
6383 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6384 ) -> JoinHandle<()> {
6385 let rrr = RandomnessRoundReceiver {
6386 authority_state,
6387 randomness_rx,
6388 };
6389 spawn_monitored_task!(rrr.run())
6390 }
6391
6392 async fn run(mut self) {
6393 info!("RandomnessRoundReceiver event loop started");
6394
6395 loop {
6396 tokio::select! {
6397 maybe_recv = self.randomness_rx.recv() => {
6398 if let Some((epoch, round, bytes)) = maybe_recv {
6399 self.handle_new_randomness(epoch, round, bytes).await;
6400 } else {
6401 break;
6402 }
6403 },
6404 }
6405 }
6406
6407 info!("RandomnessRoundReceiver event loop ended");
6408 }
6409
6410 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6411 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6412 fail_point_async!("randomness-delay");
6413
6414 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6415 if epoch_store.epoch() != epoch {
6416 warn!(
6417 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6418 epoch_store.epoch()
6419 );
6420 return;
6421 }
6422 let key = TransactionKey::RandomnessRound(epoch, round);
6423 let transaction = VerifiedTransaction::new_randomness_state_update(
6424 epoch,
6425 round,
6426 bytes,
6427 epoch_store
6428 .epoch_start_config()
6429 .randomness_obj_initial_shared_version()
6430 .expect("randomness state obj must exist"),
6431 );
6432 debug!(
6433 "created randomness state update transaction with digest: {:?}",
6434 transaction.digest()
6435 );
6436 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6437 let digest = *transaction.digest();
6438
6439 self.authority_state
6444 .get_cache_commit()
6445 .persist_transaction(&transaction);
6446
6447 if epoch_store.insert_tx_key(key, digest).is_err() {
6449 warn!("epoch ended while handling new randomness");
6450 }
6451
6452 let authority_state = self.authority_state.clone();
6453 spawn_monitored_task!(async move {
6454 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6461 let result = tokio::time::timeout(
6462 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6463 authority_state
6464 .get_transaction_cache_reader()
6465 .notify_read_executed_effects(
6466 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6467 &[digest],
6468 ),
6469 )
6470 .await;
6471 let mut effects = match result {
6472 Ok(result) => result,
6473 Err(_) => {
6474 debug_fatal!(
6476 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6477 );
6478 authority_state
6480 .get_transaction_cache_reader()
6481 .notify_read_executed_effects(
6482 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6483 &[digest],
6484 )
6485 .await
6486 }
6487 };
6488
6489 let effects = effects.pop().expect("should return effects");
6490 if *effects.status() != ExecutionStatus::Success {
6491 fatal!(
6492 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6493 );
6494 }
6495 debug!(
6496 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6497 );
6498 });
6499 }
6500}
6501
6502#[async_trait]
6503impl TransactionKeyValueStoreTrait for AuthorityState {
6504 #[instrument(skip(self))]
6505 async fn multi_get(
6506 &self,
6507 transactions: &[TransactionDigest],
6508 effects: &[TransactionDigest],
6509 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6510 let txns = if !transactions.is_empty() {
6511 self.get_transaction_cache_reader()
6512 .multi_get_transaction_blocks(transactions)
6513 .into_iter()
6514 .map(|t| t.map(|t| (*t).clone().into_inner()))
6515 .collect()
6516 } else {
6517 vec![]
6518 };
6519
6520 let fx = if !effects.is_empty() {
6521 self.get_transaction_cache_reader()
6522 .multi_get_executed_effects(effects)
6523 } else {
6524 vec![]
6525 };
6526
6527 Ok((txns, fx))
6528 }
6529
6530 #[instrument(skip(self))]
6531 async fn multi_get_checkpoints(
6532 &self,
6533 checkpoint_summaries: &[CheckpointSequenceNumber],
6534 checkpoint_contents: &[CheckpointSequenceNumber],
6535 checkpoint_summaries_by_digest: &[CheckpointDigest],
6536 ) -> SuiResult<(
6537 Vec<Option<CertifiedCheckpointSummary>>,
6538 Vec<Option<CheckpointContents>>,
6539 Vec<Option<CertifiedCheckpointSummary>>,
6540 )> {
6541 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6543 let store = self.get_checkpoint_store();
6544 for seq in checkpoint_summaries {
6545 let checkpoint = store
6546 .get_checkpoint_by_sequence_number(*seq)?
6547 .map(|c| c.into_inner());
6548
6549 summaries.push(checkpoint);
6550 }
6551
6552 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6553 for seq in checkpoint_contents {
6554 let checkpoint = store
6555 .get_checkpoint_by_sequence_number(*seq)?
6556 .and_then(|summary| {
6557 store
6558 .get_checkpoint_contents(&summary.content_digest)
6559 .expect("db read cannot fail")
6560 });
6561 contents.push(checkpoint);
6562 }
6563
6564 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6565 for digest in checkpoint_summaries_by_digest {
6566 let checkpoint = store
6567 .get_checkpoint_by_digest(digest)?
6568 .map(|c| c.into_inner());
6569 summaries_by_digest.push(checkpoint);
6570 }
6571 Ok((summaries, contents, summaries_by_digest))
6572 }
6573
6574 #[instrument(skip(self))]
6575 async fn deprecated_get_transaction_checkpoint(
6576 &self,
6577 digest: TransactionDigest,
6578 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6579 Ok(self
6580 .get_checkpoint_cache()
6581 .deprecated_get_transaction_checkpoint(&digest)
6582 .map(|(_epoch, checkpoint)| checkpoint))
6583 }
6584
6585 #[instrument(skip(self))]
6586 async fn get_object(
6587 &self,
6588 object_id: ObjectID,
6589 version: VersionNumber,
6590 ) -> SuiResult<Option<Object>> {
6591 Ok(self
6592 .get_object_cache_reader()
6593 .get_object_by_key(&object_id, version))
6594 }
6595
6596 #[instrument(skip_all)]
6597 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6598 Ok(self
6599 .get_object_cache_reader()
6600 .multi_get_objects_by_key(object_keys))
6601 }
6602
6603 #[instrument(skip(self))]
6604 async fn multi_get_transaction_checkpoint(
6605 &self,
6606 digests: &[TransactionDigest],
6607 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6608 let res = self
6609 .get_checkpoint_cache()
6610 .deprecated_multi_get_transaction_checkpoint(digests);
6611
6612 Ok(res
6613 .into_iter()
6614 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6615 .collect())
6616 }
6617
6618 #[instrument(skip(self))]
6619 async fn multi_get_events_by_tx_digests(
6620 &self,
6621 digests: &[TransactionDigest],
6622 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6623 if digests.is_empty() {
6624 return Ok(vec![]);
6625 }
6626
6627 Ok(self
6628 .get_transaction_cache_reader()
6629 .multi_get_events(digests))
6630 }
6631}
6632
6633#[cfg(msim)]
6634pub mod framework_injection {
6635 use move_binary_format::CompiledModule;
6636 use std::collections::BTreeMap;
6637 use std::{cell::RefCell, collections::BTreeSet};
6638 use sui_framework::{BuiltInFramework, SystemPackage};
6639 use sui_types::base_types::{AuthorityName, ObjectID};
6640 use sui_types::is_system_package;
6641
6642 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6643
6644 thread_local! {
6646 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6647 }
6648
6649 type Framework = Vec<CompiledModule>;
6650
6651 pub type PackageUpgradeCallback =
6652 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6653
6654 enum PackageOverrideConfig {
6655 Global(Framework),
6656 PerValidator(PackageUpgradeCallback),
6657 }
6658
6659 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6660 modules
6661 .iter()
6662 .map(|m| {
6663 let mut buf = Vec::new();
6664 m.serialize_with_version(m.version, &mut buf).unwrap();
6665 buf
6666 })
6667 .collect()
6668 }
6669
6670 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6671 OVERRIDE.with(|bs| {
6672 bs.borrow_mut()
6673 .insert(package_id, PackageOverrideConfig::Global(modules))
6674 });
6675 }
6676
6677 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6678 OVERRIDE.with(|bs| {
6679 bs.borrow_mut()
6680 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6681 });
6682 }
6683
6684 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6685 OVERRIDE.with(|bs| {
6686 let mut new_packages_not_to_include: BTreeSet<_> =
6687 BuiltInFramework::all_package_ids().into_iter().collect();
6688 for pkg in &packages {
6689 new_packages_not_to_include.remove(&pkg.id);
6690 }
6691 for pkg in packages {
6692 bs.borrow_mut()
6693 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6694 }
6695 for empty_pkg in new_packages_not_to_include {
6696 bs.borrow_mut()
6697 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6698 }
6699 });
6700 }
6701
6702 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6703 OVERRIDE.with(|cfg| {
6704 cfg.borrow().get(package_id).and_then(|entry| match entry {
6705 PackageOverrideConfig::Global(framework) => {
6706 Some(compiled_modules_to_bytes(framework))
6707 }
6708 PackageOverrideConfig::PerValidator(func) => {
6709 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6710 }
6711 })
6712 })
6713 }
6714
6715 pub fn get_override_modules(
6716 package_id: &ObjectID,
6717 name: AuthorityName,
6718 ) -> Option<Vec<CompiledModule>> {
6719 OVERRIDE.with(|cfg| {
6720 cfg.borrow().get(package_id).and_then(|entry| match entry {
6721 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6722 PackageOverrideConfig::PerValidator(func) => func(name),
6723 })
6724 })
6725 }
6726
6727 pub fn get_override_system_package(
6728 package_id: &ObjectID,
6729 name: AuthorityName,
6730 ) -> Option<SystemPackage> {
6731 let bytes = get_override_bytes(package_id, name)?;
6732 let dependencies = if is_system_package(*package_id) {
6733 BuiltInFramework::get_package_by_id(package_id)
6734 .dependencies
6735 .to_vec()
6736 } else {
6737 BuiltInFramework::all_package_ids()
6739 };
6740 Some(SystemPackage {
6741 id: *package_id,
6742 bytes,
6743 dependencies,
6744 })
6745 }
6746
6747 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6748 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6749 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6750 cfg.borrow()
6751 .keys()
6752 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6753 .collect()
6754 });
6755
6756 extra
6757 .into_iter()
6758 .map(|package| SystemPackage {
6759 id: package,
6760 bytes: get_override_bytes(&package, name).unwrap(),
6761 dependencies: BuiltInFramework::all_package_ids(),
6762 })
6763 .collect()
6764 }
6765}
6766
6767#[derive(Debug, Serialize, Deserialize, Clone)]
6768pub struct ObjDumpFormat {
6769 pub id: ObjectID,
6770 pub version: VersionNumber,
6771 pub digest: ObjectDigest,
6772 pub object: Object,
6773}
6774
6775impl ObjDumpFormat {
6776 fn new(object: Object) -> Self {
6777 let oref = object.compute_object_reference();
6778 Self {
6779 id: oref.0,
6780 version: oref.1,
6781 digest: oref.2,
6782 object,
6783 }
6784 }
6785}
6786
6787#[derive(Debug, Serialize, Deserialize, Clone)]
6788pub struct NodeStateDump {
6789 pub tx_digest: TransactionDigest,
6790 pub sender_signed_data: SenderSignedData,
6791 pub executed_epoch: u64,
6792 pub reference_gas_price: u64,
6793 pub protocol_version: u64,
6794 pub epoch_start_timestamp_ms: u64,
6795 pub computed_effects: TransactionEffects,
6796 pub expected_effects_digest: TransactionEffectsDigest,
6797 pub relevant_system_packages: Vec<ObjDumpFormat>,
6798 pub shared_objects: Vec<ObjDumpFormat>,
6799 pub loaded_child_objects: Vec<ObjDumpFormat>,
6800 pub modified_at_versions: Vec<ObjDumpFormat>,
6801 pub runtime_reads: Vec<ObjDumpFormat>,
6802 pub input_objects: Vec<ObjDumpFormat>,
6803}
6804
6805impl NodeStateDump {
6806 pub fn new(
6807 tx_digest: &TransactionDigest,
6808 effects: &TransactionEffects,
6809 expected_effects_digest: TransactionEffectsDigest,
6810 object_store: &dyn ObjectStore,
6811 epoch_store: &Arc<AuthorityPerEpochStore>,
6812 inner_temporary_store: &InnerTemporaryStore,
6813 certificate: &VerifiedExecutableTransaction,
6814 ) -> SuiResult<Self> {
6815 let executed_epoch = epoch_store.epoch();
6817 let reference_gas_price = epoch_store.reference_gas_price();
6818 let epoch_start_config = epoch_store.epoch_start_config();
6819 let protocol_version = epoch_store.protocol_version().as_u64();
6820 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6821
6822 let mut relevant_system_packages = Vec::new();
6824 for sys_package_id in BuiltInFramework::all_package_ids() {
6825 if let Some(w) = object_store.get_object(&sys_package_id) {
6826 relevant_system_packages.push(ObjDumpFormat::new(w))
6827 }
6828 }
6829
6830 let mut shared_objects = Vec::new();
6832 for kind in effects.input_consensus_objects() {
6833 match kind {
6834 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6835 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6836 shared_objects.push(ObjDumpFormat::new(w))
6837 }
6838 }
6839 InputConsensusObject::ReadConsensusStreamEnded(..)
6840 | InputConsensusObject::MutateConsensusStreamEnded(..)
6841 | InputConsensusObject::Cancelled(..) => (), }
6843 }
6844
6845 let mut loaded_child_objects = Vec::new();
6848 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6849 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6850 loaded_child_objects.push(ObjDumpFormat::new(w))
6851 }
6852 }
6853
6854 let mut modified_at_versions = Vec::new();
6856 for (id, ver) in effects.modified_at_versions() {
6857 if let Some(w) = object_store.get_object_by_key(&id, ver) {
6858 modified_at_versions.push(ObjDumpFormat::new(w))
6859 }
6860 }
6861
6862 let mut runtime_reads = Vec::new();
6865 for obj in inner_temporary_store
6866 .runtime_packages_loaded_from_db
6867 .values()
6868 {
6869 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6870 }
6871
6872 Ok(Self {
6875 tx_digest: *tx_digest,
6876 executed_epoch,
6877 reference_gas_price,
6878 epoch_start_timestamp_ms,
6879 protocol_version,
6880 relevant_system_packages,
6881 shared_objects,
6882 loaded_child_objects,
6883 modified_at_versions,
6884 runtime_reads,
6885 sender_signed_data: certificate.clone().into_message(),
6886 input_objects: inner_temporary_store
6887 .input_objects
6888 .values()
6889 .map(|o| ObjDumpFormat::new(o.clone()))
6890 .collect(),
6891 computed_effects: effects.clone(),
6892 expected_effects_digest,
6893 })
6894 }
6895
6896 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6897 let mut objects = Vec::new();
6898 objects.extend(self.relevant_system_packages.clone());
6899 objects.extend(self.shared_objects.clone());
6900 objects.extend(self.loaded_child_objects.clone());
6901 objects.extend(self.modified_at_versions.clone());
6902 objects.extend(self.runtime_reads.clone());
6903 objects.extend(self.input_objects.clone());
6904 objects
6905 }
6906
6907 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6908 let file_name = format!(
6909 "{}_{}_NODE_DUMP.json",
6910 self.tx_digest,
6911 AuthorityState::unixtime_now_ms()
6912 );
6913 let mut path = path.to_path_buf();
6914 path.push(&file_name);
6915 let mut file = File::create(path.clone())?;
6916 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6917 Ok(path)
6918 }
6919
6920 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6921 let file = File::open(path)?;
6922 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6923 }
6924}