1mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120 pub roots: Vec<TransactionKey>,
121 pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125pub struct BuilderCheckpointSummary {
126 pub summary: CheckpointSummary,
127 pub checkpoint_height: Option<CheckpointHeight>,
129 pub position_in_commit: usize,
130}
131
132#[derive(DBMapUtils)]
133#[cfg_attr(tidehunter, tidehunter)]
134pub struct CheckpointStoreTables {
135 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
137
138 pub(crate) checkpoint_sequence_by_contents_digest:
140 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
141
142 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
146 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
149 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
150 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
151
152 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
154 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
156
157 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
161
162 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
164
165 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
168
169 pub(crate) transaction_fork_detected: DBMap<
171 u8,
172 (
173 TransactionDigest,
174 TransactionEffectsDigest,
175 TransactionEffectsDigest,
176 ),
177 >,
178}
179
180fn full_checkpoint_content_table_default_config() -> DBOptions {
181 DBOptions {
182 options: default_db_options().options,
183 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
187 }
188}
189
190impl CheckpointStoreTables {
191 #[cfg(not(tidehunter))]
192 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
193 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
194 }
195
196 #[cfg(tidehunter)]
197 pub fn new(
198 path: &Path,
199 metric_name: &'static str,
200 pruner_watermarks: Arc<PrunerWatermarks>,
201 ) -> Self {
202 tracing::warn!("Checkpoint DB using tidehunter");
203 use crate::authority::authority_store_pruner::apply_relocation_filter;
204 use typed_store::tidehunter_util::{
205 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
206 default_mutex_count, default_value_cache_size,
207 };
208 let mutexes = default_mutex_count() * 4;
209 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
210 let override_dirty_keys_config = KeySpaceConfig::new()
211 .with_max_dirty_keys(64_000)
212 .with_value_cache_size(default_value_cache_size());
213 let config_u64 = ThConfig::new_with_config(
214 8,
215 mutexes,
216 u64_sequence_key,
217 override_dirty_keys_config.clone(),
218 );
219 let digest_config = ThConfig::new_with_rm_prefix(
220 32,
221 mutexes,
222 KeyType::uniform(default_cells_per_mutex()),
223 KeySpaceConfig::default(),
224 vec![0, 0, 0, 0, 0, 0, 0, 32],
225 );
226 let watermarks_config = KeySpaceConfig::new()
227 .with_value_cache_size(10)
228 .disable_unload();
229 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
230 let configs = vec![
231 (
232 "checkpoint_content",
233 digest_config.clone().with_config(
234 lru_config
235 .clone()
236 .with_relocation_filter(|_, _| Decision::Remove),
237 ),
238 ),
239 (
240 "checkpoint_sequence_by_contents_digest",
241 digest_config.clone().with_config(apply_relocation_filter(
242 KeySpaceConfig::default(),
243 pruner_watermarks.checkpoint_id.clone(),
244 |sequence_number: CheckpointSequenceNumber| sequence_number,
245 false,
246 )),
247 ),
248 (
249 "full_checkpoint_content",
250 config_u64.clone().with_config(apply_relocation_filter(
251 override_dirty_keys_config.clone(),
252 pruner_watermarks.checkpoint_id.clone(),
253 |sequence_number: CheckpointSequenceNumber| sequence_number,
254 true,
255 )),
256 ),
257 ("certified_checkpoints", config_u64.clone()),
258 (
259 "checkpoint_by_digest",
260 digest_config.clone().with_config(apply_relocation_filter(
261 lru_config,
262 pruner_watermarks.epoch_id.clone(),
263 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
264 false,
265 )),
266 ),
267 (
268 "locally_computed_checkpoints",
269 config_u64.clone().with_config(apply_relocation_filter(
270 override_dirty_keys_config.clone(),
271 pruner_watermarks.checkpoint_id.clone(),
272 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
273 true,
274 )),
275 ),
276 ("epoch_last_checkpoint_map", config_u64.clone()),
277 (
278 "watermarks",
279 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
280 ),
281 (
282 "transaction_fork_detected",
283 ThConfig::new_with_config(
284 1,
285 1,
286 KeyType::uniform(1),
287 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
288 ),
289 ),
290 (
291 "full_checkpoint_content_v2",
292 config_u64.clone().with_config(apply_relocation_filter(
293 override_dirty_keys_config.clone(),
294 pruner_watermarks.checkpoint_id.clone(),
295 |sequence_number: CheckpointSequenceNumber| sequence_number,
296 true,
297 )),
298 ),
299 ];
300 Self::open_tables_read_write(
301 path.to_path_buf(),
302 MetricConf::new(metric_name),
303 configs
304 .into_iter()
305 .map(|(cf, config)| (cf.to_string(), config))
306 .collect(),
307 )
308 }
309
310 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
311 Self::get_read_only_handle(
312 path.to_path_buf(),
313 None,
314 None,
315 MetricConf::new("checkpoint_readonly"),
316 )
317 }
318}
319
320pub struct CheckpointStore {
321 pub(crate) tables: CheckpointStoreTables,
322 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
323 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
324}
325
326impl CheckpointStore {
327 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
328 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
329 Arc::new(Self {
330 tables,
331 synced_checkpoint_notify_read: NotifyRead::new(),
332 executed_checkpoint_notify_read: NotifyRead::new(),
333 })
334 }
335
336 pub fn new_for_tests() -> Arc<Self> {
337 let ckpt_dir = mysten_common::tempdir().unwrap();
338 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
339 }
340
341 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
342 let tables = CheckpointStoreTables::new(
343 path,
344 "db_checkpoint",
345 Arc::new(PrunerWatermarks::default()),
346 );
347 Arc::new(Self {
348 tables,
349 synced_checkpoint_notify_read: NotifyRead::new(),
350 executed_checkpoint_notify_read: NotifyRead::new(),
351 })
352 }
353
354 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
355 CheckpointStoreTables::open_readonly(path)
356 }
357
358 #[instrument(level = "info", skip_all)]
359 pub fn insert_genesis_checkpoint(
360 &self,
361 checkpoint: VerifiedCheckpoint,
362 contents: CheckpointContents,
363 epoch_store: &AuthorityPerEpochStore,
364 ) {
365 assert_eq!(
366 checkpoint.epoch(),
367 0,
368 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
369 );
370 assert_eq!(
371 *checkpoint.sequence_number(),
372 0,
373 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
374 );
375
376 match self.get_checkpoint_by_sequence_number(0).unwrap() {
378 Some(existing_checkpoint) => {
379 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
380 }
381 None => {
382 if epoch_store.epoch() == checkpoint.epoch {
383 epoch_store
384 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
385 .unwrap();
386 } else {
387 debug!(
388 validator_epoch =% epoch_store.epoch(),
389 genesis_epoch =% checkpoint.epoch(),
390 "Not inserting checkpoint builder data for genesis checkpoint",
391 );
392 }
393 self.insert_checkpoint_contents(contents).unwrap();
394 self.insert_verified_checkpoint(&checkpoint).unwrap();
395 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
396 }
397 }
398 }
399
400 pub fn get_checkpoint_by_digest(
401 &self,
402 digest: &CheckpointDigest,
403 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
404 self.tables
405 .checkpoint_by_digest
406 .get(digest)
407 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
408 }
409
410 pub fn get_checkpoint_by_sequence_number(
411 &self,
412 sequence_number: CheckpointSequenceNumber,
413 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
414 self.tables
415 .certified_checkpoints
416 .get(&sequence_number)
417 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
418 }
419
420 pub fn get_locally_computed_checkpoint(
421 &self,
422 sequence_number: CheckpointSequenceNumber,
423 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
424 self.tables
425 .locally_computed_checkpoints
426 .get(&sequence_number)
427 }
428
429 pub fn multi_get_locally_computed_checkpoints(
430 &self,
431 sequence_numbers: &[CheckpointSequenceNumber],
432 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
433 let checkpoints = self
434 .tables
435 .locally_computed_checkpoints
436 .multi_get(sequence_numbers)?;
437
438 Ok(checkpoints)
439 }
440
441 pub fn get_sequence_number_by_contents_digest(
442 &self,
443 digest: &CheckpointContentsDigest,
444 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
445 self.tables
446 .checkpoint_sequence_by_contents_digest
447 .get(digest)
448 }
449
450 pub fn delete_contents_digest_sequence_number_mapping(
451 &self,
452 digest: &CheckpointContentsDigest,
453 ) -> Result<(), TypedStoreError> {
454 self.tables
455 .checkpoint_sequence_by_contents_digest
456 .remove(digest)
457 }
458
459 pub fn get_latest_certified_checkpoint(
460 &self,
461 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
462 Ok(self
463 .tables
464 .certified_checkpoints
465 .reversed_safe_iter_with_bounds(None, None)?
466 .next()
467 .transpose()?
468 .map(|(_, v)| v.into()))
469 }
470
471 pub fn get_latest_locally_computed_checkpoint(
472 &self,
473 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
474 Ok(self
475 .tables
476 .locally_computed_checkpoints
477 .reversed_safe_iter_with_bounds(None, None)?
478 .next()
479 .transpose()?
480 .map(|(_, v)| v))
481 }
482
483 pub fn multi_get_checkpoint_by_sequence_number(
484 &self,
485 sequence_numbers: &[CheckpointSequenceNumber],
486 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
487 let checkpoints = self
488 .tables
489 .certified_checkpoints
490 .multi_get(sequence_numbers)?
491 .into_iter()
492 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
493 .collect();
494
495 Ok(checkpoints)
496 }
497
498 pub fn multi_get_checkpoint_content(
499 &self,
500 contents_digest: &[CheckpointContentsDigest],
501 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
502 self.tables.checkpoint_content.multi_get(contents_digest)
503 }
504
505 pub fn get_highest_verified_checkpoint(
506 &self,
507 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
508 let highest_verified = if let Some(highest_verified) = self
509 .tables
510 .watermarks
511 .get(&CheckpointWatermark::HighestVerified)?
512 {
513 highest_verified
514 } else {
515 return Ok(None);
516 };
517 self.get_checkpoint_by_digest(&highest_verified.1)
518 }
519
520 pub fn get_highest_synced_checkpoint(
521 &self,
522 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
523 let highest_synced = if let Some(highest_synced) = self
524 .tables
525 .watermarks
526 .get(&CheckpointWatermark::HighestSynced)?
527 {
528 highest_synced
529 } else {
530 return Ok(None);
531 };
532 self.get_checkpoint_by_digest(&highest_synced.1)
533 }
534
535 pub fn get_highest_synced_checkpoint_seq_number(
536 &self,
537 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
538 if let Some(highest_synced) = self
539 .tables
540 .watermarks
541 .get(&CheckpointWatermark::HighestSynced)?
542 {
543 Ok(Some(highest_synced.0))
544 } else {
545 Ok(None)
546 }
547 }
548
549 pub fn get_highest_executed_checkpoint_seq_number(
550 &self,
551 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
552 if let Some(highest_executed) = self
553 .tables
554 .watermarks
555 .get(&CheckpointWatermark::HighestExecuted)?
556 {
557 Ok(Some(highest_executed.0))
558 } else {
559 Ok(None)
560 }
561 }
562
563 pub fn get_highest_executed_checkpoint(
564 &self,
565 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
566 let highest_executed = if let Some(highest_executed) = self
567 .tables
568 .watermarks
569 .get(&CheckpointWatermark::HighestExecuted)?
570 {
571 highest_executed
572 } else {
573 return Ok(None);
574 };
575 self.get_checkpoint_by_digest(&highest_executed.1)
576 }
577
578 pub fn get_highest_pruned_checkpoint_seq_number(
579 &self,
580 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
581 self.tables
582 .watermarks
583 .get(&CheckpointWatermark::HighestPruned)
584 .map(|watermark| watermark.map(|w| w.0))
585 }
586
587 pub fn get_checkpoint_contents(
588 &self,
589 digest: &CheckpointContentsDigest,
590 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
591 self.tables.checkpoint_content.get(digest)
592 }
593
594 pub fn get_full_checkpoint_contents_by_sequence_number(
595 &self,
596 seq: CheckpointSequenceNumber,
597 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
598 self.tables.full_checkpoint_content_v2.get(&seq)
599 }
600
601 fn prune_local_summaries(&self) -> SuiResult {
602 if let Some((last_local_summary, _)) = self
603 .tables
604 .locally_computed_checkpoints
605 .reversed_safe_iter_with_bounds(None, None)?
606 .next()
607 .transpose()?
608 {
609 let mut batch = self.tables.locally_computed_checkpoints.batch();
610 batch.schedule_delete_range(
611 &self.tables.locally_computed_checkpoints,
612 &0,
613 &last_local_summary,
614 )?;
615 batch.write()?;
616 info!("Pruned local summaries up to {:?}", last_local_summary);
617 }
618 Ok(())
619 }
620
621 pub fn clear_locally_computed_checkpoints_from(
622 &self,
623 from_seq: CheckpointSequenceNumber,
624 ) -> SuiResult {
625 let keys: Vec<_> = self
626 .tables
627 .locally_computed_checkpoints
628 .safe_iter_with_bounds(Some(from_seq), None)
629 .map(|r| r.map(|(k, _)| k))
630 .collect::<Result<_, _>>()?;
631 if let Some(&last_local_summary) = keys.last() {
632 let mut batch = self.tables.locally_computed_checkpoints.batch();
633 batch
634 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
635 .expect("Failed to delete locally computed checkpoints");
636 batch
637 .write()
638 .expect("Failed to delete locally computed checkpoints");
639 warn!(
640 from_seq,
641 last_local_summary,
642 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
643 from_seq,
644 last_local_summary
645 );
646 }
647 Ok(())
648 }
649
650 fn check_for_checkpoint_fork(
651 &self,
652 local_checkpoint: &CheckpointSummary,
653 verified_checkpoint: &VerifiedCheckpoint,
654 ) {
655 if local_checkpoint != verified_checkpoint.data() {
656 let verified_contents = self
657 .get_checkpoint_contents(&verified_checkpoint.content_digest)
658 .map(|opt_contents| {
659 opt_contents
660 .map(|contents| format!("{:?}", contents))
661 .unwrap_or_else(|| {
662 format!(
663 "Verified checkpoint contents not found, digest: {:?}",
664 verified_checkpoint.content_digest,
665 )
666 })
667 })
668 .map_err(|e| {
669 format!(
670 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
671 verified_checkpoint.content_digest, e
672 )
673 })
674 .unwrap_or_else(|err_msg| err_msg);
675
676 let local_contents = self
677 .get_checkpoint_contents(&local_checkpoint.content_digest)
678 .map(|opt_contents| {
679 opt_contents
680 .map(|contents| format!("{:?}", contents))
681 .unwrap_or_else(|| {
682 format!(
683 "Local checkpoint contents not found, digest: {:?}",
684 local_checkpoint.content_digest
685 )
686 })
687 })
688 .map_err(|e| {
689 format!(
690 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
691 local_checkpoint.content_digest, e
692 )
693 })
694 .unwrap_or_else(|err_msg| err_msg);
695
696 error!(
698 verified_checkpoint = ?verified_checkpoint.data(),
699 ?verified_contents,
700 ?local_checkpoint,
701 ?local_contents,
702 "Local checkpoint fork detected!",
703 );
704
705 if let Err(e) = self.record_checkpoint_fork_detected(
707 *local_checkpoint.sequence_number(),
708 local_checkpoint.digest(),
709 ) {
710 error!("Failed to record checkpoint fork in database: {:?}", e);
711 }
712
713 fail_point_arg!(
714 "kill_checkpoint_fork_node",
715 |checkpoint_overrides: std::sync::Arc<
716 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
717 >| {
718 #[cfg(msim)]
719 {
720 if let Ok(mut overrides) = checkpoint_overrides.lock() {
721 overrides.insert(
722 local_checkpoint.sequence_number,
723 verified_checkpoint.digest().to_string(),
724 );
725 }
726 tracing::error!(
727 fatal = true,
728 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
729 local_checkpoint.sequence_number(),
730 verified_checkpoint.digest()
731 );
732 sui_simulator::task::shutdown_current_node();
733 }
734 }
735 );
736
737 fatal!(
738 "Local checkpoint fork detected for sequence number: {}",
739 local_checkpoint.sequence_number()
740 );
741 }
742 }
743
744 pub fn insert_certified_checkpoint(
750 &self,
751 checkpoint: &VerifiedCheckpoint,
752 ) -> Result<(), TypedStoreError> {
753 debug!(
754 checkpoint_seq = checkpoint.sequence_number(),
755 "Inserting certified checkpoint",
756 );
757 let mut batch = self.tables.certified_checkpoints.batch();
758 batch
759 .insert_batch(
760 &self.tables.certified_checkpoints,
761 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
762 )?
763 .insert_batch(
764 &self.tables.checkpoint_by_digest,
765 [(checkpoint.digest(), checkpoint.serializable_ref())],
766 )?;
767 if checkpoint.next_epoch_committee().is_some() {
768 batch.insert_batch(
769 &self.tables.epoch_last_checkpoint_map,
770 [(&checkpoint.epoch(), checkpoint.sequence_number())],
771 )?;
772 }
773 batch.write()?;
774
775 if let Some(local_checkpoint) = self
776 .tables
777 .locally_computed_checkpoints
778 .get(checkpoint.sequence_number())?
779 {
780 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
781 }
782
783 Ok(())
784 }
785
786 #[instrument(level = "debug", skip_all)]
789 pub fn insert_verified_checkpoint(
790 &self,
791 checkpoint: &VerifiedCheckpoint,
792 ) -> Result<(), TypedStoreError> {
793 self.insert_certified_checkpoint(checkpoint)?;
794 self.update_highest_verified_checkpoint(checkpoint)
795 }
796
797 pub fn update_highest_verified_checkpoint(
798 &self,
799 checkpoint: &VerifiedCheckpoint,
800 ) -> Result<(), TypedStoreError> {
801 if Some(*checkpoint.sequence_number())
802 > self
803 .get_highest_verified_checkpoint()?
804 .map(|x| *x.sequence_number())
805 {
806 debug!(
807 checkpoint_seq = checkpoint.sequence_number(),
808 "Updating highest verified checkpoint",
809 );
810 self.tables.watermarks.insert(
811 &CheckpointWatermark::HighestVerified,
812 &(*checkpoint.sequence_number(), *checkpoint.digest()),
813 )?;
814 }
815
816 Ok(())
817 }
818
819 pub fn update_highest_synced_checkpoint(
820 &self,
821 checkpoint: &VerifiedCheckpoint,
822 ) -> Result<(), TypedStoreError> {
823 let seq = *checkpoint.sequence_number();
824 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
825 self.tables.watermarks.insert(
826 &CheckpointWatermark::HighestSynced,
827 &(seq, *checkpoint.digest()),
828 )?;
829 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
830 Ok(())
831 }
832
833 async fn notify_read_checkpoint_watermark<F>(
834 &self,
835 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
836 seq: CheckpointSequenceNumber,
837 get_watermark: F,
838 ) -> VerifiedCheckpoint
839 where
840 F: Fn() -> Option<CheckpointSequenceNumber>,
841 {
842 notify_read
843 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
844 let seq = seqs[0];
845 let Some(highest) = get_watermark() else {
846 return vec![None];
847 };
848 if highest < seq {
849 return vec![None];
850 }
851 let checkpoint = self
852 .get_checkpoint_by_sequence_number(seq)
853 .expect("db error")
854 .expect("checkpoint not found");
855 vec![Some(checkpoint)]
856 })
857 .await
858 .into_iter()
859 .next()
860 .unwrap()
861 }
862
863 pub async fn notify_read_synced_checkpoint(
864 &self,
865 seq: CheckpointSequenceNumber,
866 ) -> VerifiedCheckpoint {
867 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
868 self.get_highest_synced_checkpoint_seq_number()
869 .expect("db error")
870 })
871 .await
872 }
873
874 pub async fn notify_read_executed_checkpoint(
875 &self,
876 seq: CheckpointSequenceNumber,
877 ) -> VerifiedCheckpoint {
878 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
879 self.get_highest_executed_checkpoint_seq_number()
880 .expect("db error")
881 })
882 .await
883 }
884
885 pub fn update_highest_executed_checkpoint(
886 &self,
887 checkpoint: &VerifiedCheckpoint,
888 ) -> Result<(), TypedStoreError> {
889 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
890 if seq_number >= *checkpoint.sequence_number() {
891 return Ok(());
892 }
893 assert_eq!(
894 seq_number + 1,
895 *checkpoint.sequence_number(),
896 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
897 checkpoint.sequence_number(),
898 seq_number
899 );
900 }
901 let seq = *checkpoint.sequence_number();
902 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
903 self.tables.watermarks.insert(
904 &CheckpointWatermark::HighestExecuted,
905 &(seq, *checkpoint.digest()),
906 )?;
907 self.executed_checkpoint_notify_read
908 .notify(&seq, checkpoint);
909 Ok(())
910 }
911
912 pub fn update_highest_pruned_checkpoint(
913 &self,
914 checkpoint: &VerifiedCheckpoint,
915 ) -> Result<(), TypedStoreError> {
916 self.tables.watermarks.insert(
917 &CheckpointWatermark::HighestPruned,
918 &(*checkpoint.sequence_number(), *checkpoint.digest()),
919 )
920 }
921
922 pub fn set_highest_executed_checkpoint_subtle(
927 &self,
928 checkpoint: &VerifiedCheckpoint,
929 ) -> Result<(), TypedStoreError> {
930 self.tables.watermarks.insert(
931 &CheckpointWatermark::HighestExecuted,
932 &(*checkpoint.sequence_number(), *checkpoint.digest()),
933 )
934 }
935
936 pub fn insert_checkpoint_contents(
937 &self,
938 contents: CheckpointContents,
939 ) -> Result<(), TypedStoreError> {
940 debug!(
941 checkpoint_seq = ?contents.digest(),
942 "Inserting checkpoint contents",
943 );
944 self.tables
945 .checkpoint_content
946 .insert(contents.digest(), &contents)
947 }
948
949 pub fn insert_verified_checkpoint_contents(
950 &self,
951 checkpoint: &VerifiedCheckpoint,
952 full_contents: VerifiedCheckpointContents,
953 ) -> Result<(), TypedStoreError> {
954 let mut batch = self.tables.full_checkpoint_content_v2.batch();
955 batch.insert_batch(
956 &self.tables.checkpoint_sequence_by_contents_digest,
957 [(&checkpoint.content_digest, checkpoint.sequence_number())],
958 )?;
959 let full_contents = full_contents.into_inner();
960 batch.insert_batch(
961 &self.tables.full_checkpoint_content_v2,
962 [(checkpoint.sequence_number(), &full_contents)],
963 )?;
964
965 let contents = full_contents.into_checkpoint_contents();
966 assert_eq!(&checkpoint.content_digest, contents.digest());
967
968 batch.insert_batch(
969 &self.tables.checkpoint_content,
970 [(contents.digest(), &contents)],
971 )?;
972
973 batch.write()
974 }
975
976 pub fn delete_full_checkpoint_contents(
977 &self,
978 seq: CheckpointSequenceNumber,
979 ) -> Result<(), TypedStoreError> {
980 self.tables.full_checkpoint_content.remove(&seq)?;
981 self.tables.full_checkpoint_content_v2.remove(&seq)
982 }
983
984 pub fn get_epoch_last_checkpoint(
985 &self,
986 epoch_id: EpochId,
987 ) -> SuiResult<Option<VerifiedCheckpoint>> {
988 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
989 let checkpoint = match seq {
990 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
991 None => None,
992 };
993 Ok(checkpoint)
994 }
995
996 pub fn get_epoch_last_checkpoint_seq_number(
997 &self,
998 epoch_id: EpochId,
999 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1000 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1001 Ok(seq)
1002 }
1003
1004 pub fn insert_epoch_last_checkpoint(
1005 &self,
1006 epoch_id: EpochId,
1007 checkpoint: &VerifiedCheckpoint,
1008 ) -> SuiResult {
1009 self.tables
1010 .epoch_last_checkpoint_map
1011 .insert(&epoch_id, checkpoint.sequence_number())?;
1012 Ok(())
1013 }
1014
1015 pub fn get_epoch_state_commitments(
1016 &self,
1017 epoch: EpochId,
1018 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1019 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1020 checkpoint
1021 .end_of_epoch_data
1022 .as_ref()
1023 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1024 .epoch_commitments
1025 .clone()
1026 });
1027 Ok(commitments)
1028 }
1029
1030 pub fn get_epoch_stats(
1032 &self,
1033 epoch: EpochId,
1034 last_checkpoint: &CheckpointSummary,
1035 ) -> Option<EpochStats> {
1036 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1037 (0, 0)
1038 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1039 (
1040 checkpoint.sequence_number + 1,
1041 checkpoint.network_total_transactions,
1042 )
1043 } else {
1044 return None;
1045 };
1046 Some(EpochStats {
1047 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1048 transaction_count: last_checkpoint.network_total_transactions
1049 - prev_epoch_network_transactions,
1050 total_gas_reward: last_checkpoint
1051 .epoch_rolling_gas_cost_summary
1052 .computation_cost,
1053 })
1054 }
1055
1056 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1057 self.tables
1059 .checkpoint_content
1060 .checkpoint_db(path)
1061 .map_err(Into::into)
1062 }
1063
1064 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1065 let mut wb = self.tables.watermarks.batch();
1066 wb.delete_batch(
1067 &self.tables.watermarks,
1068 std::iter::once(CheckpointWatermark::HighestExecuted),
1069 )?;
1070 wb.write()?;
1071 Ok(())
1072 }
1073
1074 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1075 self.delete_highest_executed_checkpoint_test_only()?;
1076 Ok(())
1077 }
1078
1079 pub fn record_checkpoint_fork_detected(
1080 &self,
1081 checkpoint_seq: CheckpointSequenceNumber,
1082 checkpoint_digest: CheckpointDigest,
1083 ) -> Result<(), TypedStoreError> {
1084 info!(
1085 checkpoint_seq = checkpoint_seq,
1086 checkpoint_digest = ?checkpoint_digest,
1087 "Recording checkpoint fork detection in database"
1088 );
1089 self.tables.watermarks.insert(
1090 &CheckpointWatermark::CheckpointForkDetected,
1091 &(checkpoint_seq, checkpoint_digest),
1092 )
1093 }
1094
1095 pub fn get_checkpoint_fork_detected(
1096 &self,
1097 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1098 self.tables
1099 .watermarks
1100 .get(&CheckpointWatermark::CheckpointForkDetected)
1101 }
1102
1103 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1104 self.tables
1105 .watermarks
1106 .remove(&CheckpointWatermark::CheckpointForkDetected)
1107 }
1108
1109 pub fn record_transaction_fork_detected(
1110 &self,
1111 tx_digest: TransactionDigest,
1112 expected_effects_digest: TransactionEffectsDigest,
1113 actual_effects_digest: TransactionEffectsDigest,
1114 ) -> Result<(), TypedStoreError> {
1115 info!(
1116 tx_digest = ?tx_digest,
1117 expected_effects_digest = ?expected_effects_digest,
1118 actual_effects_digest = ?actual_effects_digest,
1119 "Recording transaction fork detection in database"
1120 );
1121 self.tables.transaction_fork_detected.insert(
1122 &TRANSACTION_FORK_DETECTED_KEY,
1123 &(tx_digest, expected_effects_digest, actual_effects_digest),
1124 )
1125 }
1126
1127 pub fn get_transaction_fork_detected(
1128 &self,
1129 ) -> Result<
1130 Option<(
1131 TransactionDigest,
1132 TransactionEffectsDigest,
1133 TransactionEffectsDigest,
1134 )>,
1135 TypedStoreError,
1136 > {
1137 self.tables
1138 .transaction_fork_detected
1139 .get(&TRANSACTION_FORK_DETECTED_KEY)
1140 }
1141
1142 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1143 self.tables
1144 .transaction_fork_detected
1145 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1146 }
1147}
1148
1149#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1150pub enum CheckpointWatermark {
1151 HighestVerified,
1152 HighestSynced,
1153 HighestExecuted,
1154 HighestPruned,
1155 CheckpointForkDetected,
1156}
1157
1158struct CheckpointStateHasher {
1159 epoch_store: Arc<AuthorityPerEpochStore>,
1160 hasher: Weak<GlobalStateHasher>,
1161 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1162}
1163
1164impl CheckpointStateHasher {
1165 fn new(
1166 epoch_store: Arc<AuthorityPerEpochStore>,
1167 hasher: Weak<GlobalStateHasher>,
1168 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1169 ) -> Self {
1170 Self {
1171 epoch_store,
1172 hasher,
1173 receive_from_builder,
1174 }
1175 }
1176
1177 async fn run(self) {
1178 let Self {
1179 epoch_store,
1180 hasher,
1181 mut receive_from_builder,
1182 } = self;
1183 while let Some((seq, effects)) = receive_from_builder.recv().await {
1184 let Some(hasher) = hasher.upgrade() else {
1185 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1186 break;
1187 };
1188 hasher
1189 .accumulate_checkpoint(&effects, seq, &epoch_store)
1190 .expect("epoch ended while accumulating checkpoint");
1191 }
1192 }
1193}
1194
1195#[derive(Debug)]
1196pub(crate) enum CheckpointBuilderError {
1197 ChangeEpochTxAlreadyExecuted,
1198 SystemPackagesMissing,
1199 Retry(anyhow::Error),
1200}
1201
1202impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1203 for CheckpointBuilderError
1204{
1205 fn from(e: SuiError) -> Self {
1206 Self::Retry(e.into())
1207 }
1208}
1209
1210pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1211
1212pub struct CheckpointBuilder {
1213 state: Arc<AuthorityState>,
1214 store: Arc<CheckpointStore>,
1215 epoch_store: Arc<AuthorityPerEpochStore>,
1216 notify: Arc<Notify>,
1217 notify_aggregator: Arc<Notify>,
1218 last_built: watch::Sender<CheckpointSequenceNumber>,
1219 effects_store: Arc<dyn TransactionCacheRead>,
1220 global_state_hasher: Weak<GlobalStateHasher>,
1221 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1222 output: Box<dyn CheckpointOutput>,
1223 metrics: Arc<CheckpointMetrics>,
1224 max_transactions_per_checkpoint: usize,
1225 max_checkpoint_size_bytes: usize,
1226}
1227
1228pub struct CheckpointAggregator {
1229 store: Arc<CheckpointStore>,
1230 epoch_store: Arc<AuthorityPerEpochStore>,
1231 notify: Arc<Notify>,
1232 current: Option<CheckpointSignatureAggregator>,
1233 output: Box<dyn CertifiedCheckpointOutput>,
1234 state: Arc<AuthorityState>,
1235 metrics: Arc<CheckpointMetrics>,
1236}
1237
1238pub struct CheckpointSignatureAggregator {
1240 next_index: u64,
1241 summary: CheckpointSummary,
1242 digest: CheckpointDigest,
1243 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1245 store: Arc<CheckpointStore>,
1246 state: Arc<AuthorityState>,
1247 metrics: Arc<CheckpointMetrics>,
1248}
1249
1250impl CheckpointBuilder {
1251 fn new(
1252 state: Arc<AuthorityState>,
1253 store: Arc<CheckpointStore>,
1254 epoch_store: Arc<AuthorityPerEpochStore>,
1255 notify: Arc<Notify>,
1256 effects_store: Arc<dyn TransactionCacheRead>,
1257 global_state_hasher: Weak<GlobalStateHasher>,
1259 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1261 output: Box<dyn CheckpointOutput>,
1262 notify_aggregator: Arc<Notify>,
1263 last_built: watch::Sender<CheckpointSequenceNumber>,
1264 metrics: Arc<CheckpointMetrics>,
1265 max_transactions_per_checkpoint: usize,
1266 max_checkpoint_size_bytes: usize,
1267 ) -> Self {
1268 Self {
1269 state,
1270 store,
1271 epoch_store,
1272 notify,
1273 effects_store,
1274 global_state_hasher,
1275 send_to_hasher,
1276 output,
1277 notify_aggregator,
1278 last_built,
1279 metrics,
1280 max_transactions_per_checkpoint,
1281 max_checkpoint_size_bytes,
1282 }
1283 }
1284
1285 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1293 if let Some(replay_waiter) = consensus_replay_waiter {
1294 info!("Waiting for consensus commits to replay ...");
1295 replay_waiter.wait_for_replay().await;
1296 info!("Consensus commits finished replaying");
1297 }
1298 info!("Starting CheckpointBuilder");
1299 loop {
1300 match self.maybe_build_checkpoints().await {
1301 Ok(()) => {}
1302 err @ Err(
1303 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1304 | CheckpointBuilderError::SystemPackagesMissing,
1305 ) => {
1306 info!("CheckpointBuilder stopping: {:?}", err);
1307 return;
1308 }
1309 Err(CheckpointBuilderError::Retry(inner)) => {
1310 let msg = format!("{:?}", inner);
1311 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1312 tokio::time::sleep(Duration::from_secs(1)).await;
1313 self.metrics.checkpoint_errors.inc();
1314 continue;
1315 }
1316 }
1317
1318 self.notify.notified().await;
1319 }
1320 }
1321
1322 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1323 let _scope = monitored_scope("BuildCheckpoints");
1324
1325 let summary = self
1327 .epoch_store
1328 .last_built_checkpoint_builder_summary()
1329 .expect("epoch should not have ended");
1330 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1331 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1332
1333 let min_checkpoint_interval_ms = self
1334 .epoch_store
1335 .protocol_config()
1336 .min_checkpoint_interval_ms_as_option()
1337 .unwrap_or_default();
1338 let mut grouped_pending_checkpoints = Vec::new();
1339 let mut checkpoints_iter = self
1340 .epoch_store
1341 .get_pending_checkpoints(last_height)
1342 .expect("unexpected epoch store error")
1343 .into_iter()
1344 .peekable();
1345 while let Some((height, pending)) = checkpoints_iter.next() {
1346 let current_timestamp = pending.details().timestamp_ms;
1349 let can_build = match last_timestamp {
1350 Some(last_timestamp) => {
1351 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1352 }
1353 None => true,
1354 } || checkpoints_iter
1357 .peek()
1358 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1359 || pending.details().last_of_epoch;
1361 grouped_pending_checkpoints.push(pending);
1362 if !can_build {
1363 debug!(
1364 checkpoint_commit_height = height,
1365 ?last_timestamp,
1366 ?current_timestamp,
1367 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1368 );
1369 continue;
1370 }
1371
1372 last_height = Some(height);
1374 last_timestamp = Some(current_timestamp);
1375 debug!(
1376 checkpoint_commit_height_from = grouped_pending_checkpoints
1377 .first()
1378 .unwrap()
1379 .details()
1380 .checkpoint_height,
1381 checkpoint_commit_height_to = last_height,
1382 "Making checkpoint with commit height range"
1383 );
1384
1385 let seq = self
1386 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1387 .await?;
1388
1389 self.last_built.send_if_modified(|cur| {
1390 if seq > *cur {
1392 *cur = seq;
1393 true
1394 } else {
1395 false
1396 }
1397 });
1398
1399 tokio::task::yield_now().await;
1402 }
1403 debug!(
1404 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1405 grouped_pending_checkpoints.len(),
1406 );
1407
1408 Ok(())
1409 }
1410
1411 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1412 async fn make_checkpoint(
1413 &mut self,
1414 pendings: Vec<PendingCheckpoint>,
1415 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1416 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1417
1418 let pending_ckpt_str = pendings
1419 .iter()
1420 .map(|p| {
1421 format!(
1422 "height={}, commit={}",
1423 p.details().checkpoint_height,
1424 p.details().consensus_commit_ref
1425 )
1426 })
1427 .join("; ");
1428
1429 let last_details = pendings.last().unwrap().details().clone();
1430
1431 let highest_executed_sequence = self
1434 .store
1435 .get_highest_executed_checkpoint_seq_number()
1436 .expect("db error")
1437 .unwrap_or(0);
1438
1439 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1440 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1441
1442 let new_checkpoints = self
1443 .create_checkpoints(
1444 sorted_tx_effects_included_in_checkpoint,
1445 &last_details,
1446 &all_roots,
1447 )
1448 .await?;
1449 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1450 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1451 debug_fatal!(
1452 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1453 );
1454 }
1455
1456 let new_ckpt_str = new_checkpoints
1457 .iter()
1458 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1459 .join("; ");
1460
1461 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1462 .await?;
1463 info!(
1464 "Made new checkpoint {} from pending checkpoint {}",
1465 new_ckpt_str, pending_ckpt_str
1466 );
1467
1468 Ok(highest_sequence)
1469 }
1470
1471 async fn construct_and_execute_settlement_transactions(
1472 &self,
1473 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1474 checkpoint_height: CheckpointHeight,
1475 checkpoint_seq: CheckpointSequenceNumber,
1476 tx_index_offset: u64,
1477 ) -> (TransactionKey, Vec<TransactionEffects>) {
1478 let _scope =
1479 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1480
1481 let tx_key =
1482 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1483
1484 let epoch = self.epoch_store.epoch();
1485 let accumulator_root_obj_initial_shared_version = self
1486 .epoch_store
1487 .epoch_start_config()
1488 .accumulator_root_obj_initial_shared_version()
1489 .expect("accumulator root object must exist");
1490
1491 let builder = AccumulatorSettlementTxBuilder::new(
1492 Some(self.effects_store.as_ref()),
1493 sorted_tx_effects_included_in_checkpoint,
1494 checkpoint_seq,
1495 tx_index_offset,
1496 );
1497
1498 let funds_changes = builder.collect_funds_changes();
1499 let num_updates = builder.num_updates();
1500 let settlement_txns = builder.build_tx(
1501 self.epoch_store.protocol_config(),
1502 epoch,
1503 accumulator_root_obj_initial_shared_version,
1504 checkpoint_height,
1505 checkpoint_seq,
1506 );
1507
1508 let settlement_txns: Vec<_> = settlement_txns
1509 .into_iter()
1510 .map(|tx| {
1511 VerifiedExecutableTransaction::new_system(
1512 VerifiedTransaction::new_system_transaction(tx),
1513 self.epoch_store.epoch(),
1514 )
1515 })
1516 .collect();
1517
1518 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1519
1520 debug!(
1521 ?settlement_digests,
1522 ?tx_key,
1523 "created settlement transactions with {num_updates} updates"
1524 );
1525
1526 self.epoch_store
1527 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1528
1529 let settlement_effects = wait_for_effects_with_retry(
1530 self.effects_store.as_ref(),
1531 "CheckpointBuilder::notify_read_settlement_effects",
1532 &settlement_digests,
1533 tx_key,
1534 )
1535 .await;
1536
1537 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1538 epoch,
1539 accumulator_root_obj_initial_shared_version,
1540 checkpoint_height,
1541 &settlement_effects,
1542 );
1543
1544 let barrier_tx = VerifiedExecutableTransaction::new_system(
1545 VerifiedTransaction::new_system_transaction(barrier_tx),
1546 self.epoch_store.epoch(),
1547 );
1548 let barrier_digest = *barrier_tx.digest();
1549
1550 self.epoch_store
1551 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1552
1553 let barrier_effects = wait_for_effects_with_retry(
1554 self.effects_store.as_ref(),
1555 "CheckpointBuilder::notify_read_barrier_effects",
1556 &[barrier_digest],
1557 tx_key,
1558 )
1559 .await;
1560
1561 let settlement_effects: Vec<_> = settlement_effects
1562 .into_iter()
1563 .chain(barrier_effects)
1564 .collect();
1565
1566 let mut next_accumulator_version = None;
1567 for fx in settlement_effects.iter() {
1568 assert!(
1569 fx.status().is_ok(),
1570 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1571 fx.transaction_digest(),
1572 fx
1573 );
1574 if let Some(version) = fx
1575 .mutated()
1576 .iter()
1577 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1578 {
1579 assert!(
1580 next_accumulator_version.is_none(),
1581 "Only one settlement transaction should mutate the accumulator root object"
1582 );
1583 next_accumulator_version = Some(version);
1584 }
1585 }
1586 let settlements = FundsSettlement {
1587 next_accumulator_version: next_accumulator_version
1588 .expect("Accumulator root object should be mutated in the settlement transactions"),
1589 funds_changes,
1590 };
1591
1592 self.state.execution_scheduler().settle_funds(settlements);
1593
1594 (tx_key, settlement_effects)
1595 }
1596
1597 #[instrument(level = "debug", skip_all)]
1602 async fn resolve_checkpoint_transactions(
1603 &self,
1604 pending_checkpoints: Vec<PendingCheckpoint>,
1605 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1606 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1607
1608 let mut effects_in_current_checkpoint = BTreeSet::new();
1613
1614 let mut tx_effects = Vec::new();
1615 let mut tx_roots = HashSet::new();
1616
1617 for pending_checkpoint in pending_checkpoints.into_iter() {
1618 let mut pending = pending_checkpoint;
1619 debug!(
1620 checkpoint_commit_height = pending.details.checkpoint_height,
1621 "Resolving checkpoint transactions for pending checkpoint.",
1622 );
1623
1624 trace!(
1625 "roots for pending checkpoint {:?}: {:?}",
1626 pending.details.checkpoint_height, pending.roots,
1627 );
1628
1629 let settlement_root = if self.epoch_store.accumulators_enabled() {
1630 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1631 pending.roots.pop()
1632 else {
1633 fatal!("No settlement root found");
1634 };
1635 Some(settlement_root)
1636 } else {
1637 None
1638 };
1639
1640 let roots = &pending.roots;
1641
1642 self.metrics
1643 .checkpoint_roots_count
1644 .inc_by(roots.len() as u64);
1645
1646 let root_digests = self
1647 .epoch_store
1648 .notify_read_tx_key_to_digest(roots)
1649 .in_monitored_scope("CheckpointNotifyDigests")
1650 .await?;
1651 let root_effects = self
1652 .effects_store
1653 .notify_read_executed_effects(
1654 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1655 &root_digests,
1656 )
1657 .in_monitored_scope("CheckpointNotifyRead")
1658 .await;
1659
1660 let consensus_commit_prologue = if self
1661 .epoch_store
1662 .protocol_config()
1663 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1664 {
1665 let consensus_commit_prologue =
1669 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1670
1671 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1674 let unsorted_ccp = self.complete_checkpoint_effects(
1675 vec![ccp_effects.clone()],
1676 &mut effects_in_current_checkpoint,
1677 )?;
1678
1679 if unsorted_ccp.len() != 1 {
1682 fatal!(
1683 "Expected 1 consensus commit prologue, got {:?}",
1684 unsorted_ccp
1685 .iter()
1686 .map(|e| e.transaction_digest())
1687 .collect::<Vec<_>>()
1688 );
1689 }
1690 assert_eq!(unsorted_ccp.len(), 1);
1691 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1692 }
1693 consensus_commit_prologue
1694 } else {
1695 None
1696 };
1697
1698 let unsorted =
1699 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1700
1701 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1702 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1703 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1704 if cfg!(debug_assertions) {
1705 for tx in unsorted.iter() {
1707 assert!(tx.transaction_digest() != &ccp_digest);
1708 }
1709 }
1710 sorted.push(ccp_effects);
1711 }
1712 sorted.extend(CausalOrder::causal_sort(unsorted));
1713
1714 if let Some(settlement_root) = settlement_root {
1715 let last_checkpoint =
1718 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1719 let next_checkpoint_seq = last_checkpoint
1720 .as_ref()
1721 .map(|(seq, _)| *seq)
1722 .unwrap_or_default()
1723 + 1;
1724 let tx_index_offset = tx_effects.len() as u64;
1725
1726 let (tx_key, settlement_effects) = self
1727 .construct_and_execute_settlement_transactions(
1728 &sorted,
1729 pending.details.checkpoint_height,
1730 next_checkpoint_seq,
1731 tx_index_offset,
1732 )
1733 .await;
1734 debug!(?tx_key, "executed settlement transactions");
1735
1736 assert_eq!(settlement_root, tx_key);
1737
1738 sorted.extend(settlement_effects);
1745 }
1746
1747 #[cfg(msim)]
1748 {
1749 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1751 }
1752
1753 tx_effects.extend(sorted);
1754 tx_roots.extend(root_digests);
1755 }
1756
1757 Ok((tx_effects, tx_roots))
1758 }
1759
1760 fn extract_consensus_commit_prologue(
1765 &self,
1766 root_digests: &[TransactionDigest],
1767 root_effects: &[TransactionEffects],
1768 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1769 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1770 if root_digests.is_empty() {
1771 return Ok(None);
1772 }
1773
1774 let first_tx = self
1779 .state
1780 .get_transaction_cache_reader()
1781 .get_transaction_block(&root_digests[0])
1782 .expect("Transaction block must exist");
1783
1784 Ok(first_tx
1785 .transaction_data()
1786 .is_consensus_commit_prologue()
1787 .then(|| {
1788 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1789 (*first_tx.digest(), root_effects[0].clone())
1790 }))
1791 }
1792
1793 #[instrument(level = "debug", skip_all)]
1794 async fn write_checkpoints(
1795 &mut self,
1796 height: CheckpointHeight,
1797 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1798 ) -> SuiResult {
1799 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1800 let mut batch = self.store.tables.checkpoint_content.batch();
1801 let mut all_tx_digests =
1802 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1803
1804 for (summary, contents) in &new_checkpoints {
1805 debug!(
1806 checkpoint_commit_height = height,
1807 checkpoint_seq = summary.sequence_number,
1808 contents_digest = ?contents.digest(),
1809 "writing checkpoint",
1810 );
1811
1812 if let Some(previously_computed_summary) = self
1813 .store
1814 .tables
1815 .locally_computed_checkpoints
1816 .get(&summary.sequence_number)?
1817 && previously_computed_summary.digest() != summary.digest()
1818 {
1819 fatal!(
1820 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1821 summary.sequence_number,
1822 previously_computed_summary.digest(),
1823 summary.digest()
1824 );
1825 }
1826
1827 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1828
1829 self.metrics
1830 .transactions_included_in_checkpoint
1831 .inc_by(contents.size() as u64);
1832 let sequence_number = summary.sequence_number;
1833 self.metrics
1834 .last_constructed_checkpoint
1835 .set(sequence_number as i64);
1836
1837 batch.insert_batch(
1838 &self.store.tables.checkpoint_content,
1839 [(contents.digest(), contents)],
1840 )?;
1841
1842 batch.insert_batch(
1843 &self.store.tables.locally_computed_checkpoints,
1844 [(sequence_number, summary)],
1845 )?;
1846 }
1847
1848 batch.write()?;
1849
1850 for (summary, contents) in &new_checkpoints {
1852 self.output
1853 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1854 .await?;
1855 }
1856
1857 for (local_checkpoint, _) in &new_checkpoints {
1858 if let Some(certified_checkpoint) = self
1859 .store
1860 .tables
1861 .certified_checkpoints
1862 .get(local_checkpoint.sequence_number())?
1863 {
1864 self.store
1865 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1866 }
1867 }
1868
1869 self.notify_aggregator.notify_one();
1870 self.epoch_store
1871 .process_constructed_checkpoint(height, new_checkpoints);
1872 Ok(())
1873 }
1874
1875 #[allow(clippy::type_complexity)]
1876 fn split_checkpoint_chunks(
1877 &self,
1878 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1879 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
1880 ) -> CheckpointBuilderResult<
1881 Vec<
1882 Vec<(
1883 TransactionEffects,
1884 Vec<(GenericSignature, Option<SequenceNumber>)>,
1885 )>,
1886 >,
1887 > {
1888 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1889 let mut chunks = Vec::new();
1890 let mut chunk = Vec::new();
1891 let mut chunk_size: usize = 0;
1892 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1893 .into_iter()
1894 .zip(signatures.into_iter())
1895 {
1896 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
1901 bcs::serialized_size(&signatures)?
1902 } else {
1903 let signatures: Vec<&GenericSignature> =
1904 signatures.iter().map(|(s, _)| s).collect();
1905 bcs::serialized_size(&signatures)?
1906 };
1907 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
1908 if chunk.len() == self.max_transactions_per_checkpoint
1909 || (chunk_size + size) > self.max_checkpoint_size_bytes
1910 {
1911 if chunk.is_empty() {
1912 warn!(
1914 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1915 self.max_checkpoint_size_bytes
1916 );
1917 } else {
1918 chunks.push(chunk);
1919 chunk = Vec::new();
1920 chunk_size = 0;
1921 }
1922 }
1923
1924 chunk.push((effects, signatures));
1925 chunk_size += size;
1926 }
1927
1928 if !chunk.is_empty() || chunks.is_empty() {
1929 chunks.push(chunk);
1934 }
1939 Ok(chunks)
1940 }
1941
1942 fn load_last_built_checkpoint_summary(
1943 epoch_store: &AuthorityPerEpochStore,
1944 store: &CheckpointStore,
1945 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1946 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1947 if last_checkpoint.is_none() {
1948 let epoch = epoch_store.epoch();
1949 if epoch > 0 {
1950 let previous_epoch = epoch - 1;
1951 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1952 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1953 if let Some((ref seq, _)) = last_checkpoint {
1954 debug!(
1955 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1956 );
1957 } else {
1958 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1960 }
1961 }
1962 }
1963 Ok(last_checkpoint)
1964 }
1965
1966 #[instrument(level = "debug", skip_all)]
1967 async fn create_checkpoints(
1968 &self,
1969 all_effects: Vec<TransactionEffects>,
1970 details: &PendingCheckpointInfo,
1971 all_roots: &HashSet<TransactionDigest>,
1972 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1973 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1974
1975 let total = all_effects.len();
1976 let mut last_checkpoint =
1977 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1978 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1979 info!(
1980 checkpoint_commit_height = details.checkpoint_height,
1981 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1982 checkpoint_timestamp = details.timestamp_ms,
1983 "Creating checkpoint(s) for {} transactions",
1984 all_effects.len(),
1985 );
1986
1987 let all_digests: Vec<_> = all_effects
1988 .iter()
1989 .map(|effect| *effect.transaction_digest())
1990 .collect();
1991 let transactions_and_sizes = self
1992 .state
1993 .get_transaction_cache_reader()
1994 .get_transactions_and_serialized_sizes(&all_digests)?;
1995 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1996 let mut transactions = Vec::with_capacity(all_effects.len());
1997 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1998 let mut randomness_rounds = BTreeMap::new();
1999 {
2000 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2001 debug!(
2002 ?last_checkpoint_seq,
2003 "Waiting for {:?} certificates to appear in consensus",
2004 all_effects.len()
2005 );
2006
2007 for (effects, transaction_and_size) in all_effects
2008 .into_iter()
2009 .zip(transactions_and_sizes.into_iter())
2010 {
2011 let (transaction, size) = transaction_and_size
2012 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2013 match transaction.inner().transaction_data().kind() {
2014 TransactionKind::ConsensusCommitPrologue(_)
2015 | TransactionKind::ConsensusCommitPrologueV2(_)
2016 | TransactionKind::ConsensusCommitPrologueV3(_)
2017 | TransactionKind::ConsensusCommitPrologueV4(_)
2018 | TransactionKind::AuthenticatorStateUpdate(_) => {
2019 }
2022 TransactionKind::ProgrammableSystemTransaction(_) => {
2023 }
2025 TransactionKind::ChangeEpoch(_)
2026 | TransactionKind::Genesis(_)
2027 | TransactionKind::EndOfEpochTransaction(_) => {
2028 fatal!(
2029 "unexpected transaction in checkpoint effects: {:?}",
2030 transaction
2031 );
2032 }
2033 TransactionKind::RandomnessStateUpdate(rsu) => {
2034 randomness_rounds
2035 .insert(*effects.transaction_digest(), rsu.randomness_round);
2036 }
2037 TransactionKind::ProgrammableTransaction(_) => {
2038 let digest = *effects.transaction_digest();
2042 if !all_roots.contains(&digest) {
2043 transaction_keys.push(SequencedConsensusTransactionKey::External(
2044 ConsensusTransactionKey::Certificate(digest),
2045 ));
2046 }
2047 }
2048 }
2049 transactions.push(transaction);
2050 all_effects_and_transaction_sizes.push((effects, size));
2051 }
2052
2053 self.epoch_store
2054 .consensus_messages_processed_notify(transaction_keys)
2055 .await?;
2056 }
2057
2058 let signatures = self
2059 .epoch_store
2060 .user_signatures_for_checkpoint(&transactions, &all_digests);
2061 debug!(
2062 ?last_checkpoint_seq,
2063 "Received {} checkpoint user signatures from consensus",
2064 signatures.len()
2065 );
2066
2067 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2068 Some(
2069 transactions
2070 .iter()
2071 .flat_map(|tx| {
2072 if let TransactionKind::ProgrammableTransaction(ptb) =
2073 tx.transaction_data().kind()
2074 {
2075 itertools::Either::Left(
2076 ptb.commands
2077 .iter()
2078 .map(ExecutionTimeObservationKey::from_command),
2079 )
2080 } else {
2081 itertools::Either::Right(std::iter::empty())
2082 }
2083 })
2084 .collect(),
2085 )
2086 } else {
2087 None
2088 };
2089
2090 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2091 let chunks_count = chunks.len();
2092
2093 let mut checkpoints = Vec::with_capacity(chunks_count);
2094 debug!(
2095 ?last_checkpoint_seq,
2096 "Creating {} checkpoints with {} transactions", chunks_count, total,
2097 );
2098
2099 let epoch = self.epoch_store.epoch();
2100 for (index, transactions) in chunks.into_iter().enumerate() {
2101 let first_checkpoint_of_epoch = index == 0
2102 && last_checkpoint
2103 .as_ref()
2104 .map(|(_, c)| c.epoch != epoch)
2105 .unwrap_or(true);
2106 if first_checkpoint_of_epoch {
2107 self.epoch_store
2108 .record_epoch_first_checkpoint_creation_time_metric();
2109 }
2110 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2111
2112 let sequence_number = last_checkpoint
2113 .as_ref()
2114 .map(|(_, c)| c.sequence_number + 1)
2115 .unwrap_or_default();
2116 let mut timestamp_ms = details.timestamp_ms;
2117 if let Some((_, last_checkpoint)) = &last_checkpoint
2118 && last_checkpoint.timestamp_ms > timestamp_ms
2119 {
2120 debug!(
2122 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2123 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2124 );
2125 if self
2126 .epoch_store
2127 .protocol_config()
2128 .enforce_checkpoint_timestamp_monotonicity()
2129 {
2130 timestamp_ms = last_checkpoint.timestamp_ms;
2131 }
2132 }
2133
2134 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2135 let epoch_rolling_gas_cost_summary =
2136 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2137
2138 let end_of_epoch_data = if last_checkpoint_of_epoch {
2139 let system_state_obj = self
2140 .augment_epoch_last_checkpoint(
2141 &epoch_rolling_gas_cost_summary,
2142 timestamp_ms,
2143 &mut effects,
2144 &mut signatures,
2145 sequence_number,
2146 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2147 last_checkpoint_seq.unwrap_or_default(),
2148 )
2149 .await?;
2150
2151 let committee = system_state_obj
2152 .get_current_epoch_committee()
2153 .committee()
2154 .clone();
2155
2156 let root_state_digest = {
2159 let state_acc = self
2160 .global_state_hasher
2161 .upgrade()
2162 .expect("No checkpoints should be getting built after local configuration");
2163 let acc = state_acc.accumulate_checkpoint(
2164 &effects,
2165 sequence_number,
2166 &self.epoch_store,
2167 )?;
2168
2169 state_acc
2170 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2171 .await?;
2172
2173 state_acc.accumulate_running_root(
2174 &self.epoch_store,
2175 sequence_number,
2176 Some(acc),
2177 )?;
2178 state_acc
2179 .digest_epoch(self.epoch_store.clone(), sequence_number)
2180 .await?
2181 };
2182 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2183 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2184
2185 let epoch_commitments = if self
2186 .epoch_store
2187 .protocol_config()
2188 .check_commit_root_state_digest_supported()
2189 {
2190 vec![root_state_digest.into()]
2191 } else {
2192 vec![]
2193 };
2194
2195 Some(EndOfEpochData {
2196 next_epoch_committee: committee.voting_rights,
2197 next_epoch_protocol_version: ProtocolVersion::new(
2198 system_state_obj.protocol_version(),
2199 ),
2200 epoch_commitments,
2201 })
2202 } else {
2203 self.send_to_hasher
2204 .send((sequence_number, effects.clone()))
2205 .await?;
2206
2207 None
2208 };
2209 let contents = if self.epoch_store.protocol_config().address_aliases() {
2210 CheckpointContents::new_v2(&effects, signatures)
2211 } else {
2212 CheckpointContents::new_with_digests_and_signatures(
2213 effects.iter().map(TransactionEffects::execution_digests),
2214 signatures
2215 .into_iter()
2216 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2217 .collect(),
2218 )
2219 };
2220
2221 let num_txns = contents.size() as u64;
2222
2223 let network_total_transactions = last_checkpoint
2224 .as_ref()
2225 .map(|(_, c)| c.network_total_transactions + num_txns)
2226 .unwrap_or(num_txns);
2227
2228 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2229
2230 let matching_randomness_rounds: Vec<_> = effects
2231 .iter()
2232 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2233 .copied()
2234 .collect();
2235
2236 let checkpoint_commitments = if self
2237 .epoch_store
2238 .protocol_config()
2239 .include_checkpoint_artifacts_digest_in_summary()
2240 {
2241 let artifacts = CheckpointArtifacts::from(&effects[..]);
2242 let artifacts_digest = artifacts.digest()?;
2243 vec![artifacts_digest.into()]
2244 } else {
2245 Default::default()
2246 };
2247
2248 let summary = CheckpointSummary::new(
2249 self.epoch_store.protocol_config(),
2250 epoch,
2251 sequence_number,
2252 network_total_transactions,
2253 &contents,
2254 previous_digest,
2255 epoch_rolling_gas_cost_summary,
2256 end_of_epoch_data,
2257 timestamp_ms,
2258 matching_randomness_rounds,
2259 checkpoint_commitments,
2260 );
2261 summary.report_checkpoint_age(
2262 &self.metrics.last_created_checkpoint_age,
2263 &self.metrics.last_created_checkpoint_age_ms,
2264 );
2265 if last_checkpoint_of_epoch {
2266 info!(
2267 checkpoint_seq = sequence_number,
2268 "creating last checkpoint of epoch {}", epoch
2269 );
2270 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2271 self.epoch_store
2272 .report_epoch_metrics_at_last_checkpoint(stats);
2273 }
2274 }
2275 last_checkpoint = Some((sequence_number, summary.clone()));
2276 checkpoints.push((summary, contents));
2277 }
2278
2279 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2280 }
2281
2282 fn get_epoch_total_gas_cost(
2283 &self,
2284 last_checkpoint: Option<&CheckpointSummary>,
2285 cur_checkpoint_effects: &[TransactionEffects],
2286 ) -> GasCostSummary {
2287 let (previous_epoch, previous_gas_costs) = last_checkpoint
2288 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2289 .unwrap_or_default();
2290 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2291 if previous_epoch == self.epoch_store.epoch() {
2292 GasCostSummary::new(
2294 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2295 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2296 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2297 previous_gas_costs.non_refundable_storage_fee
2298 + current_gas_costs.non_refundable_storage_fee,
2299 )
2300 } else {
2301 current_gas_costs
2302 }
2303 }
2304
2305 #[instrument(level = "error", skip_all)]
2306 async fn augment_epoch_last_checkpoint(
2307 &self,
2308 epoch_total_gas_cost: &GasCostSummary,
2309 epoch_start_timestamp_ms: CheckpointTimestamp,
2310 checkpoint_effects: &mut Vec<TransactionEffects>,
2311 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2312 checkpoint: CheckpointSequenceNumber,
2313 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2314 last_checkpoint: CheckpointSequenceNumber,
2317 ) -> CheckpointBuilderResult<SuiSystemState> {
2318 let (system_state, effects) = self
2319 .state
2320 .create_and_execute_advance_epoch_tx(
2321 &self.epoch_store,
2322 epoch_total_gas_cost,
2323 checkpoint,
2324 epoch_start_timestamp_ms,
2325 end_of_epoch_observation_keys,
2326 last_checkpoint,
2327 )
2328 .await?;
2329 checkpoint_effects.push(effects);
2330 signatures.push(vec![]);
2331 Ok(system_state)
2332 }
2333
2334 #[instrument(level = "debug", skip_all)]
2341 fn complete_checkpoint_effects(
2342 &self,
2343 mut roots: Vec<TransactionEffects>,
2344 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2345 ) -> SuiResult<Vec<TransactionEffects>> {
2346 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2347 let mut results = vec![];
2348 let mut seen = HashSet::new();
2349 loop {
2350 let mut pending = HashSet::new();
2351
2352 let transactions_included = self
2353 .epoch_store
2354 .builder_included_transactions_in_checkpoint(
2355 roots.iter().map(|e| e.transaction_digest()),
2356 )?;
2357
2358 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2359 let digest = effect.transaction_digest();
2360 seen.insert(*digest);
2362
2363 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2365 continue;
2366 }
2367
2368 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2370 continue;
2371 }
2372
2373 let existing_effects = self
2374 .epoch_store
2375 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2376
2377 for (dependency, effects_signature_exists) in
2378 effect.dependencies().iter().zip(existing_effects.iter())
2379 {
2380 if !effects_signature_exists {
2385 continue;
2386 }
2387 if seen.insert(*dependency) {
2388 pending.insert(*dependency);
2389 }
2390 }
2391 results.push(effect);
2392 }
2393 if pending.is_empty() {
2394 break;
2395 }
2396 let pending = pending.into_iter().collect::<Vec<_>>();
2397 let effects = self.effects_store.multi_get_executed_effects(&pending);
2398 let effects = effects
2399 .into_iter()
2400 .zip(pending)
2401 .map(|(opt, digest)| match opt {
2402 Some(x) => x,
2403 None => panic!(
2404 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2405 digest
2406 ),
2407 })
2408 .collect::<Vec<_>>();
2409 roots = effects;
2410 }
2411
2412 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2413 Ok(results)
2414 }
2415
2416 #[cfg(msim)]
2419 fn expensive_consensus_commit_prologue_invariants_check(
2420 &self,
2421 root_digests: &[TransactionDigest],
2422 sorted: &[TransactionEffects],
2423 ) {
2424 if !self
2425 .epoch_store
2426 .protocol_config()
2427 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2428 {
2429 return;
2430 }
2431
2432 let root_txs = self
2434 .state
2435 .get_transaction_cache_reader()
2436 .multi_get_transaction_blocks(root_digests);
2437 let ccps = root_txs
2438 .iter()
2439 .filter_map(|tx| {
2440 if let Some(tx) = tx {
2441 if tx.transaction_data().is_consensus_commit_prologue() {
2442 Some(tx)
2443 } else {
2444 None
2445 }
2446 } else {
2447 None
2448 }
2449 })
2450 .collect::<Vec<_>>();
2451
2452 assert!(ccps.len() <= 1);
2454
2455 let txs = self
2457 .state
2458 .get_transaction_cache_reader()
2459 .multi_get_transaction_blocks(
2460 &sorted
2461 .iter()
2462 .map(|tx| tx.transaction_digest().clone())
2463 .collect::<Vec<_>>(),
2464 );
2465
2466 if ccps.len() == 0 {
2467 for tx in txs.iter() {
2470 if let Some(tx) = tx {
2471 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2472 }
2473 }
2474 } else {
2475 assert!(
2477 txs[0]
2478 .as_ref()
2479 .unwrap()
2480 .transaction_data()
2481 .is_consensus_commit_prologue()
2482 );
2483
2484 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2485
2486 for tx in txs.iter().skip(1) {
2487 if let Some(tx) = tx {
2488 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2489 }
2490 }
2491 }
2492 }
2493}
2494
2495async fn wait_for_effects_with_retry(
2496 effects_store: &dyn TransactionCacheRead,
2497 task_name: &'static str,
2498 digests: &[TransactionDigest],
2499 tx_key: TransactionKey,
2500) -> Vec<TransactionEffects> {
2501 loop {
2502 match tokio::time::timeout(Duration::from_secs(5), async {
2503 effects_store
2504 .notify_read_executed_effects(task_name, digests)
2505 .await
2506 })
2507 .await
2508 {
2509 Ok(effects) => break effects,
2510 Err(_) => {
2511 debug_fatal!(
2512 "Timeout waiting for transactions to be executed {:?}, retrying...",
2513 tx_key
2514 );
2515 }
2516 }
2517 }
2518}
2519
2520impl CheckpointAggregator {
2521 fn new(
2522 tables: Arc<CheckpointStore>,
2523 epoch_store: Arc<AuthorityPerEpochStore>,
2524 notify: Arc<Notify>,
2525 output: Box<dyn CertifiedCheckpointOutput>,
2526 state: Arc<AuthorityState>,
2527 metrics: Arc<CheckpointMetrics>,
2528 ) -> Self {
2529 let current = None;
2530 Self {
2531 store: tables,
2532 epoch_store,
2533 notify,
2534 current,
2535 output,
2536 state,
2537 metrics,
2538 }
2539 }
2540
2541 async fn run(mut self) {
2542 info!("Starting CheckpointAggregator");
2543 loop {
2544 if let Err(e) = self.run_and_notify().await {
2545 error!(
2546 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2547 e
2548 );
2549 self.metrics.checkpoint_errors.inc();
2550 tokio::time::sleep(Duration::from_secs(1)).await;
2551 continue;
2552 }
2553
2554 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2555 }
2556 }
2557
2558 async fn run_and_notify(&mut self) -> SuiResult {
2559 let summaries = self.run_inner()?;
2560 for summary in summaries {
2561 self.output.certified_checkpoint_created(&summary).await?;
2562 }
2563 Ok(())
2564 }
2565
2566 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2567 let _scope = monitored_scope("CheckpointAggregator");
2568 let mut result = vec![];
2569 'outer: loop {
2570 let next_to_certify = self.next_checkpoint_to_certify()?;
2571 let current = if let Some(current) = &mut self.current {
2572 if current.summary.sequence_number < next_to_certify {
2578 assert_reachable!("skip checkpoint certification");
2579 self.current = None;
2580 continue;
2581 }
2582 current
2583 } else {
2584 let Some(summary) = self
2585 .epoch_store
2586 .get_built_checkpoint_summary(next_to_certify)?
2587 else {
2588 return Ok(result);
2589 };
2590 self.current = Some(CheckpointSignatureAggregator {
2591 next_index: 0,
2592 digest: summary.digest(),
2593 summary,
2594 signatures_by_digest: MultiStakeAggregator::new(
2595 self.epoch_store.committee().clone(),
2596 ),
2597 store: self.store.clone(),
2598 state: self.state.clone(),
2599 metrics: self.metrics.clone(),
2600 });
2601 self.current.as_mut().unwrap()
2602 };
2603
2604 let epoch_tables = self
2605 .epoch_store
2606 .tables()
2607 .expect("should not run past end of epoch");
2608 let iter = epoch_tables
2609 .pending_checkpoint_signatures
2610 .safe_iter_with_bounds(
2611 Some((current.summary.sequence_number, current.next_index)),
2612 None,
2613 );
2614 for item in iter {
2615 let ((seq, index), data) = item?;
2616 if seq != current.summary.sequence_number {
2617 trace!(
2618 checkpoint_seq =? current.summary.sequence_number,
2619 "Not enough checkpoint signatures",
2620 );
2621 return Ok(result);
2623 }
2624 trace!(
2625 checkpoint_seq = current.summary.sequence_number,
2626 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2627 current.summary.digest(),
2628 data.summary.auth_sig().authority.concise()
2629 );
2630 self.metrics
2631 .checkpoint_participation
2632 .with_label_values(&[&format!(
2633 "{:?}",
2634 data.summary.auth_sig().authority.concise()
2635 )])
2636 .inc();
2637 if let Ok(auth_signature) = current.try_aggregate(data) {
2638 debug!(
2639 checkpoint_seq = current.summary.sequence_number,
2640 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2641 current.summary.digest(),
2642 );
2643 let summary = VerifiedCheckpoint::new_unchecked(
2644 CertifiedCheckpointSummary::new_from_data_and_sig(
2645 current.summary.clone(),
2646 auth_signature,
2647 ),
2648 );
2649
2650 self.store.insert_certified_checkpoint(&summary)?;
2651 self.metrics
2652 .last_certified_checkpoint
2653 .set(current.summary.sequence_number as i64);
2654 current.summary.report_checkpoint_age(
2655 &self.metrics.last_certified_checkpoint_age,
2656 &self.metrics.last_certified_checkpoint_age_ms,
2657 );
2658 result.push(summary.into_inner());
2659 self.current = None;
2660 continue 'outer;
2661 } else {
2662 current.next_index = index + 1;
2663 }
2664 }
2665 break;
2666 }
2667 Ok(result)
2668 }
2669
2670 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2671 Ok(self
2672 .store
2673 .tables
2674 .certified_checkpoints
2675 .reversed_safe_iter_with_bounds(None, None)?
2676 .next()
2677 .transpose()?
2678 .map(|(seq, _)| seq + 1)
2679 .unwrap_or_default())
2680 }
2681}
2682
2683impl CheckpointSignatureAggregator {
2684 #[allow(clippy::result_unit_err)]
2685 pub fn try_aggregate(
2686 &mut self,
2687 data: CheckpointSignatureMessage,
2688 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2689 let their_digest = *data.summary.digest();
2690 let (_, signature) = data.summary.into_data_and_sig();
2691 let author = signature.authority;
2692 let envelope =
2693 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2694 match self.signatures_by_digest.insert(their_digest, envelope) {
2695 InsertResult::Failed { error }
2697 if matches!(
2698 error.as_inner(),
2699 SuiErrorKind::StakeAggregatorRepeatedSigner {
2700 conflicting_sig: false,
2701 ..
2702 },
2703 ) =>
2704 {
2705 Err(())
2706 }
2707 InsertResult::Failed { error } => {
2708 warn!(
2709 checkpoint_seq = self.summary.sequence_number,
2710 "Failed to aggregate new signature from validator {:?}: {:?}",
2711 author.concise(),
2712 error
2713 );
2714 self.check_for_split_brain();
2715 Err(())
2716 }
2717 InsertResult::QuorumReached(cert) => {
2718 if their_digest != self.digest {
2721 self.metrics.remote_checkpoint_forks.inc();
2722 warn!(
2723 checkpoint_seq = self.summary.sequence_number,
2724 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2725 author.concise(),
2726 their_digest,
2727 self.digest
2728 );
2729 return Err(());
2730 }
2731 Ok(cert)
2732 }
2733 InsertResult::NotEnoughVotes {
2734 bad_votes: _,
2735 bad_authorities: _,
2736 } => {
2737 self.check_for_split_brain();
2738 Err(())
2739 }
2740 }
2741 }
2742
2743 fn check_for_split_brain(&self) {
2747 debug!(
2748 checkpoint_seq = self.summary.sequence_number,
2749 "Checking for split brain condition"
2750 );
2751 if self.signatures_by_digest.quorum_unreachable() {
2752 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2758 let digests_by_stake_messages = all_unique_values
2759 .iter()
2760 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2761 .map(|(digest, (_authorities, total_stake))| {
2762 format!("{:?} (total stake: {})", digest, total_stake)
2763 })
2764 .collect::<Vec<String>>();
2765 fail_point_arg!("kill_split_brain_node", |(
2766 checkpoint_overrides,
2767 forked_authorities,
2768 ): (
2769 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2770 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2771 )| {
2772 #[cfg(msim)]
2773 {
2774 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2775 (checkpoint_overrides.lock(), forked_authorities.lock())
2776 {
2777 let correct_digest = all_unique_values
2779 .iter()
2780 .find(|(_, (authorities, _))| {
2781 authorities
2783 .iter()
2784 .any(|auth| !forked_authorities_set.contains(auth))
2785 })
2786 .map(|(digest, _)| digest.to_string())
2787 .unwrap_or_else(|| {
2788 all_unique_values
2790 .iter()
2791 .max_by_key(|(_, (_, stake))| *stake)
2792 .map(|(digest, _)| digest.to_string())
2793 .unwrap_or_else(|| self.digest.to_string())
2794 });
2795
2796 overrides.insert(self.summary.sequence_number, correct_digest.clone());
2797
2798 tracing::error!(
2799 fatal = true,
2800 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2801 self.summary.sequence_number,
2802 correct_digest
2803 );
2804 }
2805 }
2806 });
2807
2808 debug_fatal!(
2809 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2810 self.summary.sequence_number,
2811 self.signatures_by_digest.uncommitted_stake(),
2812 digests_by_stake_messages
2813 );
2814 self.metrics.split_brain_checkpoint_forks.inc();
2815
2816 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2817 let local_summary = self.summary.clone();
2818 let state = self.state.clone();
2819 let tables = self.store.clone();
2820
2821 tokio::spawn(async move {
2822 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2823 });
2824 }
2825 }
2826}
2827
2828async fn diagnose_split_brain(
2834 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2835 local_summary: CheckpointSummary,
2836 state: Arc<AuthorityState>,
2837 tables: Arc<CheckpointStore>,
2838) {
2839 debug!(
2840 checkpoint_seq = local_summary.sequence_number,
2841 "Running split brain diagnostics..."
2842 );
2843 let time = SystemTime::now();
2844 let digest_to_validator = all_unique_values
2846 .iter()
2847 .filter_map(|(digest, (validators, _))| {
2848 if *digest != local_summary.digest() {
2849 let random_validator = validators.choose(&mut get_rng()).unwrap();
2850 Some((*digest, *random_validator))
2851 } else {
2852 None
2853 }
2854 })
2855 .collect::<HashMap<_, _>>();
2856 if digest_to_validator.is_empty() {
2857 panic!(
2858 "Given split brain condition, there should be at \
2859 least one validator that disagrees with local signature"
2860 );
2861 }
2862
2863 let epoch_store = state.load_epoch_store_one_call_per_task();
2864 let committee = epoch_store
2865 .epoch_start_state()
2866 .get_sui_committee_with_network_metadata();
2867 let network_config = default_mysten_network_config();
2868 let network_clients =
2869 make_network_authority_clients_with_network_config(&committee, &network_config);
2870
2871 let response_futures = digest_to_validator
2873 .values()
2874 .cloned()
2875 .map(|validator| {
2876 let client = network_clients
2877 .get(&validator)
2878 .expect("Failed to get network client");
2879 let request = CheckpointRequestV2 {
2880 sequence_number: Some(local_summary.sequence_number),
2881 request_content: true,
2882 certified: false,
2883 };
2884 client.handle_checkpoint_v2(request)
2885 })
2886 .collect::<Vec<_>>();
2887
2888 let digest_name_pair = digest_to_validator.iter();
2889 let response_data = futures::future::join_all(response_futures)
2890 .await
2891 .into_iter()
2892 .zip(digest_name_pair)
2893 .filter_map(|(response, (digest, name))| match response {
2894 Ok(response) => match response {
2895 CheckpointResponseV2 {
2896 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2897 contents: Some(contents),
2898 } => Some((*name, *digest, summary, contents)),
2899 CheckpointResponseV2 {
2900 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2901 contents: _,
2902 } => {
2903 panic!("Expected pending checkpoint, but got certified checkpoint");
2904 }
2905 CheckpointResponseV2 {
2906 checkpoint: None,
2907 contents: _,
2908 } => {
2909 error!(
2910 "Summary for checkpoint {:?} not found on validator {:?}",
2911 local_summary.sequence_number, name
2912 );
2913 None
2914 }
2915 CheckpointResponseV2 {
2916 checkpoint: _,
2917 contents: None,
2918 } => {
2919 error!(
2920 "Contents for checkpoint {:?} not found on validator {:?}",
2921 local_summary.sequence_number, name
2922 );
2923 None
2924 }
2925 },
2926 Err(e) => {
2927 error!(
2928 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2929 e
2930 );
2931 None
2932 }
2933 })
2934 .collect::<Vec<_>>();
2935
2936 let local_checkpoint_contents = tables
2937 .get_checkpoint_contents(&local_summary.content_digest)
2938 .unwrap_or_else(|_| {
2939 panic!(
2940 "Could not find checkpoint contents for digest {:?}",
2941 local_summary.digest()
2942 )
2943 })
2944 .unwrap_or_else(|| {
2945 panic!(
2946 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2947 local_summary.sequence_number,
2948 local_summary.digest()
2949 )
2950 });
2951 let local_contents_text = format!("{local_checkpoint_contents:?}");
2952
2953 let local_summary_text = format!("{local_summary:?}");
2954 let local_validator = state.name.concise();
2955 let diff_patches = response_data
2956 .iter()
2957 .map(|(name, other_digest, other_summary, contents)| {
2958 let other_contents_text = format!("{contents:?}");
2959 let other_summary_text = format!("{other_summary:?}");
2960 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2961 .enumerate_transactions(&local_summary)
2962 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2963 .unzip();
2964 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2965 .enumerate_transactions(other_summary)
2966 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2967 .unzip();
2968 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2969 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2970 let local_transactions_text = format!("{local_transactions:#?}");
2971 let other_transactions_text = format!("{other_transactions:#?}");
2972 let transactions_patch =
2973 create_patch(&local_transactions_text, &other_transactions_text);
2974 let local_effects_text = format!("{local_effects:#?}");
2975 let other_effects_text = format!("{other_effects:#?}");
2976 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2977 let seq_number = local_summary.sequence_number;
2978 let local_digest = local_summary.digest();
2979 let other_validator = name.concise();
2980 format!(
2981 "Checkpoint: {seq_number:?}\n\
2982 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2983 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2984 Summary Diff: \n{summary_patch}\n\n\
2985 Contents Diff: \n{contents_patch}\n\n\
2986 Transactions Diff: \n{transactions_patch}\n\n\
2987 Effects Diff: \n{effects_patch}",
2988 )
2989 })
2990 .collect::<Vec<_>>()
2991 .join("\n\n\n");
2992
2993 let header = format!(
2994 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2995 Datetime: {:?}",
2996 time
2997 );
2998 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2999 let path = tempfile::tempdir()
3000 .expect("Failed to create tempdir")
3001 .keep()
3002 .join(Path::new("checkpoint_fork_dump.txt"));
3003 let mut file = File::create(path).unwrap();
3004 write!(file, "{}", fork_logs_text).unwrap();
3005 debug!("{}", fork_logs_text);
3006}
3007
3008pub trait CheckpointServiceNotify {
3009 fn notify_checkpoint_signature(
3010 &self,
3011 epoch_store: &AuthorityPerEpochStore,
3012 info: &CheckpointSignatureMessage,
3013 ) -> SuiResult;
3014
3015 fn notify_checkpoint(&self) -> SuiResult;
3016}
3017
3018#[allow(clippy::large_enum_variant)]
3019enum CheckpointServiceState {
3020 Unstarted(
3021 (
3022 CheckpointBuilder,
3023 CheckpointAggregator,
3024 CheckpointStateHasher,
3025 ),
3026 ),
3027 Started,
3028}
3029
3030impl CheckpointServiceState {
3031 fn take_unstarted(
3032 &mut self,
3033 ) -> (
3034 CheckpointBuilder,
3035 CheckpointAggregator,
3036 CheckpointStateHasher,
3037 ) {
3038 let mut state = CheckpointServiceState::Started;
3039 std::mem::swap(self, &mut state);
3040
3041 match state {
3042 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3043 (builder, aggregator, hasher)
3044 }
3045 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3046 }
3047 }
3048}
3049
3050pub struct CheckpointService {
3051 tables: Arc<CheckpointStore>,
3052 notify_builder: Arc<Notify>,
3053 notify_aggregator: Arc<Notify>,
3054 last_signature_index: Mutex<u64>,
3055 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3057 highest_previously_built_seq: CheckpointSequenceNumber,
3060 metrics: Arc<CheckpointMetrics>,
3061 state: Mutex<CheckpointServiceState>,
3062}
3063
3064impl CheckpointService {
3065 pub fn build(
3067 state: Arc<AuthorityState>,
3068 checkpoint_store: Arc<CheckpointStore>,
3069 epoch_store: Arc<AuthorityPerEpochStore>,
3070 effects_store: Arc<dyn TransactionCacheRead>,
3071 global_state_hasher: Weak<GlobalStateHasher>,
3072 checkpoint_output: Box<dyn CheckpointOutput>,
3073 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3074 metrics: Arc<CheckpointMetrics>,
3075 max_transactions_per_checkpoint: usize,
3076 max_checkpoint_size_bytes: usize,
3077 ) -> Arc<Self> {
3078 info!(
3079 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3080 );
3081 let notify_builder = Arc::new(Notify::new());
3082 let notify_aggregator = Arc::new(Notify::new());
3083
3084 let highest_previously_built_seq = checkpoint_store
3086 .get_latest_locally_computed_checkpoint()
3087 .expect("failed to get latest locally computed checkpoint")
3088 .map(|s| s.sequence_number)
3089 .unwrap_or(0);
3090
3091 let highest_currently_built_seq =
3092 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3093 .expect("epoch should not have ended")
3094 .map(|(seq, _)| seq)
3095 .unwrap_or(0);
3096
3097 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3098
3099 let aggregator = CheckpointAggregator::new(
3100 checkpoint_store.clone(),
3101 epoch_store.clone(),
3102 notify_aggregator.clone(),
3103 certified_checkpoint_output,
3104 state.clone(),
3105 metrics.clone(),
3106 );
3107
3108 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3109
3110 let ckpt_state_hasher = CheckpointStateHasher::new(
3111 epoch_store.clone(),
3112 global_state_hasher.clone(),
3113 receive_from_builder,
3114 );
3115
3116 let builder = CheckpointBuilder::new(
3117 state.clone(),
3118 checkpoint_store.clone(),
3119 epoch_store.clone(),
3120 notify_builder.clone(),
3121 effects_store,
3122 global_state_hasher,
3123 send_to_hasher,
3124 checkpoint_output,
3125 notify_aggregator.clone(),
3126 highest_currently_built_seq_tx.clone(),
3127 metrics.clone(),
3128 max_transactions_per_checkpoint,
3129 max_checkpoint_size_bytes,
3130 );
3131
3132 let last_signature_index = epoch_store
3133 .get_last_checkpoint_signature_index()
3134 .expect("should not cross end of epoch");
3135 let last_signature_index = Mutex::new(last_signature_index);
3136
3137 Arc::new(Self {
3138 tables: checkpoint_store,
3139 notify_builder,
3140 notify_aggregator,
3141 last_signature_index,
3142 highest_currently_built_seq_tx,
3143 highest_previously_built_seq,
3144 metrics,
3145 state: Mutex::new(CheckpointServiceState::Unstarted((
3146 builder,
3147 aggregator,
3148 ckpt_state_hasher,
3149 ))),
3150 })
3151 }
3152
3153 pub async fn spawn(
3161 &self,
3162 epoch_store: Arc<AuthorityPerEpochStore>,
3163 consensus_replay_waiter: Option<ReplayWaiter>,
3164 ) {
3165 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3166
3167 if let Some(last_committed_seq) = self
3170 .tables
3171 .get_highest_executed_checkpoint()
3172 .expect("Failed to get highest executed checkpoint")
3173 .map(|checkpoint| *checkpoint.sequence_number())
3174 {
3175 if let Err(e) = builder
3176 .epoch_store
3177 .clear_state_hashes_after_checkpoint(last_committed_seq)
3178 {
3179 error!(
3180 "Failed to clear state hashes after checkpoint {}: {:?}",
3181 last_committed_seq, e
3182 );
3183 } else {
3184 info!(
3185 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3186 last_committed_seq
3187 );
3188 }
3189 }
3190
3191 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3192
3193 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3194 let aggregator_task = spawn_monitored_task!(aggregator.run());
3195
3196 spawn_monitored_task!(async move {
3197 epoch_store
3198 .within_alive_epoch(async move {
3199 builder.run(consensus_replay_waiter).await;
3200 builder_finished_tx.send(()).ok();
3201 })
3202 .await
3203 .ok();
3204
3205 state_hasher_task
3207 .await
3208 .expect("state hasher should exit normally");
3209
3210 aggregator_task.abort();
3213 aggregator_task.await.ok();
3214 });
3215
3216 if tokio::time::timeout(Duration::from_secs(120), async move {
3222 tokio::select! {
3223 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3224 _ = self.wait_for_rebuilt_checkpoints() => (),
3225 }
3226 })
3227 .await
3228 .is_err()
3229 {
3230 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3231 }
3232 }
3233}
3234
3235impl CheckpointService {
3236 pub async fn wait_for_rebuilt_checkpoints(&self) {
3242 let highest_previously_built_seq = self.highest_previously_built_seq;
3243 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3244 let mut highest_currently_built_seq = *rx.borrow_and_update();
3245 info!(
3246 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3247 );
3248 loop {
3249 if highest_currently_built_seq >= highest_previously_built_seq {
3250 info!("Checkpoint rebuild complete");
3251 break;
3252 }
3253 rx.changed().await.unwrap();
3254 highest_currently_built_seq = *rx.borrow_and_update();
3255 }
3256 }
3257
3258 #[cfg(test)]
3259 fn write_and_notify_checkpoint_for_testing(
3260 &self,
3261 epoch_store: &AuthorityPerEpochStore,
3262 checkpoint: PendingCheckpoint,
3263 ) -> SuiResult {
3264 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3265
3266 let mut output = ConsensusCommitOutput::new(0);
3267 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3268 output.set_default_commit_stats_for_testing();
3269 epoch_store.push_consensus_output_for_tests(output);
3270 self.notify_checkpoint()?;
3271 Ok(())
3272 }
3273}
3274
3275impl CheckpointServiceNotify for CheckpointService {
3276 fn notify_checkpoint_signature(
3277 &self,
3278 epoch_store: &AuthorityPerEpochStore,
3279 info: &CheckpointSignatureMessage,
3280 ) -> SuiResult {
3281 let sequence = info.summary.sequence_number;
3282 let signer = info.summary.auth_sig().authority.concise();
3283
3284 if let Some(highest_verified_checkpoint) = self
3285 .tables
3286 .get_highest_verified_checkpoint()?
3287 .map(|x| *x.sequence_number())
3288 && sequence <= highest_verified_checkpoint
3289 {
3290 trace!(
3291 checkpoint_seq = sequence,
3292 "Ignore checkpoint signature from {} - already certified", signer,
3293 );
3294 self.metrics
3295 .last_ignored_checkpoint_signature_received
3296 .set(sequence as i64);
3297 return Ok(());
3298 }
3299 trace!(
3300 checkpoint_seq = sequence,
3301 "Received checkpoint signature, digest {} from {}",
3302 info.summary.digest(),
3303 signer,
3304 );
3305 self.metrics
3306 .last_received_checkpoint_signatures
3307 .with_label_values(&[&signer.to_string()])
3308 .set(sequence as i64);
3309 let mut index = self.last_signature_index.lock();
3312 *index += 1;
3313 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3314 self.notify_aggregator.notify_one();
3315 Ok(())
3316 }
3317
3318 fn notify_checkpoint(&self) -> SuiResult {
3319 self.notify_builder.notify_one();
3320 Ok(())
3321 }
3322}
3323
3324pub struct CheckpointServiceNoop {}
3326impl CheckpointServiceNotify for CheckpointServiceNoop {
3327 fn notify_checkpoint_signature(
3328 &self,
3329 _: &AuthorityPerEpochStore,
3330 _: &CheckpointSignatureMessage,
3331 ) -> SuiResult {
3332 Ok(())
3333 }
3334
3335 fn notify_checkpoint(&self) -> SuiResult {
3336 Ok(())
3337 }
3338}
3339
3340impl PendingCheckpoint {
3341 pub fn height(&self) -> CheckpointHeight {
3342 self.details.checkpoint_height
3343 }
3344
3345 pub fn roots(&self) -> &Vec<TransactionKey> {
3346 &self.roots
3347 }
3348
3349 pub fn details(&self) -> &PendingCheckpointInfo {
3350 &self.details
3351 }
3352}
3353
3354pin_project! {
3355 pub struct PollCounter<Fut> {
3356 #[pin]
3357 future: Fut,
3358 count: usize,
3359 }
3360}
3361
3362impl<Fut> PollCounter<Fut> {
3363 pub fn new(future: Fut) -> Self {
3364 Self { future, count: 0 }
3365 }
3366
3367 pub fn count(&self) -> usize {
3368 self.count
3369 }
3370}
3371
3372impl<Fut: Future> Future for PollCounter<Fut> {
3373 type Output = (usize, Fut::Output);
3374
3375 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3376 let this = self.project();
3377 *this.count += 1;
3378 match this.future.poll(cx) {
3379 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3380 Poll::Pending => Poll::Pending,
3381 }
3382 }
3383}
3384
3385fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3386 PollCounter::new(future)
3387}
3388
3389#[cfg(test)]
3390mod tests {
3391 use super::*;
3392 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3393 use crate::transaction_outputs::TransactionOutputs;
3394 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3395 use futures::FutureExt as _;
3396 use futures::future::BoxFuture;
3397 use std::collections::HashMap;
3398 use std::ops::Deref;
3399 use sui_macros::sim_test;
3400 use sui_protocol_config::{Chain, ProtocolConfig};
3401 use sui_types::accumulator_event::AccumulatorEvent;
3402 use sui_types::authenticator_state::ActiveJwk;
3403 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3404 use sui_types::crypto::Signature;
3405 use sui_types::effects::{TransactionEffects, TransactionEvents};
3406 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3407 use sui_types::transaction::VerifiedTransaction;
3408 use tokio::sync::mpsc;
3409
3410 #[tokio::test]
3411 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3412 let store = CheckpointStore::new_for_tests();
3413 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3414 for seq in 70u64..=80u64 {
3415 let contents =
3416 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3417 [sui_types::base_types::ExecutionDigests::new(
3418 sui_types::digests::TransactionDigest::random(),
3419 sui_types::digests::TransactionEffectsDigest::ZERO,
3420 )],
3421 );
3422 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3423 &protocol,
3424 0,
3425 seq,
3426 0,
3427 &contents,
3428 None,
3429 sui_types::gas::GasCostSummary::default(),
3430 None,
3431 0,
3432 Vec::new(),
3433 Vec::new(),
3434 );
3435 store
3436 .tables
3437 .locally_computed_checkpoints
3438 .insert(&seq, &summary)
3439 .unwrap();
3440 }
3441
3442 store
3443 .clear_locally_computed_checkpoints_from(76)
3444 .expect("clear should succeed");
3445
3446 assert!(
3448 store
3449 .tables
3450 .locally_computed_checkpoints
3451 .get(&75)
3452 .unwrap()
3453 .is_some()
3454 );
3455 assert!(
3456 store
3457 .tables
3458 .locally_computed_checkpoints
3459 .get(&76)
3460 .unwrap()
3461 .is_none()
3462 );
3463
3464 for seq in 70u64..76u64 {
3465 assert!(
3466 store
3467 .tables
3468 .locally_computed_checkpoints
3469 .get(&seq)
3470 .unwrap()
3471 .is_some()
3472 );
3473 }
3474 for seq in 76u64..=80u64 {
3475 assert!(
3476 store
3477 .tables
3478 .locally_computed_checkpoints
3479 .get(&seq)
3480 .unwrap()
3481 .is_none()
3482 );
3483 }
3484 }
3485
3486 #[tokio::test]
3487 async fn test_fork_detection_storage() {
3488 let store = CheckpointStore::new_for_tests();
3489 let seq_num = 42;
3491 let digest = CheckpointDigest::random();
3492
3493 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3494
3495 store
3496 .record_checkpoint_fork_detected(seq_num, digest)
3497 .unwrap();
3498
3499 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3500 assert!(retrieved.is_some());
3501 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3502 assert_eq!(retrieved_seq, seq_num);
3503 assert_eq!(retrieved_digest, digest);
3504
3505 store.clear_checkpoint_fork_detected().unwrap();
3506 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3507
3508 let tx_digest = TransactionDigest::random();
3510 let expected_effects = TransactionEffectsDigest::random();
3511 let actual_effects = TransactionEffectsDigest::random();
3512
3513 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3514
3515 store
3516 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3517 .unwrap();
3518
3519 let retrieved = store.get_transaction_fork_detected().unwrap();
3520 assert!(retrieved.is_some());
3521 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3522 assert_eq!(retrieved_tx, tx_digest);
3523 assert_eq!(retrieved_expected, expected_effects);
3524 assert_eq!(retrieved_actual, actual_effects);
3525
3526 store.clear_transaction_fork_detected().unwrap();
3527 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3528 }
3529
3530 #[sim_test]
3531 pub async fn checkpoint_builder_test() {
3532 telemetry_subscribers::init_for_testing();
3533
3534 let mut protocol_config =
3535 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3536 protocol_config.disable_accumulators_for_testing();
3537 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3538 let state = TestAuthorityBuilder::new()
3539 .with_protocol_config(protocol_config)
3540 .build()
3541 .await;
3542
3543 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3544 0,
3545 0,
3546 vec![],
3547 SequenceNumber::new(),
3548 );
3549
3550 let jwks = {
3551 let mut jwks = Vec::new();
3552 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3553 jwks.push(ActiveJwk {
3554 jwk_id: JwkId::new(
3555 "https://accounts.google.com".to_string(),
3556 "1234567890".to_string(),
3557 ),
3558 jwk: JWK {
3559 kty: "RSA".to_string(),
3560 e: "AQAB".to_string(),
3561 n: "1234567890".to_string(),
3562 alg: "RS256".to_string(),
3563 },
3564 epoch: 0,
3565 });
3566 }
3567 jwks
3568 };
3569
3570 let dummy_tx_with_data =
3571 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3572
3573 for i in 0..15 {
3574 state
3575 .database_for_testing()
3576 .perpetual_tables
3577 .transactions
3578 .insert(&d(i), dummy_tx.serializable_ref())
3579 .unwrap();
3580 }
3581 for i in 15..20 {
3582 state
3583 .database_for_testing()
3584 .perpetual_tables
3585 .transactions
3586 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3587 .unwrap();
3588 }
3589
3590 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3591 commit_cert_for_test(
3592 &mut store,
3593 state.clone(),
3594 d(1),
3595 vec![d(2), d(3)],
3596 GasCostSummary::new(11, 12, 11, 1),
3597 );
3598 commit_cert_for_test(
3599 &mut store,
3600 state.clone(),
3601 d(2),
3602 vec![d(3), d(4)],
3603 GasCostSummary::new(21, 22, 21, 1),
3604 );
3605 commit_cert_for_test(
3606 &mut store,
3607 state.clone(),
3608 d(3),
3609 vec![],
3610 GasCostSummary::new(31, 32, 31, 1),
3611 );
3612 commit_cert_for_test(
3613 &mut store,
3614 state.clone(),
3615 d(4),
3616 vec![],
3617 GasCostSummary::new(41, 42, 41, 1),
3618 );
3619 for i in [5, 6, 7, 10, 11, 12, 13] {
3620 commit_cert_for_test(
3621 &mut store,
3622 state.clone(),
3623 d(i),
3624 vec![],
3625 GasCostSummary::new(41, 42, 41, 1),
3626 );
3627 }
3628 for i in [15, 16, 17] {
3629 commit_cert_for_test(
3630 &mut store,
3631 state.clone(),
3632 d(i),
3633 vec![],
3634 GasCostSummary::new(51, 52, 51, 1),
3635 );
3636 }
3637 let all_digests: Vec<_> = store.keys().copied().collect();
3638 for digest in all_digests {
3639 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3640 state
3641 .epoch_store_for_testing()
3642 .test_insert_user_signature(digest, vec![(signature, None)]);
3643 }
3644
3645 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3646 let (certified_output, mut certified_result) =
3647 mpsc::channel::<CertifiedCheckpointSummary>(10);
3648 let store = Arc::new(store);
3649
3650 let ckpt_dir = tempfile::tempdir().unwrap();
3651 let checkpoint_store =
3652 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3653 let epoch_store = state.epoch_store_for_testing();
3654
3655 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3656 state.get_global_state_hash_store().clone(),
3657 ));
3658
3659 let checkpoint_service = CheckpointService::build(
3660 state.clone(),
3661 checkpoint_store,
3662 epoch_store.clone(),
3663 store,
3664 Arc::downgrade(&global_state_hasher),
3665 Box::new(output),
3666 Box::new(certified_output),
3667 CheckpointMetrics::new_for_tests(),
3668 3,
3669 100_000,
3670 );
3671 checkpoint_service.spawn(epoch_store.clone(), None).await;
3672
3673 checkpoint_service
3674 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3675 .unwrap();
3676 checkpoint_service
3677 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3678 .unwrap();
3679 checkpoint_service
3680 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3681 .unwrap();
3682 checkpoint_service
3683 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3684 .unwrap();
3685 checkpoint_service
3686 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3687 .unwrap();
3688 checkpoint_service
3689 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3690 .unwrap();
3691
3692 let (c1c, c1s) = result.recv().await.unwrap();
3693 let (c2c, c2s) = result.recv().await.unwrap();
3694
3695 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3696 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3697 assert_eq!(c1t, vec![d(4)]);
3698 assert_eq!(c1s.previous_digest, None);
3699 assert_eq!(c1s.sequence_number, 0);
3700 assert_eq!(
3701 c1s.epoch_rolling_gas_cost_summary,
3702 GasCostSummary::new(41, 42, 41, 1)
3703 );
3704
3705 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3706 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3707 assert_eq!(c2s.sequence_number, 1);
3708 assert_eq!(
3709 c2s.epoch_rolling_gas_cost_summary,
3710 GasCostSummary::new(104, 108, 104, 4)
3711 );
3712
3713 let (c3c, c3s) = result.recv().await.unwrap();
3716 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3717 let (c4c, c4s) = result.recv().await.unwrap();
3718 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3719 assert_eq!(c3s.sequence_number, 2);
3720 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3721 assert_eq!(c4s.sequence_number, 3);
3722 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3723 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3724 assert_eq!(c4t, vec![d(13)]);
3725
3726 let (c5c, c5s) = result.recv().await.unwrap();
3729 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3730 let (c6c, c6s) = result.recv().await.unwrap();
3731 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3732 assert_eq!(c5s.sequence_number, 4);
3733 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3734 assert_eq!(c6s.sequence_number, 5);
3735 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3736 assert_eq!(c5t, vec![d(15), d(16)]);
3737 assert_eq!(c6t, vec![d(17)]);
3738
3739 let (c7c, c7s) = result.recv().await.unwrap();
3742 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3743 assert_eq!(c7t, vec![d(5), d(6)]);
3744 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3745 assert_eq!(c7s.sequence_number, 6);
3746
3747 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3748 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3749
3750 checkpoint_service
3751 .notify_checkpoint_signature(
3752 &epoch_store,
3753 &CheckpointSignatureMessage { summary: c2ss },
3754 )
3755 .unwrap();
3756 checkpoint_service
3757 .notify_checkpoint_signature(
3758 &epoch_store,
3759 &CheckpointSignatureMessage { summary: c1ss },
3760 )
3761 .unwrap();
3762
3763 let c1sc = certified_result.recv().await.unwrap();
3764 let c2sc = certified_result.recv().await.unwrap();
3765 assert_eq!(c1sc.sequence_number, 0);
3766 assert_eq!(c2sc.sequence_number, 1);
3767 }
3768
3769 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3770 fn notify_read_executed_effects(
3771 &self,
3772 _: &str,
3773 digests: &[TransactionDigest],
3774 ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3775 std::future::ready(
3776 digests
3777 .iter()
3778 .map(|d| self.get(d).expect("effects not found").clone())
3779 .collect(),
3780 )
3781 .boxed()
3782 }
3783
3784 fn notify_read_executed_effects_digests(
3785 &self,
3786 _: &str,
3787 digests: &[TransactionDigest],
3788 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3789 std::future::ready(
3790 digests
3791 .iter()
3792 .map(|d| {
3793 self.get(d)
3794 .map(|fx| fx.digest())
3795 .expect("effects not found")
3796 })
3797 .collect(),
3798 )
3799 .boxed()
3800 }
3801
3802 fn multi_get_executed_effects(
3803 &self,
3804 digests: &[TransactionDigest],
3805 ) -> Vec<Option<TransactionEffects>> {
3806 digests.iter().map(|d| self.get(d).cloned()).collect()
3807 }
3808
3809 fn multi_get_transaction_blocks(
3815 &self,
3816 _: &[TransactionDigest],
3817 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3818 unimplemented!()
3819 }
3820
3821 fn multi_get_executed_effects_digests(
3822 &self,
3823 _: &[TransactionDigest],
3824 ) -> Vec<Option<TransactionEffectsDigest>> {
3825 unimplemented!()
3826 }
3827
3828 fn multi_get_effects(
3829 &self,
3830 _: &[TransactionEffectsDigest],
3831 ) -> Vec<Option<TransactionEffects>> {
3832 unimplemented!()
3833 }
3834
3835 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3836 unimplemented!()
3837 }
3838
3839 fn get_mysticeti_fastpath_outputs(
3840 &self,
3841 _: &TransactionDigest,
3842 ) -> Option<Arc<TransactionOutputs>> {
3843 unimplemented!()
3844 }
3845
3846 fn notify_read_fastpath_transaction_outputs<'a>(
3847 &'a self,
3848 _: &'a [TransactionDigest],
3849 ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3850 unimplemented!()
3851 }
3852
3853 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3854 unimplemented!()
3855 }
3856
3857 fn get_unchanged_loaded_runtime_objects(
3858 &self,
3859 _digest: &TransactionDigest,
3860 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3861 unimplemented!()
3862 }
3863
3864 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
3865 unimplemented!()
3866 }
3867 }
3868
3869 #[async_trait::async_trait]
3870 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3871 async fn checkpoint_created(
3872 &self,
3873 summary: &CheckpointSummary,
3874 contents: &CheckpointContents,
3875 _epoch_store: &Arc<AuthorityPerEpochStore>,
3876 _checkpoint_store: &Arc<CheckpointStore>,
3877 ) -> SuiResult {
3878 self.try_send((contents.clone(), summary.clone())).unwrap();
3879 Ok(())
3880 }
3881 }
3882
3883 #[async_trait::async_trait]
3884 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3885 async fn certified_checkpoint_created(
3886 &self,
3887 summary: &CertifiedCheckpointSummary,
3888 ) -> SuiResult {
3889 self.try_send(summary.clone()).unwrap();
3890 Ok(())
3891 }
3892 }
3893
3894 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3895 PendingCheckpoint {
3896 roots: t
3897 .into_iter()
3898 .map(|t| TransactionKey::Digest(d(t)))
3899 .collect(),
3900 details: PendingCheckpointInfo {
3901 timestamp_ms,
3902 last_of_epoch: false,
3903 checkpoint_height: i,
3904 consensus_commit_ref: CommitRef::default(),
3905 rejected_transactions_digest: Digest::default(),
3906 },
3907 }
3908 }
3909
3910 fn d(i: u8) -> TransactionDigest {
3911 let mut bytes: [u8; 32] = Default::default();
3912 bytes[0] = i;
3913 TransactionDigest::new(bytes)
3914 }
3915
3916 fn e(
3917 transaction_digest: TransactionDigest,
3918 dependencies: Vec<TransactionDigest>,
3919 gas_used: GasCostSummary,
3920 ) -> TransactionEffects {
3921 let mut effects = TransactionEffects::default();
3922 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3923 *effects.dependencies_mut_for_testing() = dependencies;
3924 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3925 effects
3926 }
3927
3928 fn commit_cert_for_test(
3929 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3930 state: Arc<AuthorityState>,
3931 digest: TransactionDigest,
3932 dependencies: Vec<TransactionDigest>,
3933 gas_used: GasCostSummary,
3934 ) {
3935 let epoch_store = state.epoch_store_for_testing();
3936 let effects = e(digest, dependencies, gas_used);
3937 store.insert(digest, effects.clone());
3938 epoch_store.insert_executed_in_epoch(&digest);
3939 }
3940}