1use futures::StreamExt;
22use mysten_common::{ZipDebugEqIteratorExt, debug_fatal, fatal, izip_debug_eq};
23use parking_lot::Mutex;
24use std::{sync::Arc, time::Instant};
25use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
26use sui_types::base_types::SequenceNumber;
27use sui_types::crypto::RandomnessRound;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::transaction::{TransactionDataAPI, TransactionKind};
30
31use sui_config::node::{CheckpointExecutorConfig, RunWithRange};
32use sui_macros::fail_point;
33use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
34use sui_types::executable_transaction::VerifiedExecutableTransaction;
35use sui_types::execution_status::{ExecutionErrorKind, ExecutionFailure, ExecutionStatus};
36use sui_types::full_checkpoint_content::Checkpoint;
37use sui_types::global_state_hash::GlobalStateHash;
38use sui_types::message_envelope::Message;
39use sui_types::{
40 base_types::{TransactionDigest, TransactionEffectsDigest},
41 messages_checkpoint::VerifiedCheckpoint,
42 transaction::VerifiedTransaction,
43};
44use tap::{TapFallible, TapOptional};
45use tracing::{debug, info, instrument, warn};
46
47use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
48use crate::authority::backpressure::BackpressureManager;
49use crate::authority::{AuthorityState, ExecutionEnv};
50use crate::execution_scheduler::ExecutionScheduler;
51use crate::execution_scheduler::execution_scheduler_impl::BarrierDependencyBuilder;
52use crate::global_state_hasher::GlobalStateHasher;
53use crate::{
54 checkpoints::CheckpointStore,
55 execution_cache::{ObjectCacheRead, TransactionCacheRead},
56};
57
58mod data_ingestion_handler;
59pub mod metrics;
60pub(crate) mod utils;
61
62use data_ingestion_handler::{load_checkpoint, store_checkpoint_locally};
63use metrics::CheckpointExecutorMetrics;
64use utils::*;
65
66const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
67
68#[derive(PartialEq, Eq, Debug)]
69pub enum StopReason {
70 EpochComplete,
71 RunWithRangeCondition,
72}
73
74pub(crate) struct CheckpointExecutionData {
75 pub checkpoint: VerifiedCheckpoint,
76 pub checkpoint_contents: CheckpointContents,
77 pub tx_digests: Vec<TransactionDigest>,
78 pub fx_digests: Vec<TransactionEffectsDigest>,
79}
80
81pub(crate) struct CheckpointTransactionData {
82 pub transactions: Vec<VerifiedExecutableTransaction>,
83 pub effects: Vec<TransactionEffects>,
84 pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
85 pub accumulator_versions: Vec<Option<SequenceNumber>>,
90}
91
92impl CheckpointTransactionData {
93 pub fn new(
94 transactions: Vec<VerifiedExecutableTransaction>,
95 effects: Vec<TransactionEffects>,
96 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
97 ) -> Self {
98 assert_eq!(transactions.len(), effects.len());
99 assert_eq!(transactions.len(), executed_fx_digests.len());
100 let mut accumulator_versions = vec![None; transactions.len()];
101 let mut next_update_index = 0;
102 for (idx, efx) in effects.iter().enumerate() {
103 let acc_version = efx.object_changes().into_iter().find_map(|change| {
108 if change.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
109 change.input_version
110 } else {
111 None
112 }
113 });
114 if let Some(acc_version) = acc_version {
115 for slot in accumulator_versions
117 .iter_mut()
118 .take(idx + 1)
119 .skip(next_update_index)
120 {
121 *slot = Some(acc_version);
122 }
123 next_update_index = idx + 1;
124 }
125 }
126 assert!(
130 next_update_index == 0
131 || next_update_index == transactions.len()
132 || (next_update_index == transactions.len() - 1
133 && transactions
134 .last()
135 .unwrap()
136 .transaction_data()
137 .is_end_of_epoch_tx())
138 );
139 Self {
140 transactions,
141 effects,
142 executed_fx_digests,
143 accumulator_versions,
144 }
145 }
146}
147pub(crate) struct CheckpointExecutionState {
148 pub data: CheckpointExecutionData,
149
150 state_hasher: Option<GlobalStateHash>,
151 full_data: Option<Checkpoint>,
152}
153
154impl CheckpointExecutionState {
155 pub fn new(data: CheckpointExecutionData) -> Self {
156 Self {
157 data,
158 state_hasher: None,
159 full_data: None,
160 }
161 }
162
163 pub fn new_with_global_state_hasher(
164 data: CheckpointExecutionData,
165 hasher: GlobalStateHash,
166 ) -> Self {
167 Self {
168 data,
169 state_hasher: Some(hasher),
170 full_data: None,
171 }
172 }
173}
174
175macro_rules! finish_stage {
176 ($handle:expr, $stage:ident) => {
177 $handle.finish_stage(PipelineStage::$stage).await;
178 };
179}
180
181pub struct CheckpointExecutor {
182 epoch_store: Arc<AuthorityPerEpochStore>,
183 state: Arc<AuthorityState>,
184 checkpoint_store: Arc<CheckpointStore>,
187 object_cache_reader: Arc<dyn ObjectCacheRead>,
188 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
189 execution_scheduler: Arc<ExecutionScheduler>,
190 global_state_hasher: Arc<GlobalStateHasher>,
191 backpressure_manager: Arc<BackpressureManager>,
192 config: CheckpointExecutorConfig,
193 metrics: Arc<CheckpointExecutorMetrics>,
194 tps_estimator: Mutex<TPSEstimator>,
195 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
196}
197
198impl CheckpointExecutor {
199 pub fn new(
200 epoch_store: Arc<AuthorityPerEpochStore>,
201 checkpoint_store: Arc<CheckpointStore>,
202 state: Arc<AuthorityState>,
203 global_state_hasher: Arc<GlobalStateHasher>,
204 backpressure_manager: Arc<BackpressureManager>,
205 config: CheckpointExecutorConfig,
206 metrics: Arc<CheckpointExecutorMetrics>,
207 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
208 ) -> Self {
209 Self {
210 epoch_store,
211 state: state.clone(),
212 checkpoint_store,
213 object_cache_reader: state.get_object_cache_reader().clone(),
214 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
215 execution_scheduler: state.execution_scheduler().clone(),
216 global_state_hasher,
217 backpressure_manager,
218 config,
219 metrics,
220 tps_estimator: Mutex::new(TPSEstimator::default()),
221 subscription_service_checkpoint_sender,
222 }
223 }
224
225 pub fn new_for_tests(
226 epoch_store: Arc<AuthorityPerEpochStore>,
227 checkpoint_store: Arc<CheckpointStore>,
228 state: Arc<AuthorityState>,
229 state_hasher: Arc<GlobalStateHasher>,
230 ) -> Self {
231 Self::new(
232 epoch_store,
233 checkpoint_store,
234 state,
235 state_hasher,
236 BackpressureManager::new_for_tests(),
237 Default::default(),
238 CheckpointExecutorMetrics::new_for_tests(),
239 None,
240 )
241 }
242
243 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
246 let highest_executed = self
250 .checkpoint_store
251 .get_highest_executed_checkpoint()
252 .unwrap();
253
254 if let Some(highest_executed) = &highest_executed
255 && self.epoch_store.epoch() == highest_executed.epoch()
256 && highest_executed.is_last_checkpoint_of_epoch()
257 {
258 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
261 return None;
262 }
263
264 Some(
265 highest_executed
266 .as_ref()
267 .map(|c| c.sequence_number() + 1)
268 .unwrap_or_else(|| {
269 assert_eq!(self.epoch_store.epoch(), 0);
271 0
273 }),
274 )
275 }
276
277 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
281 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
282 let _metrics_scope = mysten_metrics::monitored_scope("CheckpointExecutor::run_epoch");
283 info!(?run_with_range, "CheckpointExecutor::run_epoch");
284 debug!(
285 "Checkpoint executor running for epoch {:?}",
286 self.epoch_store.epoch(),
287 );
288
289 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
293 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
294 return StopReason::RunWithRangeCondition;
295 };
296
297 self.metrics
298 .checkpoint_exec_epoch
299 .set(self.epoch_store.epoch() as i64);
300
301 let Some(next_to_schedule) = self.get_next_to_schedule() else {
302 return StopReason::EpochComplete;
303 };
304
305 let this = Arc::new(self);
306
307 let concurrency = std::env::var("SUI_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
308 .ok()
309 .and_then(|s| s.parse().ok())
310 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
311
312 let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
313
314 let final_checkpoint_executed = stream_synced_checkpoints(
315 this.checkpoint_store.clone(),
316 next_to_schedule,
317 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
318 )
319 .map(|checkpoint| {
321 let this = this.clone();
322 let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
323 async move {
324 let pipeline_handle = pipeline_handle.await;
325 tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
326 .await
327 .unwrap()
328 }
329 })
330 .buffered(concurrency)
331 .fold(false, |state, is_final_checkpoint| async move {
333 assert!(
334 !state,
335 "fold can't be called again after the final checkpoint"
336 );
337 is_final_checkpoint
338 })
339 .await;
340
341 if final_checkpoint_executed {
342 StopReason::EpochComplete
343 } else {
344 StopReason::RunWithRangeCondition
345 }
346 }
347}
348
349impl CheckpointExecutor {
350 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
352 async fn execute_checkpoint(
353 self: Arc<Self>,
354 checkpoint: VerifiedCheckpoint,
355 mut pipeline_handle: PipelineHandle,
356 ) -> bool {
357 info!("executing checkpoint");
358 let sequence_number = checkpoint.sequence_number;
359
360 checkpoint.report_checkpoint_age(
361 &self.metrics.checkpoint_contents_age,
362 &self.metrics.checkpoint_contents_age_ms,
363 );
364 self.backpressure_manager
365 .update_highest_certified_checkpoint(sequence_number);
366
367 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
368 let _wait_for_previous_checkpoints_guard = mysten_metrics::monitored_scope(
369 "CheckpointExecutor::wait_for_previous_checkpoints",
370 );
371
372 info!(
373 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
374 );
375 self.checkpoint_store
376 .notify_read_executed_checkpoint(sequence_number - 1)
377 .await;
378 }
379
380 let _parallel_step_guard =
381 mysten_metrics::monitored_scope("CheckpointExecutor::parallel_step");
382
383 let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
385 || checkpoint.is_last_checkpoint_of_epoch()
386 {
387 self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
388 .await
389 } else {
390 self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
391 .await
392 };
393
394 let tps = self.tps_estimator.lock().update(
395 Instant::now(),
396 ckpt_state.data.checkpoint.network_total_transactions,
397 );
398 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
399
400 self.backpressure_manager
401 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
402
403 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
404
405 let seq = ckpt_state.data.checkpoint.sequence_number;
406
407 let batch = self
408 .state
409 .get_cache_commit()
410 .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
411
412 finish_stage!(pipeline_handle, BuildDbBatch);
413
414 let object_funds_checker = self.state.object_funds_checker.load();
415 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
416 object_funds_checker.commit_effects(batch.0.iter().map(|o| &o.effects));
417 }
418
419 let mut ckpt_state = tokio::task::spawn_blocking({
420 let this = self.clone();
421 move || {
422 let cache_commit = this.state.get_cache_commit();
424 debug!(?seq, "committing checkpoint transactions to disk");
425 cache_commit.commit_transaction_outputs(
426 this.epoch_store.epoch(),
427 batch,
428 &ckpt_state.data.tx_digests,
429 );
430 ckpt_state
431 }
432 })
433 .await
434 .unwrap();
435
436 finish_stage!(pipeline_handle, CommitTransactionOutputs);
437
438 self.epoch_store
439 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
440 .expect("cannot fail");
441
442 let randomness_rounds = self.extract_randomness_rounds(
443 &ckpt_state.data.checkpoint,
444 &ckpt_state.data.checkpoint_contents,
445 );
446
447 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
451 for round in randomness_rounds {
452 debug!(
453 ?round,
454 "notifying RandomnessReporter that randomness update was executed in checkpoint"
455 );
456 randomness_reporter
457 .notify_randomness_in_checkpoint(round)
458 .expect("epoch cannot have ended");
459 }
460 }
461
462 finish_stage!(pipeline_handle, FinalizeCheckpoint);
463
464 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
465 self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
466 .await;
467 }
468
469 finish_stage!(pipeline_handle, UpdateRpcIndex);
470
471 self.global_state_hasher
472 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hasher)
473 .expect("Failed to accumulate running root");
474
475 if is_final_checkpoint {
476 self.checkpoint_store
477 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
478 .expect("Failed to insert epoch last checkpoint");
479
480 self.global_state_hasher
481 .accumulate_epoch(self.epoch_store.clone(), seq)
482 .expect("Accumulating epoch cannot fail");
483
484 self.checkpoint_store
485 .prune_local_summaries()
486 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
487 .ok();
488 }
489
490 fail_point!("crash");
491
492 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
493
494 finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
495
496 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
499 }
500
501 #[instrument(level = "info", skip_all)]
504 async fn verify_locally_built_checkpoint(
505 &self,
506 checkpoint: VerifiedCheckpoint,
507 pipeline_handle: &mut PipelineHandle,
508 ) -> CheckpointExecutionState {
509 assert!(
510 !checkpoint.is_last_checkpoint_of_epoch(),
511 "only fullnode path has end-of-epoch logic"
512 );
513
514 let sequence_number = checkpoint.sequence_number;
515 let locally_built_checkpoint = self
516 .checkpoint_store
517 .get_locally_computed_checkpoint(sequence_number)
518 .expect("db error");
519
520 let Some(locally_built_checkpoint) = locally_built_checkpoint else {
521 return self
523 .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
524 .await;
525 };
526
527 self.metrics.checkpoint_executor_validator_path.inc();
528
529 assert_checkpoint_not_forked(
531 &locally_built_checkpoint,
532 &checkpoint,
533 &self.checkpoint_store,
534 );
535
536 let state_hasher = {
538 let _metrics_scope =
539 mysten_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hasher");
540 self.epoch_store
541 .notify_read_checkpoint_state_hasher(&[sequence_number])
542 .await
543 .unwrap()
544 .pop()
545 .unwrap()
546 };
547
548 let checkpoint_contents = self
549 .checkpoint_store
550 .get_checkpoint_contents(&checkpoint.content_digest)
551 .expect("db error")
552 .expect("checkpoint contents not found");
553
554 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
555 .iter()
556 .map(|digests| (digests.transaction, digests.effects))
557 .unzip();
558
559 pipeline_handle
560 .skip_to(PipelineStage::FinalizeTransactions)
561 .await;
562
563 self.insert_finalized_transactions(&tx_digests, sequence_number);
566
567 pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
568
569 CheckpointExecutionState::new_with_global_state_hasher(
570 CheckpointExecutionData {
571 checkpoint,
572 checkpoint_contents,
573 tx_digests,
574 fx_digests,
575 },
576 state_hasher,
577 )
578 }
579
580 #[instrument(level = "info", skip_all)]
581 async fn execute_transactions_from_synced_checkpoint(
582 &self,
583 checkpoint: VerifiedCheckpoint,
584 pipeline_handle: &mut PipelineHandle,
585 ) -> CheckpointExecutionState {
586 let sequence_number = checkpoint.sequence_number;
587 let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
588 let _scope =
589 mysten_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
590 let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
591 let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
592 (ckpt_state, tx_data, unexecuted_tx_digests)
593 };
594
595 finish_stage!(pipeline_handle, ExecuteTransactions);
596
597 {
598 self.transaction_cache_reader
599 .notify_read_executed_effects_digests(
600 "CheckpointExecutor::notify_read_executed_effects_digests",
601 &unexecuted_tx_digests,
602 )
603 .await;
604 }
605
606 finish_stage!(pipeline_handle, WaitForTransactions);
607
608 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
609 self.execute_change_epoch_tx(&tx_data).await;
610 }
611
612 {
618 let mut raw_batches = Vec::new();
619 let mut cache_updates = Vec::new();
620 for tx_digest in &ckpt_state.data.tx_digests {
621 if let Some((raw_batch, cu)) = self.state.await_post_processing(tx_digest).await {
622 raw_batches.push(raw_batch);
623 cache_updates.push(cu);
624 }
625 }
626 if !raw_batches.is_empty()
627 && let Some(indexes) = &self.state.indexes
628 {
629 let mut db_batch = indexes.new_db_batch();
630 db_batch
631 .concat(raw_batches)
632 .expect("failed to build index batch");
633 indexes
634 .commit_index_batch(db_batch, cache_updates)
635 .expect("failed to commit index batch");
636 }
637 }
638
639 let _scope = mysten_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
640
641 if self.state.is_fullnode(&self.epoch_store) {
642 self.state.congestion_tracker.process_checkpoint_effects(
643 &*self.transaction_cache_reader,
644 &ckpt_state.data.checkpoint,
645 &tx_data.effects,
646 );
647 }
648
649 self.insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number);
650
651 ckpt_state.state_hasher = Some(
655 self.global_state_hasher
656 .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
657 .expect("epoch cannot have ended"),
658 );
659
660 finish_stage!(pipeline_handle, FinalizeTransactions);
661
662 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
663
664 finish_stage!(pipeline_handle, ProcessCheckpointData);
665
666 ckpt_state
667 }
668
669 fn checkpoint_data_enabled(&self) -> bool {
670 self.subscription_service_checkpoint_sender.is_some()
671 || self.state.rpc_index.is_some()
672 || self.config.data_ingestion_dir.is_some()
673 }
674
675 fn insert_finalized_transactions(
676 &self,
677 tx_digests: &[TransactionDigest],
678 sequence_number: CheckpointSequenceNumber,
679 ) {
680 self.epoch_store
681 .insert_finalized_transactions(tx_digests, sequence_number)
682 .expect("failed to insert finalized transactions");
683
684 if self.state.is_fullnode(&self.epoch_store) {
685 self.state
687 .get_checkpoint_cache()
688 .deprecated_insert_finalized_transactions(
689 tx_digests,
690 self.epoch_store.epoch(),
691 sequence_number,
692 );
693 }
694 }
695
696 #[instrument(level = "info", skip_all)]
697 fn process_checkpoint_data(
698 &self,
699 ckpt_data: &CheckpointExecutionData,
700 tx_data: &CheckpointTransactionData,
701 ) -> Option<Checkpoint> {
702 if !self.checkpoint_data_enabled() {
703 return None;
704 }
705
706 let checkpoint = load_checkpoint(
707 ckpt_data,
708 tx_data,
709 self.state.get_object_store(),
710 &*self.transaction_cache_reader,
711 )
712 .expect("failed to load checkpoint data");
713
714 if let Some(rpc_index) = &self.state.rpc_index {
717 rpc_index.index_checkpoint(&checkpoint);
718 }
719
720 if let Some(path) = &self.config.data_ingestion_dir {
721 store_checkpoint_locally(path, &checkpoint)
722 .expect("failed to store checkpoint locally");
723 }
724
725 Some(checkpoint)
726 }
727
728 #[instrument(level = "info", skip_all)]
730 fn load_checkpoint_transactions(
731 &self,
732 checkpoint: VerifiedCheckpoint,
733 ) -> (CheckpointExecutionState, CheckpointTransactionData) {
734 let seq = checkpoint.sequence_number;
735 let epoch = checkpoint.epoch;
736
737 let checkpoint_contents = self
738 .checkpoint_store
739 .get_checkpoint_contents(&checkpoint.content_digest)
740 .expect("db error")
741 .expect("checkpoint contents not found");
742
743 if let Some(full_contents) = self
747 .checkpoint_store
748 .get_full_checkpoint_contents_by_sequence_number(seq)
749 .tap_err(|e| debug_fatal!("Failed to get checkpoint contents from store: {e}"))
750 .ok()
751 .flatten()
752 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
753 {
754 let num_txns = full_contents.size();
755 let mut tx_digests = Vec::with_capacity(num_txns);
756 let mut transactions = Vec::with_capacity(num_txns);
757 let mut effects = Vec::with_capacity(num_txns);
758 let mut fx_digests = Vec::with_capacity(num_txns);
759
760 full_contents
761 .into_iter()
762 .zip_debug_eq(checkpoint_contents.iter())
763 .for_each(|(execution_data, digests)| {
764 let tx_digest = digests.transaction;
765 let fx_digest = digests.effects;
766 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
767 debug_assert_eq!(fx_digest, execution_data.effects.digest());
768
769 tx_digests.push(tx_digest);
770 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
771 VerifiedTransaction::new_unchecked(execution_data.transaction),
772 epoch,
773 seq,
774 ));
775 effects.push(execution_data.effects);
776 fx_digests.push(fx_digest);
777 });
778
779 let executed_fx_digests = self
780 .transaction_cache_reader
781 .multi_get_executed_effects_digests(&tx_digests);
782
783 (
784 CheckpointExecutionState::new(CheckpointExecutionData {
785 checkpoint,
786 checkpoint_contents,
787 tx_digests,
788 fx_digests,
789 }),
790 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
791 )
792 } else {
793 let digests = checkpoint_contents.inner();
798
799 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = digests
800 .digests_iter()
801 .map(|d| (d.transaction, d.effects))
802 .unzip();
803 let transactions = self
804 .transaction_cache_reader
805 .multi_get_transaction_blocks(&tx_digests)
806 .into_iter()
807 .enumerate()
808 .map(|(i, tx)| {
809 let tx = tx
810 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
811 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
812 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
813 })
814 .collect();
815 let effects = self
816 .transaction_cache_reader
817 .multi_get_effects(&fx_digests)
818 .into_iter()
819 .enumerate()
820 .map(|(i, effect)| {
821 effect.unwrap_or_else(|| {
822 fatal!("checkpoint effect not found for {:?}", digests[i])
823 })
824 })
825 .collect();
826
827 let executed_fx_digests = self
828 .transaction_cache_reader
829 .multi_get_executed_effects_digests(&tx_digests);
830
831 (
832 CheckpointExecutionState::new(CheckpointExecutionData {
833 checkpoint,
834 checkpoint_contents,
835 tx_digests,
836 fx_digests,
837 }),
838 CheckpointTransactionData::new(transactions, effects, executed_fx_digests),
839 )
840 }
841 }
842
843 #[instrument(level = "info", skip_all)]
845 fn schedule_transaction_execution(
846 &self,
847 ckpt_state: &CheckpointExecutionState,
848 tx_data: &CheckpointTransactionData,
849 ) -> Vec<TransactionDigest> {
850 let mut barrier_deps_builder = BarrierDependencyBuilder::new();
851
852 let (unexecuted_tx_digests, unexecuted_txns): (Vec<_>, Vec<_>) = itertools::multiunzip(
854 izip_debug_eq!(
855 tx_data.transactions.iter(),
856 ckpt_state.data.tx_digests.iter(),
857 ckpt_state.data.fx_digests.iter(),
858 tx_data.effects.iter(),
859 tx_data.executed_fx_digests.iter(),
860 tx_data.accumulator_versions.iter()
861 )
862 .filter_map(
863 |(
864 txn,
865 tx_digest,
866 expected_fx_digest,
867 effects,
868 executed_fx_digest,
869 accumulator_version,
870 )| {
871 let barrier_deps =
872 barrier_deps_builder.process_tx(*tx_digest, txn.transaction_data());
873
874 if let Some(executed_fx_digest) = executed_fx_digest {
875 assert_not_forked(
876 &ckpt_state.data.checkpoint,
877 tx_digest,
878 expected_fx_digest,
879 executed_fx_digest,
880 &*self.transaction_cache_reader,
881 );
882 None
883 } else if txn.transaction_data().is_end_of_epoch_tx() {
884 None
885 } else {
886 let assigned_versions = self
887 .epoch_store
888 .acquire_shared_version_assignments_from_effects(
889 txn,
890 effects,
891 *accumulator_version,
892 &*self.object_cache_reader,
893 )
894 .expect("failed to acquire shared version assignments");
895
896 let mut env = ExecutionEnv::new()
897 .with_assigned_versions(assigned_versions)
898 .with_expected_effects_digest(*expected_fx_digest)
899 .with_barrier_dependencies(barrier_deps);
900
901 if let &ExecutionStatus::Failure(ExecutionFailure {
903 error: ExecutionErrorKind::InsufficientFundsForWithdraw,
904 ..
905 }) = effects.status()
906 {
907 env = env.with_insufficient_funds();
908 }
909
910 Some((tx_digest, (txn.clone(), env)))
911 }
912 },
913 ),
914 );
915
916 self.execution_scheduler
918 .enqueue_transactions(unexecuted_txns, &self.epoch_store);
919
920 unexecuted_tx_digests
921 }
922
923 #[instrument(level = "error", skip_all)]
925 async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
926 let change_epoch_tx = tx_data.transactions.last().unwrap();
927 let change_epoch_fx = tx_data.effects.last().unwrap();
928 assert_eq!(
929 change_epoch_tx.digest(),
930 change_epoch_fx.transaction_digest()
931 );
932 assert!(
933 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
934 "final txn must be an end of epoch txn"
935 );
936
937 let assigned_versions = self
955 .epoch_store
956 .acquire_shared_version_assignments_from_effects(
957 change_epoch_tx,
958 change_epoch_fx,
959 None,
960 self.object_cache_reader.as_ref(),
961 )
962 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
963
964 info!(
965 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}, assigned versions: {:?}",
966 change_epoch_tx.digest(),
967 change_epoch_fx.digest(),
968 assigned_versions
969 );
970 self.execution_scheduler.enqueue_transactions(
971 vec![(
972 change_epoch_tx.clone(),
973 ExecutionEnv::new()
974 .with_assigned_versions(assigned_versions)
975 .with_expected_effects_digest(change_epoch_fx.digest()),
976 )],
977 &self.epoch_store,
978 );
979
980 self.transaction_cache_reader
981 .notify_read_executed_effects_digests(
982 "CheckpointExecutor::notify_read_advance_epoch_tx",
983 &[*change_epoch_tx.digest()],
984 )
985 .await;
986 }
987
988 #[instrument(level = "debug", skip_all)]
990 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
991 let seq = *checkpoint.sequence_number();
993 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
994 if let Some(prev_highest) = self
995 .checkpoint_store
996 .get_highest_executed_checkpoint_seq_number()
997 .unwrap()
998 {
999 assert_eq!(prev_highest + 1, seq);
1000 } else {
1001 assert_eq!(seq, 0);
1002 }
1003 if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
1004 info!("Finished syncing and executing checkpoint {}", seq);
1005 }
1006
1007 fail_point!("highest-executed-checkpoint");
1008
1009 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
1012 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
1013 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
1014 if let Some(prune_checkpoint) = self
1015 .checkpoint_store
1016 .get_checkpoint_by_sequence_number(prune_seq)
1017 .expect("Failed to fetch checkpoint")
1018 {
1019 self.checkpoint_store
1020 .delete_full_checkpoint_contents(prune_seq)
1021 .expect("Failed to delete full checkpoint contents");
1022 self.checkpoint_store
1023 .delete_contents_digest_sequence_number_mapping(
1024 &prune_checkpoint.content_digest,
1025 )
1026 .expect("Failed to delete contents digest -> sequence number mapping");
1027 } else {
1028 debug!(
1032 "Failed to fetch checkpoint with sequence number {:?}",
1033 prune_seq
1034 );
1035 }
1036 }
1037
1038 self.checkpoint_store
1039 .update_highest_executed_checkpoint(checkpoint)
1040 .unwrap();
1041 self.metrics.last_executed_checkpoint.set(seq as i64);
1042
1043 self.metrics
1044 .last_executed_checkpoint_timestamp_ms
1045 .set(checkpoint.timestamp_ms as i64);
1046 checkpoint.report_checkpoint_age(
1047 &self.metrics.last_executed_checkpoint_age,
1048 &self.metrics.last_executed_checkpoint_age_ms,
1049 );
1050 }
1051
1052 #[instrument(level = "info", skip_all)]
1055 async fn commit_index_updates_and_enqueue_to_subscription_service(
1056 &self,
1057 checkpoint: Checkpoint,
1058 ) {
1059 if let Some(rpc_index) = &self.state.rpc_index {
1060 rpc_index
1061 .commit_update_for_checkpoint(checkpoint.summary.sequence_number)
1062 .expect("failed to update rpc_indexes");
1063 }
1064
1065 if let Some(sender) = &self.subscription_service_checkpoint_sender
1066 && let Err(e) = sender.send(checkpoint).await
1067 {
1068 warn!("unable to send checkpoint to subscription service: {e}");
1069 }
1070 }
1071
1072 #[instrument(level = "debug", skip_all)]
1075 fn extract_randomness_rounds(
1076 &self,
1077 checkpoint: &VerifiedCheckpoint,
1078 checkpoint_contents: &CheckpointContents,
1079 ) -> Vec<RandomnessRound> {
1080 if let Some(version_specific_data) = checkpoint
1081 .version_specific_data(self.epoch_store.protocol_config())
1082 .expect("unable to get version_specific_data")
1083 {
1084 version_specific_data.into_v1().randomness_rounds
1086 } else {
1087 assert_eq!(
1092 0,
1093 self.epoch_store
1094 .protocol_config()
1095 .min_checkpoint_interval_ms_as_option()
1096 .unwrap_or_default(),
1097 );
1098 if let Some(first_digest) = checkpoint_contents.inner().first_digests() {
1099 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1100 .unwrap_or_else(||
1101 fatal!(
1102 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1103 checkpoint.sequence_number()
1104 )
1105 );
1106 if let TransactionKind::RandomnessStateUpdate(rsu) =
1107 maybe_randomness_tx.data().transaction_data().kind()
1108 {
1109 vec![rsu.randomness_round]
1110 } else {
1111 Vec::new()
1112 }
1113 } else {
1114 Vec::new()
1115 }
1116 }
1117 }
1118}