sui_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CheckpointExecutor is a Node component that executes all checkpoints for the
5//! given epoch. It acts as a Consumer to StateSync
6//! for newly synced checkpoints, taking these checkpoints and
7//! scheduling and monitoring their execution. Its primary goal is to allow
8//! for catching up to the current checkpoint sequence number of the network
9//! as quickly as possible so that a newly joined, or recovering Node can
10//! participate in a timely manner. To that end, CheckpointExecutor attempts
11//! to saturate the CPU with executor tasks (one per checkpoint), each of which
12//! handle scheduling and awaiting checkpoint transaction execution.
13//!
14//! CheckpointExecutor is made recoverable in the event of Node shutdown by way of a watermark,
15//! highest_executed_checkpoint, which is guaranteed to be updated sequentially in order,
16//! despite checkpoints themselves potentially being executed nonsequentially and in parallel.
17//! CheckpointExecutor parallelizes checkpoints of the same epoch as much as possible.
18//! CheckpointExecutor enforces the invariant that if `run` returns successfully, we have reached the
19//! end of epoch. This allows us to use it as a signal for reconfig.
20
21use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::crypto::RandomnessRound;
28use sui_types::inner_temporary_store::PackageStoreWithFallback;
29use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
30use sui_types::transaction::{TransactionDataAPI, TransactionKind};
31
32use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
33use sui_macros::fail_point;
34use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
35use sui_types::executable_transaction::VerifiedExecutableTransaction;
36use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
37use sui_types::full_checkpoint_content::Checkpoint;
38use sui_types::global_state_hash::GlobalStateHash;
39use sui_types::message_envelope::Message;
40use sui_types::{
41    base_types::{TransactionDigest, TransactionEffectsDigest},
42    messages_checkpoint::VerifiedCheckpoint,
43    transaction::VerifiedTransaction,
44};
45use tap::{TapFallible, TapOptional};
46use tracing::{debug, info, instrument, warn};
47
48use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
49use crate::authority::backpressure::BackpressureManager;
50use crate::authority::{AuthorityState, ExecutionEnv};
51use crate::execution_scheduler::ExecutionScheduler;
52use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
53use crate::global_state_hasher::GlobalStateHasher;
54use crate::{
55    checkpoints::CheckpointStore,
56    execution_cache::{ObjectCacheRead, TransactionCacheRead},
57};
58
59mod data_ingestion_handler;
60pub mod metrics;
61pub(crate) mod utils;
62
63use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
64use metrics::CheckpointExecutorMetrics;
65use utils::*;
66
67const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
68
69#[derive(PartialEq, Eq, Debug)]
70pub enum StopReason {
71    EpochComplete,
72    RunWithRangeCondition,
73}
74
75pub(crate) struct CheckpointExecutionData {
76    pub checkpoint: VerifiedCheckpoint,
77    pub checkpoint_contents: CheckpointContents,
78    pub tx_digests: Vec<TransactionDigest>,
79    pub fx_digests: Vec<TransactionEffectsDigest>,
80}
81
82pub(crate) struct CheckpointTransactionData {
83    pub transactions: Vec<VerifiedExecutableTransaction>,
84    pub effects: Vec<TransactionEffects>,
85    pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
86    /// The accumulator versions for the transactions in the checkpoint.
87    /// None only if accumulator is not enabled (either all Some, or all None).
88    /// This information is needed for object balance withdraw processing.
89    /// The vector should be 1:1 with the transactions in the checkpoint.
90    pub accumulator_versions: Vec<Option<SequenceNumber>>,
91}
92
93impl CheckpointTransactionData {
94    pub fn new(
95        transactions: Vec<VerifiedExecutableTransaction>,
96        effects: Vec<TransactionEffects>,
97        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
98    ) -> Self {
99        assert_eq!(transactions.len(), effects.len());
100        assert_eq!(transactions.len(), executed_fx_digests.len());
101        let mut accumulator_versions = vec![None; transactions.len()];
102        let mut next_update_index = 0;
103        for (idx, efx) in effects.iter().enumerate() {
104            // Only barrier settlement transactions mutate the accumulator root object.
105            // This filtering detects whether this transaction is a barrier settlement transaction.
106            // And if so we get the old version of the accumulator root object.
107            // Transactions prior to the barrier settlement transaction reads this accumulator version.
108            let acc_version = efx.object_changes().into_iter().find_map(|change| {
109                if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
110                    change.input_version
111                } else {
112                    None
113                }
114            });
115            if let Some(acc_version) = acc_version {
116                // Set version for transactions between [next_update_index, idx] inclusive.
117                for slot in accumulator_versions
118                    .iter_mut()
119                    .take(idx + 1)
120                    .skip(next_update_index)
121                {
122                    *slot = Some(acc_version);
123                }
124                next_update_index = idx + 1;
125            }
126        }
127        // Either accumulator is not enabled, then next_update_index == 0;
128        // or the last transaction is the barrier settlement transaction, and next_update_index == transactions.len();
129        // or the last transaction is the end of epoch transaction, and next_update_index == transactions.len() - 1.
130        assert!(
131            next_update_index == 0
132                || next_update_index == transactions.len()
133                || (next_update_index == transactions.len() - 1
134                    && transactions
135                        .last()
136                        .unwrap()
137                        .transaction_data()
138                        .is_end_of_epoch_tx())
139        );
140        Self {
141            transactions,
142            effects,
143            executed_fx_digests,
144            accumulator_versions,
145        }
146    }
147}
148pub(crate) struct CheckpointExecutionState {
149    pub data: CheckpointExecutionData,
150
151    state_hasher: Option<GlobalStateHash>,
152    full_data: Option<Checkpoint>,
153}
154
155impl CheckpointExecutionState {
156    pub fn new(data: CheckpointExecutionData) -> Self {
157        Self {
158            data,
159            state_hasher: None,
160            full_data: None,
161        }
162    }
163
164    pub fn new_with_global_state_hasher(
165        data: CheckpointExecutionData,
166        hasher: GlobalStateHash,
167    ) -> Self {
168        Self {
169            data,
170            state_hasher: Some(hasher),
171            full_data: None,
172        }
173    }
174}
175
176macro_rules! finish_stage {
177    ($handle:expr, $stage:ident) => {
178        $handle.finish_stage(PipelineStage::$stage).await;
179    };
180}
181
182pub struct CheckpointExecutor {
183    epoch_store: Arc<AuthorityPerEpochStore>,
184    state: Arc<AuthorityState>,
185    // TODO: We should use RocksDbStore in the executor
186    // to consolidate DB accesses.
187    checkpoint_store: Arc<CheckpointStore>,
188    object_cache_reader: Arc<dyn ObjectCacheRead>,
189    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
190    execution_scheduler: Arc<ExecutionScheduler>,
191    global_state_hasher: Arc<GlobalStateHasher>,
192    backpressure_manager: Arc<BackpressureManager>,
193    config: CheckpointExecutorConfig,
194    metrics: Arc<CheckpointExecutorMetrics>,
195    tps_estimator: Mutex<TPSEstimator>,
196    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
197}
198
199impl CheckpointExecutor {
200    pub fn new(
201        epoch_store: Arc<AuthorityPerEpochStore>,
202        checkpoint_store: Arc<CheckpointStore>,
203        state: Arc<AuthorityState>,
204        global_state_hasher: Arc<GlobalStateHasher>,
205        backpressure_manager: Arc<BackpressureManager>,
206        config: CheckpointExecutorConfig,
207        metrics: Arc<CheckpointExecutorMetrics>,
208        subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
209    ) -> Self {
210        Self {
211            epoch_store,
212            state: state.clone(),
213            checkpoint_store,
214            object_cache_reader: state.get_object_cache_reader().clone(),
215            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
216            execution_scheduler: state.execution_scheduler().clone(),
217            global_state_hasher,
218            backpressure_manager,
219            config,
220            metrics,
221            tps_estimator: Mutex::new(TPSEstimator::default()),
222            subscription_service_checkpoint_sender,
223        }
224    }
225
226    pub fn new_for_tests(
227        epoch_store: Arc<AuthorityPerEpochStore>,
228        checkpoint_store: Arc<CheckpointStore>,
229        state: Arc<AuthorityState>,
230        state_hasher: Arc<GlobalStateHasher>,
231    ) -> Self {
232        Self::new(
233            epoch_store,
234            checkpoint_store,
235            state,
236            state_hasher,
237            BackpressureManager::new_for_tests(),
238            Default::default(),
239            CheckpointExecutorMetrics::new_for_tests(),
240            None,
241        )
242    }
243
244    // Gets the next checkpoint to schedule for execution. If the epoch is already
245    // completed, returns None.
246    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
247        // Decide the first checkpoint to schedule for execution.
248        // If we haven't executed anything in the past, we schedule checkpoint 0.
249        // Otherwise we schedule the one after highest executed.
250        let highest_executed = self
251            .checkpoint_store
252            .get_highest_executed_checkpoint()
253            .unwrap();
254
255        if let Some(highest_executed) = &highest_executed
256            && self.epoch_store.epoch() == highest_executed.epoch()
257            && highest_executed.is_last_checkpoint_of_epoch()
258        {
259            // We can arrive at this point if we bump the highest_executed_checkpoint watermark, and then
260            // crash before completing reconfiguration.
261            info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
262            return None;
263        }
264
265        Some(
266            highest_executed
267                .as_ref()
268                .map(|c| c.sequence_number() + 1)
269                .unwrap_or_else(|| {
270                    // TODO this invariant may no longer hold once we introduce snapshots
271                    assert_eq!(self.epoch_store.epoch(), 0);
272                    // we need to execute the genesis checkpoint
273                    0
274                }),
275        )
276    }
277
278    /// Execute all checkpoints for the current epoch, ensuring that the node has not
279    /// forked, and return when finished.
280    /// If `run_with_range` is set, execution will stop early.
281    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
282    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
283        let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
284        info!(?run_with_range, "CheckpointExecutor::run_epoch");
285        debug!(
286            "Checkpoint executor running for epoch {:?}",
287            self.epoch_store.epoch(),
288        );
289
290        // check if we want to run this epoch based on RunWithRange condition value
291        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
292        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
293        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
294            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
295            return StopReason::RunWithRangeCondition;
296        };
297
298        self.metrics
299            .checkpoint_exec_epoch
300            .set(self.epoch_store.epoch() as i64);
301
302        let Some(next_to_schedule) = self.get_next_to_schedule() else {
303            return StopReason::EpochComplete;
304        };
305
306        let this = Arc::new(self);
307
308        let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
309            .ok()
310            .and_then(|s| s.parse().ok())
311            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
312
313        let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
314
315        let final_checkpoint_executed = stream_synced_checkpoints(
316            this.checkpoint_store.clone(),
317            next_to_schedule,
318            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
319        )
320        // Checkpoint loading and execution is parallelized
321        .map(|checkpoint| {
322            let this = this.clone();
323            let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
324            async move {
325                let pipeline_handle = pipeline_handle.await;
326                tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
327                    .await
328                    .unwrap()
329            }
330        })
331        .buffered(concurrency)
332        // Take the last value from the stream to determine if we completed the epoch
333        .fold(false, |state, is_final_checkpoint| async move {
334            assert!(
335                !state,
336                "fold can't be called again after the final checkpoint"
337            );
338            is_final_checkpoint
339        })
340        .await;
341
342        if final_checkpoint_executed {
343            StopReason::EpochComplete
344        } else {
345            StopReason::RunWithRangeCondition
346        }
347    }
348}
349
350impl CheckpointExecutor {
351    /// Load all data for a checkpoint, ensure all transactions are executed, and check for forks.
352    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
353    async fn execute_checkpoint(
354        self: Arc<Self>,
355        checkpoint: VerifiedCheckpoint,
356        mut pipeline_handle: PipelineHandle,
357    ) -> bool /* is final checkpoint */ {
358        info!("executing checkpoint");
359        let sequence_number = checkpoint.sequence_number;
360
361        checkpoint.report_checkpoint_age(
362            &self.metrics.checkpoint_contents_age,
363            &self.metrics.checkpoint_contents_age_ms,
364        );
365        self.backpressure_manager
366            .update_highest_certified_checkpoint(sequence_number);
367
368        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
369            let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
370                "CheckpointExecutor::wait_for_previous_checkpoints",
371            );
372
373            info!(
374                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
375            );
376            self.checkpoint_store
377                .notify_read_executed_checkpoint(sequence_number - 1)
378                .await;
379        }
380
381        let _parallel_step_guard =
382            mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
383
384        // Note: only `execute_transactions_from_synced_checkpoint` has end-of-epoch logic.
385        let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
386            || checkpoint.is_last_checkpoint_of_epoch()
387        {
388            self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
389                .await
390        } else {
391            self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
392                .await
393        };
394
395        let tps = self.tps_estimator.lock().update(
396            Instant::now(),
397            ckpt_state.data.checkpoint.network_total_transactions,
398        );
399        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
400
401        self.backpressure_manager
402            .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
403
404        let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
405
406        let seq = ckpt_state.data.checkpoint.sequence_number;
407
408        let batch = self
409            .state
410            .get_cache_commit()
411            .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
412
413        finish_stage!(pipeline_handle, BuildDbBatch);
414
415        let object_funds_checker = self.state.object_funds_checker.load();
416        if let Some(object_funds_checker) = object_funds_checker.as_ref() {
417            object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
418        }
419
420        let mut ckpt_state = tokio::task::spawn_blocking({
421            let this = self.clone();
422            move || {
423                // Commit all transaction effects to disk
424                let cache_commit = this.state.get_cache_commit();
425                debug!(?seq, "committing checkpoint transactions to disk");
426                cache_commit.commit_transaction_outputs(
427                    this.epoch_store.epoch(),
428                    batch,
429                    &ckpt_state.data.tx_digests,
430                );
431                ckpt_state
432            }
433        })
434        .await
435        .unwrap();
436
437        finish_stage!(pipeline_handle, CommitTransactionOutputs);
438
439        self.epoch_store
440            .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
441            .expect("cannot fail");
442
443        let randomness_rounds = self.extract_randomness_rounds(
444            &ckpt_state.data.checkpoint,
445            &ckpt_state.data.checkpoint_contents,
446        );
447
448        // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
449        // been successfully included in a checkpoint certified by quorum of validators.
450        // (RandomnessManager/RandomnessReporter is only present on validators.)
451        if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
452            for round in randomness_rounds {
453                debug!(
454                    ?round,
455                    "notifying RandomnessReporter that randomness update was executed in checkpoint"
456                );
457                randomness_reporter
458                    .notify_randomness_in_checkpoint(round)
459                    .expect("epoch cannot have ended");
460            }
461        }
462
463        finish_stage!(pipeline_handle, FinalizeCheckpoint);
464
465        if let Some(checkpoint_data) = ckpt_state.full_data.take() {
466            self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
467                .await;
468        }
469
470        finish_stage!(pipeline_handle, UpdateRpcIndex);
471
472        self.global_state_hasher
473            .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
474            .expect("Failed to accumulate running root");
475
476        if is_final_checkpoint {
477            self.checkpoint_store
478                .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
479                .expect("Failed to insert epoch last checkpoint");
480
481            self.global_state_hasher
482                .accumulate_epoch(self.epoch_store.clone(), seq)
483                .expect("Accumulating epoch cannot fail");
484
485            self.checkpoint_store
486                .prune_local_summaries()
487                .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
488                .ok();
489        }
490
491        fail_point!("crash");
492
493        self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
494
495        finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
496
497        // Important: code after the last pipeline stage is finished can run out of checkpoint order.
498
499        ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
500    }
501
502    // On validators, checkpoints have often already been constructed locally, in which
503    // case we can skip many steps of the checkpoint execution process.
504    #[instrument(level = "info", skip_all)]
505    async fn verify_locally_built_checkpoint(
506        &self,
507        checkpoint: VerifiedCheckpoint,
508        pipeline_handle: &mut PipelineHandle,
509    ) -> CheckpointExecutionState {
510        assert!(
511            !checkpoint.is_last_checkpoint_of_epoch(),
512            "only fullnode path has end-of-epoch logic"
513        );
514
515        let sequence_number = checkpoint.sequence_number;
516        let locally_built_checkpoint = self
517            .checkpoint_store
518            .get_locally_computed_checkpoint(sequence_number)
519            .expect("db error");
520
521        let Some(locally_built_checkpoint) = locally_built_checkpoint else {
522            // fall back to tx-by-tx execution path if we are catching up.
523            return self
524                .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
525                .await;
526        };
527
528        self.metrics.checkpoint_executor_validator_path.inc();
529
530        // Check for fork
531        assert_checkpoint_not_forked(
532            &locally_built_checkpoint,
533            &checkpoint,
534            &self.checkpoint_store,
535        );
536
537        // Checkpoint builder triggers accumulation of the checkpoint, so this is guaranteed to finish.
538        let state_hasher = {
539            let _metrics_scope =
540                mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
541            self.epoch_store
542                .notify_read_checkpoint_state_hasher(&[sequence_number])
543                .await
544                .unwrap()
545                .pop()
546                .unwrap()
547        };
548
549        let checkpoint_contents = self
550            .checkpoint_store
551            .get_checkpoint_contents(&checkpoint.content_digest)
552            .expect("db error")
553            .expect("checkpoint contents not found");
554
555        let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
556            .iter()
557            .map(|digests| (digests.transaction, digests.effects))
558            .unzip();
559
560        pipeline_handle
561            .skip_to(PipelineStage::FinalizeTransactions)
562            .await;
563
564        // Currently this code only runs on validators, where this method call does nothing.
565        // But in the future, fullnodes may follow the mysticeti dag and build their own checkpoints.
566        self.insert_finalized_transactions(&tx_digests, sequence_number);
567
568        pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
569
570        CheckpointExecutionState::new_with_global_state_hasher(
571            CheckpointExecutionData {
572                checkpoint,
573                checkpoint_contents,
574                tx_digests,
575                fx_digests,
576            },
577            state_hasher,
578        )
579    }
580
581    #[instrument(level = "info", skip_all)]
582    async fn execute_transactions_from_synced_checkpoint(
583        &self,
584        checkpoint: VerifiedCheckpoint,
585        pipeline_handle: &mut PipelineHandle,
586    ) -> CheckpointExecutionState {
587        let sequence_number = checkpoint.sequence_number;
588        let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
589            let _scope =
590                mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
591            let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
592            let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
593            (ckpt_state, tx_data, unexecuted_tx_digests)
594        };
595
596        finish_stage!(pipeline_handle, ExecuteTransactions);
597
598        {
599            self.transaction_cache_reader
600                .notify_read_executed_effects_digests(
601                    "CheckpointExecutor::notify_read_executed_effects_digests",
602                    &unexecuted_tx_digests,
603                )
604                .await;
605        }
606
607        finish_stage!(pipeline_handle, WaitForTransactions);
608
609        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
610            self.execute_change_epoch_tx(&tx_data).await;
611        }
612
613        // Collect index batches from post-processing and commit atomically.
614        // This must happen AFTER execute_change_epoch_tx (so that all transactions
615        // including the end-of-epoch tx have completed post-processing) and BEFORE
616        // insert_finalized_transactions (so that index data is available when
617        // transactions_executed_in_checkpoint_notify fires).
618        {
619            let mut raw_batches = Vec::new();
620            let mut cache_updates = Vec::new();
621            for tx_digest in &ckpt_state.data.tx_digests {
622                if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
623                    raw_batches.push(raw_batch);
624                    cache_updates.push(cu);
625                }
626            }
627            if !raw_batches.is_empty()
628                && let Some(indexes) = &self.state.indexes
629            {
630                let mut db_batch = indexes.new_db_batch();
631                db_batch
632                    .concat(raw_batches)
633                    .expect("failed to build index batch");
634                indexes
635                    .commit_index_batch(db_batch, cache_updates)
636                    .expect("failed to commit index batch");
637            }
638        }
639
640        let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
641
642        if self.state.is_fullnode(&self.epoch_store) {
643            self.state.congestion_tracker.process_checkpoint_effects(
644                &*self.transaction_cache_reader,
645                &ckpt_state.data.checkpoint,
646                &tx_data.effects,
647            );
648        }
649
650        self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
651
652        // The early versions of the hasher (prior to effectsv2) rely on db
653        // state, so we must wait until all transactions have been executed
654        // before accumulating the checkpoint.
655        ckpt_state.state_hasher = Some(
656            self.global_state_hasher
657                .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
658                .expect("epoch cannot have ended"),
659        );
660
661        finish_stage!(pipeline_handle, FinalizeTransactions);
662
663        ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
664
665        finish_stage!(pipeline_handle, ProcessCheckpointData);
666
667        ckpt_state
668    }
669
670    fn checkpoint_data_enabled(&self) -> bool {
671        self.subscription_service_checkpoint_sender.is_some()
672            || self.state.rpc_index.is_some()
673            || self.config.data_ingestion_dir.is_some()
674    }
675
676    fn insert_finalized_transactions(
677        &self,
678        tx_digests: &[TransactionDigest],
679        sequence_number: CheckpointSequenceNumber,
680    ) {
681        self.epoch_store
682            .insert_finalized_transactions(tx_digests, sequence_number)
683            .expect("failed to insert finalized transactions");
684
685        if self.state.is_fullnode(&self.epoch_store) {
686            // TODO remove once we no longer need to support this table for read RPC
687            self.state
688                .get_checkpoint_cache()
689                .deprecated_insert_finalized_transactions(
690                    tx_digests,
691                    self.epoch_store.epoch(),
692                    sequence_number,
693                );
694        }
695    }
696
697    #[instrument(level = "info", skip_all)]
698    fn process_checkpoint_data(
699        &self,
700        ckpt_data: &CheckpointExecutionData,
701        tx_data: &CheckpointTransactionData,
702    ) -> Option<Checkpoint> {
703        if !self.checkpoint_data_enabled() {
704            return None;
705        }
706
707        let checkpoint = load_checkpoint(
708            ckpt_data,
709            tx_data,
710            self.state.get_object_store(),
711            &*self.transaction_cache_reader,
712        )
713        .expect("failed to load checkpoint data");
714
715        if self.state.rpc_index.is_some() || self.config.data_ingestion_dir.is_some() {
716            let checkpoint_data = checkpoint.clone().into();
717            // Index the checkpoint. this is done out of order and is not written and committed to the
718            // DB until later (committing must be done in-order)
719            if let Some(rpc_index) = &self.state.rpc_index {
720                let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
721                    self.epoch_store.protocol_config(),
722                    Box::new(PackageStoreWithFallback::new(
723                        self.state.get_backing_package_store(),
724                        &checkpoint_data,
725                    )),
726                );
727
728                rpc_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
729            }
730
731            if let Some(path) = &self.config.data_ingestion_dir {
732                store_checkpoint_locally(path, &checkpoint_data)
733                    .expect("failed to store checkpoint locally");
734            }
735        }
736
737        Some(checkpoint)
738    }
739
740    // Load all required transaction and effects data for the checkpoint.
741    #[instrument(level = "info", skip_all)]
742    fn load_checkpoint_transactions(
743        &self,
744        checkpoint: VerifiedCheckpoint,
745    ) -> (CheckpointExecutionState, CheckpointTransactionData) {
746        let seq = checkpoint.sequence_number;
747        let epoch = checkpoint.epoch;
748
749        let checkpoint_contents = self
750            .checkpoint_store
751            .get_checkpoint_contents(&checkpoint.content_digest)
752            .expect("db error")
753            .expect("checkpoint contents not found");
754
755        // attempt to load full checkpoint contents in bulk
756        // Tolerate db error in case of data corruption.
757        // We will fall back to loading items one-by-one below in case of error.
758        if let Some(full_contents) = self
759            .checkpoint_store
760            .get_full_checkpoint_contents_by_sequence_number(seq)
761            .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
762            .ok()
763            .flatten()
764            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
765        {
766            let num_txns = full_contents.size();
767            let mut tx_digests = Vec::with_capacity(num_txns);
768            let mut transactions = Vec::with_capacity(num_txns);
769            let mut effects = Vec::with_capacity(num_txns);
770            let mut fx_digests = Vec::with_capacity(num_txns);
771
772            full_contents
773                .into_iter()
774                .zip_debug_eq(checkpoint_contents.iter())
775                .for_each(|(execution_data, digests)| {
776                    let tx_digest = digests.transaction;
777                    let fx_digest = digests.effects;
778                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
779                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
780
781                    tx_digests.push(tx_digest);
782                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
783                        VerifiedTransaction::new_unchecked(execution_data.transaction),
784                        epoch,
785                        seq,
786                    ));
787                    effects.push(execution_data.effects);
788                    fx_digests.push(fx_digest);
789                });
790
791            let executed_fx_digests = self
792                .transaction_cache_reader
793                .multi_get_executed_effects_digests(&tx_digests);
794
795            (
796                CheckpointExecutionState::new(CheckpointExecutionData {
797                    checkpoint,
798                    checkpoint_contents,
799                    tx_digests,
800                    fx_digests,
801                }),
802                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
803            )
804        } else {
805            // load items one-by-one
806            // TODO: If we used RocksDbStore in the executor instead,
807            // all the logic below could be removed.
808
809            let digests = checkpoint_contents.inner();
810
811            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
812                .digests_iter()
813                .map(|d| (d.transaction, d.effects))
814                .unzip();
815            let transactions = self
816                .transaction_cache_reader
817                .multi_get_transaction_blocks(&tx_digests)
818                .into_iter()
819                .enumerate()
820                .map(|(i, tx)| {
821                    let tx = tx
822                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
823                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
824                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
825                })
826                .collect();
827            let effects = self
828                .transaction_cache_reader
829                .multi_get_effects(&fx_digests)
830                .into_iter()
831                .enumerate()
832                .map(|(i, effect)| {
833                    effect.unwrap_or_else(|| {
834                        fatal!("checkpoint effect not found for {:?}", digests[i])
835                    })
836                })
837                .collect();
838
839            let executed_fx_digests = self
840                .transaction_cache_reader
841                .multi_get_executed_effects_digests(&tx_digests);
842
843            (
844                CheckpointExecutionState::new(CheckpointExecutionData {
845                    checkpoint,
846                    checkpoint_contents,
847                    tx_digests,
848                    fx_digests,
849                }),
850                CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
851            )
852        }
853    }
854
855    // Schedule all unexecuted transactions in the checkpoint for execution
856    #[instrument(level = "info", skip_all)]
857    fn schedule_transaction_execution(
858        &self,
859        ckpt_state: &CheckpointExecutionState,
860        tx_data: &CheckpointTransactionData,
861    ) -> Vec<TransactionDigest> {
862        let mut barrier_deps_builder = BarrierDependencyBuilder::new();
863
864        // Find unexecuted transactions and their expected effects digests
865        let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
866            izip_debug_eq!(
867                tx_data.transactions.iter(),
868                ckpt_state.data.tx_digests.iter(),
869                ckpt_state.data.fx_digests.iter(),
870                tx_data.effects.iter(),
871                tx_data.executed_fx_digests.iter(),
872                tx_data.accumulator_versions.iter()
873            )
874            .filter_map(
875                |(
876                    txn,
877                    tx_digest,
878                    expected_fx_digest,
879                    effects,
880                    executed_fx_digest,
881                    accumulator_version,
882                )| {
883                    let barrier_deps =
884                        barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
885
886                    if let Some(executed_fx_digest) = executed_fx_digest {
887                        assert_not_forked(
888                            &ckpt_state.data.checkpoint,
889                            tx_digest,
890                            expected_fx_digest,
891                            executed_fx_digest,
892                            &*self.transaction_cache_reader,
893                        );
894                        None
895                    } else if txn.transaction_data().is_end_of_epoch_tx() {
896                        None
897                    } else {
898                        let assigned_versions = self
899                            .epoch_store
900                            .acquire_shared_version_assignments_from_effects(
901                                txn,
902                                effects,
903                                *accumulator_version,
904                                &*self.object_cache_reader,
905                            )
906                            .expect("failed to acquire shared version assignments");
907
908                        let mut env = ExecutionEnv::new()
909                            .with_assigned_versions(assigned_versions)
910                            .with_expected_effects_digest(*expected_fx_digest)
911                            .with_barrier_dependencies(barrier_deps);
912
913                        // Check if the expected effects indicate insufficient balance
914                        if let &ExecutionStatus::Failure(ExecutionFailure {
915                            error: ExecutionErrorKind::InsufficientFundsForWithdraw,
916                            ..
917                        }) = effects.status()
918                        {
919                            env = env.with_insufficient_funds();
920                        }
921
922                        Some((tx_digest, (txn.clone(), env)))
923                    }
924                },
925            ),
926        );
927
928        // Enqueue unexecuted transactions with their expected effects digests
929        self.execution_scheduler
930            .enqueue_transactions(unexecuted_txns, &self.epoch_store);
931
932        unexecuted_tx_digests
933    }
934
935    // Execute the change epoch txn
936    #[instrument(level = "error", skip_all)]
937    async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
938        let change_epoch_tx = tx_data.transactions.last().unwrap();
939        let change_epoch_fx = tx_data.effects.last().unwrap();
940        assert_eq!(
941            change_epoch_tx.digest(),
942            change_epoch_fx.transaction_digest()
943        );
944        assert!(
945            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
946            "final txn must be an end of epoch txn"
947        );
948
949        // Ordinarily we would assert that the change epoch txn has not been executed yet.
950        // However, during crash recovery, it is possible that we already passed this point and
951        // the txn has been executed. You can uncomment this assert if you are debugging a problem
952        // related to reconfig. If you hit this assert and it is not because of crash-recovery,
953        // it may indicate a bug in the checkpoint executor.
954        //
955        //     if self
956        //         .transaction_cache_reader
957        //         .get_executed_effects(change_epoch_tx.digest())
958        //         .is_some()
959        //     {
960        //         fatal!(
961        //             "end of epoch txn must not have been executed: {:?}",
962        //             change_epoch_tx.digest()
963        //         );
964        //     }
965
966        let assigned_versions = self
967            .epoch_store
968            .acquire_shared_version_assignments_from_effects(
969                change_epoch_tx,
970                change_epoch_fx,
971                None,
972                self.object_cache_reader.as_ref(),
973            )
974            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
975
976        info!(
977            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
978            change_epoch_tx.digest(),
979            change_epoch_fx.digest(),
980            assigned_versions
981        );
982        self.execution_scheduler.enqueue_transactions(
983            vec![(
984                change_epoch_tx.clone(),
985                ExecutionEnv::new()
986                    .with_assigned_versions(assigned_versions)
987                    .with_expected_effects_digest(change_epoch_fx.digest()),
988            )],
989            &self.epoch_store,
990        );
991
992        self.transaction_cache_reader
993            .notify_read_executed_effects_digests(
994                "CheckpointExecutor::notify_read_advance_epoch_tx",
995                &[*change_epoch_tx.digest()],
996            )
997            .await;
998    }
999
1000    // Increment the highest executed checkpoint watermark and prune old full-checkpoint contents
1001    #[instrument(level = "debug", skip_all)]
1002    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
1003        // Ensure that we are not skipping checkpoints at any point
1004        let seq = *checkpoint.sequence_number();
1005        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
1006        if let Some(prev_highest) = self
1007            .checkpoint_store
1008            .get_highest_executed_checkpoint_seq_number()
1009            .unwrap()
1010        {
1011            assert_eq!(prev_highest + 1, seq);
1012        } else {
1013            assert_eq!(seq, 0);
1014        }
1015        if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1016            info!("Finished syncing and executing checkpoint {}", seq);
1017        }
1018
1019        fail_point!("highest-executed-checkpoint");
1020
1021        // We store a fixed number of additional FullCheckpointContents after execution is complete
1022        // for use in state sync.
1023        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1024        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1025            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1026            if let Some(prune_checkpoint) = self
1027                .checkpoint_store
1028                .get_checkpoint_by_sequence_number(prune_seq)
1029                .expect("Failed to fetch checkpoint")
1030            {
1031                self.checkpoint_store
1032                    .delete_full_checkpoint_contents(prune_seq)
1033                    .expect("Failed to delete full checkpoint contents");
1034                self.checkpoint_store
1035                    .delete_contents_digest_sequence_number_mapping(
1036                        &prune_checkpoint.content_digest,
1037                    )
1038                    .expect("Failed to delete contents digest -> sequence number mapping");
1039            } else {
1040                // If this is directly after a snapshot restore with skiplisting,
1041                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
1042                // checkpoints.
1043                debug!(
1044                    "Failed to fetch checkpoint with sequence number {:?}",
1045                    prune_seq
1046                );
1047            }
1048        }
1049
1050        self.checkpoint_store
1051            .update_highest_executed_checkpoint(checkpoint)
1052            .unwrap();
1053        self.metrics.last_executed_checkpoint.set(seq as i64);
1054
1055        self.metrics
1056            .last_executed_checkpoint_timestamp_ms
1057            .set(checkpoint.timestamp_ms as i64);
1058        checkpoint.report_checkpoint_age(
1059            &self.metrics.last_executed_checkpoint_age,
1060            &self.metrics.last_executed_checkpoint_age_ms,
1061        );
1062    }
1063
1064    /// If configured, commit the pending index updates for the provided checkpoint as well as
1065    /// enqueuing the checkpoint to the subscription service
1066    #[instrument(level = "info", skip_all)]
1067    async fn commit_index_updates_and_enqueue_to_subscription_service(
1068        &self,
1069        checkpoint: Checkpoint,
1070    ) {
1071        if let Some(rpc_index) = &self.state.rpc_index {
1072            rpc_index
1073                .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1074                .expect("failed to update rpc_indexes");
1075        }
1076
1077        if let Some(sender) = &self.subscription_service_checkpoint_sender
1078            && let Err(e) = sender.send(checkpoint).await
1079        {
1080            warn!("unable to send checkpoint to subscription service: {e}");
1081        }
1082    }
1083
1084    // Extract randomness rounds from the checkpoint version-specific data (if available).
1085    // Otherwise, extract randomness rounds from the first transaction in the checkpoint
1086    #[instrument(level = "debug", skip_all)]
1087    fn extract_randomness_rounds(
1088        &self,
1089        checkpoint: &VerifiedCheckpoint,
1090        checkpoint_contents: &CheckpointContents,
1091    ) -> Vec<RandomnessRound> {
1092        if let Some(version_specific_data) = checkpoint
1093            .version_specific_data(self.epoch_store.protocol_config())
1094            .expect("unable to get version_specific_data")
1095        {
1096            // With version-specific data, randomness rounds are stored in checkpoint summary.
1097            version_specific_data.into_v1().randomness_rounds
1098        } else {
1099            // Before version-specific data, checkpoint batching must be disabled. In this case,
1100            // randomness state update tx must be first if it exists, because all other
1101            // transactions in a checkpoint that includes a randomness state update are causally
1102            // dependent on it.
1103            assert_eq!(
1104                0,
1105                self.epoch_store
1106                    .protocol_config()
1107                    .min_checkpoint_interval_ms_as_option()
1108                    .unwrap_or_default(),
1109            );
1110            if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1111                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1112                .unwrap_or_else(||
1113                    fatal!(
1114                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1115                        checkpoint.sequence_number()
1116                    )
1117                );
1118                if let TransactionKind::RandomnessStateUpdate(rsu) =
1119                    maybe_randomness_tx.data().transaction_data().kind()
1120                {
1121                    vec![rsu.randomness_round]
1122                } else {
1123                    Vec::new()
1124                }
1125            } else {
1126                Vec::new()
1127            }
1128        }
1129    }
1130}