1use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::base_types::SequenceNumber;
26use sui_types::crypto::RandomnessRound;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::transaction::{TransactionDataAPI, TransactionKind};
29use sui_types::{
30 SUI_ACCUMULATOR_ROOT_OBJECT_ID,
31 node_role::{FullNodeSyncMode, NodeRole},
32};
33
34use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
35use sui_macros::fail_point;
36use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
37use sui_types::executable_transaction::VerifiedExecutableTransaction;
38use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
39use sui_types::full_checkpoint_content::Checkpoint;
40use sui_types::global_state_hash::GlobalStateHash;
41use sui_types::message_envelope::Message;
42use sui_types::{
43 base_types::{TransactionDigest, TransactionEffectsDigest},
44 messages_checkpoint::VerifiedCheckpoint,
45 transaction::VerifiedTransaction,
46};
47use tap::{TapFallible, TapOptional};
48use tracing::{debug, info, instrument};
49
50use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
51use crate::authority::backpressure::BackpressureManager;
52use crate::authority::{AuthorityState, ExecutionEnv};
53use crate::execution_scheduler::ExecutionScheduler;
54use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
55use crate::global_state_hasher::GlobalStateHasher;
56use crate::{
57 checkpoints::CheckpointStore,
58 execution_cache::{ObjectCacheRead, TransactionCacheRead},
59};
60
61mod data_ingestion_handler;
62pub mod metrics;
63pub(crate) mod utils;
64
65use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
66use metrics::CheckpointExecutorMetrics;
67use utils::*;
68
69const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
70
71#[derive(PartialEq, Eq, Debug)]
72pub enum StopReason {
73 EpochComplete,
74 RunWithRangeCondition,
75}
76
77pub(crate) struct CheckpointExecutionData {
78 pub checkpoint: VerifiedCheckpoint,
79 pub checkpoint_contents: CheckpointContents,
80 pub tx_digests: Vec<TransactionDigest>,
81 pub fx_digests: Vec<TransactionEffectsDigest>,
82}
83
84pub(crate) struct CheckpointTransactionData {
85 pub transactions: Vec<VerifiedExecutableTransaction>,
86 pub effects: Vec<TransactionEffects>,
87 pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
88 pub accumulator_versions: Vec<Option<SequenceNumber>>,
93}
94
95impl CheckpointTransactionData {
96 pub fn new(
97 transactions: Vec<VerifiedExecutableTransaction>,
98 effects: Vec<TransactionEffects>,
99 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
100 ) -> Self {
101 assert_eq!(transactions.len(), effects.len());
102 assert_eq!(transactions.len(), executed_fx_digests.len());
103 let mut accumulator_versions = vec![None; transactions.len()];
104 let mut next_update_index = 0;
105 for (idx, efx) in effects.iter().enumerate() {
106 let acc_version = efx.object_changes().into_iter().find_map(|change| {
111 if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
112 change.input_version
113 } else {
114 None
115 }
116 });
117 if let Some(acc_version) = acc_version {
118 for slot in accumulator_versions
120 .iter_mut()
121 .take(idx + 1)
122 .skip(next_update_index)
123 {
124 *slot = Some(acc_version);
125 }
126 next_update_index = idx + 1;
127 }
128 }
129 assert!(
133 next_update_index == 0
134 || next_update_index == transactions.len()
135 || (next_update_index == transactions.len() - 1
136 && transactions
137 .last()
138 .unwrap()
139 .transaction_data()
140 .is_end_of_epoch_tx())
141 );
142 Self {
143 transactions,
144 effects,
145 executed_fx_digests,
146 accumulator_versions,
147 }
148 }
149}
150pub(crate) struct CheckpointExecutionState {
151 pub data: CheckpointExecutionData,
152
153 state_hasher: Option<GlobalStateHash>,
154 full_data: Option<Checkpoint>,
155}
156
157impl CheckpointExecutionState {
158 pub fn new(data: CheckpointExecutionData) -> Self {
159 Self {
160 data,
161 state_hasher: None,
162 full_data: None,
163 }
164 }
165
166 pub fn new_with_global_state_hasher(
167 data: CheckpointExecutionData,
168 hasher: GlobalStateHash,
169 ) -> Self {
170 Self {
171 data,
172 state_hasher: Some(hasher),
173 full_data: None,
174 }
175 }
176}
177
178macro_rules! finish_stage {
179 ($handle:expr, $stage:ident) => {
180 $handle.finish_stage(PipelineStage::$stage).await;
181 };
182}
183
184pub struct CheckpointExecutor {
185 epoch_store: Arc<AuthorityPerEpochStore>,
186 state: Arc<AuthorityState>,
187 checkpoint_store: Arc<CheckpointStore>,
190 object_cache_reader: Arc<dyn ObjectCacheRead>,
191 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
192 execution_scheduler: Arc<ExecutionScheduler>,
193 global_state_hasher: Arc<GlobalStateHasher>,
194 backpressure_manager: Arc<BackpressureManager>,
195 config: CheckpointExecutorConfig,
196 metrics: Arc<CheckpointExecutorMetrics>,
197 tps_estimator: Mutex<TPSEstimator>,
198 subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
199}
200
201impl CheckpointExecutor {
202 pub fn new(
203 epoch_store: Arc<AuthorityPerEpochStore>,
204 checkpoint_store: Arc<CheckpointStore>,
205 state: Arc<AuthorityState>,
206 global_state_hasher: Arc<GlobalStateHasher>,
207 backpressure_manager: Arc<BackpressureManager>,
208 config: CheckpointExecutorConfig,
209 metrics: Arc<CheckpointExecutorMetrics>,
210 subscription_service_checkpoint_sender: Option<
211 tokio::sync::broadcast::Sender<Arc<Checkpoint>>,
212 >,
213 ) -> Self {
214 Self {
215 epoch_store,
216 state: state.clone(),
217 checkpoint_store,
218 object_cache_reader: state.get_object_cache_reader().clone(),
219 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
220 execution_scheduler: state.execution_scheduler().clone(),
221 global_state_hasher,
222 backpressure_manager,
223 config,
224 metrics,
225 tps_estimator: Mutex::new(TPSEstimator::default()),
226 subscription_service_checkpoint_sender,
227 }
228 }
229
230 pub fn new_for_tests(
231 epoch_store: Arc<AuthorityPerEpochStore>,
232 checkpoint_store: Arc<CheckpointStore>,
233 state: Arc<AuthorityState>,
234 state_hasher: Arc<GlobalStateHasher>,
235 ) -> Self {
236 Self::new(
237 epoch_store,
238 checkpoint_store,
239 state,
240 state_hasher,
241 BackpressureManager::new_for_tests(),
242 Default::default(),
243 CheckpointExecutorMetrics::new_for_tests(),
244 None,
245 )
246 }
247
248 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
251 let highest_executed = self
255 .checkpoint_store
256 .get_highest_executed_checkpoint()
257 .unwrap();
258
259 if let Some(highest_executed) = &highest_executed
260 && self.epoch_store.epoch() == highest_executed.epoch()
261 && highest_executed.is_last_checkpoint_of_epoch()
262 {
263 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
266 return None;
267 }
268
269 Some(
270 highest_executed
271 .as_ref()
272 .map(|c| c.sequence_number() + 1)
273 .unwrap_or_else(|| {
274 assert_eq!(self.epoch_store.epoch(), 0);
276 0
278 }),
279 )
280 }
281
282 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
286 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
287 let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
288 info!(?run_with_range, "CheckpointExecutor::run_epoch");
289 debug!(
290 "Checkpoint executor running for epoch {:?}",
291 self.epoch_store.epoch(),
292 );
293
294 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
298 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
299 return StopReason::RunWithRangeCondition;
300 };
301
302 self.metrics
303 .checkpoint_exec_epoch
304 .set(self.epoch_store.epoch() as i64);
305
306 let Some(next_to_schedule) = self.get_next_to_schedule() else {
307 return StopReason::EpochComplete;
308 };
309
310 let this = Arc::new(self);
311
312 let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
313 .ok()
314 .and_then(|s| s.parse().ok())
315 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
316
317 let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
318
319 let final_checkpoint_executed = stream_synced_checkpoints(
320 this.checkpoint_store.clone(),
321 next_to_schedule,
322 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
323 )
324 .map(|checkpoint| {
326 let this = this.clone();
327 let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
328 async move {
329 let pipeline_handle = pipeline_handle.await;
330 tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
331 .await
332 .unwrap()
333 }
334 })
335 .buffered(concurrency)
336 .fold(false, |state, is_final_checkpoint| async move {
338 assert!(
339 !state,
340 "fold can't be called again after the final checkpoint"
341 );
342 is_final_checkpoint
343 })
344 .await;
345
346 if final_checkpoint_executed {
347 StopReason::EpochComplete
348 } else {
349 StopReason::RunWithRangeCondition
350 }
351 }
352}
353
354impl CheckpointExecutor {
355 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
357 async fn execute_checkpoint(
358 self: Arc<Self>,
359 checkpoint: VerifiedCheckpoint,
360 mut pipeline_handle: PipelineHandle,
361 ) -> bool {
362 info!("executing checkpoint");
363 let sequence_number = checkpoint.sequence_number;
364
365 checkpoint.report_checkpoint_age(
366 &self.metrics.checkpoint_contents_age,
367 &self.metrics.checkpoint_contents_age_ms,
368 );
369 self.backpressure_manager
370 .update_highest_certified_checkpoint(sequence_number);
371
372 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
373 let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
374 "CheckpointExecutor::wait_for_previous_checkpoints",
375 );
376
377 info!(
378 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
379 );
380 self.checkpoint_store
381 .notify_read_executed_checkpoint(sequence_number - 1)
382 .await;
383 }
384
385 let _parallel_step_guard =
386 mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
387
388 let ckpt_state = if !self.epoch_store.node_role().runs_consensus()
393 || checkpoint.is_last_checkpoint_of_epoch()
394 {
395 self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
396 .await
397 } else {
398 self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
399 .await
400 };
401
402 let tps = self.tps_estimator.lock().update(
403 Instant::now(),
404 ckpt_state.data.checkpoint.network_total_transactions,
405 );
406 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
407
408 self.backpressure_manager
409 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
410
411 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
412
413 let seq = ckpt_state.data.checkpoint.sequence_number;
414
415 let mut batch = self
416 .state
417 .get_cache_commit()
418 .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
419
420 self.state
426 .get_cache_commit()
427 .set_highest_committed_checkpoint_in_batch(&mut batch, seq);
428
429 finish_stage!(pipeline_handle, BuildDbBatch);
430
431 let object_funds_checker = self.state.object_funds_checker.load();
432 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
433 object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
434 }
435
436 let mut ckpt_state = tokio::task::spawn_blocking({
437 let this = self.clone();
438 move || {
439 let cache_commit = this.state.get_cache_commit();
441 debug!(?seq, "committing checkpoint transactions to disk");
442 cache_commit.commit_transaction_outputs(
443 this.epoch_store.epoch(),
444 batch,
445 &ckpt_state.data.tx_digests,
446 );
447 ckpt_state
448 }
449 })
450 .await
451 .unwrap();
452
453 finish_stage!(pipeline_handle, CommitTransactionOutputs);
454
455 self.epoch_store
456 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
457 .expect("cannot fail");
458
459 let randomness_rounds = self.extract_randomness_rounds(
460 &ckpt_state.data.checkpoint,
461 &ckpt_state.data.checkpoint_contents,
462 );
463
464 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
468 for round in randomness_rounds {
469 debug!(
470 ?round,
471 "notifying RandomnessReporter that randomness update was executed in checkpoint"
472 );
473 randomness_reporter
474 .notify_randomness_in_checkpoint(round)
475 .expect("epoch cannot have ended");
476 }
477 }
478
479 finish_stage!(pipeline_handle, FinalizeCheckpoint);
480
481 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
482 self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data);
483 }
484
485 finish_stage!(pipeline_handle, UpdateRpcIndex);
486
487 self.global_state_hasher
488 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
489 .expect("Failed to accumulate running root");
490
491 if is_final_checkpoint {
492 self.checkpoint_store
493 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
494 .expect("Failed to insert epoch last checkpoint");
495
496 self.global_state_hasher
497 .accumulate_epoch(self.epoch_store.clone(), seq)
498 .expect("Accumulating epoch cannot fail");
499
500 self.checkpoint_store
501 .prune_local_summaries()
502 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
503 .ok();
504 }
505
506 fail_point!("crash");
507
508 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
509
510 finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
511
512 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
515 }
516
517 #[instrument(level = "info", skip_all)]
523 async fn verify_locally_built_checkpoint(
524 &self,
525 checkpoint: VerifiedCheckpoint,
526 pipeline_handle: &mut PipelineHandle,
527 ) -> CheckpointExecutionState {
528 assert!(
529 !checkpoint.is_last_checkpoint_of_epoch(),
530 "only fullnode path has end-of-epoch logic"
531 );
532
533 let sequence_number = checkpoint.sequence_number;
534 let locally_built_checkpoint = self
535 .checkpoint_store
536 .get_locally_computed_checkpoint(sequence_number)
537 .expect("db error");
538
539 let Some(locally_built_checkpoint) = locally_built_checkpoint else {
540 self.metrics
542 .checkpoint_executor_validator_sync_fallback_path
543 .inc();
544 return self
545 .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
546 .await;
547 };
548
549 self.metrics.checkpoint_executor_validator_path.inc();
550
551 assert_checkpoint_not_forked(
553 &locally_built_checkpoint,
554 &checkpoint,
555 &self.checkpoint_store,
556 );
557
558 let state_hasher = {
560 let _metrics_scope =
561 mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
562 self.epoch_store
563 .notify_read_checkpoint_state_hasher(&[sequence_number])
564 .await
565 .unwrap()
566 .pop()
567 .unwrap()
568 };
569
570 if matches!(
573 self.epoch_store.node_role(),
574 NodeRole::FullNode(FullNodeSyncMode::ConsensusObserver)
575 ) {
576 pipeline_handle
577 .skip_to(PipelineStage::FinalizeTransactions)
578 .await;
579
580 let (state, tx_data) =
581 self.load_checkpoint_transactions(checkpoint, Some(state_hasher));
582
583 return self
584 .finalize_executed_checkpoint_transactions(state, &tx_data, pipeline_handle)
585 .await;
586 }
587
588 let checkpoint_contents = self
589 .checkpoint_store
590 .get_checkpoint_contents(&checkpoint.content_digest)
591 .expect("db error")
592 .expect("checkpoint contents not found");
593
594 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
595 .iter()
596 .map(|digests| (digests.transaction, digests.effects))
597 .unzip();
598
599 pipeline_handle
600 .skip_to(PipelineStage::FinalizeTransactions)
601 .await;
602
603 self.insert_finalized_transactions(&tx_digests, sequence_number);
604
605 pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
606
607 let ckpt_data = CheckpointExecutionData {
608 checkpoint,
609 checkpoint_contents,
610 tx_digests,
611 fx_digests,
612 };
613 CheckpointExecutionState::new_with_global_state_hasher(ckpt_data, state_hasher)
614 }
615
616 #[instrument(level = "info", skip_all)]
617 async fn execute_transactions_from_synced_checkpoint(
618 &self,
619 checkpoint: VerifiedCheckpoint,
620 pipeline_handle: &mut PipelineHandle,
621 ) -> CheckpointExecutionState {
622 let (ckpt_state, tx_data, unexecuted_tx_digests) = {
623 let _scope =
624 mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
625 let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint, None);
626 let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
627 (ckpt_state, tx_data, unexecuted_tx_digests)
628 };
629
630 finish_stage!(pipeline_handle, ExecuteTransactions);
631
632 {
633 self.transaction_cache_reader
634 .notify_read_executed_effects_digests(
635 "CheckpointExecutor::notify_read_executed_effects_digests",
636 &unexecuted_tx_digests,
637 )
638 .await;
639 }
640
641 finish_stage!(pipeline_handle, WaitForTransactions);
642
643 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
644 self.execute_change_epoch_tx(&tx_data).await;
645 }
646
647 self.finalize_executed_checkpoint_transactions(ckpt_state, &tx_data, pipeline_handle)
648 .await
649 }
650
651 async fn finalize_executed_checkpoint_transactions(
652 &self,
653 mut ckpt_state: CheckpointExecutionState,
654 tx_data: &CheckpointTransactionData,
655 pipeline_handle: &mut PipelineHandle,
656 ) -> CheckpointExecutionState {
657 let sequence_number = ckpt_state.data.checkpoint.sequence_number;
658
659 self.commit_post_processing_index_batches(&ckpt_state.data.tx_digests)
660 .await;
661
662 let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
663
664 if self.state.is_fullnode(&self.epoch_store) {
665 self.state.congestion_tracker.process_checkpoint_effects(
666 &*self.transaction_cache_reader,
667 &ckpt_state.data.checkpoint,
668 &tx_data.effects,
669 );
670 }
671
672 self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
673
674 if ckpt_state.state_hasher.is_none() {
678 ckpt_state.state_hasher = Some(
679 self.global_state_hasher
680 .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
681 .expect("epoch cannot have ended"),
682 );
683 }
684
685 finish_stage!(pipeline_handle, FinalizeTransactions);
686
687 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, tx_data);
688
689 finish_stage!(pipeline_handle, ProcessCheckpointData);
690
691 ckpt_state
692 }
693
694 async fn commit_post_processing_index_batches(&self, tx_digests: &[TransactionDigest]) {
699 let mut raw_batches = Vec::new();
700 let mut cache_updates = Vec::new();
701 for tx_digest in tx_digests {
702 if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
703 raw_batches.push(raw_batch);
704 cache_updates.push(cu);
705 }
706 }
707 if !raw_batches.is_empty()
708 && let Some(indexes) = &self.state.indexes
709 {
710 let mut db_batch = indexes.new_db_batch();
711 db_batch
712 .concat(raw_batches)
713 .expect("failed to build index batch");
714 indexes
715 .commit_index_batch(db_batch, cache_updates)
716 .expect("failed to commit index batch");
717 }
718 }
719
720 fn checkpoint_data_enabled(&self) -> bool {
721 self.subscription_service_checkpoint_sender.is_some()
722 || self.state.rpc_index.is_some()
723 || self.config.data_ingestion_dir.is_some()
724 }
725
726 fn insert_finalized_transactions(
727 &self,
728 tx_digests: &[TransactionDigest],
729 sequence_number: CheckpointSequenceNumber,
730 ) {
731 self.epoch_store
732 .insert_finalized_transactions(tx_digests, sequence_number)
733 .expect("failed to insert finalized transactions");
734
735 if self.state.is_fullnode(&self.epoch_store) {
736 self.state
738 .get_checkpoint_cache()
739 .deprecated_insert_finalized_transactions(
740 tx_digests,
741 self.epoch_store.epoch(),
742 sequence_number,
743 );
744 }
745 }
746
747 #[instrument(level = "info", skip_all)]
748 fn process_checkpoint_data(
749 &self,
750 ckpt_data: &CheckpointExecutionData,
751 tx_data: &CheckpointTransactionData,
752 ) -> Option<Checkpoint> {
753 if !self.checkpoint_data_enabled() {
754 return None;
755 }
756
757 let checkpoint = load_checkpoint(
758 ckpt_data,
759 tx_data,
760 self.state.get_object_store(),
761 &*self.transaction_cache_reader,
762 )
763 .expect("failed to load checkpoint data");
764
765 if let Some(rpc_index) = &self.state.rpc_index {
768 rpc_index.index_checkpoint(&checkpoint);
769 }
770
771 if let Some(path) = &self.config.data_ingestion_dir {
772 store_checkpoint_locally(path, &checkpoint)
773 .expect("failed to store checkpoint locally");
774 }
775
776 Some(checkpoint)
777 }
778
779 #[instrument(level = "info", skip_all)]
783 fn load_checkpoint_transactions(
784 &self,
785 checkpoint: VerifiedCheckpoint,
786 state_hasher: Option<GlobalStateHash>,
787 ) -> (CheckpointExecutionState, CheckpointTransactionData) {
788 let seq = checkpoint.sequence_number;
789 let epoch = checkpoint.epoch;
790
791 let checkpoint_contents = self
792 .checkpoint_store
793 .get_checkpoint_contents(&checkpoint.content_digest)
794 .expect("db error")
795 .expect("checkpoint contents not found");
796
797 let checkpoint_state = |data| match state_hasher {
798 Some(hasher) => CheckpointExecutionState::new_with_global_state_hasher(data, hasher),
799 None => CheckpointExecutionState::new(data),
800 };
801
802 if let Some(full_contents) = self
806 .checkpoint_store
807 .get_full_checkpoint_contents_by_sequence_number(seq)
808 .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
809 .ok()
810 .flatten()
811 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
812 {
813 let num_txns = full_contents.size();
814 let mut tx_digests = Vec::with_capacity(num_txns);
815 let mut transactions = Vec::with_capacity(num_txns);
816 let mut effects = Vec::with_capacity(num_txns);
817 let mut fx_digests = Vec::with_capacity(num_txns);
818
819 full_contents
820 .into_iter()
821 .zip_debug_eq(checkpoint_contents.iter())
822 .for_each(|(execution_data, digests)| {
823 let tx_digest = digests.transaction;
824 let fx_digest = digests.effects;
825 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
826 debug_assert_eq!(fx_digest, execution_data.effects.digest());
827
828 tx_digests.push(tx_digest);
829 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
830 VerifiedTransaction::new_unchecked(execution_data.transaction),
831 epoch,
832 seq,
833 ));
834 effects.push(execution_data.effects);
835 fx_digests.push(fx_digest);
836 });
837
838 let executed_fx_digests = self
839 .transaction_cache_reader
840 .multi_get_executed_effects_digests(&tx_digests);
841
842 (
843 checkpoint_state(CheckpointExecutionData {
844 checkpoint,
845 checkpoint_contents,
846 tx_digests,
847 fx_digests,
848 }),
849 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
850 )
851 } else {
852 let digests = checkpoint_contents.inner();
857
858 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
859 .digests_iter()
860 .map(|d| (d.transaction, d.effects))
861 .unzip();
862 let transactions = self
863 .transaction_cache_reader
864 .multi_get_transaction_blocks(&tx_digests)
865 .into_iter()
866 .enumerate()
867 .map(|(i, tx)| {
868 let tx = tx
869 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
870 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
871 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
872 })
873 .collect();
874 let effects = self
875 .transaction_cache_reader
876 .multi_get_effects(&fx_digests)
877 .into_iter()
878 .enumerate()
879 .map(|(i, effect)| {
880 effect.unwrap_or_else(|| {
881 fatal!("checkpoint effect not found for {:?}", digests[i])
882 })
883 })
884 .collect();
885
886 let executed_fx_digests = self
887 .transaction_cache_reader
888 .multi_get_executed_effects_digests(&tx_digests);
889
890 (
891 checkpoint_state(CheckpointExecutionData {
892 checkpoint,
893 checkpoint_contents,
894 tx_digests,
895 fx_digests,
896 }),
897 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
898 )
899 }
900 }
901
902 #[instrument(level = "info", skip_all)]
904 fn schedule_transaction_execution(
905 &self,
906 ckpt_state: &CheckpointExecutionState,
907 tx_data: &CheckpointTransactionData,
908 ) -> Vec<TransactionDigest> {
909 let mut barrier_deps_builder = BarrierDependencyBuilder::new();
910
911 let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
913 izip_debug_eq!(
914 tx_data.transactions.iter(),
915 ckpt_state.data.tx_digests.iter(),
916 ckpt_state.data.fx_digests.iter(),
917 tx_data.effects.iter(),
918 tx_data.executed_fx_digests.iter(),
919 tx_data.accumulator_versions.iter()
920 )
921 .filter_map(
922 |(
923 txn,
924 tx_digest,
925 expected_fx_digest,
926 effects,
927 executed_fx_digest,
928 accumulator_version,
929 )| {
930 let barrier_deps =
931 barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
932
933 if let Some(executed_fx_digest) = executed_fx_digest {
934 assert_not_forked(
935 &ckpt_state.data.checkpoint,
936 tx_digest,
937 expected_fx_digest,
938 executed_fx_digest,
939 &*self.transaction_cache_reader,
940 );
941 None
942 } else if txn.transaction_data().is_end_of_epoch_tx() {
943 None
944 } else {
945 let assigned_versions = self
946 .epoch_store
947 .acquire_shared_version_assignments_from_effects(
948 txn,
949 effects,
950 *accumulator_version,
951 &*self.object_cache_reader,
952 )
953 .expect("failed to acquire shared version assignments");
954
955 let mut env = ExecutionEnv::new()
956 .with_assigned_versions(assigned_versions)
957 .with_expected_effects_digest(*expected_fx_digest)
958 .with_barrier_dependencies(barrier_deps);
959
960 if let &ExecutionStatus::Failure(ExecutionFailure {
962 error: ExecutionErrorKind::InsufficientFundsForWithdraw,
963 ..
964 }) = effects.status()
965 {
966 env = env.with_insufficient_funds();
967 }
968
969 Some((tx_digest, (txn.clone(), env)))
970 }
971 },
972 ),
973 );
974
975 self.execution_scheduler
977 .enqueue_transactions(unexecuted_txns, &self.epoch_store);
978
979 unexecuted_tx_digests
980 }
981
982 #[instrument(level = "error", skip_all)]
984 async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
985 let change_epoch_tx = tx_data.transactions.last().unwrap();
986 let change_epoch_fx = tx_data.effects.last().unwrap();
987 assert_eq!(
988 change_epoch_tx.digest(),
989 change_epoch_fx.transaction_digest()
990 );
991 assert!(
992 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
993 "final txn must be an end of epoch txn"
994 );
995
996 let assigned_versions = self
1014 .epoch_store
1015 .acquire_shared_version_assignments_from_effects(
1016 change_epoch_tx,
1017 change_epoch_fx,
1018 None,
1019 self.object_cache_reader.as_ref(),
1020 )
1021 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
1022
1023 info!(
1024 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
1025 change_epoch_tx.digest(),
1026 change_epoch_fx.digest(),
1027 assigned_versions
1028 );
1029 self.execution_scheduler.enqueue_transactions(
1030 vec![(
1031 change_epoch_tx.clone(),
1032 ExecutionEnv::new()
1033 .with_assigned_versions(assigned_versions)
1034 .with_expected_effects_digest(change_epoch_fx.digest()),
1035 )],
1036 &self.epoch_store,
1037 );
1038
1039 self.transaction_cache_reader
1040 .notify_read_executed_effects_digests(
1041 "CheckpointExecutor::notify_read_advance_epoch_tx",
1042 &[*change_epoch_tx.digest()],
1043 )
1044 .await;
1045 }
1046
1047 #[instrument(level = "debug", skip_all)]
1049 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
1050 let seq = *checkpoint.sequence_number();
1052 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
1053 if let Some(prev_highest) = self
1054 .checkpoint_store
1055 .get_highest_executed_checkpoint_seq_number()
1056 .unwrap()
1057 {
1058 assert_eq!(prev_highest + 1, seq);
1059 } else {
1060 assert_eq!(seq, 0);
1061 }
1062 if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1063 info!("Finished syncing and executing checkpoint {}", seq);
1064 }
1065
1066 fail_point!("highest-executed-checkpoint");
1067
1068 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1071 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1072 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1073 if let Some(prune_checkpoint) = self
1074 .checkpoint_store
1075 .get_checkpoint_by_sequence_number(prune_seq)
1076 .expect("Failed to fetch checkpoint")
1077 {
1078 self.checkpoint_store
1079 .delete_full_checkpoint_contents(prune_seq)
1080 .expect("Failed to delete full checkpoint contents");
1081 self.checkpoint_store
1082 .delete_contents_digest_sequence_number_mapping(
1083 &prune_checkpoint.content_digest,
1084 )
1085 .expect("Failed to delete contents digest -> sequence number mapping");
1086 } else {
1087 debug!(
1091 "Failed to fetch checkpoint with sequence number {:?}",
1092 prune_seq
1093 );
1094 }
1095 }
1096
1097 self.checkpoint_store
1098 .update_highest_executed_checkpoint(checkpoint)
1099 .unwrap();
1100 self.metrics.last_executed_checkpoint.set(seq as i64);
1101
1102 self.metrics
1103 .last_executed_checkpoint_timestamp_ms
1104 .set(checkpoint.timestamp_ms as i64);
1105 checkpoint.report_checkpoint_age(
1106 &self.metrics.last_executed_checkpoint_age,
1107 &self.metrics.last_executed_checkpoint_age_ms,
1108 );
1109 }
1110
1111 #[instrument(level = "info", skip_all)]
1114 fn commit_index_updates_and_enqueue_to_subscription_service(&self, checkpoint: Checkpoint) {
1115 if let Some(rpc_index) = &self.state.rpc_index {
1116 rpc_index
1117 .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1118 .expect("failed to update rpc_indexes");
1119 }
1120
1121 if let Some(sender) = &self.subscription_service_checkpoint_sender {
1127 let _ = sender.send(Arc::new(checkpoint));
1128 }
1129 }
1130
1131 #[instrument(level = "debug", skip_all)]
1134 fn extract_randomness_rounds(
1135 &self,
1136 checkpoint: &VerifiedCheckpoint,
1137 checkpoint_contents: &CheckpointContents,
1138 ) -> Vec<RandomnessRound> {
1139 if let Some(version_specific_data) = checkpoint
1140 .version_specific_data(self.epoch_store.protocol_config())
1141 .expect("unable to get version_specific_data")
1142 {
1143 version_specific_data.into_v1().randomness_rounds
1145 } else {
1146 assert_eq!(
1151 0,
1152 self.epoch_store
1153 .protocol_config()
1154 .min_checkpoint_interval_ms_as_option()
1155 .unwrap_or_default(),
1156 );
1157 if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1158 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1159 .unwrap_or_else(||
1160 fatal!(
1161 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1162 checkpoint.sequence_number()
1163 )
1164 );
1165 if let TransactionKind::RandomnessStateUpdate(rsu) =
1166 maybe_randomness_tx.data().transaction_data().kind()
1167 {
1168 vec![rsu.randomness_round]
1169 } else {
1170 Vec::new()
1171 }
1172 } else {
1173 Vec::new()
1174 }
1175 }
1176 }
1177}