1use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::crypto::RandomnessRound;
28use sui_types::inner_temporary_store::PackageStoreWithFallback;
29use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
30use sui_types::transaction::{TransactionDataAPI, TransactionKind};
31
32use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
33use sui_macros::fail_point;
34use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
35use sui_types::executable_transaction::VerifiedExecutableTransaction;
36use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
37use sui_types::full_checkpoint_content::Checkpoint;
38use sui_types::global_state_hash::GlobalStateHash;
39use sui_types::message_envelope::Message;
40use sui_types::{
41 base_types::{TransactionDigest, TransactionEffectsDigest},
42 messages_checkpoint::VerifiedCheckpoint,
43 transaction::VerifiedTransaction,
44};
45use tap::{TapFallible, TapOptional};
46use tracing::{debug, info, instrument, warn};
47
48use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
49use crate::authority::backpressure::BackpressureManager;
50use crate::authority::{AuthorityState, ExecutionEnv};
51use crate::execution_scheduler::ExecutionScheduler;
52use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
53use crate::global_state_hasher::GlobalStateHasher;
54use crate::{
55 checkpoints::CheckpointStore,
56 execution_cache::{ObjectCacheRead, TransactionCacheRead},
57};
58
59mod data_ingestion_handler;
60pub mod metrics;
61pub(crate) mod utils;
62
63use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
64use metrics::CheckpointExecutorMetrics;
65use utils::*;
66
67const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
68
69#[derive(PartialEq, Eq, Debug)]
70pub enum StopReason {
71 EpochComplete,
72 RunWithRangeCondition,
73}
74
75pub(crate) struct CheckpointExecutionData {
76 pub checkpoint: VerifiedCheckpoint,
77 pub checkpoint_contents: CheckpointContents,
78 pub tx_digests: Vec<TransactionDigest>,
79 pub fx_digests: Vec<TransactionEffectsDigest>,
80}
81
82pub(crate) struct CheckpointTransactionData {
83 pub transactions: Vec<VerifiedExecutableTransaction>,
84 pub effects: Vec<TransactionEffects>,
85 pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
86 pub accumulator_versions: Vec<Option<SequenceNumber>>,
91}
92
93impl CheckpointTransactionData {
94 pub fn new(
95 transactions: Vec<VerifiedExecutableTransaction>,
96 effects: Vec<TransactionEffects>,
97 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
98 ) -> Self {
99 assert_eq!(transactions.len(), effects.len());
100 assert_eq!(transactions.len(), executed_fx_digests.len());
101 let mut accumulator_versions = vec![None; transactions.len()];
102 let mut next_update_index = 0;
103 for (idx, efx) in effects.iter().enumerate() {
104 let acc_version = efx.object_changes().into_iter().find_map(|change| {
109 if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
110 change.input_version
111 } else {
112 None
113 }
114 });
115 if let Some(acc_version) = acc_version {
116 for slot in accumulator_versions
118 .iter_mut()
119 .take(idx + 1)
120 .skip(next_update_index)
121 {
122 *slot = Some(acc_version);
123 }
124 next_update_index = idx + 1;
125 }
126 }
127 assert!(
131 next_update_index == 0
132 || next_update_index == transactions.len()
133 || (next_update_index == transactions.len() - 1
134 && transactions
135 .last()
136 .unwrap()
137 .transaction_data()
138 .is_end_of_epoch_tx())
139 );
140 Self {
141 transactions,
142 effects,
143 executed_fx_digests,
144 accumulator_versions,
145 }
146 }
147}
148pub(crate) struct CheckpointExecutionState {
149 pub data: CheckpointExecutionData,
150
151 state_hasher: Option<GlobalStateHash>,
152 full_data: Option<Checkpoint>,
153}
154
155impl CheckpointExecutionState {
156 pub fn new(data: CheckpointExecutionData) -> Self {
157 Self {
158 data,
159 state_hasher: None,
160 full_data: None,
161 }
162 }
163
164 pub fn new_with_global_state_hasher(
165 data: CheckpointExecutionData,
166 hasher: GlobalStateHash,
167 ) -> Self {
168 Self {
169 data,
170 state_hasher: Some(hasher),
171 full_data: None,
172 }
173 }
174}
175
176macro_rules! finish_stage {
177 ($handle:expr, $stage:ident) => {
178 $handle.finish_stage(PipelineStage::$stage).await;
179 };
180}
181
182pub struct CheckpointExecutor {
183 epoch_store: Arc<AuthorityPerEpochStore>,
184 state: Arc<AuthorityState>,
185 checkpoint_store: Arc<CheckpointStore>,
188 object_cache_reader: Arc<dyn ObjectCacheRead>,
189 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
190 execution_scheduler: Arc<ExecutionScheduler>,
191 global_state_hasher: Arc<GlobalStateHasher>,
192 backpressure_manager: Arc<BackpressureManager>,
193 config: CheckpointExecutorConfig,
194 metrics: Arc<CheckpointExecutorMetrics>,
195 tps_estimator: Mutex<TPSEstimator>,
196 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
197}
198
199impl CheckpointExecutor {
200 pub fn new(
201 epoch_store: Arc<AuthorityPerEpochStore>,
202 checkpoint_store: Arc<CheckpointStore>,
203 state: Arc<AuthorityState>,
204 global_state_hasher: Arc<GlobalStateHasher>,
205 backpressure_manager: Arc<BackpressureManager>,
206 config: CheckpointExecutorConfig,
207 metrics: Arc<CheckpointExecutorMetrics>,
208 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
209 ) -> Self {
210 Self {
211 epoch_store,
212 state: state.clone(),
213 checkpoint_store,
214 object_cache_reader: state.get_object_cache_reader().clone(),
215 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
216 execution_scheduler: state.execution_scheduler().clone(),
217 global_state_hasher,
218 backpressure_manager,
219 config,
220 metrics,
221 tps_estimator: Mutex::new(TPSEstimator::default()),
222 subscription_service_checkpoint_sender,
223 }
224 }
225
226 pub fn new_for_tests(
227 epoch_store: Arc<AuthorityPerEpochStore>,
228 checkpoint_store: Arc<CheckpointStore>,
229 state: Arc<AuthorityState>,
230 state_hasher: Arc<GlobalStateHasher>,
231 ) -> Self {
232 Self::new(
233 epoch_store,
234 checkpoint_store,
235 state,
236 state_hasher,
237 BackpressureManager::new_for_tests(),
238 Default::default(),
239 CheckpointExecutorMetrics::new_for_tests(),
240 None,
241 )
242 }
243
244 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
247 let highest_executed = self
251 .checkpoint_store
252 .get_highest_executed_checkpoint()
253 .unwrap();
254
255 if let Some(highest_executed) = &highest_executed
256 && self.epoch_store.epoch() == highest_executed.epoch()
257 && highest_executed.is_last_checkpoint_of_epoch()
258 {
259 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
262 return None;
263 }
264
265 Some(
266 highest_executed
267 .as_ref()
268 .map(|c| c.sequence_number() + 1)
269 .unwrap_or_else(|| {
270 assert_eq!(self.epoch_store.epoch(), 0);
272 0
274 }),
275 )
276 }
277
278 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
282 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
283 let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
284 info!(?run_with_range, "CheckpointExecutor::run_epoch");
285 debug!(
286 "Checkpoint executor running for epoch {:?}",
287 self.epoch_store.epoch(),
288 );
289
290 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
294 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
295 return StopReason::RunWithRangeCondition;
296 };
297
298 self.metrics
299 .checkpoint_exec_epoch
300 .set(self.epoch_store.epoch() as i64);
301
302 let Some(next_to_schedule) = self.get_next_to_schedule() else {
303 return StopReason::EpochComplete;
304 };
305
306 let this = Arc::new(self);
307
308 let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
309 .ok()
310 .and_then(|s| s.parse().ok())
311 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
312
313 let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
314
315 let final_checkpoint_executed = stream_synced_checkpoints(
316 this.checkpoint_store.clone(),
317 next_to_schedule,
318 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
319 )
320 .map(|checkpoint| {
322 let this = this.clone();
323 let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
324 async move {
325 let pipeline_handle = pipeline_handle.await;
326 tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
327 .await
328 .unwrap()
329 }
330 })
331 .buffered(concurrency)
332 .fold(false, |state, is_final_checkpoint| async move {
334 assert!(
335 !state,
336 "fold can't be called again after the final checkpoint"
337 );
338 is_final_checkpoint
339 })
340 .await;
341
342 if final_checkpoint_executed {
343 StopReason::EpochComplete
344 } else {
345 StopReason::RunWithRangeCondition
346 }
347 }
348}
349
350impl CheckpointExecutor {
351 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
353 async fn execute_checkpoint(
354 self: Arc<Self>,
355 checkpoint: VerifiedCheckpoint,
356 mut pipeline_handle: PipelineHandle,
357 ) -> bool {
358 info!("executing checkpoint");
359 let sequence_number = checkpoint.sequence_number;
360
361 checkpoint.report_checkpoint_age(
362 &self.metrics.checkpoint_contents_age,
363 &self.metrics.checkpoint_contents_age_ms,
364 );
365 self.backpressure_manager
366 .update_highest_certified_checkpoint(sequence_number);
367
368 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
369 let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
370 "CheckpointExecutor::wait_for_previous_checkpoints",
371 );
372
373 info!(
374 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
375 );
376 self.checkpoint_store
377 .notify_read_executed_checkpoint(sequence_number - 1)
378 .await;
379 }
380
381 let _parallel_step_guard =
382 mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
383
384 let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
386 || checkpoint.is_last_checkpoint_of_epoch()
387 {
388 self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
389 .await
390 } else {
391 self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
392 .await
393 };
394
395 let tps = self.tps_estimator.lock().update(
396 Instant::now(),
397 ckpt_state.data.checkpoint.network_total_transactions,
398 );
399 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
400
401 self.backpressure_manager
402 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
403
404 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
405
406 let seq = ckpt_state.data.checkpoint.sequence_number;
407
408 let batch = self
409 .state
410 .get_cache_commit()
411 .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
412
413 finish_stage!(pipeline_handle, BuildDbBatch);
414
415 let object_funds_checker = self.state.object_funds_checker.load();
416 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
417 object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
418 }
419
420 let mut ckpt_state = tokio::task::spawn_blocking({
421 let this = self.clone();
422 move || {
423 let cache_commit = this.state.get_cache_commit();
425 debug!(?seq, "committing checkpoint transactions to disk");
426 cache_commit.commit_transaction_outputs(
427 this.epoch_store.epoch(),
428 batch,
429 &ckpt_state.data.tx_digests,
430 );
431 ckpt_state
432 }
433 })
434 .await
435 .unwrap();
436
437 finish_stage!(pipeline_handle, CommitTransactionOutputs);
438
439 self.epoch_store
440 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
441 .expect("cannot fail");
442
443 let randomness_rounds = self.extract_randomness_rounds(
444 &ckpt_state.data.checkpoint,
445 &ckpt_state.data.checkpoint_contents,
446 );
447
448 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
452 for round in randomness_rounds {
453 debug!(
454 ?round,
455 "notifying RandomnessReporter that randomness update was executed in checkpoint"
456 );
457 randomness_reporter
458 .notify_randomness_in_checkpoint(round)
459 .expect("epoch cannot have ended");
460 }
461 }
462
463 finish_stage!(pipeline_handle, FinalizeCheckpoint);
464
465 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
466 self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
467 .await;
468 }
469
470 finish_stage!(pipeline_handle, UpdateRpcIndex);
471
472 self.global_state_hasher
473 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
474 .expect("Failed to accumulate running root");
475
476 if is_final_checkpoint {
477 self.checkpoint_store
478 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
479 .expect("Failed to insert epoch last checkpoint");
480
481 self.global_state_hasher
482 .accumulate_epoch(self.epoch_store.clone(), seq)
483 .expect("Accumulating epoch cannot fail");
484
485 self.checkpoint_store
486 .prune_local_summaries()
487 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
488 .ok();
489 }
490
491 fail_point!("crash");
492
493 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
494
495 finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
496
497 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
500 }
501
502 #[instrument(level = "info", skip_all)]
505 async fn verify_locally_built_checkpoint(
506 &self,
507 checkpoint: VerifiedCheckpoint,
508 pipeline_handle: &mut PipelineHandle,
509 ) -> CheckpointExecutionState {
510 assert!(
511 !checkpoint.is_last_checkpoint_of_epoch(),
512 "only fullnode path has end-of-epoch logic"
513 );
514
515 let sequence_number = checkpoint.sequence_number;
516 let locally_built_checkpoint = self
517 .checkpoint_store
518 .get_locally_computed_checkpoint(sequence_number)
519 .expect("db error");
520
521 let Some(locally_built_checkpoint) = locally_built_checkpoint else {
522 return self
524 .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
525 .await;
526 };
527
528 self.metrics.checkpoint_executor_validator_path.inc();
529
530 assert_checkpoint_not_forked(
532 &locally_built_checkpoint,
533 &checkpoint,
534 &self.checkpoint_store,
535 );
536
537 let state_hasher = {
539 let _metrics_scope =
540 mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
541 self.epoch_store
542 .notify_read_checkpoint_state_hasher(&[sequence_number])
543 .await
544 .unwrap()
545 .pop()
546 .unwrap()
547 };
548
549 let checkpoint_contents = self
550 .checkpoint_store
551 .get_checkpoint_contents(&checkpoint.content_digest)
552 .expect("db error")
553 .expect("checkpoint contents not found");
554
555 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
556 .iter()
557 .map(|digests| (digests.transaction, digests.effects))
558 .unzip();
559
560 pipeline_handle
561 .skip_to(PipelineStage::FinalizeTransactions)
562 .await;
563
564 self.insert_finalized_transactions(&tx_digests, sequence_number);
567
568 pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
569
570 CheckpointExecutionState::new_with_global_state_hasher(
571 CheckpointExecutionData {
572 checkpoint,
573 checkpoint_contents,
574 tx_digests,
575 fx_digests,
576 },
577 state_hasher,
578 )
579 }
580
581 #[instrument(level = "info", skip_all)]
582 async fn execute_transactions_from_synced_checkpoint(
583 &self,
584 checkpoint: VerifiedCheckpoint,
585 pipeline_handle: &mut PipelineHandle,
586 ) -> CheckpointExecutionState {
587 let sequence_number = checkpoint.sequence_number;
588 let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
589 let _scope =
590 mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
591 let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
592 let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
593 (ckpt_state, tx_data, unexecuted_tx_digests)
594 };
595
596 finish_stage!(pipeline_handle, ExecuteTransactions);
597
598 {
599 self.transaction_cache_reader
600 .notify_read_executed_effects_digests(
601 "CheckpointExecutor::notify_read_executed_effects_digests",
602 &unexecuted_tx_digests,
603 )
604 .await;
605 }
606
607 finish_stage!(pipeline_handle, WaitForTransactions);
608
609 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
610 self.execute_change_epoch_tx(&tx_data).await;
611 }
612
613 {
619 let mut raw_batches = Vec::new();
620 let mut cache_updates = Vec::new();
621 for tx_digest in &ckpt_state.data.tx_digests {
622 if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
623 raw_batches.push(raw_batch);
624 cache_updates.push(cu);
625 }
626 }
627 if !raw_batches.is_empty()
628 && let Some(indexes) = &self.state.indexes
629 {
630 let mut db_batch = indexes.new_db_batch();
631 db_batch
632 .concat(raw_batches)
633 .expect("failed to build index batch");
634 indexes
635 .commit_index_batch(db_batch, cache_updates)
636 .expect("failed to commit index batch");
637 }
638 }
639
640 let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
641
642 if self.state.is_fullnode(&self.epoch_store) {
643 self.state.congestion_tracker.process_checkpoint_effects(
644 &*self.transaction_cache_reader,
645 &ckpt_state.data.checkpoint,
646 &tx_data.effects,
647 );
648 }
649
650 self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
651
652 ckpt_state.state_hasher = Some(
656 self.global_state_hasher
657 .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
658 .expect("epoch cannot have ended"),
659 );
660
661 finish_stage!(pipeline_handle, FinalizeTransactions);
662
663 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
664
665 finish_stage!(pipeline_handle, ProcessCheckpointData);
666
667 ckpt_state
668 }
669
670 fn checkpoint_data_enabled(&self) -> bool {
671 self.subscription_service_checkpoint_sender.is_some()
672 || self.state.rpc_index.is_some()
673 || self.config.data_ingestion_dir.is_some()
674 }
675
676 fn insert_finalized_transactions(
677 &self,
678 tx_digests: &[TransactionDigest],
679 sequence_number: CheckpointSequenceNumber,
680 ) {
681 self.epoch_store
682 .insert_finalized_transactions(tx_digests, sequence_number)
683 .expect("failed to insert finalized transactions");
684
685 if self.state.is_fullnode(&self.epoch_store) {
686 self.state
688 .get_checkpoint_cache()
689 .deprecated_insert_finalized_transactions(
690 tx_digests,
691 self.epoch_store.epoch(),
692 sequence_number,
693 );
694 }
695 }
696
697 #[instrument(level = "info", skip_all)]
698 fn process_checkpoint_data(
699 &self,
700 ckpt_data: &CheckpointExecutionData,
701 tx_data: &CheckpointTransactionData,
702 ) -> Option<Checkpoint> {
703 if !self.checkpoint_data_enabled() {
704 return None;
705 }
706
707 let checkpoint = load_checkpoint(
708 ckpt_data,
709 tx_data,
710 self.state.get_object_store(),
711 &*self.transaction_cache_reader,
712 )
713 .expect("failed to load checkpoint data");
714
715 if self.state.rpc_index.is_some() || self.config.data_ingestion_dir.is_some() {
716 let checkpoint_data = checkpoint.clone().into();
717 if let Some(rpc_index) = &self.state.rpc_index {
720 let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
721 self.epoch_store.protocol_config(),
722 Box::new(PackageStoreWithFallback::new(
723 self.state.get_backing_package_store(),
724 &checkpoint_data,
725 )),
726 );
727
728 rpc_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
729 }
730
731 if let Some(path) = &self.config.data_ingestion_dir {
732 store_checkpoint_locally(path, &checkpoint_data)
733 .expect("failed to store checkpoint locally");
734 }
735 }
736
737 Some(checkpoint)
738 }
739
740 #[instrument(level = "info", skip_all)]
742 fn load_checkpoint_transactions(
743 &self,
744 checkpoint: VerifiedCheckpoint,
745 ) -> (CheckpointExecutionState, CheckpointTransactionData) {
746 let seq = checkpoint.sequence_number;
747 let epoch = checkpoint.epoch;
748
749 let checkpoint_contents = self
750 .checkpoint_store
751 .get_checkpoint_contents(&checkpoint.content_digest)
752 .expect("db error")
753 .expect("checkpoint contents not found");
754
755 if let Some(full_contents) = self
759 .checkpoint_store
760 .get_full_checkpoint_contents_by_sequence_number(seq)
761 .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
762 .ok()
763 .flatten()
764 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
765 {
766 let num_txns = full_contents.size();
767 let mut tx_digests = Vec::with_capacity(num_txns);
768 let mut transactions = Vec::with_capacity(num_txns);
769 let mut effects = Vec::with_capacity(num_txns);
770 let mut fx_digests = Vec::with_capacity(num_txns);
771
772 full_contents
773 .into_iter()
774 .zip_debug_eq(checkpoint_contents.iter())
775 .for_each(|(execution_data, digests)| {
776 let tx_digest = digests.transaction;
777 let fx_digest = digests.effects;
778 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
779 debug_assert_eq!(fx_digest, execution_data.effects.digest());
780
781 tx_digests.push(tx_digest);
782 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
783 VerifiedTransaction::new_unchecked(execution_data.transaction),
784 epoch,
785 seq,
786 ));
787 effects.push(execution_data.effects);
788 fx_digests.push(fx_digest);
789 });
790
791 let executed_fx_digests = self
792 .transaction_cache_reader
793 .multi_get_executed_effects_digests(&tx_digests);
794
795 (
796 CheckpointExecutionState::new(CheckpointExecutionData {
797 checkpoint,
798 checkpoint_contents,
799 tx_digests,
800 fx_digests,
801 }),
802 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
803 )
804 } else {
805 let digests = checkpoint_contents.inner();
810
811 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
812 .digests_iter()
813 .map(|d| (d.transaction, d.effects))
814 .unzip();
815 let transactions = self
816 .transaction_cache_reader
817 .multi_get_transaction_blocks(&tx_digests)
818 .into_iter()
819 .enumerate()
820 .map(|(i, tx)| {
821 let tx = tx
822 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
823 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
824 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
825 })
826 .collect();
827 let effects = self
828 .transaction_cache_reader
829 .multi_get_effects(&fx_digests)
830 .into_iter()
831 .enumerate()
832 .map(|(i, effect)| {
833 effect.unwrap_or_else(|| {
834 fatal!("checkpoint effect not found for {:?}", digests[i])
835 })
836 })
837 .collect();
838
839 let executed_fx_digests = self
840 .transaction_cache_reader
841 .multi_get_executed_effects_digests(&tx_digests);
842
843 (
844 CheckpointExecutionState::new(CheckpointExecutionData {
845 checkpoint,
846 checkpoint_contents,
847 tx_digests,
848 fx_digests,
849 }),
850 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
851 )
852 }
853 }
854
855 #[instrument(level = "info", skip_all)]
857 fn schedule_transaction_execution(
858 &self,
859 ckpt_state: &CheckpointExecutionState,
860 tx_data: &CheckpointTransactionData,
861 ) -> Vec<TransactionDigest> {
862 let mut barrier_deps_builder = BarrierDependencyBuilder::new();
863
864 let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
866 izip_debug_eq!(
867 tx_data.transactions.iter(),
868 ckpt_state.data.tx_digests.iter(),
869 ckpt_state.data.fx_digests.iter(),
870 tx_data.effects.iter(),
871 tx_data.executed_fx_digests.iter(),
872 tx_data.accumulator_versions.iter()
873 )
874 .filter_map(
875 |(
876 txn,
877 tx_digest,
878 expected_fx_digest,
879 effects,
880 executed_fx_digest,
881 accumulator_version,
882 )| {
883 let barrier_deps =
884 barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
885
886 if let Some(executed_fx_digest) = executed_fx_digest {
887 assert_not_forked(
888 &ckpt_state.data.checkpoint,
889 tx_digest,
890 expected_fx_digest,
891 executed_fx_digest,
892 &*self.transaction_cache_reader,
893 );
894 None
895 } else if txn.transaction_data().is_end_of_epoch_tx() {
896 None
897 } else {
898 let assigned_versions = self
899 .epoch_store
900 .acquire_shared_version_assignments_from_effects(
901 txn,
902 effects,
903 *accumulator_version,
904 &*self.object_cache_reader,
905 )
906 .expect("failed to acquire shared version assignments");
907
908 let mut env = ExecutionEnv::new()
909 .with_assigned_versions(assigned_versions)
910 .with_expected_effects_digest(*expected_fx_digest)
911 .with_barrier_dependencies(barrier_deps);
912
913 if let &ExecutionStatus::Failure(ExecutionFailure {
915 error: ExecutionErrorKind::InsufficientFundsForWithdraw,
916 ..
917 }) = effects.status()
918 {
919 env = env.with_insufficient_funds();
920 }
921
922 Some((tx_digest, (txn.clone(), env)))
923 }
924 },
925 ),
926 );
927
928 self.execution_scheduler
930 .enqueue_transactions(unexecuted_txns, &self.epoch_store);
931
932 unexecuted_tx_digests
933 }
934
935 #[instrument(level = "error", skip_all)]
937 async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
938 let change_epoch_tx = tx_data.transactions.last().unwrap();
939 let change_epoch_fx = tx_data.effects.last().unwrap();
940 assert_eq!(
941 change_epoch_tx.digest(),
942 change_epoch_fx.transaction_digest()
943 );
944 assert!(
945 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
946 "final txn must be an end of epoch txn"
947 );
948
949 let assigned_versions = self
967 .epoch_store
968 .acquire_shared_version_assignments_from_effects(
969 change_epoch_tx,
970 change_epoch_fx,
971 None,
972 self.object_cache_reader.as_ref(),
973 )
974 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
975
976 info!(
977 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
978 change_epoch_tx.digest(),
979 change_epoch_fx.digest(),
980 assigned_versions
981 );
982 self.execution_scheduler.enqueue_transactions(
983 vec![(
984 change_epoch_tx.clone(),
985 ExecutionEnv::new()
986 .with_assigned_versions(assigned_versions)
987 .with_expected_effects_digest(change_epoch_fx.digest()),
988 )],
989 &self.epoch_store,
990 );
991
992 self.transaction_cache_reader
993 .notify_read_executed_effects_digests(
994 "CheckpointExecutor::notify_read_advance_epoch_tx",
995 &[*change_epoch_tx.digest()],
996 )
997 .await;
998 }
999
1000 #[instrument(level = "debug", skip_all)]
1002 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
1003 let seq = *checkpoint.sequence_number();
1005 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
1006 if let Some(prev_highest) = self
1007 .checkpoint_store
1008 .get_highest_executed_checkpoint_seq_number()
1009 .unwrap()
1010 {
1011 assert_eq!(prev_highest + 1, seq);
1012 } else {
1013 assert_eq!(seq, 0);
1014 }
1015 if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1016 info!("Finished syncing and executing checkpoint {}", seq);
1017 }
1018
1019 fail_point!("highest-executed-checkpoint");
1020
1021 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1024 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1025 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1026 if let Some(prune_checkpoint) = self
1027 .checkpoint_store
1028 .get_checkpoint_by_sequence_number(prune_seq)
1029 .expect("Failed to fetch checkpoint")
1030 {
1031 self.checkpoint_store
1032 .delete_full_checkpoint_contents(prune_seq)
1033 .expect("Failed to delete full checkpoint contents");
1034 self.checkpoint_store
1035 .delete_contents_digest_sequence_number_mapping(
1036 &prune_checkpoint.content_digest,
1037 )
1038 .expect("Failed to delete contents digest -> sequence number mapping");
1039 } else {
1040 debug!(
1044 "Failed to fetch checkpoint with sequence number {:?}",
1045 prune_seq
1046 );
1047 }
1048 }
1049
1050 self.checkpoint_store
1051 .update_highest_executed_checkpoint(checkpoint)
1052 .unwrap();
1053 self.metrics.last_executed_checkpoint.set(seq as i64);
1054
1055 self.metrics
1056 .last_executed_checkpoint_timestamp_ms
1057 .set(checkpoint.timestamp_ms as i64);
1058 checkpoint.report_checkpoint_age(
1059 &self.metrics.last_executed_checkpoint_age,
1060 &self.metrics.last_executed_checkpoint_age_ms,
1061 );
1062 }
1063
1064 #[instrument(level = "info", skip_all)]
1067 async fn commit_index_updates_and_enqueue_to_subscription_service(
1068 &self,
1069 checkpoint: Checkpoint,
1070 ) {
1071 if let Some(rpc_index) = &self.state.rpc_index {
1072 rpc_index
1073 .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1074 .expect("failed to update rpc_indexes");
1075 }
1076
1077 if let Some(sender) = &self.subscription_service_checkpoint_sender
1078 && let Err(e) = sender.send(checkpoint).await
1079 {
1080 warn!("unable to send checkpoint to subscription service: {e}");
1081 }
1082 }
1083
1084 #[instrument(level = "debug", skip_all)]
1087 fn extract_randomness_rounds(
1088 &self,
1089 checkpoint: &VerifiedCheckpoint,
1090 checkpoint_contents: &CheckpointContents,
1091 ) -> Vec<RandomnessRound> {
1092 if let Some(version_specific_data) = checkpoint
1093 .version_specific_data(self.epoch_store.protocol_config())
1094 .expect("unable to get version_specific_data")
1095 {
1096 version_specific_data.into_v1().randomness_rounds
1098 } else {
1099 assert_eq!(
1104 0,
1105 self.epoch_store
1106 .protocol_config()
1107 .min_checkpoint_interval_ms_as_option()
1108 .unwrap_or_default(),
1109 );
1110 if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1111 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1112 .unwrap_or_else(||
1113 fatal!(
1114 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1115 checkpoint.sequence_number()
1116 )
1117 );
1118 if let TransactionKind::RandomnessStateUpdate(rsu) =
1119 maybe_randomness_tx.data().transaction_data().kind()
1120 {
1121 vec![rsu.randomness_round]
1122 } else {
1123 Vec::new()
1124 }
1125 } else {
1126 Vec::new()
1127 }
1128 }
1129 }
1130}