sui_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CheckpointExecutor is a Node component that executes all checkpoints for the
5//! given epoch. It acts as a Consumer to StateSync
6//! for newly synced checkpoints, taking these checkpoints and
7//! scheduling and monitoring their execution. Its primary goal is to allow
8//! for catching up to the current checkpoint sequence number of the network
9//! as quickly as possible so that a newly joined, or recovering Node can
10//! participate in a timely manner. To that end, CheckpointExecutor attempts
11//! to saturate the CPU with executor tasks (one per checkpoint), each of which
12//! handle scheduling and awaiting checkpoint transaction execution.
13//!
14//! CheckpointExecutor is made recoverable in the event of Node shutdown by way of a watermark,
15//! highest_executed_checkpoint, which is guaranteed to be updated sequentially in order,
16//! despite checkpoints themselves potentially being executed nonsequentially and in parallel.
17//! CheckpointExecutor parallelizes checkpoints of the same epoch as much as possible.
18//! CheckpointExecutor enforces the invariant that if `run` returns successfully, we have reached the
19//! end of epoch. This allows us to use it as a signal for reconfig.
20
21use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::base_types::SequenceNumber;
26use sui_types::crypto::RandomnessRound;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::transaction::{TransactionDataAPI, TransactionKind};
29use sui_types::{
30    SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31    node_role::{FullNodeSyncMode, NodeRole},
32};
33
34use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
35use sui_macros::fail_point;
36use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
37use sui_types::executable_transaction::VerifiedExecutableTransaction;
38use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
39use sui_types::full_checkpoint_content::Checkpoint;
40use sui_types::global_state_hash::GlobalStateHash;
41use sui_types::message_envelope::Message;
42use sui_types::{
43    base_types::{TransactionDigest, TransactionEffectsDigest},
44    messages_checkpoint::VerifiedCheckpoint,
45    transaction::VerifiedTransaction,
46};
47use tap::{TapFallible, TapOptional};
48use tracing::{debug, info, instrument};
49
50use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
51use crate::authority::backpressure::BackpressureManager;
52use crate::authority::{AuthorityState, ExecutionEnv};
53use crate::execution_scheduler::ExecutionScheduler;
54use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
55use crate::global_state_hasher::GlobalStateHasher;
56use crate::{
57    checkpoints::CheckpointStore,
58    execution_cache::{ObjectCacheRead, TransactionCacheRead},
59};
60
61mod data_ingestion_handler;
62pub mod metrics;
63pub(crate) mod utils;
64
65use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
66use metrics::CheckpointExecutorMetrics;
67use utils::*;
68
69const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
70
71#[derive(PartialEq, Eq, Debug)]
72pub enum StopReason {
73    EpochComplete,
74    RunWithRangeCondition,
75}
76
77pub(crate) struct CheckpointExecutionData {
78    pub checkpoint: VerifiedCheckpoint,
79    pub checkpoint_contents: CheckpointContents,
80    pub tx_digests: Vec<TransactionDigest>,
81    pub fx_digests: Vec<TransactionEffectsDigest>,
82}
83
84pub(crate) struct CheckpointTransactionData {
85    pub transactions: Vec<VerifiedExecutableTransaction>,
86    pub effects: Vec<TransactionEffects>,
87    pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
88    /// The accumulator versions for the transactions in the checkpoint.
89    /// None only if accumulator is not enabled (either all Some, or all None).
90    /// This information is needed for object balance withdraw processing.
91    /// The vector should be 1:1 with the transactions in the checkpoint.
92    pub accumulator_versions: Vec<Option<SequenceNumber>>,
93}
94
95impl CheckpointTransactionData {
96    pub fn new(
97        transactions: Vec<VerifiedExecutableTransaction>,
98        effects: Vec<TransactionEffects>,
99        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
100    ) -> Self {
101        assert_eq!(transactions.len(), effects.len());
102        assert_eq!(transactions.len(), executed_fx_digests.len());
103        let mut accumulator_versions = vec![None; transactions.len()];
104        let mut next_update_index = 0;
105        for (idx, efx) in effects.iter().enumerate() {
106            // Only barrier settlement transactions mutate the accumulator root object.
107            // This filtering detects whether this transaction is a barrier settlement transaction.
108            // And if so we get the old version of the accumulator root object.
109            // Transactions prior to the barrier settlement transaction reads this accumulator version.
110            let acc_version = efx.object_changes().into_iter().find_map(|change| {
111                if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
112                    change.input_version
113                } else {
114                    None
115                }
116            });
117            if let Some(acc_version) = acc_version {
118                // Set version for transactions between [next_update_index, idx] inclusive.
119                for slot in accumulator_versions
120                    .iter_mut()
121                    .take(idx + 1)
122                    .skip(next_update_index)
123                {
124                    *slot = Some(acc_version);
125                }
126                next_update_index = idx + 1;
127            }
128        }
129        // Either accumulator is not enabled, then next_update_index == 0;
130        // or the last transaction is the barrier settlement transaction, and next_update_index == transactions.len();
131        // or the last transaction is the end of epoch transaction, and next_update_index == transactions.len() - 1.
132        assert!(
133            next_update_index == 0
134                || next_update_index == transactions.len()
135                || (next_update_index == transactions.len() - 1
136                    && transactions
137                        .last()
138                        .unwrap()
139                        .transaction_data()
140                        .is_end_of_epoch_tx())
141        );
142        Self {
143            transactions,
144            effects,
145            executed_fx_digests,
146            accumulator_versions,
147        }
148    }
149}
150pub(crate) struct CheckpointExecutionState {
151    pub data: CheckpointExecutionData,
152
153    state_hasher: Option<GlobalStateHash>,
154    full_data: Option<Checkpoint>,
155}
156
157impl CheckpointExecutionState {
158    pub fn new(data: CheckpointExecutionData) -> Self {
159        Self {
160            data,
161            state_hasher: None,
162            full_data: None,
163        }
164    }
165
166    pub fn new_with_global_state_hasher(
167        data: CheckpointExecutionData,
168        hasher: GlobalStateHash,
169    ) -> Self {
170        Self {
171            data,
172            state_hasher: Some(hasher),
173            full_data: None,
174        }
175    }
176}
177
178macro_rules! finish_stage {
179    ($handle:expr, $stage:ident) => {
180        $handle.finish_stage(PipelineStage::$stage).await;
181    };
182}
183
184pub struct CheckpointExecutor {
185    epoch_store: Arc<AuthorityPerEpochStore>,
186    state: Arc<AuthorityState>,
187    // TODO: We should use RocksDbStore in the executor
188    // to consolidate DB accesses.
189    checkpoint_store: Arc<CheckpointStore>,
190    object_cache_reader: Arc<dyn ObjectCacheRead>,
191    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
192    execution_scheduler: Arc<ExecutionScheduler>,
193    global_state_hasher: Arc<GlobalStateHasher>,
194    backpressure_manager: Arc<BackpressureManager>,
195    config: CheckpointExecutorConfig,
196    metrics: Arc<CheckpointExecutorMetrics>,
197    tps_estimator: Mutex<TPSEstimator>,
198    subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
199}
200
201impl CheckpointExecutor {
202    pub fn new(
203        epoch_store: Arc<AuthorityPerEpochStore>,
204        checkpoint_store: Arc<CheckpointStore>,
205        state: Arc<AuthorityState>,
206        global_state_hasher: Arc<GlobalStateHasher>,
207        backpressure_manager: Arc<BackpressureManager>,
208        config: CheckpointExecutorConfig,
209        metrics: Arc<CheckpointExecutorMetrics>,
210        subscription_service_checkpoint_sender: Option<
211            tokio::sync::broadcast::Sender<Arc<Checkpoint>>,
212        >,
213    ) -> Self {
214        Self {
215            epoch_store,
216            state: state.clone(),
217            checkpoint_store,
218            object_cache_reader: state.get_object_cache_reader().clone(),
219            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
220            execution_scheduler: state.execution_scheduler().clone(),
221            global_state_hasher,
222            backpressure_manager,
223            config,
224            metrics,
225            tps_estimator: Mutex::new(TPSEstimator::default()),
226            subscription_service_checkpoint_sender,
227        }
228    }
229
230    pub fn new_for_tests(
231        epoch_store: Arc<AuthorityPerEpochStore>,
232        checkpoint_store: Arc<CheckpointStore>,
233        state: Arc<AuthorityState>,
234        state_hasher: Arc<GlobalStateHasher>,
235    ) -> Self {
236        Self::new(
237            epoch_store,
238            checkpoint_store,
239            state,
240            state_hasher,
241            BackpressureManager::new_for_tests(),
242            Default::default(),
243            CheckpointExecutorMetrics::new_for_tests(),
244            None,
245        )
246    }
247
248    // Gets the next checkpoint to schedule for execution. If the epoch is already
249    // completed, returns None.
250    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
251        // Decide the first checkpoint to schedule for execution.
252        // If we haven't executed anything in the past, we schedule checkpoint 0.
253        // Otherwise we schedule the one after highest executed.
254        let highest_executed = self
255            .checkpoint_store
256            .get_highest_executed_checkpoint()
257            .unwrap();
258
259        if let Some(highest_executed) = &highest_executed
260            && self.epoch_store.epoch() == highest_executed.epoch()
261            && highest_executed.is_last_checkpoint_of_epoch()
262        {
263            // We can arrive at this point if we bump the highest_executed_checkpoint watermark, and then
264            // crash before completing reconfiguration.
265            info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
266            return None;
267        }
268
269        Some(
270            highest_executed
271                .as_ref()
272                .map(|c| c.sequence_number() + 1)
273                .unwrap_or_else(|| {
274                    // TODO this invariant may no longer hold once we introduce snapshots
275                    assert_eq!(self.epoch_store.epoch(), 0);
276                    // we need to execute the genesis checkpoint
277                    0
278                }),
279        )
280    }
281
282    /// Execute all checkpoints for the current epoch, ensuring that the node has not
283    /// forked, and return when finished.
284    /// If `run_with_range` is set, execution will stop early.
285    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
286    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
287        let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
288        info!(?run_with_range, "CheckpointExecutor::run_epoch");
289        debug!(
290            "Checkpoint executor running for epoch {:?}",
291            self.epoch_store.epoch(),
292        );
293
294        // check if we want to run this epoch based on RunWithRange condition value
295        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
296        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
297        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
298            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
299            return StopReason::RunWithRangeCondition;
300        };
301
302        self.metrics
303            .checkpoint_exec_epoch
304            .set(self.epoch_store.epoch() as i64);
305
306        let Some(next_to_schedule) = self.get_next_to_schedule() else {
307            return StopReason::EpochComplete;
308        };
309
310        let this = Arc::new(self);
311
312        let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
313            .ok()
314            .and_then(|s| s.parse().ok())
315            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
316
317        let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
318
319        let final_checkpoint_executed = stream_synced_checkpoints(
320            this.checkpoint_store.clone(),
321            next_to_schedule,
322            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
323        )
324        // Checkpoint loading and execution is parallelized
325        .map(|checkpoint| {
326            let this = this.clone();
327            let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
328            async move {
329                let pipeline_handle = pipeline_handle.await;
330                tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
331                    .await
332                    .unwrap()
333            }
334        })
335        .buffered(concurrency)
336        // Take the last value from the stream to determine if we completed the epoch
337        .fold(false, |state, is_final_checkpoint| async move {
338            assert!(
339                !state,
340                "fold can't be called again after the final checkpoint"
341            );
342            is_final_checkpoint
343        })
344        .await;
345
346        if final_checkpoint_executed {
347            StopReason::EpochComplete
348        } else {
349            StopReason::RunWithRangeCondition
350        }
351    }
352}
353
354impl CheckpointExecutor {
355    /// Load all data for a checkpoint, ensure all transactions are executed, and check for forks.
356    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
357    async fn execute_checkpoint(
358        self: Arc<Self>,
359        checkpoint: VerifiedCheckpoint,
360        mut pipeline_handle: PipelineHandle,
361    ) -> bool /* is final checkpoint */ {
362        info!("executing checkpoint");
363        let sequence_number = checkpoint.sequence_number;
364
365        checkpoint.report_checkpoint_age(
366            &self.metrics.checkpoint_contents_age,
367            &self.metrics.checkpoint_contents_age_ms,
368        );
369        self.backpressure_manager
370            .update_highest_certified_checkpoint(sequence_number);
371
372        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
373            let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
374                "CheckpointExecutor::wait_for_previous_checkpoints",
375            );
376
377            info!(
378                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
379            );
380            self.checkpoint_store
381                .notify_read_executed_checkpoint(sequence_number - 1)
382                .await;
383        }
384
385        let _parallel_step_guard =
386            mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
387
388        // Note: only `execute_transactions_from_synced_checkpoint` has end-of-epoch logic.
389        // For nodes that are not running consensus (Full nodes with checkpoint state sync only), always
390        // execute transactions via synced checkpoints. The rest of nodes should attempt only to verify the locally
391        // build checkpoints as those should (potentially) be already executed.
392        let ckpt_state = if !self.epoch_store.node_role().runs_consensus()
393            || checkpoint.is_last_checkpoint_of_epoch()
394        {
395            self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
396                .await
397        } else {
398            self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
399                .await
400        };
401
402        let tps = self.tps_estimator.lock().update(
403            Instant::now(),
404            ckpt_state.data.checkpoint.network_total_transactions,
405        );
406        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
407
408        self.backpressure_manager
409            .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
410
411        let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
412
413        let seq = ckpt_state.data.checkpoint.sequence_number;
414
415        let mut batch = self
416            .state
417            .get_cache_commit()
418            .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
419
420        // Stamp the highest-committed-checkpoint watermark into the same batch
421        // as the outputs, so it lands atomically with the object writes. This
422        // gives consumers that read the live object set directly (the embedded
423        // rpc-store restore) a watermark that never lags the durable objects,
424        // unlike the separately-bumped `highest_executed` watermark below.
425        self.state
426            .get_cache_commit()
427            .set_highest_committed_checkpoint_in_batch(&mut batch, seq);
428
429        finish_stage!(pipeline_handle, BuildDbBatch);
430
431        let object_funds_checker = self.state.object_funds_checker.load();
432        if let Some(object_funds_checker) = object_funds_checker.as_ref() {
433            object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
434        }
435
436        let mut ckpt_state = tokio::task::spawn_blocking({
437            let this = self.clone();
438            move || {
439                // Commit all transaction effects to disk
440                let cache_commit = this.state.get_cache_commit();
441                debug!(?seq, "committing checkpoint transactions to disk");
442                cache_commit.commit_transaction_outputs(
443                    this.epoch_store.epoch(),
444                    batch,
445                    &ckpt_state.data.tx_digests,
446                );
447                ckpt_state
448            }
449        })
450        .await
451        .unwrap();
452
453        finish_stage!(pipeline_handle, CommitTransactionOutputs);
454
455        self.epoch_store
456            .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
457            .expect("cannot fail");
458
459        let randomness_rounds = self.extract_randomness_rounds(
460            &ckpt_state.data.checkpoint,
461            &ckpt_state.data.checkpoint_contents,
462        );
463
464        // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
465        // been successfully included in a checkpoint certified by quorum of validators.
466        // (RandomnessManager/RandomnessReporter is only present on validators.)
467        if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
468            for round in randomness_rounds {
469                debug!(
470                    ?round,
471                    "notifying RandomnessReporter that randomness update was executed in checkpoint"
472                );
473                randomness_reporter
474                    .notify_randomness_in_checkpoint(round)
475                    .expect("epoch cannot have ended");
476            }
477        }
478
479        finish_stage!(pipeline_handle, FinalizeCheckpoint);
480
481        if let Some(checkpoint_data) = ckpt_state.full_data.take() {
482            self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data);
483        }
484
485        finish_stage!(pipeline_handle, UpdateRpcIndex);
486
487        self.global_state_hasher
488            .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
489            .expect("Failed to accumulate running root");
490
491        if is_final_checkpoint {
492            self.checkpoint_store
493                .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
494                .expect("Failed to insert epoch last checkpoint");
495
496            self.global_state_hasher
497                .accumulate_epoch(self.epoch_store.clone(), seq)
498                .expect("Accumulating epoch cannot fail");
499
500            self.checkpoint_store
501                .prune_local_summaries()
502                .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
503                .ok();
504        }
505
506        fail_point!("crash");
507
508        self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
509
510        finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
511
512        // Important: code after the last pipeline stage is finished can run out of checkpoint order.
513
514        ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
515    }
516
517    // On validators, checkpoints have often already been constructed locally, in which
518    // case we can skip many steps of the checkpoint execution process.
519    // If the node is a validator, then the checkpoint execution state will not contain the full data.
520    // If the node is a full node that has consensus state sync enabled, then the full data will be populated as they are required
521    // by downstream components.
522    #[instrument(level = "info", skip_all)]
523    async fn verify_locally_built_checkpoint(
524        &self,
525        checkpoint: VerifiedCheckpoint,
526        pipeline_handle: &mut PipelineHandle,
527    ) -> CheckpointExecutionState {
528        assert!(
529            !checkpoint.is_last_checkpoint_of_epoch(),
530            "only fullnode path has end-of-epoch logic"
531        );
532
533        let sequence_number = checkpoint.sequence_number;
534        let locally_built_checkpoint = self
535            .checkpoint_store
536            .get_locally_computed_checkpoint(sequence_number)
537            .expect("db error");
538
539        let Some(locally_built_checkpoint) = locally_built_checkpoint else {
540            // fall back to tx-by-tx execution path if we are catching up.
541            self.metrics
542                .checkpoint_executor_validator_sync_fallback_path
543                .inc();
544            return self
545                .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
546                .await;
547        };
548
549        self.metrics.checkpoint_executor_validator_path.inc();
550
551        // Check for fork
552        assert_checkpoint_not_forked(
553            &locally_built_checkpoint,
554            &checkpoint,
555            &self.checkpoint_store,
556        );
557
558        // Checkpoint builder triggers accumulation of the checkpoint, so this is guaranteed to finish.
559        let state_hasher = {
560            let _metrics_scope =
561                mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
562            self.epoch_store
563                .notify_read_checkpoint_state_hasher(&[sequence_number])
564                .await
565                .unwrap()
566                .pop()
567                .unwrap()
568        };
569
570        // Observer fullnodes have already executed these transactions through consensus,
571        // but still need the fullnode side effects that the synced path normally performs.
572        if matches!(
573            self.epoch_store.node_role(),
574            NodeRole::FullNode(FullNodeSyncMode::ConsensusObserver)
575        ) {
576            pipeline_handle
577                .skip_to(PipelineStage::FinalizeTransactions)
578                .await;
579
580            let (state, tx_data) =
581                self.load_checkpoint_transactions(checkpoint, Some(state_hasher));
582
583            return self
584                .finalize_executed_checkpoint_transactions(state, &tx_data, pipeline_handle)
585                .await;
586        }
587
588        let checkpoint_contents = self
589            .checkpoint_store
590            .get_checkpoint_contents(&checkpoint.content_digest)
591            .expect("db error")
592            .expect("checkpoint contents not found");
593
594        let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
595            .iter()
596            .map(|digests| (digests.transaction, digests.effects))
597            .unzip();
598
599        pipeline_handle
600            .skip_to(PipelineStage::FinalizeTransactions)
601            .await;
602
603        self.insert_finalized_transactions(&tx_digests, sequence_number);
604
605        pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
606
607        let ckpt_data = CheckpointExecutionData {
608            checkpoint,
609            checkpoint_contents,
610            tx_digests,
611            fx_digests,
612        };
613        CheckpointExecutionState::new_with_global_state_hasher(ckpt_data, state_hasher)
614    }
615
616    #[instrument(level = "info", skip_all)]
617    async fn execute_transactions_from_synced_checkpoint(
618        &self,
619        checkpoint: VerifiedCheckpoint,
620        pipeline_handle: &mut PipelineHandle,
621    ) -> CheckpointExecutionState {
622        let (ckpt_state, tx_data, unexecuted_tx_digests) = {
623            let _scope =
624                mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
625            let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint, None);
626            let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
627            (ckpt_state, tx_data, unexecuted_tx_digests)
628        };
629
630        finish_stage!(pipeline_handle, ExecuteTransactions);
631
632        {
633            self.transaction_cache_reader
634                .notify_read_executed_effects_digests(
635                    "CheckpointExecutor::notify_read_executed_effects_digests",
636                    &unexecuted_tx_digests,
637                )
638                .await;
639        }
640
641        finish_stage!(pipeline_handle, WaitForTransactions);
642
643        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
644            self.execute_change_epoch_tx(&tx_data).await;
645        }
646
647        self.finalize_executed_checkpoint_transactions(ckpt_state, &tx_data, pipeline_handle)
648            .await
649    }
650
651    async fn finalize_executed_checkpoint_transactions(
652        &self,
653        mut ckpt_state: CheckpointExecutionState,
654        tx_data: &CheckpointTransactionData,
655        pipeline_handle: &mut PipelineHandle,
656    ) -> CheckpointExecutionState {
657        let sequence_number = ckpt_state.data.checkpoint.sequence_number;
658
659        self.commit_post_processing_index_batches(&ckpt_state.data.tx_digests)
660            .await;
661
662        let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
663
664        if self.state.is_fullnode(&self.epoch_store) {
665            self.state.congestion_tracker.process_checkpoint_effects(
666                &*self.transaction_cache_reader,
667                &ckpt_state.data.checkpoint,
668                &tx_data.effects,
669            );
670        }
671
672        self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
673
674        // The early versions of the hasher (prior to effectsv2) rely on db
675        // state, so we must wait until all transactions have been executed
676        // before accumulating the checkpoint.
677        if ckpt_state.state_hasher.is_none() {
678            ckpt_state.state_hasher = Some(
679                self.global_state_hasher
680                    .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
681                    .expect("epoch cannot have ended"),
682            );
683        }
684
685        finish_stage!(pipeline_handle, FinalizeTransactions);
686
687        ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, tx_data);
688
689        finish_stage!(pipeline_handle, ProcessCheckpointData);
690
691        ckpt_state
692    }
693
694    // Collect index batches from post-processing and commit atomically.
695    // This must happen AFTER all transactions have completed execution and BEFORE
696    // insert_finalized_transactions (so that index data is available when
697    // transactions_executed_in_checkpoint_notify fires).
698    async fn commit_post_processing_index_batches(&self, tx_digests: &[TransactionDigest]) {
699        let mut raw_batches = Vec::new();
700        let mut cache_updates = Vec::new();
701        for tx_digest in tx_digests {
702            if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
703                raw_batches.push(raw_batch);
704                cache_updates.push(cu);
705            }
706        }
707        if !raw_batches.is_empty()
708            && let Some(indexes) = &self.state.indexes
709        {
710            let mut db_batch = indexes.new_db_batch();
711            db_batch
712                .concat(raw_batches)
713                .expect("failed to build index batch");
714            indexes
715                .commit_index_batch(db_batch, cache_updates)
716                .expect("failed to commit index batch");
717        }
718    }
719
720    fn checkpoint_data_enabled(&self) -> bool {
721        self.subscription_service_checkpoint_sender.is_some()
722            || self.state.rpc_index.is_some()
723            || self.config.data_ingestion_dir.is_some()
724    }
725
726    fn insert_finalized_transactions(
727        &self,
728        tx_digests: &[TransactionDigest],
729        sequence_number: CheckpointSequenceNumber,
730    ) {
731        self.epoch_store
732            .insert_finalized_transactions(tx_digests, sequence_number)
733            .expect("failed to insert finalized transactions");
734
735        if self.state.is_fullnode(&self.epoch_store) {
736            // TODO remove once we no longer need to support this table for read RPC
737            self.state
738                .get_checkpoint_cache()
739                .deprecated_insert_finalized_transactions(
740                    tx_digests,
741                    self.epoch_store.epoch(),
742                    sequence_number,
743                );
744        }
745    }
746
747    #[instrument(level = "info", skip_all)]
748    fn process_checkpoint_data(
749        &self,
750        ckpt_data: &CheckpointExecutionData,
751        tx_data: &CheckpointTransactionData,
752    ) -> Option<Checkpoint> {
753        if !self.checkpoint_data_enabled() {
754            return None;
755        }
756
757        let checkpoint = load_checkpoint(
758            ckpt_data,
759            tx_data,
760            self.state.get_object_store(),
761            &*self.transaction_cache_reader,
762        )
763        .expect("failed to load checkpoint data");
764
765        // Index the checkpoint. This is done out of order and is not written
766        // and committed to the DB until later (committing must be done in-order).
767        if let Some(rpc_index) = &self.state.rpc_index {
768            rpc_index.index_checkpoint(&checkpoint);
769        }
770
771        if let Some(path) = &self.config.data_ingestion_dir {
772            store_checkpoint_locally(path, &checkpoint)
773                .expect("failed to store checkpoint locally");
774        }
775
776        Some(checkpoint)
777    }
778
779    // Load all required transaction and effects data for the checkpoint.
780    // When `state_hasher` is provided the returned `CheckpointExecutionState`
781    // carries the hasher (used by the verify-locally-built-checkpoint path).
782    #[instrument(level = "info", skip_all)]
783    fn load_checkpoint_transactions(
784        &self,
785        checkpoint: VerifiedCheckpoint,
786        state_hasher: Option<GlobalStateHash>,
787    ) -> (CheckpointExecutionState, CheckpointTransactionData) {
788        let seq = checkpoint.sequence_number;
789        let epoch = checkpoint.epoch;
790
791        let checkpoint_contents = self
792            .checkpoint_store
793            .get_checkpoint_contents(&checkpoint.content_digest)
794            .expect("db error")
795            .expect("checkpoint contents not found");
796
797        let checkpoint_state = |data| match state_hasher {
798            Some(hasher) => CheckpointExecutionState::new_with_global_state_hasher(data, hasher),
799            None => CheckpointExecutionState::new(data),
800        };
801
802        // attempt to load full checkpoint contents in bulk
803        // Tolerate db error in case of data corruption.
804        // We will fall back to loading items one-by-one below in case of error.
805        if let Some(full_contents) = self
806            .checkpoint_store
807            .get_full_checkpoint_contents_by_sequence_number(seq)
808            .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
809            .ok()
810            .flatten()
811            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
812        {
813            let num_txns = full_contents.size();
814            let mut tx_digests = Vec::with_capacity(num_txns);
815            let mut transactions = Vec::with_capacity(num_txns);
816            let mut effects = Vec::with_capacity(num_txns);
817            let mut fx_digests = Vec::with_capacity(num_txns);
818
819            full_contents
820                .into_iter()
821                .zip_debug_eq(checkpoint_contents.iter())
822                .for_each(|(execution_data, digests)| {
823                    let tx_digest = digests.transaction;
824                    let fx_digest = digests.effects;
825                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
826                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
827
828                    tx_digests.push(tx_digest);
829                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
830                        VerifiedTransaction::new_unchecked(execution_data.transaction),
831                        epoch,
832                        seq,
833                    ));
834                    effects.push(execution_data.effects);
835                    fx_digests.push(fx_digest);
836                });
837
838            let executed_fx_digests = self
839                .transaction_cache_reader
840                .multi_get_executed_effects_digests(&tx_digests);
841
842            (
843                checkpoint_state(CheckpointExecutionData {
844                    checkpoint,
845                    checkpoint_contents,
846                    tx_digests,
847                    fx_digests,
848                }),
849                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
850            )
851        } else {
852            // load items one-by-one
853            // TODO: If we used RocksDbStore in the executor instead,
854            // all the logic below could be removed.
855
856            let digests = checkpoint_contents.inner();
857
858            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
859                .digests_iter()
860                .map(|d| (d.transaction, d.effects))
861                .unzip();
862            let transactions = self
863                .transaction_cache_reader
864                .multi_get_transaction_blocks(&tx_digests)
865                .into_iter()
866                .enumerate()
867                .map(|(i, tx)| {
868                    let tx = tx
869                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
870                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
871                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
872                })
873                .collect();
874            let effects = self
875                .transaction_cache_reader
876                .multi_get_effects(&fx_digests)
877                .into_iter()
878                .enumerate()
879                .map(|(i, effect)| {
880                    effect.unwrap_or_else(|| {
881                        fatal!("checkpoint effect not found for {:?}", digests[i])
882                    })
883                })
884                .collect();
885
886            let executed_fx_digests = self
887                .transaction_cache_reader
888                .multi_get_executed_effects_digests(&tx_digests);
889
890            (
891                checkpoint_state(CheckpointExecutionData {
892                    checkpoint,
893                    checkpoint_contents,
894                    tx_digests,
895                    fx_digests,
896                }),
897                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
898            )
899        }
900    }
901
902    // Schedule all unexecuted transactions in the checkpoint for execution
903    #[instrument(level = "info", skip_all)]
904    fn schedule_transaction_execution(
905        &self,
906        ckpt_state: &CheckpointExecutionState,
907        tx_data: &CheckpointTransactionData,
908    ) -> Vec<TransactionDigest> {
909        let mut barrier_deps_builder = BarrierDependencyBuilder::new();
910
911        // Find unexecuted transactions and their expected effects digests
912        let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
913            izip_debug_eq!(
914                tx_data.transactions.iter(),
915                ckpt_state.data.tx_digests.iter(),
916                ckpt_state.data.fx_digests.iter(),
917                tx_data.effects.iter(),
918                tx_data.executed_fx_digests.iter(),
919                tx_data.accumulator_versions.iter()
920            )
921            .filter_map(
922                |(
923                    txn,
924                    tx_digest,
925                    expected_fx_digest,
926                    effects,
927                    executed_fx_digest,
928                    accumulator_version,
929                )| {
930                    let barrier_deps =
931                        barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
932
933                    if let Some(executed_fx_digest) = executed_fx_digest {
934                        assert_not_forked(
935                            &ckpt_state.data.checkpoint,
936                            tx_digest,
937                            expected_fx_digest,
938                            executed_fx_digest,
939                            &*self.transaction_cache_reader,
940                        );
941                        None
942                    } else if txn.transaction_data().is_end_of_epoch_tx() {
943                        None
944                    } else {
945                        let assigned_versions = self
946                            .epoch_store
947                            .acquire_shared_version_assignments_from_effects(
948                                txn,
949                                effects,
950                                *accumulator_version,
951                                &*self.object_cache_reader,
952                            )
953                            .expect("failed to acquire shared version assignments");
954
955                        let mut env = ExecutionEnv::new()
956                            .with_assigned_versions(assigned_versions)
957                            .with_expected_effects_digest(*expected_fx_digest)
958                            .with_barrier_dependencies(barrier_deps);
959
960                        // Check if the expected effects indicate insufficient balance
961                        if let &ExecutionStatus::Failure(ExecutionFailure {
962                            error: ExecutionErrorKind::InsufficientFundsForWithdraw,
963                            ..
964                        }) = effects.status()
965                        {
966                            env = env.with_insufficient_funds();
967                        }
968
969                        Some((tx_digest, (txn.clone(), env)))
970                    }
971                },
972            ),
973        );
974
975        // Enqueue unexecuted transactions with their expected effects digests
976        self.execution_scheduler
977            .enqueue_transactions(unexecuted_txns, &self.epoch_store);
978
979        unexecuted_tx_digests
980    }
981
982    // Execute the change epoch txn
983    #[instrument(level = "error", skip_all)]
984    async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
985        let change_epoch_tx = tx_data.transactions.last().unwrap();
986        let change_epoch_fx = tx_data.effects.last().unwrap();
987        assert_eq!(
988            change_epoch_tx.digest(),
989            change_epoch_fx.transaction_digest()
990        );
991        assert!(
992            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
993            "final txn must be an end of epoch txn"
994        );
995
996        // Ordinarily we would assert that the change epoch txn has not been executed yet.
997        // However, during crash recovery, it is possible that we already passed this point and
998        // the txn has been executed. You can uncomment this assert if you are debugging a problem
999        // related to reconfig. If you hit this assert and it is not because of crash-recovery,
1000        // it may indicate a bug in the checkpoint executor.
1001        //
1002        //     if self
1003        //         .transaction_cache_reader
1004        //         .get_executed_effects(change_epoch_tx.digest())
1005        //         .is_some()
1006        //     {
1007        //         fatal!(
1008        //             "end of epoch txn must not have been executed: {:?}",
1009        //             change_epoch_tx.digest()
1010        //         );
1011        //     }
1012
1013        let assigned_versions = self
1014            .epoch_store
1015            .acquire_shared_version_assignments_from_effects(
1016                change_epoch_tx,
1017                change_epoch_fx,
1018                None,
1019                self.object_cache_reader.as_ref(),
1020            )
1021            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
1022
1023        info!(
1024            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
1025            change_epoch_tx.digest(),
1026            change_epoch_fx.digest(),
1027            assigned_versions
1028        );
1029        self.execution_scheduler.enqueue_transactions(
1030            vec![(
1031                change_epoch_tx.clone(),
1032                ExecutionEnv::new()
1033                    .with_assigned_versions(assigned_versions)
1034                    .with_expected_effects_digest(change_epoch_fx.digest()),
1035            )],
1036            &self.epoch_store,
1037        );
1038
1039        self.transaction_cache_reader
1040            .notify_read_executed_effects_digests(
1041                "CheckpointExecutor::notify_read_advance_epoch_tx",
1042                &[*change_epoch_tx.digest()],
1043            )
1044            .await;
1045    }
1046
1047    // Increment the highest executed checkpoint watermark and prune old full-checkpoint contents
1048    #[instrument(level = "debug", skip_all)]
1049    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
1050        // Ensure that we are not skipping checkpoints at any point
1051        let seq = *checkpoint.sequence_number();
1052        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
1053        if let Some(prev_highest) = self
1054            .checkpoint_store
1055            .get_highest_executed_checkpoint_seq_number()
1056            .unwrap()
1057        {
1058            assert_eq!(prev_highest + 1, seq);
1059        } else {
1060            assert_eq!(seq, 0);
1061        }
1062        if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1063            info!("Finished syncing and executing checkpoint {}", seq);
1064        }
1065
1066        fail_point!("highest-executed-checkpoint");
1067
1068        // We store a fixed number of additional FullCheckpointContents after execution is complete
1069        // for use in state sync.
1070        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1071        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1072            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1073            if let Some(prune_checkpoint) = self
1074                .checkpoint_store
1075                .get_checkpoint_by_sequence_number(prune_seq)
1076                .expect("Failed to fetch checkpoint")
1077            {
1078                self.checkpoint_store
1079                    .delete_full_checkpoint_contents(prune_seq)
1080                    .expect("Failed to delete full checkpoint contents");
1081                self.checkpoint_store
1082                    .delete_contents_digest_sequence_number_mapping(
1083                        &prune_checkpoint.content_digest,
1084                    )
1085                    .expect("Failed to delete contents digest -> sequence number mapping");
1086            } else {
1087                // If this is directly after a snapshot restore with skiplisting,
1088                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
1089                // checkpoints.
1090                debug!(
1091                    "Failed to fetch checkpoint with sequence number {:?}",
1092                    prune_seq
1093                );
1094            }
1095        }
1096
1097        self.checkpoint_store
1098            .update_highest_executed_checkpoint(checkpoint)
1099            .unwrap();
1100        self.metrics.last_executed_checkpoint.set(seq as i64);
1101
1102        self.metrics
1103            .last_executed_checkpoint_timestamp_ms
1104            .set(checkpoint.timestamp_ms as i64);
1105        checkpoint.report_checkpoint_age(
1106            &self.metrics.last_executed_checkpoint_age,
1107            &self.metrics.last_executed_checkpoint_age_ms,
1108        );
1109    }
1110
1111    /// If configured, commit the pending index updates for the provided checkpoint as well as
1112    /// enqueuing the checkpoint to the subscription service
1113    #[instrument(level = "info", skip_all)]
1114    fn commit_index_updates_and_enqueue_to_subscription_service(&self, checkpoint: Checkpoint) {
1115        if let Some(rpc_index) = &self.state.rpc_index {
1116            rpc_index
1117                .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1118                .expect("failed to update rpc_indexes");
1119        }
1120
1121        // Best-effort, non-blocking publish to the broadcast stream. A send
1122        // error just means there are no live subscribers right now, which is
1123        // fine: subscribers (the RPC subscription service and the embedded
1124        // rpc-store indexer) recover any checkpoints they miss by fetching from
1125        // the local stores.
1126        if let Some(sender) = &self.subscription_service_checkpoint_sender {
1127            let _ = sender.send(Arc::new(checkpoint));
1128        }
1129    }
1130
1131    // Extract randomness rounds from the checkpoint version-specific data (if available).
1132    // Otherwise, extract randomness rounds from the first transaction in the checkpoint
1133    #[instrument(level = "debug", skip_all)]
1134    fn extract_randomness_rounds(
1135        &self,
1136        checkpoint: &VerifiedCheckpoint,
1137        checkpoint_contents: &CheckpointContents,
1138    ) -> Vec<RandomnessRound> {
1139        if let Some(version_specific_data) = checkpoint
1140            .version_specific_data(self.epoch_store.protocol_config())
1141            .expect("unable to get version_specific_data")
1142        {
1143            // With version-specific data, randomness rounds are stored in checkpoint summary.
1144            version_specific_data.into_v1().randomness_rounds
1145        } else {
1146            // Before version-specific data, checkpoint batching must be disabled. In this case,
1147            // randomness state update tx must be first if it exists, because all other
1148            // transactions in a checkpoint that includes a randomness state update are causally
1149            // dependent on it.
1150            assert_eq!(
1151                0,
1152                self.epoch_store
1153                    .protocol_config()
1154                    .min_checkpoint_interval_ms_as_option()
1155                    .unwrap_or_default(),
1156            );
1157            if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1158                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1159                .unwrap_or_else(||
1160                    fatal!(
1161                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1162                        checkpoint.sequence_number()
1163                    )
1164                );
1165                if let TransactionKind::RandomnessStateUpdate(rsu) =
1166                    maybe_randomness_tx.data().transaction_data().kind()
1167                {
1168                    vec![rsu.randomness_round]
1169                } else {
1170                    Vec::new()
1171                }
1172            } else {
1173                Vec::new()
1174            }
1175        }
1176    }
1177}