sui_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CheckpointExecutor is a Node component that executes all checkpoints for the
5//! given epoch. It acts as a Consumer to StateSync
6//! for newly synced checkpoints, taking these checkpoints and
7//! scheduling and monitoring their execution. Its primary goal is to allow
8//! for catching up to the current checkpoint sequence number of the network
9//! as quickly as possible so that a newly joined, or recovering Node can
10//! participate in a timely manner. To that end, CheckpointExecutor attempts
11//! to saturate the CPU with executor tasks (one per checkpoint), each of which
12//! handle scheduling and awaiting checkpoint transaction execution.
13//!
14//! CheckpointExecutor is made recoverable in the event of Node shutdown by way of a watermark,
15//! highest_executed_checkpoint, which is guaranteed to be updated sequentially in order,
16//! despite checkpoints themselves potentially being executed nonsequentially and in parallel.
17//! CheckpointExecutor parallelizes checkpoints of the same epoch as much as possible.
18//! CheckpointExecutor enforces the invariant that if `run` returns successfully, we have reached the
19//! end of epoch. This allows us to use it as a signal for reconfig.
20
21use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::crypto::RandomnessRound;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::transaction::{TransactionDataAPI, TransactionKind};
30
31use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
32use sui_macros::fail_point;
33use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
34use sui_types::executable_transaction::VerifiedExecutableTransaction;
35use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
36use sui_types::full_checkpoint_content::Checkpoint;
37use sui_types::global_state_hash::GlobalStateHash;
38use sui_types::message_envelope::Message;
39use sui_types::{
40    base_types::{TransactionDigest, TransactionEffectsDigest},
41    messages_checkpoint::VerifiedCheckpoint,
42    transaction::VerifiedTransaction,
43};
44use tap::{TapFallible, TapOptional};
45use tracing::{debug, info, instrument, warn};
46
47use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
48use crate::authority::backpressure::BackpressureManager;
49use crate::authority::{AuthorityState, ExecutionEnv};
50use crate::execution_scheduler::ExecutionScheduler;
51use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
52use crate::global_state_hasher::GlobalStateHasher;
53use crate::{
54    checkpoints::CheckpointStore,
55    execution_cache::{ObjectCacheRead, TransactionCacheRead},
56};
57
58mod data_ingestion_handler;
59pub mod metrics;
60pub(crate) mod utils;
61
62use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
63use metrics::CheckpointExecutorMetrics;
64use utils::*;
65
66const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
67
68#[derive(PartialEq, Eq, Debug)]
69pub enum StopReason {
70    EpochComplete,
71    RunWithRangeCondition,
72}
73
74pub(crate) struct CheckpointExecutionData {
75    pub checkpoint: VerifiedCheckpoint,
76    pub checkpoint_contents: CheckpointContents,
77    pub tx_digests: Vec<TransactionDigest>,
78    pub fx_digests: Vec<TransactionEffectsDigest>,
79}
80
81pub(crate) struct CheckpointTransactionData {
82    pub transactions: Vec<VerifiedExecutableTransaction>,
83    pub effects: Vec<TransactionEffects>,
84    pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
85    /// The accumulator versions for the transactions in the checkpoint.
86    /// None only if accumulator is not enabled (either all Some, or all None).
87    /// This information is needed for object balance withdraw processing.
88    /// The vector should be 1:1 with the transactions in the checkpoint.
89    pub accumulator_versions: Vec<Option<SequenceNumber>>,
90}
91
92impl CheckpointTransactionData {
93    pub fn new(
94        transactions: Vec<VerifiedExecutableTransaction>,
95        effects: Vec<TransactionEffects>,
96        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
97    ) -> Self {
98        assert_eq!(transactions.len(), effects.len());
99        assert_eq!(transactions.len(), executed_fx_digests.len());
100        let mut accumulator_versions = vec![None; transactions.len()];
101        let mut next_update_index = 0;
102        for (idx, efx) in effects.iter().enumerate() {
103            // Only barrier settlement transactions mutate the accumulator root object.
104            // This filtering detects whether this transaction is a barrier settlement transaction.
105            // And if so we get the old version of the accumulator root object.
106            // Transactions prior to the barrier settlement transaction reads this accumulator version.
107            let acc_version = efx.object_changes().into_iter().find_map(|change| {
108                if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
109                    change.input_version
110                } else {
111                    None
112                }
113            });
114            if let Some(acc_version) = acc_version {
115                // Set version for transactions between [next_update_index, idx] inclusive.
116                for slot in accumulator_versions
117                    .iter_mut()
118                    .take(idx + 1)
119                    .skip(next_update_index)
120                {
121                    *slot = Some(acc_version);
122                }
123                next_update_index = idx + 1;
124            }
125        }
126        // Either accumulator is not enabled, then next_update_index == 0;
127        // or the last transaction is the barrier settlement transaction, and next_update_index == transactions.len();
128        // or the last transaction is the end of epoch transaction, and next_update_index == transactions.len() - 1.
129        assert!(
130            next_update_index == 0
131                || next_update_index == transactions.len()
132                || (next_update_index == transactions.len() - 1
133                    && transactions
134                        .last()
135                        .unwrap()
136                        .transaction_data()
137                        .is_end_of_epoch_tx())
138        );
139        Self {
140            transactions,
141            effects,
142            executed_fx_digests,
143            accumulator_versions,
144        }
145    }
146}
147pub(crate) struct CheckpointExecutionState {
148    pub data: CheckpointExecutionData,
149
150    state_hasher: Option<GlobalStateHash>,
151    full_data: Option<Checkpoint>,
152}
153
154impl CheckpointExecutionState {
155    pub fn new(data: CheckpointExecutionData) -> Self {
156        Self {
157            data,
158            state_hasher: None,
159            full_data: None,
160        }
161    }
162
163    pub fn new_with_global_state_hasher(
164        data: CheckpointExecutionData,
165        hasher: GlobalStateHash,
166    ) -> Self {
167        Self {
168            data,
169            state_hasher: Some(hasher),
170            full_data: None,
171        }
172    }
173}
174
175macro_rules! finish_stage {
176    ($handle:expr, $stage:ident) => {
177        $handle.finish_stage(PipelineStage::$stage).await;
178    };
179}
180
181pub struct CheckpointExecutor {
182    epoch_store: Arc<AuthorityPerEpochStore>,
183    state: Arc<AuthorityState>,
184    // TODO: We should use RocksDbStore in the executor
185    // to consolidate DB accesses.
186    checkpoint_store: Arc<CheckpointStore>,
187    object_cache_reader: Arc<dyn ObjectCacheRead>,
188    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
189    execution_scheduler: Arc<ExecutionScheduler>,
190    global_state_hasher: Arc<GlobalStateHasher>,
191    backpressure_manager: Arc<BackpressureManager>,
192    config: CheckpointExecutorConfig,
193    metrics: Arc<CheckpointExecutorMetrics>,
194    tps_estimator: Mutex<TPSEstimator>,
195    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
196}
197
198impl CheckpointExecutor {
199    pub fn new(
200        epoch_store: Arc<AuthorityPerEpochStore>,
201        checkpoint_store: Arc<CheckpointStore>,
202        state: Arc<AuthorityState>,
203        global_state_hasher: Arc<GlobalStateHasher>,
204        backpressure_manager: Arc<BackpressureManager>,
205        config: CheckpointExecutorConfig,
206        metrics: Arc<CheckpointExecutorMetrics>,
207        subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
208    ) -> Self {
209        Self {
210            epoch_store,
211            state: state.clone(),
212            checkpoint_store,
213            object_cache_reader: state.get_object_cache_reader().clone(),
214            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
215            execution_scheduler: state.execution_scheduler().clone(),
216            global_state_hasher,
217            backpressure_manager,
218            config,
219            metrics,
220            tps_estimator: Mutex::new(TPSEstimator::default()),
221            subscription_service_checkpoint_sender,
222        }
223    }
224
225    pub fn new_for_tests(
226        epoch_store: Arc<AuthorityPerEpochStore>,
227        checkpoint_store: Arc<CheckpointStore>,
228        state: Arc<AuthorityState>,
229        state_hasher: Arc<GlobalStateHasher>,
230    ) -> Self {
231        Self::new(
232            epoch_store,
233            checkpoint_store,
234            state,
235            state_hasher,
236            BackpressureManager::new_for_tests(),
237            Default::default(),
238            CheckpointExecutorMetrics::new_for_tests(),
239            None,
240        )
241    }
242
243    // Gets the next checkpoint to schedule for execution. If the epoch is already
244    // completed, returns None.
245    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
246        // Decide the first checkpoint to schedule for execution.
247        // If we haven't executed anything in the past, we schedule checkpoint 0.
248        // Otherwise we schedule the one after highest executed.
249        let highest_executed = self
250            .checkpoint_store
251            .get_highest_executed_checkpoint()
252            .unwrap();
253
254        if let Some(highest_executed) = &highest_executed
255            && self.epoch_store.epoch() == highest_executed.epoch()
256            && highest_executed.is_last_checkpoint_of_epoch()
257        {
258            // We can arrive at this point if we bump the highest_executed_checkpoint watermark, and then
259            // crash before completing reconfiguration.
260            info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
261            return None;
262        }
263
264        Some(
265            highest_executed
266                .as_ref()
267                .map(|c| c.sequence_number() + 1)
268                .unwrap_or_else(|| {
269                    // TODO this invariant may no longer hold once we introduce snapshots
270                    assert_eq!(self.epoch_store.epoch(), 0);
271                    // we need to execute the genesis checkpoint
272                    0
273                }),
274        )
275    }
276
277    /// Execute all checkpoints for the current epoch, ensuring that the node has not
278    /// forked, and return when finished.
279    /// If `run_with_range` is set, execution will stop early.
280    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
281    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
282        let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
283        info!(?run_with_range, "CheckpointExecutor::run_epoch");
284        debug!(
285            "Checkpoint executor running for epoch {:?}",
286            self.epoch_store.epoch(),
287        );
288
289        // check if we want to run this epoch based on RunWithRange condition value
290        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
291        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
292        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
293            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
294            return StopReason::RunWithRangeCondition;
295        };
296
297        self.metrics
298            .checkpoint_exec_epoch
299            .set(self.epoch_store.epoch() as i64);
300
301        let Some(next_to_schedule) = self.get_next_to_schedule() else {
302            return StopReason::EpochComplete;
303        };
304
305        let this = Arc::new(self);
306
307        let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
308            .ok()
309            .and_then(|s| s.parse().ok())
310            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
311
312        let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
313
314        let final_checkpoint_executed = stream_synced_checkpoints(
315            this.checkpoint_store.clone(),
316            next_to_schedule,
317            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
318        )
319        // Checkpoint loading and execution is parallelized
320        .map(|checkpoint| {
321            let this = this.clone();
322            let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
323            async move {
324                let pipeline_handle = pipeline_handle.await;
325                tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
326                    .await
327                    .unwrap()
328            }
329        })
330        .buffered(concurrency)
331        // Take the last value from the stream to determine if we completed the epoch
332        .fold(false, |state, is_final_checkpoint| async move {
333            assert!(
334                !state,
335                "fold can't be called again after the final checkpoint"
336            );
337            is_final_checkpoint
338        })
339        .await;
340
341        if final_checkpoint_executed {
342            StopReason::EpochComplete
343        } else {
344            StopReason::RunWithRangeCondition
345        }
346    }
347}
348
349impl CheckpointExecutor {
350    /// Load all data for a checkpoint, ensure all transactions are executed, and check for forks.
351    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
352    async fn execute_checkpoint(
353        self: Arc<Self>,
354        checkpoint: VerifiedCheckpoint,
355        mut pipeline_handle: PipelineHandle,
356    ) -> bool /* is final checkpoint */ {
357        info!("executing checkpoint");
358        let sequence_number = checkpoint.sequence_number;
359
360        checkpoint.report_checkpoint_age(
361            &self.metrics.checkpoint_contents_age,
362            &self.metrics.checkpoint_contents_age_ms,
363        );
364        self.backpressure_manager
365            .update_highest_certified_checkpoint(sequence_number);
366
367        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
368            let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
369                "CheckpointExecutor::wait_for_previous_checkpoints",
370            );
371
372            info!(
373                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
374            );
375            self.checkpoint_store
376                .notify_read_executed_checkpoint(sequence_number - 1)
377                .await;
378        }
379
380        let _parallel_step_guard =
381            mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
382
383        // Note: only `execute_transactions_from_synced_checkpoint` has end-of-epoch logic.
384        let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
385            || checkpoint.is_last_checkpoint_of_epoch()
386        {
387            self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
388                .await
389        } else {
390            self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
391                .await
392        };
393
394        let tps = self.tps_estimator.lock().update(
395            Instant::now(),
396            ckpt_state.data.checkpoint.network_total_transactions,
397        );
398        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
399
400        self.backpressure_manager
401            .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
402
403        let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
404
405        let seq = ckpt_state.data.checkpoint.sequence_number;
406
407        let batch = self
408            .state
409            .get_cache_commit()
410            .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
411
412        finish_stage!(pipeline_handle, BuildDbBatch);
413
414        let object_funds_checker = self.state.object_funds_checker.load();
415        if let Some(object_funds_checker) = object_funds_checker.as_ref() {
416            object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
417        }
418
419        let mut ckpt_state = tokio::task::spawn_blocking({
420            let this = self.clone();
421            move || {
422                // Commit all transaction effects to disk
423                let cache_commit = this.state.get_cache_commit();
424                debug!(?seq, "committing checkpoint transactions to disk");
425                cache_commit.commit_transaction_outputs(
426                    this.epoch_store.epoch(),
427                    batch,
428                    &ckpt_state.data.tx_digests,
429                );
430                ckpt_state
431            }
432        })
433        .await
434        .unwrap();
435
436        finish_stage!(pipeline_handle, CommitTransactionOutputs);
437
438        self.epoch_store
439            .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
440            .expect("cannot fail");
441
442        let randomness_rounds = self.extract_randomness_rounds(
443            &ckpt_state.data.checkpoint,
444            &ckpt_state.data.checkpoint_contents,
445        );
446
447        // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
448        // been successfully included in a checkpoint certified by quorum of validators.
449        // (RandomnessManager/RandomnessReporter is only present on validators.)
450        if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
451            for round in randomness_rounds {
452                debug!(
453                    ?round,
454                    "notifying RandomnessReporter that randomness update was executed in checkpoint"
455                );
456                randomness_reporter
457                    .notify_randomness_in_checkpoint(round)
458                    .expect("epoch cannot have ended");
459            }
460        }
461
462        finish_stage!(pipeline_handle, FinalizeCheckpoint);
463
464        if let Some(checkpoint_data) = ckpt_state.full_data.take() {
465            self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
466                .await;
467        }
468
469        finish_stage!(pipeline_handle, UpdateRpcIndex);
470
471        self.global_state_hasher
472            .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
473            .expect("Failed to accumulate running root");
474
475        if is_final_checkpoint {
476            self.checkpoint_store
477                .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
478                .expect("Failed to insert epoch last checkpoint");
479
480            self.global_state_hasher
481                .accumulate_epoch(self.epoch_store.clone(), seq)
482                .expect("Accumulating epoch cannot fail");
483
484            self.checkpoint_store
485                .prune_local_summaries()
486                .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
487                .ok();
488        }
489
490        fail_point!("crash");
491
492        self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
493
494        finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
495
496        // Important: code after the last pipeline stage is finished can run out of checkpoint order.
497
498        ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
499    }
500
501    // On validators, checkpoints have often already been constructed locally, in which
502    // case we can skip many steps of the checkpoint execution process.
503    #[instrument(level = "info", skip_all)]
504    async fn verify_locally_built_checkpoint(
505        &self,
506        checkpoint: VerifiedCheckpoint,
507        pipeline_handle: &mut PipelineHandle,
508    ) -> CheckpointExecutionState {
509        assert!(
510            !checkpoint.is_last_checkpoint_of_epoch(),
511            "only fullnode path has end-of-epoch logic"
512        );
513
514        let sequence_number = checkpoint.sequence_number;
515        let locally_built_checkpoint = self
516            .checkpoint_store
517            .get_locally_computed_checkpoint(sequence_number)
518            .expect("db error");
519
520        let Some(locally_built_checkpoint) = locally_built_checkpoint else {
521            // fall back to tx-by-tx execution path if we are catching up.
522            return self
523                .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
524                .await;
525        };
526
527        self.metrics.checkpoint_executor_validator_path.inc();
528
529        // Check for fork
530        assert_checkpoint_not_forked(
531            &locally_built_checkpoint,
532            &checkpoint,
533            &self.checkpoint_store,
534        );
535
536        // Checkpoint builder triggers accumulation of the checkpoint, so this is guaranteed to finish.
537        let state_hasher = {
538            let _metrics_scope =
539                mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
540            self.epoch_store
541                .notify_read_checkpoint_state_hasher(&[sequence_number])
542                .await
543                .unwrap()
544                .pop()
545                .unwrap()
546        };
547
548        let checkpoint_contents = self
549            .checkpoint_store
550            .get_checkpoint_contents(&checkpoint.content_digest)
551            .expect("db error")
552            .expect("checkpoint contents not found");
553
554        let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
555            .iter()
556            .map(|digests| (digests.transaction, digests.effects))
557            .unzip();
558
559        pipeline_handle
560            .skip_to(PipelineStage::FinalizeTransactions)
561            .await;
562
563        // Currently this code only runs on validators, where this method call does nothing.
564        // But in the future, fullnodes may follow the mysticeti dag and build their own checkpoints.
565        self.insert_finalized_transactions(&tx_digests, sequence_number);
566
567        pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
568
569        CheckpointExecutionState::new_with_global_state_hasher(
570            CheckpointExecutionData {
571                checkpoint,
572                checkpoint_contents,
573                tx_digests,
574                fx_digests,
575            },
576            state_hasher,
577        )
578    }
579
580    #[instrument(level = "info", skip_all)]
581    async fn execute_transactions_from_synced_checkpoint(
582        &self,
583        checkpoint: VerifiedCheckpoint,
584        pipeline_handle: &mut PipelineHandle,
585    ) -> CheckpointExecutionState {
586        let sequence_number = checkpoint.sequence_number;
587        let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
588            let _scope =
589                mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
590            let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
591            let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
592            (ckpt_state, tx_data, unexecuted_tx_digests)
593        };
594
595        finish_stage!(pipeline_handle, ExecuteTransactions);
596
597        {
598            self.transaction_cache_reader
599                .notify_read_executed_effects_digests(
600                    "CheckpointExecutor::notify_read_executed_effects_digests",
601                    &unexecuted_tx_digests,
602                )
603                .await;
604        }
605
606        finish_stage!(pipeline_handle, WaitForTransactions);
607
608        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
609            self.execute_change_epoch_tx(&tx_data).await;
610        }
611
612        // Collect index batches from post-processing and commit atomically.
613        // This must happen AFTER execute_change_epoch_tx (so that all transactions
614        // including the end-of-epoch tx have completed post-processing) and BEFORE
615        // insert_finalized_transactions (so that index data is available when
616        // transactions_executed_in_checkpoint_notify fires).
617        {
618            let mut raw_batches = Vec::new();
619            let mut cache_updates = Vec::new();
620            for tx_digest in &ckpt_state.data.tx_digests {
621                if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
622                    raw_batches.push(raw_batch);
623                    cache_updates.push(cu);
624                }
625            }
626            if !raw_batches.is_empty()
627                && let Some(indexes) = &self.state.indexes
628            {
629                let mut db_batch = indexes.new_db_batch();
630                db_batch
631                    .concat(raw_batches)
632                    .expect("failed to build index batch");
633                indexes
634                    .commit_index_batch(db_batch, cache_updates)
635                    .expect("failed to commit index batch");
636            }
637        }
638
639        let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
640
641        if self.state.is_fullnode(&self.epoch_store) {
642            self.state.congestion_tracker.process_checkpoint_effects(
643                &*self.transaction_cache_reader,
644                &ckpt_state.data.checkpoint,
645                &tx_data.effects,
646            );
647        }
648
649        self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
650
651        // The early versions of the hasher (prior to effectsv2) rely on db
652        // state, so we must wait until all transactions have been executed
653        // before accumulating the checkpoint.
654        ckpt_state.state_hasher = Some(
655            self.global_state_hasher
656                .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
657                .expect("epoch cannot have ended"),
658        );
659
660        finish_stage!(pipeline_handle, FinalizeTransactions);
661
662        ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
663
664        finish_stage!(pipeline_handle, ProcessCheckpointData);
665
666        ckpt_state
667    }
668
669    fn checkpoint_data_enabled(&self) -> bool {
670        self.subscription_service_checkpoint_sender.is_some()
671            || self.state.rpc_index.is_some()
672            || self.config.data_ingestion_dir.is_some()
673    }
674
675    fn insert_finalized_transactions(
676        &self,
677        tx_digests: &[TransactionDigest],
678        sequence_number: CheckpointSequenceNumber,
679    ) {
680        self.epoch_store
681            .insert_finalized_transactions(tx_digests, sequence_number)
682            .expect("failed to insert finalized transactions");
683
684        if self.state.is_fullnode(&self.epoch_store) {
685            // TODO remove once we no longer need to support this table for read RPC
686            self.state
687                .get_checkpoint_cache()
688                .deprecated_insert_finalized_transactions(
689                    tx_digests,
690                    self.epoch_store.epoch(),
691                    sequence_number,
692                );
693        }
694    }
695
696    #[instrument(level = "info", skip_all)]
697    fn process_checkpoint_data(
698        &self,
699        ckpt_data: &CheckpointExecutionData,
700        tx_data: &CheckpointTransactionData,
701    ) -> Option<Checkpoint> {
702        if !self.checkpoint_data_enabled() {
703            return None;
704        }
705
706        let checkpoint = load_checkpoint(
707            ckpt_data,
708            tx_data,
709            self.state.get_object_store(),
710            &*self.transaction_cache_reader,
711        )
712        .expect("failed to load checkpoint data");
713
714        // Index the checkpoint. This is done out of order and is not written
715        // and committed to the DB until later (committing must be done in-order).
716        if let Some(rpc_index) = &self.state.rpc_index {
717            rpc_index.index_checkpoint(&checkpoint);
718        }
719
720        if let Some(path) = &self.config.data_ingestion_dir {
721            store_checkpoint_locally(path, &checkpoint)
722                .expect("failed to store checkpoint locally");
723        }
724
725        Some(checkpoint)
726    }
727
728    // Load all required transaction and effects data for the checkpoint.
729    #[instrument(level = "info", skip_all)]
730    fn load_checkpoint_transactions(
731        &self,
732        checkpoint: VerifiedCheckpoint,
733    ) -> (CheckpointExecutionState, CheckpointTransactionData) {
734        let seq = checkpoint.sequence_number;
735        let epoch = checkpoint.epoch;
736
737        let checkpoint_contents = self
738            .checkpoint_store
739            .get_checkpoint_contents(&checkpoint.content_digest)
740            .expect("db error")
741            .expect("checkpoint contents not found");
742
743        // attempt to load full checkpoint contents in bulk
744        // Tolerate db error in case of data corruption.
745        // We will fall back to loading items one-by-one below in case of error.
746        if let Some(full_contents) = self
747            .checkpoint_store
748            .get_full_checkpoint_contents_by_sequence_number(seq)
749            .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
750            .ok()
751            .flatten()
752            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
753        {
754            let num_txns = full_contents.size();
755            let mut tx_digests = Vec::with_capacity(num_txns);
756            let mut transactions = Vec::with_capacity(num_txns);
757            let mut effects = Vec::with_capacity(num_txns);
758            let mut fx_digests = Vec::with_capacity(num_txns);
759
760            full_contents
761                .into_iter()
762                .zip_debug_eq(checkpoint_contents.iter())
763                .for_each(|(execution_data, digests)| {
764                    let tx_digest = digests.transaction;
765                    let fx_digest = digests.effects;
766                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
767                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
768
769                    tx_digests.push(tx_digest);
770                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
771                        VerifiedTransaction::new_unchecked(execution_data.transaction),
772                        epoch,
773                        seq,
774                    ));
775                    effects.push(execution_data.effects);
776                    fx_digests.push(fx_digest);
777                });
778
779            let executed_fx_digests = self
780                .transaction_cache_reader
781                .multi_get_executed_effects_digests(&tx_digests);
782
783            (
784                CheckpointExecutionState::new(CheckpointExecutionData {
785                    checkpoint,
786                    checkpoint_contents,
787                    tx_digests,
788                    fx_digests,
789                }),
790                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
791            )
792        } else {
793            // load items one-by-one
794            // TODO: If we used RocksDbStore in the executor instead,
795            // all the logic below could be removed.
796
797            let digests = checkpoint_contents.inner();
798
799            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
800                .digests_iter()
801                .map(|d| (d.transaction, d.effects))
802                .unzip();
803            let transactions = self
804                .transaction_cache_reader
805                .multi_get_transaction_blocks(&tx_digests)
806                .into_iter()
807                .enumerate()
808                .map(|(i, tx)| {
809                    let tx = tx
810                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
811                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
812                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
813                })
814                .collect();
815            let effects = self
816                .transaction_cache_reader
817                .multi_get_effects(&fx_digests)
818                .into_iter()
819                .enumerate()
820                .map(|(i, effect)| {
821                    effect.unwrap_or_else(|| {
822                        fatal!("checkpoint effect not found for {:?}", digests[i])
823                    })
824                })
825                .collect();
826
827            let executed_fx_digests = self
828                .transaction_cache_reader
829                .multi_get_executed_effects_digests(&tx_digests);
830
831            (
832                CheckpointExecutionState::new(CheckpointExecutionData {
833                    checkpoint,
834                    checkpoint_contents,
835                    tx_digests,
836                    fx_digests,
837                }),
838                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
839            )
840        }
841    }
842
843    // Schedule all unexecuted transactions in the checkpoint for execution
844    #[instrument(level = "info", skip_all)]
845    fn schedule_transaction_execution(
846        &self,
847        ckpt_state: &CheckpointExecutionState,
848        tx_data: &CheckpointTransactionData,
849    ) -> Vec<TransactionDigest> {
850        let mut barrier_deps_builder = BarrierDependencyBuilder::new();
851
852        // Find unexecuted transactions and their expected effects digests
853        let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
854            izip_debug_eq!(
855                tx_data.transactions.iter(),
856                ckpt_state.data.tx_digests.iter(),
857                ckpt_state.data.fx_digests.iter(),
858                tx_data.effects.iter(),
859                tx_data.executed_fx_digests.iter(),
860                tx_data.accumulator_versions.iter()
861            )
862            .filter_map(
863                |(
864                    txn,
865                    tx_digest,
866                    expected_fx_digest,
867                    effects,
868                    executed_fx_digest,
869                    accumulator_version,
870                )| {
871                    let barrier_deps =
872                        barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
873
874                    if let Some(executed_fx_digest) = executed_fx_digest {
875                        assert_not_forked(
876                            &ckpt_state.data.checkpoint,
877                            tx_digest,
878                            expected_fx_digest,
879                            executed_fx_digest,
880                            &*self.transaction_cache_reader,
881                        );
882                        None
883                    } else if txn.transaction_data().is_end_of_epoch_tx() {
884                        None
885                    } else {
886                        let assigned_versions = self
887                            .epoch_store
888                            .acquire_shared_version_assignments_from_effects(
889                                txn,
890                                effects,
891                                *accumulator_version,
892                                &*self.object_cache_reader,
893                            )
894                            .expect("failed to acquire shared version assignments");
895
896                        let mut env = ExecutionEnv::new()
897                            .with_assigned_versions(assigned_versions)
898                            .with_expected_effects_digest(*expected_fx_digest)
899                            .with_barrier_dependencies(barrier_deps);
900
901                        // Check if the expected effects indicate insufficient balance
902                        if let &ExecutionStatus::Failure(ExecutionFailure {
903                            error: ExecutionErrorKind::InsufficientFundsForWithdraw,
904                            ..
905                        }) = effects.status()
906                        {
907                            env = env.with_insufficient_funds();
908                        }
909
910                        Some((tx_digest, (txn.clone(), env)))
911                    }
912                },
913            ),
914        );
915
916        // Enqueue unexecuted transactions with their expected effects digests
917        self.execution_scheduler
918            .enqueue_transactions(unexecuted_txns, &self.epoch_store);
919
920        unexecuted_tx_digests
921    }
922
923    // Execute the change epoch txn
924    #[instrument(level = "error", skip_all)]
925    async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
926        let change_epoch_tx = tx_data.transactions.last().unwrap();
927        let change_epoch_fx = tx_data.effects.last().unwrap();
928        assert_eq!(
929            change_epoch_tx.digest(),
930            change_epoch_fx.transaction_digest()
931        );
932        assert!(
933            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
934            "final txn must be an end of epoch txn"
935        );
936
937        // Ordinarily we would assert that the change epoch txn has not been executed yet.
938        // However, during crash recovery, it is possible that we already passed this point and
939        // the txn has been executed. You can uncomment this assert if you are debugging a problem
940        // related to reconfig. If you hit this assert and it is not because of crash-recovery,
941        // it may indicate a bug in the checkpoint executor.
942        //
943        //     if self
944        //         .transaction_cache_reader
945        //         .get_executed_effects(change_epoch_tx.digest())
946        //         .is_some()
947        //     {
948        //         fatal!(
949        //             "end of epoch txn must not have been executed: {:?}",
950        //             change_epoch_tx.digest()
951        //         );
952        //     }
953
954        let assigned_versions = self
955            .epoch_store
956            .acquire_shared_version_assignments_from_effects(
957                change_epoch_tx,
958                change_epoch_fx,
959                None,
960                self.object_cache_reader.as_ref(),
961            )
962            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
963
964        info!(
965            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
966            change_epoch_tx.digest(),
967            change_epoch_fx.digest(),
968            assigned_versions
969        );
970        self.execution_scheduler.enqueue_transactions(
971            vec![(
972                change_epoch_tx.clone(),
973                ExecutionEnv::new()
974                    .with_assigned_versions(assigned_versions)
975                    .with_expected_effects_digest(change_epoch_fx.digest()),
976            )],
977            &self.epoch_store,
978        );
979
980        self.transaction_cache_reader
981            .notify_read_executed_effects_digests(
982                "CheckpointExecutor::notify_read_advance_epoch_tx",
983                &[*change_epoch_tx.digest()],
984            )
985            .await;
986    }
987
988    // Increment the highest executed checkpoint watermark and prune old full-checkpoint contents
989    #[instrument(level = "debug", skip_all)]
990    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
991        // Ensure that we are not skipping checkpoints at any point
992        let seq = *checkpoint.sequence_number();
993        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
994        if let Some(prev_highest) = self
995            .checkpoint_store
996            .get_highest_executed_checkpoint_seq_number()
997            .unwrap()
998        {
999            assert_eq!(prev_highest + 1, seq);
1000        } else {
1001            assert_eq!(seq, 0);
1002        }
1003        if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1004            info!("Finished syncing and executing checkpoint {}", seq);
1005        }
1006
1007        fail_point!("highest-executed-checkpoint");
1008
1009        // We store a fixed number of additional FullCheckpointContents after execution is complete
1010        // for use in state sync.
1011        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1012        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1013            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1014            if let Some(prune_checkpoint) = self
1015                .checkpoint_store
1016                .get_checkpoint_by_sequence_number(prune_seq)
1017                .expect("Failed to fetch checkpoint")
1018            {
1019                self.checkpoint_store
1020                    .delete_full_checkpoint_contents(prune_seq)
1021                    .expect("Failed to delete full checkpoint contents");
1022                self.checkpoint_store
1023                    .delete_contents_digest_sequence_number_mapping(
1024                        &prune_checkpoint.content_digest,
1025                    )
1026                    .expect("Failed to delete contents digest -> sequence number mapping");
1027            } else {
1028                // If this is directly after a snapshot restore with skiplisting,
1029                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
1030                // checkpoints.
1031                debug!(
1032                    "Failed to fetch checkpoint with sequence number {:?}",
1033                    prune_seq
1034                );
1035            }
1036        }
1037
1038        self.checkpoint_store
1039            .update_highest_executed_checkpoint(checkpoint)
1040            .unwrap();
1041        self.metrics.last_executed_checkpoint.set(seq as i64);
1042
1043        self.metrics
1044            .last_executed_checkpoint_timestamp_ms
1045            .set(checkpoint.timestamp_ms as i64);
1046        checkpoint.report_checkpoint_age(
1047            &self.metrics.last_executed_checkpoint_age,
1048            &self.metrics.last_executed_checkpoint_age_ms,
1049        );
1050    }
1051
1052    /// If configured, commit the pending index updates for the provided checkpoint as well as
1053    /// enqueuing the checkpoint to the subscription service
1054    #[instrument(level = "info", skip_all)]
1055    async fn commit_index_updates_and_enqueue_to_subscription_service(
1056        &self,
1057        checkpoint: Checkpoint,
1058    ) {
1059        if let Some(rpc_index) = &self.state.rpc_index {
1060            rpc_index
1061                .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1062                .expect("failed to update rpc_indexes");
1063        }
1064
1065        if let Some(sender) = &self.subscription_service_checkpoint_sender
1066            && let Err(e) = sender.send(checkpoint).await
1067        {
1068            warn!("unable to send checkpoint to subscription service: {e}");
1069        }
1070    }
1071
1072    // Extract randomness rounds from the checkpoint version-specific data (if available).
1073    // Otherwise, extract randomness rounds from the first transaction in the checkpoint
1074    #[instrument(level = "debug", skip_all)]
1075    fn extract_randomness_rounds(
1076        &self,
1077        checkpoint: &VerifiedCheckpoint,
1078        checkpoint_contents: &CheckpointContents,
1079    ) -> Vec<RandomnessRound> {
1080        if let Some(version_specific_data) = checkpoint
1081            .version_specific_data(self.epoch_store.protocol_config())
1082            .expect("unable to get version_specific_data")
1083        {
1084            // With version-specific data, randomness rounds are stored in checkpoint summary.
1085            version_specific_data.into_v1().randomness_rounds
1086        } else {
1087            // Before version-specific data, checkpoint batching must be disabled. In this case,
1088            // randomness state update tx must be first if it exists, because all other
1089            // transactions in a checkpoint that includes a randomness state update are causally
1090            // dependent on it.
1091            assert_eq!(
1092                0,
1093                self.epoch_store
1094                    .protocol_config()
1095                    .min_checkpoint_interval_ms_as_option()
1096                    .unwrap_or_default(),
1097            );
1098            if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1099                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1100                .unwrap_or_else(||
1101                    fatal!(
1102                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1103                        checkpoint.sequence_number()
1104                    )
1105                );
1106                if let TransactionKind::RandomnessStateUpdate(rsu) =
1107                    maybe_randomness_tx.data().transaction_data().kind()
1108                {
1109                    vec![rsu.randomness_round]
1110                } else {
1111                    Vec::new()
1112                }
1113            } else {
1114                Vec::new()
1115            }
1116        }
1117    }
1118}