1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116 pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123 pub roots: Vec<TransactionKey>,
124 pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129 pub tx_roots: Vec<TransactionKey>,
130 pub settlement_root: Option<TransactionKey>,
131 pub height: CheckpointHeight,
132}
133
134#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142 pub roots: Vec<CheckpointRoots>,
143 pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148 pub summary: CheckpointSummary,
149 pub checkpoint_height: Option<CheckpointHeight>,
151 pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160 pub(crate) checkpoint_sequence_by_contents_digest:
162 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189 pub(crate) transaction_fork_detected: DBMap<
191 u8,
192 (
193 TransactionDigest,
194 TransactionEffectsDigest,
195 TransactionEffectsDigest,
196 ),
197 >,
198 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203 DBOptions {
204 options: default_db_options().options,
205 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209 }
210}
211
212impl CheckpointStoreTables {
213 #[cfg(not(tidehunter))]
214 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216 }
217
218 #[cfg(tidehunter)]
219 pub fn new(
220 path: &Path,
221 metric_name: &'static str,
222 pruner_watermarks: Arc<PrunerWatermarks>,
223 ) -> Self {
224 tracing::warn!("Checkpoint DB using tidehunter");
225 use crate::authority::authority_store_pruner::apply_relocation_filter;
226 use typed_store::tidehunter_util::{
227 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228 default_mutex_count, default_value_cache_size,
229 };
230 let mutexes = default_mutex_count() * 4;
231 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232 let override_dirty_keys_config = KeySpaceConfig::new()
233 .with_max_dirty_keys(64_000)
234 .with_value_cache_size(default_value_cache_size());
235 let config_u64 = ThConfig::new_with_config(
236 8,
237 mutexes,
238 u64_sequence_key,
239 override_dirty_keys_config.clone(),
240 );
241 let digest_config = ThConfig::new_with_rm_prefix(
242 32,
243 mutexes,
244 KeyType::uniform(default_cells_per_mutex()),
245 KeySpaceConfig::default(),
246 vec![0, 0, 0, 0, 0, 0, 0, 32],
247 );
248 let watermarks_config = KeySpaceConfig::new()
249 .with_value_cache_size(10)
250 .disable_unload();
251 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
252 let configs = vec![
253 (
254 "checkpoint_content",
255 digest_config.clone().with_config(
256 lru_config
257 .clone()
258 .with_relocation_filter(|_, _| Decision::Remove),
259 ),
260 ),
261 (
262 "checkpoint_sequence_by_contents_digest",
263 digest_config.clone().with_config(apply_relocation_filter(
264 KeySpaceConfig::default(),
265 pruner_watermarks.checkpoint_id.clone(),
266 |sequence_number: CheckpointSequenceNumber| sequence_number,
267 false,
268 )),
269 ),
270 (
271 "full_checkpoint_content",
272 config_u64.clone().with_config(apply_relocation_filter(
273 override_dirty_keys_config.clone(),
274 pruner_watermarks.checkpoint_id.clone(),
275 |sequence_number: CheckpointSequenceNumber| sequence_number,
276 true,
277 )),
278 ),
279 ("certified_checkpoints", config_u64.clone()),
280 (
281 "checkpoint_by_digest",
282 digest_config.clone().with_config(apply_relocation_filter(
283 lru_config,
284 pruner_watermarks.epoch_id.clone(),
285 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
286 false,
287 )),
288 ),
289 (
290 "locally_computed_checkpoints",
291 config_u64.clone().with_config(apply_relocation_filter(
292 override_dirty_keys_config.clone(),
293 pruner_watermarks.checkpoint_id.clone(),
294 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
295 true,
296 )),
297 ),
298 ("epoch_last_checkpoint_map", config_u64.clone()),
299 (
300 "watermarks",
301 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
302 ),
303 (
304 "transaction_fork_detected",
305 ThConfig::new_with_config(
306 1,
307 1,
308 KeyType::uniform(1),
309 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
310 ),
311 ),
312 (
313 "full_checkpoint_content_v2",
314 config_u64.clone().with_config(apply_relocation_filter(
315 override_dirty_keys_config.clone(),
316 pruner_watermarks.checkpoint_id.clone(),
317 |sequence_number: CheckpointSequenceNumber| sequence_number,
318 true,
319 )),
320 ),
321 ];
322 Self::open_tables_read_write(
323 path.to_path_buf(),
324 MetricConf::new(metric_name),
325 configs
326 .into_iter()
327 .map(|(cf, config)| (cf.to_string(), config))
328 .collect(),
329 )
330 }
331
332 #[cfg(not(tidehunter))]
333 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
334 Self::get_read_only_handle(
335 path.to_path_buf(),
336 None,
337 None,
338 MetricConf::new("checkpoint_readonly"),
339 )
340 }
341
342 #[cfg(tidehunter)]
343 pub fn open_readonly(path: &Path) -> Self {
344 Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
345 }
346}
347
348pub struct CheckpointStore {
349 pub(crate) tables: CheckpointStoreTables,
350 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
351 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
352}
353
354impl CheckpointStore {
355 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
356 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
357 Arc::new(Self {
358 tables,
359 synced_checkpoint_notify_read: NotifyRead::new(),
360 executed_checkpoint_notify_read: NotifyRead::new(),
361 })
362 }
363
364 pub fn new_for_tests() -> Arc<Self> {
365 let ckpt_dir = mysten_common::tempdir().unwrap();
366 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
367 }
368
369 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
370 let tables = CheckpointStoreTables::new(
371 path,
372 "db_checkpoint",
373 Arc::new(PrunerWatermarks::default()),
374 );
375 Arc::new(Self {
376 tables,
377 synced_checkpoint_notify_read: NotifyRead::new(),
378 executed_checkpoint_notify_read: NotifyRead::new(),
379 })
380 }
381
382 #[cfg(not(tidehunter))]
383 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
384 CheckpointStoreTables::open_readonly(path)
385 }
386
387 #[cfg(tidehunter)]
388 pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
389 CheckpointStoreTables::open_readonly(path)
390 }
391
392 #[instrument(level = "info", skip_all)]
393 pub fn insert_genesis_checkpoint(
394 &self,
395 checkpoint: VerifiedCheckpoint,
396 contents: CheckpointContents,
397 epoch_store: &AuthorityPerEpochStore,
398 ) {
399 assert_eq!(
400 checkpoint.epoch(),
401 0,
402 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
403 );
404 assert_eq!(
405 *checkpoint.sequence_number(),
406 0,
407 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
408 );
409
410 match self.get_checkpoint_by_sequence_number(0).unwrap() {
412 Some(existing_checkpoint) => {
413 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
414 }
415 None => {
416 if epoch_store.epoch() == checkpoint.epoch {
417 epoch_store
418 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
419 .unwrap();
420 } else {
421 debug!(
422 validator_epoch =% epoch_store.epoch(),
423 genesis_epoch =% checkpoint.epoch(),
424 "Not inserting checkpoint builder data for genesis checkpoint",
425 );
426 }
427 self.insert_checkpoint_contents(contents).unwrap();
428 self.insert_verified_checkpoint(&checkpoint).unwrap();
429 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
430 }
431 }
432 }
433
434 pub fn get_checkpoint_by_digest(
435 &self,
436 digest: &CheckpointDigest,
437 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
438 self.tables
439 .checkpoint_by_digest
440 .get(digest)
441 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
442 }
443
444 pub fn get_checkpoint_by_sequence_number(
445 &self,
446 sequence_number: CheckpointSequenceNumber,
447 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
448 self.tables
449 .certified_checkpoints
450 .get(&sequence_number)
451 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
452 }
453
454 pub fn get_locally_computed_checkpoint(
455 &self,
456 sequence_number: CheckpointSequenceNumber,
457 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
458 self.tables
459 .locally_computed_checkpoints
460 .get(&sequence_number)
461 }
462
463 pub fn multi_get_locally_computed_checkpoints(
464 &self,
465 sequence_numbers: &[CheckpointSequenceNumber],
466 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
467 let checkpoints = self
468 .tables
469 .locally_computed_checkpoints
470 .multi_get(sequence_numbers)?;
471
472 Ok(checkpoints)
473 }
474
475 pub fn get_sequence_number_by_contents_digest(
476 &self,
477 digest: &CheckpointContentsDigest,
478 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
479 self.tables
480 .checkpoint_sequence_by_contents_digest
481 .get(digest)
482 }
483
484 pub fn delete_contents_digest_sequence_number_mapping(
485 &self,
486 digest: &CheckpointContentsDigest,
487 ) -> Result<(), TypedStoreError> {
488 self.tables
489 .checkpoint_sequence_by_contents_digest
490 .remove(digest)
491 }
492
493 pub fn get_latest_certified_checkpoint(
494 &self,
495 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
496 Ok(self
497 .tables
498 .certified_checkpoints
499 .reversed_safe_iter_with_bounds(None, None)?
500 .next()
501 .transpose()?
502 .map(|(_, v)| v.into()))
503 }
504
505 pub fn get_latest_locally_computed_checkpoint(
506 &self,
507 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
508 Ok(self
509 .tables
510 .locally_computed_checkpoints
511 .reversed_safe_iter_with_bounds(None, None)?
512 .next()
513 .transpose()?
514 .map(|(_, v)| v))
515 }
516
517 pub fn multi_get_checkpoint_by_sequence_number(
518 &self,
519 sequence_numbers: &[CheckpointSequenceNumber],
520 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
521 let checkpoints = self
522 .tables
523 .certified_checkpoints
524 .multi_get(sequence_numbers)?
525 .into_iter()
526 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
527 .collect();
528
529 Ok(checkpoints)
530 }
531
532 pub fn multi_get_checkpoint_content(
533 &self,
534 contents_digest: &[CheckpointContentsDigest],
535 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
536 self.tables.checkpoint_content.multi_get(contents_digest)
537 }
538
539 pub fn get_highest_verified_checkpoint(
540 &self,
541 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542 let highest_verified = if let Some(highest_verified) = self
543 .tables
544 .watermarks
545 .get(&CheckpointWatermark::HighestVerified)?
546 {
547 highest_verified
548 } else {
549 return Ok(None);
550 };
551 self.get_checkpoint_by_digest(&highest_verified.1)
552 }
553
554 pub fn get_highest_synced_checkpoint(
555 &self,
556 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
557 let highest_synced = if let Some(highest_synced) = self
558 .tables
559 .watermarks
560 .get(&CheckpointWatermark::HighestSynced)?
561 {
562 highest_synced
563 } else {
564 return Ok(None);
565 };
566 self.get_checkpoint_by_digest(&highest_synced.1)
567 }
568
569 pub fn get_highest_synced_checkpoint_seq_number(
570 &self,
571 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
572 if let Some(highest_synced) = self
573 .tables
574 .watermarks
575 .get(&CheckpointWatermark::HighestSynced)?
576 {
577 Ok(Some(highest_synced.0))
578 } else {
579 Ok(None)
580 }
581 }
582
583 pub fn get_highest_executed_checkpoint_seq_number(
584 &self,
585 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
586 if let Some(highest_executed) = self
587 .tables
588 .watermarks
589 .get(&CheckpointWatermark::HighestExecuted)?
590 {
591 Ok(Some(highest_executed.0))
592 } else {
593 Ok(None)
594 }
595 }
596
597 pub fn get_highest_executed_checkpoint(
598 &self,
599 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
600 let highest_executed = if let Some(highest_executed) = self
601 .tables
602 .watermarks
603 .get(&CheckpointWatermark::HighestExecuted)?
604 {
605 highest_executed
606 } else {
607 return Ok(None);
608 };
609 self.get_checkpoint_by_digest(&highest_executed.1)
610 }
611
612 pub fn get_highest_pruned_checkpoint_seq_number(
613 &self,
614 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
615 self.tables
616 .watermarks
617 .get(&CheckpointWatermark::HighestPruned)
618 .map(|watermark| watermark.map(|w| w.0))
619 }
620
621 pub fn get_checkpoint_contents(
622 &self,
623 digest: &CheckpointContentsDigest,
624 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
625 self.tables.checkpoint_content.get(digest)
626 }
627
628 pub fn get_full_checkpoint_contents_by_sequence_number(
629 &self,
630 seq: CheckpointSequenceNumber,
631 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
632 self.tables.full_checkpoint_content_v2.get(&seq)
633 }
634
635 fn prune_local_summaries(&self) -> SuiResult {
636 if let Some((last_local_summary, _)) = self
637 .tables
638 .locally_computed_checkpoints
639 .reversed_safe_iter_with_bounds(None, None)?
640 .next()
641 .transpose()?
642 {
643 let mut batch = self.tables.locally_computed_checkpoints.batch();
644 batch.schedule_delete_range(
645 &self.tables.locally_computed_checkpoints,
646 &0,
647 &last_local_summary,
648 )?;
649 batch.write()?;
650 info!("Pruned local summaries up to {:?}", last_local_summary);
651 }
652 Ok(())
653 }
654
655 pub fn clear_locally_computed_checkpoints_from(
656 &self,
657 from_seq: CheckpointSequenceNumber,
658 ) -> SuiResult {
659 let keys: Vec<_> = self
660 .tables
661 .locally_computed_checkpoints
662 .safe_iter_with_bounds(Some(from_seq), None)
663 .map(|r| r.map(|(k, _)| k))
664 .collect::<Result<_, _>>()?;
665 if let Some(&last_local_summary) = keys.last() {
666 let mut batch = self.tables.locally_computed_checkpoints.batch();
667 batch
668 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
669 .expect("Failed to delete locally computed checkpoints");
670 batch
671 .write()
672 .expect("Failed to delete locally computed checkpoints");
673 warn!(
674 from_seq,
675 last_local_summary,
676 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
677 from_seq,
678 last_local_summary
679 );
680 }
681 Ok(())
682 }
683
684 fn check_for_checkpoint_fork(
685 &self,
686 local_checkpoint: &CheckpointSummary,
687 verified_checkpoint: &VerifiedCheckpoint,
688 ) {
689 if local_checkpoint != verified_checkpoint.data() {
690 let verified_contents = self
691 .get_checkpoint_contents(&verified_checkpoint.content_digest)
692 .map(|opt_contents| {
693 opt_contents
694 .map(|contents| format!("{:?}", contents))
695 .unwrap_or_else(|| {
696 format!(
697 "Verified checkpoint contents not found, digest: {:?}",
698 verified_checkpoint.content_digest,
699 )
700 })
701 })
702 .map_err(|e| {
703 format!(
704 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
705 verified_checkpoint.content_digest, e
706 )
707 })
708 .unwrap_or_else(|err_msg| err_msg);
709
710 let local_contents = self
711 .get_checkpoint_contents(&local_checkpoint.content_digest)
712 .map(|opt_contents| {
713 opt_contents
714 .map(|contents| format!("{:?}", contents))
715 .unwrap_or_else(|| {
716 format!(
717 "Local checkpoint contents not found, digest: {:?}",
718 local_checkpoint.content_digest
719 )
720 })
721 })
722 .map_err(|e| {
723 format!(
724 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
725 local_checkpoint.content_digest, e
726 )
727 })
728 .unwrap_or_else(|err_msg| err_msg);
729
730 error!(
732 verified_checkpoint = ?verified_checkpoint.data(),
733 ?verified_contents,
734 ?local_checkpoint,
735 ?local_contents,
736 "Local checkpoint fork detected!",
737 );
738
739 if let Err(e) = self.record_checkpoint_fork_detected(
741 *local_checkpoint.sequence_number(),
742 local_checkpoint.digest(),
743 ) {
744 error!("Failed to record checkpoint fork in database: {:?}", e);
745 }
746
747 fail_point_arg!(
748 "kill_checkpoint_fork_node",
749 |checkpoint_overrides: std::sync::Arc<
750 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
751 >| {
752 #[cfg(msim)]
753 {
754 if let Ok(mut overrides) = checkpoint_overrides.lock() {
755 overrides.insert(
756 local_checkpoint.sequence_number,
757 verified_checkpoint.digest().to_string(),
758 );
759 }
760 tracing::error!(
761 fatal = true,
762 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
763 local_checkpoint.sequence_number(),
764 verified_checkpoint.digest()
765 );
766 sui_simulator::task::shutdown_current_node();
767 }
768 }
769 );
770
771 fatal!(
772 "Local checkpoint fork detected for sequence number: {}",
773 local_checkpoint.sequence_number()
774 );
775 }
776 }
777
778 pub fn insert_certified_checkpoint(
784 &self,
785 checkpoint: &VerifiedCheckpoint,
786 ) -> Result<(), TypedStoreError> {
787 debug!(
788 checkpoint_seq = checkpoint.sequence_number(),
789 "Inserting certified checkpoint",
790 );
791 let mut batch = self.tables.certified_checkpoints.batch();
792 batch
793 .insert_batch(
794 &self.tables.certified_checkpoints,
795 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
796 )?
797 .insert_batch(
798 &self.tables.checkpoint_by_digest,
799 [(checkpoint.digest(), checkpoint.serializable_ref())],
800 )?;
801 if checkpoint.next_epoch_committee().is_some() {
802 batch.insert_batch(
803 &self.tables.epoch_last_checkpoint_map,
804 [(&checkpoint.epoch(), checkpoint.sequence_number())],
805 )?;
806 }
807 batch.write()?;
808
809 if let Some(local_checkpoint) = self
810 .tables
811 .locally_computed_checkpoints
812 .get(checkpoint.sequence_number())?
813 {
814 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
815 }
816
817 Ok(())
818 }
819
820 #[instrument(level = "debug", skip_all)]
823 pub fn insert_verified_checkpoint(
824 &self,
825 checkpoint: &VerifiedCheckpoint,
826 ) -> Result<(), TypedStoreError> {
827 self.insert_certified_checkpoint(checkpoint)?;
828 self.update_highest_verified_checkpoint(checkpoint)
829 }
830
831 pub fn update_highest_verified_checkpoint(
832 &self,
833 checkpoint: &VerifiedCheckpoint,
834 ) -> Result<(), TypedStoreError> {
835 if Some(*checkpoint.sequence_number())
836 > self
837 .get_highest_verified_checkpoint()?
838 .map(|x| *x.sequence_number())
839 {
840 debug!(
841 checkpoint_seq = checkpoint.sequence_number(),
842 "Updating highest verified checkpoint",
843 );
844 self.tables.watermarks.insert(
845 &CheckpointWatermark::HighestVerified,
846 &(*checkpoint.sequence_number(), *checkpoint.digest()),
847 )?;
848 }
849
850 Ok(())
851 }
852
853 pub fn update_highest_synced_checkpoint(
854 &self,
855 checkpoint: &VerifiedCheckpoint,
856 ) -> Result<(), TypedStoreError> {
857 let seq = *checkpoint.sequence_number();
858 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
859 self.tables.watermarks.insert(
860 &CheckpointWatermark::HighestSynced,
861 &(seq, *checkpoint.digest()),
862 )?;
863 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
864 Ok(())
865 }
866
867 async fn notify_read_checkpoint_watermark<F>(
868 &self,
869 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
870 seq: CheckpointSequenceNumber,
871 get_watermark: F,
872 ) -> VerifiedCheckpoint
873 where
874 F: Fn() -> Option<CheckpointSequenceNumber>,
875 {
876 notify_read
877 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
878 let seq = seqs[0];
879 let Some(highest) = get_watermark() else {
880 return vec![None];
881 };
882 if highest < seq {
883 return vec![None];
884 }
885 let checkpoint = self
886 .get_checkpoint_by_sequence_number(seq)
887 .expect("db error")
888 .expect("checkpoint not found");
889 vec![Some(checkpoint)]
890 })
891 .await
892 .into_iter()
893 .next()
894 .unwrap()
895 }
896
897 pub async fn notify_read_synced_checkpoint(
898 &self,
899 seq: CheckpointSequenceNumber,
900 ) -> VerifiedCheckpoint {
901 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
902 self.get_highest_synced_checkpoint_seq_number()
903 .expect("db error")
904 })
905 .await
906 }
907
908 pub async fn notify_read_executed_checkpoint(
909 &self,
910 seq: CheckpointSequenceNumber,
911 ) -> VerifiedCheckpoint {
912 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
913 self.get_highest_executed_checkpoint_seq_number()
914 .expect("db error")
915 })
916 .await
917 }
918
919 pub fn update_highest_executed_checkpoint(
920 &self,
921 checkpoint: &VerifiedCheckpoint,
922 ) -> Result<(), TypedStoreError> {
923 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
924 if seq_number >= *checkpoint.sequence_number() {
925 return Ok(());
926 }
927 assert_eq!(
928 seq_number + 1,
929 *checkpoint.sequence_number(),
930 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
931 checkpoint.sequence_number(),
932 seq_number
933 );
934 }
935 let seq = *checkpoint.sequence_number();
936 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
937 self.tables.watermarks.insert(
938 &CheckpointWatermark::HighestExecuted,
939 &(seq, *checkpoint.digest()),
940 )?;
941 self.executed_checkpoint_notify_read
942 .notify(&seq, checkpoint);
943 Ok(())
944 }
945
946 pub fn update_highest_pruned_checkpoint(
947 &self,
948 checkpoint: &VerifiedCheckpoint,
949 ) -> Result<(), TypedStoreError> {
950 self.tables.watermarks.insert(
951 &CheckpointWatermark::HighestPruned,
952 &(*checkpoint.sequence_number(), *checkpoint.digest()),
953 )
954 }
955
956 pub fn set_highest_executed_checkpoint_subtle(
961 &self,
962 checkpoint: &VerifiedCheckpoint,
963 ) -> Result<(), TypedStoreError> {
964 self.tables.watermarks.insert(
965 &CheckpointWatermark::HighestExecuted,
966 &(*checkpoint.sequence_number(), *checkpoint.digest()),
967 )
968 }
969
970 pub fn insert_checkpoint_contents(
971 &self,
972 contents: CheckpointContents,
973 ) -> Result<(), TypedStoreError> {
974 debug!(
975 checkpoint_seq = ?contents.digest(),
976 "Inserting checkpoint contents",
977 );
978 self.tables
979 .checkpoint_content
980 .insert(contents.digest(), &contents)
981 }
982
983 pub fn insert_verified_checkpoint_contents(
984 &self,
985 checkpoint: &VerifiedCheckpoint,
986 full_contents: VerifiedCheckpointContents,
987 ) -> Result<(), TypedStoreError> {
988 let mut batch = self.tables.full_checkpoint_content_v2.batch();
989 batch.insert_batch(
990 &self.tables.checkpoint_sequence_by_contents_digest,
991 [(&checkpoint.content_digest, checkpoint.sequence_number())],
992 )?;
993 let full_contents = full_contents.into_inner();
994 batch.insert_batch(
995 &self.tables.full_checkpoint_content_v2,
996 [(checkpoint.sequence_number(), &full_contents)],
997 )?;
998
999 let contents = full_contents.into_checkpoint_contents();
1000 assert_eq!(&checkpoint.content_digest, contents.digest());
1001
1002 batch.insert_batch(
1003 &self.tables.checkpoint_content,
1004 [(contents.digest(), &contents)],
1005 )?;
1006
1007 batch.write()
1008 }
1009
1010 pub fn delete_full_checkpoint_contents(
1011 &self,
1012 seq: CheckpointSequenceNumber,
1013 ) -> Result<(), TypedStoreError> {
1014 self.tables.full_checkpoint_content.remove(&seq)?;
1015 self.tables.full_checkpoint_content_v2.remove(&seq)
1016 }
1017
1018 pub fn get_epoch_last_checkpoint(
1019 &self,
1020 epoch_id: EpochId,
1021 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1022 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1023 let checkpoint = match seq {
1024 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1025 None => None,
1026 };
1027 Ok(checkpoint)
1028 }
1029
1030 pub fn get_epoch_last_checkpoint_seq_number(
1031 &self,
1032 epoch_id: EpochId,
1033 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1034 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1035 Ok(seq)
1036 }
1037
1038 pub fn insert_epoch_last_checkpoint(
1039 &self,
1040 epoch_id: EpochId,
1041 checkpoint: &VerifiedCheckpoint,
1042 ) -> SuiResult {
1043 self.tables
1044 .epoch_last_checkpoint_map
1045 .insert(&epoch_id, checkpoint.sequence_number())?;
1046 Ok(())
1047 }
1048
1049 pub fn get_epoch_state_commitments(
1050 &self,
1051 epoch: EpochId,
1052 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1053 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1054 checkpoint
1055 .end_of_epoch_data
1056 .as_ref()
1057 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1058 .epoch_commitments
1059 .clone()
1060 });
1061 Ok(commitments)
1062 }
1063
1064 pub fn get_epoch_stats(
1066 &self,
1067 epoch: EpochId,
1068 last_checkpoint: &CheckpointSummary,
1069 ) -> Option<EpochStats> {
1070 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1071 (0, 0)
1072 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1073 (
1074 checkpoint.sequence_number + 1,
1075 checkpoint.network_total_transactions,
1076 )
1077 } else {
1078 return None;
1079 };
1080 Some(EpochStats {
1081 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1082 transaction_count: last_checkpoint.network_total_transactions
1083 - prev_epoch_network_transactions,
1084 total_gas_reward: last_checkpoint
1085 .epoch_rolling_gas_cost_summary
1086 .computation_cost,
1087 })
1088 }
1089
1090 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1091 self.tables
1093 .checkpoint_content
1094 .checkpoint_db(path)
1095 .map_err(Into::into)
1096 }
1097
1098 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1099 let mut wb = self.tables.watermarks.batch();
1100 wb.delete_batch(
1101 &self.tables.watermarks,
1102 std::iter::once(CheckpointWatermark::HighestExecuted),
1103 )?;
1104 wb.write()?;
1105 Ok(())
1106 }
1107
1108 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1109 self.delete_highest_executed_checkpoint_test_only()?;
1110 Ok(())
1111 }
1112
1113 pub fn record_checkpoint_fork_detected(
1114 &self,
1115 checkpoint_seq: CheckpointSequenceNumber,
1116 checkpoint_digest: CheckpointDigest,
1117 ) -> Result<(), TypedStoreError> {
1118 info!(
1119 checkpoint_seq = checkpoint_seq,
1120 checkpoint_digest = ?checkpoint_digest,
1121 "Recording checkpoint fork detection in database"
1122 );
1123 self.tables.watermarks.insert(
1124 &CheckpointWatermark::CheckpointForkDetected,
1125 &(checkpoint_seq, checkpoint_digest),
1126 )
1127 }
1128
1129 pub fn get_checkpoint_fork_detected(
1130 &self,
1131 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1132 self.tables
1133 .watermarks
1134 .get(&CheckpointWatermark::CheckpointForkDetected)
1135 }
1136
1137 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1138 self.tables
1139 .watermarks
1140 .remove(&CheckpointWatermark::CheckpointForkDetected)
1141 }
1142
1143 pub fn record_transaction_fork_detected(
1144 &self,
1145 tx_digest: TransactionDigest,
1146 expected_effects_digest: TransactionEffectsDigest,
1147 actual_effects_digest: TransactionEffectsDigest,
1148 ) -> Result<(), TypedStoreError> {
1149 info!(
1150 tx_digest = ?tx_digest,
1151 expected_effects_digest = ?expected_effects_digest,
1152 actual_effects_digest = ?actual_effects_digest,
1153 "Recording transaction fork detection in database"
1154 );
1155 self.tables.transaction_fork_detected.insert(
1156 &TRANSACTION_FORK_DETECTED_KEY,
1157 &(tx_digest, expected_effects_digest, actual_effects_digest),
1158 )
1159 }
1160
1161 pub fn get_transaction_fork_detected(
1162 &self,
1163 ) -> Result<
1164 Option<(
1165 TransactionDigest,
1166 TransactionEffectsDigest,
1167 TransactionEffectsDigest,
1168 )>,
1169 TypedStoreError,
1170 > {
1171 self.tables
1172 .transaction_fork_detected
1173 .get(&TRANSACTION_FORK_DETECTED_KEY)
1174 }
1175
1176 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1177 self.tables
1178 .transaction_fork_detected
1179 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1180 }
1181}
1182
1183#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1184pub enum CheckpointWatermark {
1185 HighestVerified,
1186 HighestSynced,
1187 HighestExecuted,
1188 HighestPruned,
1189 CheckpointForkDetected,
1190}
1191
1192struct CheckpointStateHasher {
1193 epoch_store: Arc<AuthorityPerEpochStore>,
1194 hasher: Weak<GlobalStateHasher>,
1195 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1196}
1197
1198impl CheckpointStateHasher {
1199 fn new(
1200 epoch_store: Arc<AuthorityPerEpochStore>,
1201 hasher: Weak<GlobalStateHasher>,
1202 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1203 ) -> Self {
1204 Self {
1205 epoch_store,
1206 hasher,
1207 receive_from_builder,
1208 }
1209 }
1210
1211 async fn run(self) {
1212 let Self {
1213 epoch_store,
1214 hasher,
1215 mut receive_from_builder,
1216 } = self;
1217 while let Some((seq, effects)) = receive_from_builder.recv().await {
1218 let Some(hasher) = hasher.upgrade() else {
1219 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1220 break;
1221 };
1222 hasher
1223 .accumulate_checkpoint(&effects, seq, &epoch_store)
1224 .expect("epoch ended while accumulating checkpoint");
1225 }
1226 }
1227}
1228
1229#[derive(Debug)]
1230pub enum CheckpointBuilderError {
1231 ChangeEpochTxAlreadyExecuted,
1232 SystemPackagesMissing,
1233 Retry(anyhow::Error),
1234}
1235
1236impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1237 for CheckpointBuilderError
1238{
1239 fn from(e: SuiError) -> Self {
1240 Self::Retry(e.into())
1241 }
1242}
1243
1244pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1245
1246pub struct CheckpointBuilder {
1247 state: Arc<AuthorityState>,
1248 store: Arc<CheckpointStore>,
1249 epoch_store: Arc<AuthorityPerEpochStore>,
1250 notify: Arc<Notify>,
1251 notify_aggregator: Arc<Notify>,
1252 last_built: watch::Sender<CheckpointSequenceNumber>,
1253 effects_store: Arc<dyn TransactionCacheRead>,
1254 global_state_hasher: Weak<GlobalStateHasher>,
1255 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1256 output: Box<dyn CheckpointOutput>,
1257 metrics: Arc<CheckpointMetrics>,
1258 max_transactions_per_checkpoint: usize,
1259 max_checkpoint_size_bytes: usize,
1260}
1261
1262pub struct CheckpointAggregator {
1263 store: Arc<CheckpointStore>,
1264 epoch_store: Arc<AuthorityPerEpochStore>,
1265 notify: Arc<Notify>,
1266 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1267 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1268 current: Option<CheckpointSignatureAggregator>,
1269 output: Box<dyn CertifiedCheckpointOutput>,
1270 state: Arc<AuthorityState>,
1271 metrics: Arc<CheckpointMetrics>,
1272}
1273
1274pub struct CheckpointSignatureAggregator {
1276 summary: CheckpointSummary,
1277 digest: CheckpointDigest,
1278 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1280 store: Arc<CheckpointStore>,
1281 state: Arc<AuthorityState>,
1282 metrics: Arc<CheckpointMetrics>,
1283}
1284
1285impl CheckpointBuilder {
1286 fn new(
1287 state: Arc<AuthorityState>,
1288 store: Arc<CheckpointStore>,
1289 epoch_store: Arc<AuthorityPerEpochStore>,
1290 notify: Arc<Notify>,
1291 effects_store: Arc<dyn TransactionCacheRead>,
1292 global_state_hasher: Weak<GlobalStateHasher>,
1294 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1296 output: Box<dyn CheckpointOutput>,
1297 notify_aggregator: Arc<Notify>,
1298 last_built: watch::Sender<CheckpointSequenceNumber>,
1299 metrics: Arc<CheckpointMetrics>,
1300 max_transactions_per_checkpoint: usize,
1301 max_checkpoint_size_bytes: usize,
1302 ) -> Self {
1303 Self {
1304 state,
1305 store,
1306 epoch_store,
1307 notify,
1308 effects_store,
1309 global_state_hasher,
1310 send_to_hasher,
1311 output,
1312 notify_aggregator,
1313 last_built,
1314 metrics,
1315 max_transactions_per_checkpoint,
1316 max_checkpoint_size_bytes,
1317 }
1318 }
1319
1320 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1328 if let Some(replay_waiter) = consensus_replay_waiter {
1329 info!("Waiting for consensus commits to replay ...");
1330 replay_waiter.wait_for_replay().await;
1331 info!("Consensus commits finished replaying");
1332 }
1333 info!("Starting CheckpointBuilder");
1334 loop {
1335 match self.maybe_build_checkpoints().await {
1336 Ok(()) => {}
1337 err @ Err(
1338 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1339 | CheckpointBuilderError::SystemPackagesMissing,
1340 ) => {
1341 info!("CheckpointBuilder stopping: {:?}", err);
1342 return;
1343 }
1344 Err(CheckpointBuilderError::Retry(inner)) => {
1345 let msg = format!("{:?}", inner);
1346 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1347 tokio::time::sleep(Duration::from_secs(1)).await;
1348 self.metrics.checkpoint_errors.inc();
1349 continue;
1350 }
1351 }
1352
1353 self.notify.notified().await;
1354 }
1355 }
1356
1357 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1358 if self
1359 .epoch_store
1360 .protocol_config()
1361 .split_checkpoints_in_consensus_handler()
1362 {
1363 self.maybe_build_checkpoints_v2().await
1364 } else {
1365 self.maybe_build_checkpoints_v1().await
1366 }
1367 }
1368
1369 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1370 let _scope = monitored_scope("BuildCheckpoints");
1371
1372 let summary = self
1374 .epoch_store
1375 .last_built_checkpoint_builder_summary()
1376 .expect("epoch should not have ended");
1377 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1378 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1379
1380 let min_checkpoint_interval_ms = self
1381 .epoch_store
1382 .protocol_config()
1383 .min_checkpoint_interval_ms_as_option()
1384 .unwrap_or_default();
1385 let mut grouped_pending_checkpoints = Vec::new();
1386 let mut checkpoints_iter = self
1387 .epoch_store
1388 .get_pending_checkpoints(last_height)
1389 .expect("unexpected epoch store error")
1390 .into_iter()
1391 .peekable();
1392 while let Some((height, pending)) = checkpoints_iter.next() {
1393 let current_timestamp = pending.details().timestamp_ms;
1396 let can_build = match last_timestamp {
1397 Some(last_timestamp) => {
1398 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1399 }
1400 None => true,
1401 } || checkpoints_iter
1404 .peek()
1405 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1406 || pending.details().last_of_epoch;
1408 grouped_pending_checkpoints.push(pending);
1409 if !can_build {
1410 debug!(
1411 checkpoint_commit_height = height,
1412 ?last_timestamp,
1413 ?current_timestamp,
1414 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1415 );
1416 continue;
1417 }
1418
1419 last_height = Some(height);
1421 last_timestamp = Some(current_timestamp);
1422 debug!(
1423 checkpoint_commit_height_from = grouped_pending_checkpoints
1424 .first()
1425 .unwrap()
1426 .details()
1427 .checkpoint_height,
1428 checkpoint_commit_height_to = last_height,
1429 "Making checkpoint with commit height range"
1430 );
1431
1432 let seq = self
1433 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1434 .await?;
1435
1436 self.last_built.send_if_modified(|cur| {
1437 if seq > *cur {
1439 *cur = seq;
1440 true
1441 } else {
1442 false
1443 }
1444 });
1445
1446 tokio::task::yield_now().await;
1449 }
1450 debug!(
1451 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1452 grouped_pending_checkpoints.len(),
1453 );
1454
1455 Ok(())
1456 }
1457
1458 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1459 let _scope = monitored_scope("BuildCheckpoints");
1460
1461 let last_height = self
1463 .epoch_store
1464 .last_built_checkpoint_builder_summary()
1465 .expect("epoch should not have ended")
1466 .and_then(|s| s.checkpoint_height);
1467
1468 for (height, pending) in self
1469 .epoch_store
1470 .get_pending_checkpoints_v2(last_height)
1471 .expect("unexpected epoch store error")
1472 {
1473 debug!(checkpoint_commit_height = height, "Making checkpoint");
1474
1475 let seq = self.make_checkpoint_v2(pending).await?;
1476
1477 self.last_built.send_if_modified(|cur| {
1478 if seq > *cur {
1480 *cur = seq;
1481 true
1482 } else {
1483 false
1484 }
1485 });
1486
1487 tokio::task::yield_now().await;
1490 }
1491
1492 Ok(())
1493 }
1494
1495 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1496 async fn make_checkpoint(
1497 &mut self,
1498 pendings: Vec<PendingCheckpoint>,
1499 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1500 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1501
1502 let pending_ckpt_str = pendings
1503 .iter()
1504 .map(|p| {
1505 format!(
1506 "height={}, commit={}",
1507 p.details().checkpoint_height,
1508 p.details().consensus_commit_ref
1509 )
1510 })
1511 .join("; ");
1512
1513 let last_details = pendings.last().unwrap().details().clone();
1514
1515 let highest_executed_sequence = self
1518 .store
1519 .get_highest_executed_checkpoint_seq_number()
1520 .expect("db error")
1521 .unwrap_or(0);
1522
1523 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1524 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1525
1526 let new_checkpoints = self
1527 .create_checkpoints(
1528 sorted_tx_effects_included_in_checkpoint,
1529 &last_details,
1530 &all_roots,
1531 )
1532 .await?;
1533 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1534 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1535 debug_fatal!(
1536 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1537 );
1538 }
1539
1540 let new_ckpt_str = new_checkpoints
1541 .iter()
1542 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1543 .join("; ");
1544
1545 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1546 .await?;
1547 info!(
1548 "Made new checkpoint {} from pending checkpoint {}",
1549 new_ckpt_str, pending_ckpt_str
1550 );
1551
1552 Ok(highest_sequence)
1553 }
1554
1555 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1556 async fn make_checkpoint_v2(
1557 &mut self,
1558 pending: PendingCheckpointV2,
1559 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1560 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1561
1562 let details = pending.details.clone();
1563
1564 let highest_executed_sequence = self
1565 .store
1566 .get_highest_executed_checkpoint_seq_number()
1567 .expect("db error")
1568 .unwrap_or(0);
1569
1570 let (poll_count, result) =
1571 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1572 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1573
1574 let new_checkpoints = self
1575 .create_checkpoints(
1576 sorted_tx_effects_included_in_checkpoint,
1577 &details,
1578 &all_roots,
1579 )
1580 .await?;
1581 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1582 let sequence = *new_checkpoints.first().0.sequence_number();
1583 let digest = new_checkpoints.first().0.digest();
1584 if sequence <= highest_executed_sequence && poll_count > 1 {
1585 debug_fatal!(
1586 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1587 );
1588 }
1589
1590 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1591 .await?;
1592 info!(
1593 seq = sequence,
1594 %digest,
1595 height = details.checkpoint_height,
1596 commit = %details.consensus_commit_ref,
1597 "Made new checkpoint"
1598 );
1599
1600 Ok(sequence)
1601 }
1602
1603 async fn construct_and_execute_settlement_transactions(
1604 &self,
1605 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1606 checkpoint_height: CheckpointHeight,
1607 checkpoint_seq: CheckpointSequenceNumber,
1608 tx_index_offset: u64,
1609 ) -> (TransactionKey, Vec<TransactionEffects>) {
1610 let _scope =
1611 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1612
1613 let tx_key =
1614 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1615
1616 let epoch = self.epoch_store.epoch();
1617 let accumulator_root_obj_initial_shared_version = self
1618 .epoch_store
1619 .epoch_start_config()
1620 .accumulator_root_obj_initial_shared_version()
1621 .expect("accumulator root object must exist");
1622
1623 let builder = AccumulatorSettlementTxBuilder::new(
1624 Some(self.effects_store.as_ref()),
1625 sorted_tx_effects_included_in_checkpoint,
1626 checkpoint_seq,
1627 tx_index_offset,
1628 );
1629
1630 let funds_changes = builder.collect_funds_changes();
1631 let num_updates = builder.num_updates();
1632 let settlement_txns = builder.build_tx(
1633 self.epoch_store.protocol_config(),
1634 epoch,
1635 accumulator_root_obj_initial_shared_version,
1636 checkpoint_height,
1637 checkpoint_seq,
1638 );
1639
1640 let settlement_txns: Vec<_> = settlement_txns
1641 .into_iter()
1642 .map(|tx| {
1643 VerifiedExecutableTransaction::new_system(
1644 VerifiedTransaction::new_system_transaction(tx),
1645 self.epoch_store.epoch(),
1646 )
1647 })
1648 .collect();
1649
1650 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1651
1652 debug!(
1653 ?settlement_digests,
1654 ?tx_key,
1655 "created settlement transactions with {num_updates} updates"
1656 );
1657
1658 self.epoch_store
1659 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1660
1661 let settlement_effects = wait_for_effects_with_retry(
1662 self.effects_store.as_ref(),
1663 "CheckpointBuilder::notify_read_settlement_effects",
1664 &settlement_digests,
1665 tx_key,
1666 )
1667 .await;
1668
1669 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1670 epoch,
1671 accumulator_root_obj_initial_shared_version,
1672 checkpoint_height,
1673 &settlement_effects,
1674 );
1675
1676 let barrier_tx = VerifiedExecutableTransaction::new_system(
1677 VerifiedTransaction::new_system_transaction(barrier_tx),
1678 self.epoch_store.epoch(),
1679 );
1680 let barrier_digest = *barrier_tx.digest();
1681
1682 self.epoch_store
1683 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1684
1685 let barrier_effects = wait_for_effects_with_retry(
1686 self.effects_store.as_ref(),
1687 "CheckpointBuilder::notify_read_barrier_effects",
1688 &[barrier_digest],
1689 tx_key,
1690 )
1691 .await;
1692
1693 let settlement_effects: Vec<_> = settlement_effects
1694 .into_iter()
1695 .chain(barrier_effects)
1696 .collect();
1697
1698 let mut next_accumulator_version = None;
1699 for fx in settlement_effects.iter() {
1700 assert!(
1701 fx.status().is_ok(),
1702 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1703 fx.transaction_digest(),
1704 fx
1705 );
1706 if let Some(version) = fx
1707 .mutated()
1708 .iter()
1709 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1710 {
1711 assert!(
1712 next_accumulator_version.is_none(),
1713 "Only one settlement transaction should mutate the accumulator root object"
1714 );
1715 next_accumulator_version = Some(version);
1716 }
1717 }
1718 let settlements = FundsSettlement {
1719 next_accumulator_version: next_accumulator_version
1720 .expect("Accumulator root object should be mutated in the settlement transactions"),
1721 funds_changes,
1722 };
1723
1724 self.state
1725 .execution_scheduler()
1726 .settle_address_funds(settlements);
1727
1728 (tx_key, settlement_effects)
1729 }
1730
1731 #[instrument(level = "debug", skip_all)]
1736 async fn resolve_checkpoint_transactions(
1737 &self,
1738 pending_checkpoints: Vec<PendingCheckpoint>,
1739 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1740 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1741
1742 let mut effects_in_current_checkpoint = BTreeSet::new();
1747
1748 let mut tx_effects = Vec::new();
1749 let mut tx_roots = HashSet::new();
1750
1751 for pending_checkpoint in pending_checkpoints.into_iter() {
1752 let mut pending = pending_checkpoint;
1753 debug!(
1754 checkpoint_commit_height = pending.details.checkpoint_height,
1755 "Resolving checkpoint transactions for pending checkpoint.",
1756 );
1757
1758 trace!(
1759 "roots for pending checkpoint {:?}: {:?}",
1760 pending.details.checkpoint_height, pending.roots,
1761 );
1762
1763 let settlement_root = if self.epoch_store.accumulators_enabled() {
1764 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1765 pending.roots.pop()
1766 else {
1767 fatal!("No settlement root found");
1768 };
1769 Some(settlement_root)
1770 } else {
1771 None
1772 };
1773
1774 let roots = &pending.roots;
1775
1776 self.metrics
1777 .checkpoint_roots_count
1778 .inc_by(roots.len() as u64);
1779
1780 let root_digests = self
1781 .epoch_store
1782 .notify_read_tx_key_to_digest(roots)
1783 .in_monitored_scope("CheckpointNotifyDigests")
1784 .await?;
1785 let root_effects = self
1786 .effects_store
1787 .notify_read_executed_effects(
1788 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1789 &root_digests,
1790 )
1791 .in_monitored_scope("CheckpointNotifyRead")
1792 .await;
1793
1794 assert!(
1795 self.epoch_store
1796 .protocol_config()
1797 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1798 );
1799
1800 let consensus_commit_prologue =
1803 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1804
1805 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1808 let unsorted_ccp = self.complete_checkpoint_effects(
1809 vec![ccp_effects.clone()],
1810 &mut effects_in_current_checkpoint,
1811 )?;
1812
1813 if unsorted_ccp.len() != 1 {
1816 fatal!(
1817 "Expected 1 consensus commit prologue, got {:?}",
1818 unsorted_ccp
1819 .iter()
1820 .map(|e| e.transaction_digest())
1821 .collect::<Vec<_>>()
1822 );
1823 }
1824 assert_eq!(unsorted_ccp.len(), 1);
1825 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1826 }
1827
1828 let unsorted =
1829 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1830
1831 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1832 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1833 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1834 if cfg!(debug_assertions) {
1835 for tx in unsorted.iter() {
1837 assert!(tx.transaction_digest() != &ccp_digest);
1838 }
1839 }
1840 sorted.push(ccp_effects);
1841 }
1842 sorted.extend(CausalOrder::causal_sort(unsorted));
1843
1844 if let Some(settlement_root) = settlement_root {
1845 let last_checkpoint =
1848 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1849 let next_checkpoint_seq = last_checkpoint
1850 .as_ref()
1851 .map(|(seq, _)| *seq)
1852 .unwrap_or_default()
1853 + 1;
1854 let tx_index_offset = tx_effects.len() as u64;
1855
1856 let (tx_key, settlement_effects) = self
1857 .construct_and_execute_settlement_transactions(
1858 &sorted,
1859 pending.details.checkpoint_height,
1860 next_checkpoint_seq,
1861 tx_index_offset,
1862 )
1863 .await;
1864 debug!(?tx_key, "executed settlement transactions");
1865
1866 assert_eq!(settlement_root, tx_key);
1867
1868 effects_in_current_checkpoint
1879 .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1880 sorted.extend(settlement_effects);
1881 }
1882
1883 #[cfg(msim)]
1884 {
1885 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1887 }
1888
1889 tx_effects.extend(sorted);
1890 tx_roots.extend(root_digests);
1891 }
1892
1893 Ok((tx_effects, tx_roots))
1894 }
1895
1896 #[instrument(level = "debug", skip_all)]
1899 async fn resolve_checkpoint_transactions_v2(
1900 &self,
1901 pending: PendingCheckpointV2,
1902 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1903 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1904
1905 debug!(
1906 checkpoint_commit_height = pending.details.checkpoint_height,
1907 "Resolving checkpoint transactions for pending checkpoint.",
1908 );
1909
1910 trace!(
1911 "roots for pending checkpoint {:?}: {:?}",
1912 pending.details.checkpoint_height, pending.roots,
1913 );
1914
1915 assert!(
1916 self.epoch_store
1917 .protocol_config()
1918 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1919 );
1920
1921 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1922 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1923
1924 for checkpoint_roots in &pending.roots {
1925 let tx_roots = &checkpoint_roots.tx_roots;
1926
1927 self.metrics
1928 .checkpoint_roots_count
1929 .inc_by(tx_roots.len() as u64);
1930
1931 let root_digests = self
1932 .epoch_store
1933 .notify_read_tx_key_to_digest(tx_roots)
1934 .in_monitored_scope("CheckpointNotifyDigests")
1935 .await?;
1936
1937 all_root_digests.extend(root_digests.iter().cloned());
1938
1939 let root_effects = self
1940 .effects_store
1941 .notify_read_executed_effects(
1942 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1943 &root_digests,
1944 )
1945 .in_monitored_scope("CheckpointNotifyRead")
1946 .await;
1947 let consensus_commit_prologue =
1948 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1949
1950 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1951 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1952 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1953
1954 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1955 let checkpoint_seq = pending
1956 .details
1957 .checkpoint_seq
1958 .expect("checkpoint_seq must be set");
1959 let tx_index_offset = all_effects.len() as u64;
1960 let effects = self
1961 .resolve_settlement_effects(
1962 *settlement_key,
1963 &sorted,
1964 checkpoint_roots.height,
1965 checkpoint_seq,
1966 tx_index_offset,
1967 )
1968 .await;
1969 sorted.extend(effects);
1970 }
1971
1972 #[cfg(msim)]
1973 {
1974 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1975 }
1976
1977 all_effects.extend(sorted);
1978 }
1979 Ok((all_effects, all_root_digests.into_iter().collect()))
1980 }
1981
1982 async fn resolve_settlement_effects(
1987 &self,
1988 settlement_key: TransactionKey,
1989 sorted_root_effects: &[TransactionEffects],
1990 checkpoint_height: CheckpointHeight,
1991 checkpoint_seq: CheckpointSequenceNumber,
1992 tx_index_offset: u64,
1993 ) -> Vec<TransactionEffects> {
1994 let epoch = self.epoch_store.epoch();
1995 let accumulator_root_obj_initial_shared_version = self
1996 .epoch_store
1997 .epoch_start_config()
1998 .accumulator_root_obj_initial_shared_version()
1999 .expect("accumulator root object must exist");
2000
2001 let builder = AccumulatorSettlementTxBuilder::new(
2002 None,
2003 sorted_root_effects,
2004 checkpoint_seq,
2005 tx_index_offset,
2006 );
2007
2008 let settlement_digests: Vec<_> = builder
2009 .build_tx(
2010 self.epoch_store.protocol_config(),
2011 epoch,
2012 accumulator_root_obj_initial_shared_version,
2013 checkpoint_height,
2014 checkpoint_seq,
2015 )
2016 .into_iter()
2017 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2018 .collect();
2019
2020 debug!(
2021 ?settlement_digests,
2022 ?settlement_key,
2023 "fallback: reading settlement effects from cache"
2024 );
2025
2026 let settlement_effects = wait_for_effects_with_retry(
2027 self.effects_store.as_ref(),
2028 "CheckpointBuilder::fallback_settlement_effects",
2029 &settlement_digests,
2030 settlement_key,
2031 )
2032 .await;
2033
2034 let barrier_digest = *VerifiedTransaction::new_system_transaction(
2035 accumulators::build_accumulator_barrier_tx(
2036 epoch,
2037 accumulator_root_obj_initial_shared_version,
2038 checkpoint_height,
2039 &settlement_effects,
2040 ),
2041 )
2042 .digest();
2043
2044 let barrier_effects = wait_for_effects_with_retry(
2045 self.effects_store.as_ref(),
2046 "CheckpointBuilder::fallback_barrier_effects",
2047 &[barrier_digest],
2048 settlement_key,
2049 )
2050 .await;
2051
2052 settlement_effects
2053 .into_iter()
2054 .chain(barrier_effects)
2055 .collect()
2056 }
2057
2058 fn extract_consensus_commit_prologue(
2061 &self,
2062 root_digests: &[TransactionDigest],
2063 root_effects: &[TransactionEffects],
2064 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2065 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2066 if root_digests.is_empty() {
2067 return Ok(None);
2068 }
2069
2070 let first_tx = self
2074 .state
2075 .get_transaction_cache_reader()
2076 .get_transaction_block(&root_digests[0])
2077 .expect("Transaction block must exist");
2078
2079 Ok(first_tx
2080 .transaction_data()
2081 .is_consensus_commit_prologue()
2082 .then(|| {
2083 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2084 (*first_tx.digest(), root_effects[0].clone())
2085 }))
2086 }
2087
2088 #[instrument(level = "debug", skip_all)]
2089 async fn write_checkpoints(
2090 &mut self,
2091 height: CheckpointHeight,
2092 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2093 ) -> SuiResult {
2094 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2095 let mut batch = self.store.tables.checkpoint_content.batch();
2096 let mut all_tx_digests =
2097 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2098
2099 for (summary, contents) in &new_checkpoints {
2100 debug!(
2101 checkpoint_commit_height = height,
2102 checkpoint_seq = summary.sequence_number,
2103 contents_digest = ?contents.digest(),
2104 "writing checkpoint",
2105 );
2106
2107 if let Some(previously_computed_summary) = self
2108 .store
2109 .tables
2110 .locally_computed_checkpoints
2111 .get(&summary.sequence_number)?
2112 && previously_computed_summary.digest() != summary.digest()
2113 {
2114 fatal!(
2115 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2116 summary.sequence_number,
2117 previously_computed_summary.digest(),
2118 summary.digest()
2119 );
2120 }
2121
2122 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2123
2124 self.metrics
2125 .transactions_included_in_checkpoint
2126 .inc_by(contents.size() as u64);
2127 let sequence_number = summary.sequence_number;
2128 self.metrics
2129 .last_constructed_checkpoint
2130 .set(sequence_number as i64);
2131
2132 batch.insert_batch(
2133 &self.store.tables.checkpoint_content,
2134 [(contents.digest(), contents)],
2135 )?;
2136
2137 batch.insert_batch(
2138 &self.store.tables.locally_computed_checkpoints,
2139 [(sequence_number, summary)],
2140 )?;
2141 }
2142
2143 batch.write()?;
2144
2145 for (summary, contents) in &new_checkpoints {
2147 self.output
2148 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2149 .await?;
2150 }
2151
2152 for (local_checkpoint, _) in &new_checkpoints {
2153 if let Some(certified_checkpoint) = self
2154 .store
2155 .tables
2156 .certified_checkpoints
2157 .get(local_checkpoint.sequence_number())?
2158 {
2159 self.store
2160 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2161 }
2162 }
2163
2164 self.notify_aggregator.notify_one();
2165 self.epoch_store
2166 .process_constructed_checkpoint(height, new_checkpoints);
2167 Ok(())
2168 }
2169
2170 #[allow(clippy::type_complexity)]
2171 fn split_checkpoint_chunks(
2172 &self,
2173 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2174 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2175 ) -> CheckpointBuilderResult<
2176 Vec<
2177 Vec<(
2178 TransactionEffects,
2179 Vec<(GenericSignature, Option<SequenceNumber>)>,
2180 )>,
2181 >,
2182 > {
2183 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2184
2185 if self
2187 .epoch_store
2188 .protocol_config()
2189 .split_checkpoints_in_consensus_handler()
2190 {
2191 let chunk: Vec<_> = effects_and_transaction_sizes
2192 .into_iter()
2193 .zip(signatures)
2194 .map(|((effects, _size), sigs)| (effects, sigs))
2195 .collect();
2196 return Ok(vec![chunk]);
2197 }
2198 let mut chunks = Vec::new();
2199 let mut chunk = Vec::new();
2200 let mut chunk_size: usize = 0;
2201 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2202 .into_iter()
2203 .zip(signatures.into_iter())
2204 {
2205 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2210 bcs::serialized_size(&signatures)?
2211 } else {
2212 let signatures: Vec<&GenericSignature> =
2213 signatures.iter().map(|(s, _)| s).collect();
2214 bcs::serialized_size(&signatures)?
2215 };
2216 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2217 if chunk.len() == self.max_transactions_per_checkpoint
2218 || (chunk_size + size) > self.max_checkpoint_size_bytes
2219 {
2220 if chunk.is_empty() {
2221 warn!(
2223 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2224 self.max_checkpoint_size_bytes
2225 );
2226 } else {
2227 chunks.push(chunk);
2228 chunk = Vec::new();
2229 chunk_size = 0;
2230 }
2231 }
2232
2233 chunk.push((effects, signatures));
2234 chunk_size += size;
2235 }
2236
2237 if !chunk.is_empty() || chunks.is_empty() {
2238 chunks.push(chunk);
2243 }
2248 Ok(chunks)
2249 }
2250
2251 fn load_last_built_checkpoint_summary(
2252 epoch_store: &AuthorityPerEpochStore,
2253 store: &CheckpointStore,
2254 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2255 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2256 if last_checkpoint.is_none() {
2257 let epoch = epoch_store.epoch();
2258 if epoch > 0 {
2259 let previous_epoch = epoch - 1;
2260 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2261 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2262 if let Some((ref seq, _)) = last_checkpoint {
2263 debug!(
2264 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2265 );
2266 } else {
2267 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2269 }
2270 }
2271 }
2272 Ok(last_checkpoint)
2273 }
2274
2275 #[instrument(level = "debug", skip_all)]
2276 async fn create_checkpoints(
2277 &self,
2278 all_effects: Vec<TransactionEffects>,
2279 details: &PendingCheckpointInfo,
2280 all_roots: &HashSet<TransactionDigest>,
2281 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2282 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2283
2284 let total = all_effects.len();
2285 let mut last_checkpoint =
2286 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2287 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2288 debug!(
2289 checkpoint_commit_height = details.checkpoint_height,
2290 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2291 checkpoint_timestamp = details.timestamp_ms,
2292 "Creating checkpoint(s) for {} transactions",
2293 all_effects.len(),
2294 );
2295
2296 let all_digests: Vec<_> = all_effects
2297 .iter()
2298 .map(|effect| *effect.transaction_digest())
2299 .collect();
2300 let transactions_and_sizes = self
2301 .state
2302 .get_transaction_cache_reader()
2303 .get_transactions_and_serialized_sizes(&all_digests)?;
2304 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2305 let mut transactions = Vec::with_capacity(all_effects.len());
2306 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2307 let mut randomness_rounds = BTreeMap::new();
2308 {
2309 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2310 debug!(
2311 ?last_checkpoint_seq,
2312 "Waiting for {:?} certificates to appear in consensus",
2313 all_effects.len()
2314 );
2315
2316 for (effects, transaction_and_size) in all_effects
2317 .into_iter()
2318 .zip(transactions_and_sizes.into_iter())
2319 {
2320 let (transaction, size) = transaction_and_size
2321 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2322 match transaction.inner().transaction_data().kind() {
2323 TransactionKind::ConsensusCommitPrologue(_)
2324 | TransactionKind::ConsensusCommitPrologueV2(_)
2325 | TransactionKind::ConsensusCommitPrologueV3(_)
2326 | TransactionKind::ConsensusCommitPrologueV4(_)
2327 | TransactionKind::AuthenticatorStateUpdate(_) => {
2328 }
2331 TransactionKind::ProgrammableSystemTransaction(_) => {
2332 }
2334 TransactionKind::ChangeEpoch(_)
2335 | TransactionKind::Genesis(_)
2336 | TransactionKind::EndOfEpochTransaction(_) => {
2337 fatal!(
2338 "unexpected transaction in checkpoint effects: {:?}",
2339 transaction
2340 );
2341 }
2342 TransactionKind::RandomnessStateUpdate(rsu) => {
2343 randomness_rounds
2344 .insert(*effects.transaction_digest(), rsu.randomness_round);
2345 }
2346 TransactionKind::ProgrammableTransaction(_) => {
2347 let digest = *effects.transaction_digest();
2351 if !all_roots.contains(&digest) {
2352 transaction_keys.push(SequencedConsensusTransactionKey::External(
2353 ConsensusTransactionKey::Certificate(digest),
2354 ));
2355 }
2356 }
2357 }
2358 transactions.push(transaction);
2359 all_effects_and_transaction_sizes.push((effects, size));
2360 }
2361
2362 self.epoch_store
2363 .consensus_messages_processed_notify(transaction_keys)
2364 .await?;
2365 }
2366
2367 let signatures = self
2368 .epoch_store
2369 .user_signatures_for_checkpoint(&transactions, &all_digests);
2370 debug!(
2371 ?last_checkpoint_seq,
2372 "Received {} checkpoint user signatures from consensus",
2373 signatures.len()
2374 );
2375
2376 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2377 Some(
2378 transactions
2379 .iter()
2380 .flat_map(|tx| {
2381 if let TransactionKind::ProgrammableTransaction(ptb) =
2382 tx.transaction_data().kind()
2383 {
2384 itertools::Either::Left(
2385 ptb.commands
2386 .iter()
2387 .map(ExecutionTimeObservationKey::from_command),
2388 )
2389 } else {
2390 itertools::Either::Right(std::iter::empty())
2391 }
2392 })
2393 .collect(),
2394 )
2395 } else {
2396 None
2397 };
2398
2399 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2400 let chunks_count = chunks.len();
2401
2402 let mut checkpoints = Vec::with_capacity(chunks_count);
2403 debug!(
2404 ?last_checkpoint_seq,
2405 "Creating {} checkpoints with {} transactions", chunks_count, total,
2406 );
2407
2408 let epoch = self.epoch_store.epoch();
2409 for (index, transactions) in chunks.into_iter().enumerate() {
2410 let first_checkpoint_of_epoch = index == 0
2411 && last_checkpoint
2412 .as_ref()
2413 .map(|(_, c)| c.epoch != epoch)
2414 .unwrap_or(true);
2415 if first_checkpoint_of_epoch {
2416 self.epoch_store
2417 .record_epoch_first_checkpoint_creation_time_metric();
2418 }
2419 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2420
2421 let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2422 preassigned_seq
2423 } else {
2424 last_checkpoint
2425 .as_ref()
2426 .map(|(_, c)| c.sequence_number + 1)
2427 .unwrap_or_default()
2428 };
2429 let mut timestamp_ms = details.timestamp_ms;
2430 if let Some((_, last_checkpoint)) = &last_checkpoint
2431 && last_checkpoint.timestamp_ms > timestamp_ms
2432 {
2433 debug!(
2435 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2436 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2437 );
2438 if self
2439 .epoch_store
2440 .protocol_config()
2441 .enforce_checkpoint_timestamp_monotonicity()
2442 {
2443 timestamp_ms = last_checkpoint.timestamp_ms;
2444 }
2445 }
2446
2447 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2448 let epoch_rolling_gas_cost_summary =
2449 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2450
2451 let end_of_epoch_data = if last_checkpoint_of_epoch {
2452 let system_state_obj = self
2453 .augment_epoch_last_checkpoint(
2454 &epoch_rolling_gas_cost_summary,
2455 timestamp_ms,
2456 &mut effects,
2457 &mut signatures,
2458 sequence_number,
2459 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2460 last_checkpoint_seq.unwrap_or_default(),
2461 )
2462 .await?;
2463
2464 let committee = system_state_obj
2465 .get_current_epoch_committee()
2466 .committee()
2467 .clone();
2468
2469 let root_state_digest = {
2472 let state_acc = self
2473 .global_state_hasher
2474 .upgrade()
2475 .expect("No checkpoints should be getting built after local configuration");
2476 let acc = state_acc.accumulate_checkpoint(
2477 &effects,
2478 sequence_number,
2479 &self.epoch_store,
2480 )?;
2481
2482 state_acc
2483 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2484 .await?;
2485
2486 state_acc.accumulate_running_root(
2487 &self.epoch_store,
2488 sequence_number,
2489 Some(acc),
2490 )?;
2491 state_acc
2492 .digest_epoch(self.epoch_store.clone(), sequence_number)
2493 .await?
2494 };
2495 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2496 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2497
2498 let epoch_commitments = if self
2499 .epoch_store
2500 .protocol_config()
2501 .check_commit_root_state_digest_supported()
2502 {
2503 vec![root_state_digest.into()]
2504 } else {
2505 vec![]
2506 };
2507
2508 Some(EndOfEpochData {
2509 next_epoch_committee: committee.voting_rights,
2510 next_epoch_protocol_version: ProtocolVersion::new(
2511 system_state_obj.protocol_version(),
2512 ),
2513 epoch_commitments,
2514 })
2515 } else {
2516 self.send_to_hasher
2517 .send((sequence_number, effects.clone()))
2518 .await?;
2519
2520 None
2521 };
2522 let contents = if self.epoch_store.protocol_config().address_aliases() {
2523 CheckpointContents::new_v2(&effects, signatures)
2524 } else {
2525 CheckpointContents::new_with_digests_and_signatures(
2526 effects.iter().map(TransactionEffects::execution_digests),
2527 signatures
2528 .into_iter()
2529 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2530 .collect(),
2531 )
2532 };
2533
2534 let num_txns = contents.size() as u64;
2535
2536 let network_total_transactions = last_checkpoint
2537 .as_ref()
2538 .map(|(_, c)| c.network_total_transactions + num_txns)
2539 .unwrap_or(num_txns);
2540
2541 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2542
2543 let matching_randomness_rounds: Vec<_> = effects
2544 .iter()
2545 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2546 .copied()
2547 .collect();
2548
2549 let checkpoint_commitments = if self
2550 .epoch_store
2551 .protocol_config()
2552 .include_checkpoint_artifacts_digest_in_summary()
2553 {
2554 let artifacts = CheckpointArtifacts::from(&effects[..]);
2555 let artifacts_digest = artifacts.digest()?;
2556 vec![artifacts_digest.into()]
2557 } else {
2558 Default::default()
2559 };
2560
2561 let summary = CheckpointSummary::new(
2562 self.epoch_store.protocol_config(),
2563 epoch,
2564 sequence_number,
2565 network_total_transactions,
2566 &contents,
2567 previous_digest,
2568 epoch_rolling_gas_cost_summary,
2569 end_of_epoch_data,
2570 timestamp_ms,
2571 matching_randomness_rounds,
2572 checkpoint_commitments,
2573 );
2574 summary.report_checkpoint_age(
2575 &self.metrics.last_created_checkpoint_age,
2576 &self.metrics.last_created_checkpoint_age_ms,
2577 );
2578 if last_checkpoint_of_epoch {
2579 info!(
2580 checkpoint_seq = sequence_number,
2581 "creating last checkpoint of epoch {}", epoch
2582 );
2583 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2584 self.epoch_store
2585 .report_epoch_metrics_at_last_checkpoint(stats);
2586 }
2587 }
2588 last_checkpoint = Some((sequence_number, summary.clone()));
2589 checkpoints.push((summary, contents));
2590 }
2591
2592 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2593 }
2594
2595 fn get_epoch_total_gas_cost(
2596 &self,
2597 last_checkpoint: Option<&CheckpointSummary>,
2598 cur_checkpoint_effects: &[TransactionEffects],
2599 ) -> GasCostSummary {
2600 let (previous_epoch, previous_gas_costs) = last_checkpoint
2601 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2602 .unwrap_or_default();
2603 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2604 if previous_epoch == self.epoch_store.epoch() {
2605 GasCostSummary::new(
2607 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2608 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2609 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2610 previous_gas_costs.non_refundable_storage_fee
2611 + current_gas_costs.non_refundable_storage_fee,
2612 )
2613 } else {
2614 current_gas_costs
2615 }
2616 }
2617
2618 #[instrument(level = "error", skip_all)]
2619 async fn augment_epoch_last_checkpoint(
2620 &self,
2621 epoch_total_gas_cost: &GasCostSummary,
2622 epoch_start_timestamp_ms: CheckpointTimestamp,
2623 checkpoint_effects: &mut Vec<TransactionEffects>,
2624 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2625 checkpoint: CheckpointSequenceNumber,
2626 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2627 last_checkpoint: CheckpointSequenceNumber,
2630 ) -> CheckpointBuilderResult<SuiSystemState> {
2631 let (system_state, effects) = self
2632 .state
2633 .create_and_execute_advance_epoch_tx(
2634 &self.epoch_store,
2635 epoch_total_gas_cost,
2636 checkpoint,
2637 epoch_start_timestamp_ms,
2638 end_of_epoch_observation_keys,
2639 last_checkpoint,
2640 )
2641 .await?;
2642 checkpoint_effects.push(effects);
2643 signatures.push(vec![]);
2644 Ok(system_state)
2645 }
2646
2647 #[instrument(level = "debug", skip_all)]
2654 fn complete_checkpoint_effects(
2655 &self,
2656 mut roots: Vec<TransactionEffects>,
2657 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2658 ) -> SuiResult<Vec<TransactionEffects>> {
2659 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2660 let mut results = vec![];
2661 let mut seen = HashSet::new();
2662 loop {
2663 let mut pending = HashSet::new();
2664
2665 let transactions_included = self
2666 .epoch_store
2667 .builder_included_transactions_in_checkpoint(
2668 roots.iter().map(|e| e.transaction_digest()),
2669 )?;
2670
2671 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2672 let digest = effect.transaction_digest();
2673 seen.insert(*digest);
2675
2676 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2678 continue;
2679 }
2680
2681 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2683 continue;
2684 }
2685
2686 let existing_effects = self
2687 .epoch_store
2688 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2689
2690 for (dependency, effects_signature_exists) in
2691 effect.dependencies().iter().zip(existing_effects.iter())
2692 {
2693 if !effects_signature_exists {
2698 continue;
2699 }
2700 if seen.insert(*dependency) {
2701 pending.insert(*dependency);
2702 }
2703 }
2704 results.push(effect);
2705 }
2706 if pending.is_empty() {
2707 break;
2708 }
2709 let pending = pending.into_iter().collect::<Vec<_>>();
2710 let effects = self.effects_store.multi_get_executed_effects(&pending);
2711 let effects = effects
2712 .into_iter()
2713 .zip(pending)
2714 .map(|(opt, digest)| match opt {
2715 Some(x) => x,
2716 None => panic!(
2717 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2718 digest
2719 ),
2720 })
2721 .collect::<Vec<_>>();
2722 roots = effects;
2723 }
2724
2725 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2726 Ok(results)
2727 }
2728
2729 #[cfg(msim)]
2732 fn expensive_consensus_commit_prologue_invariants_check(
2733 &self,
2734 root_digests: &[TransactionDigest],
2735 sorted: &[TransactionEffects],
2736 ) {
2737 let root_txs = self
2739 .state
2740 .get_transaction_cache_reader()
2741 .multi_get_transaction_blocks(root_digests);
2742 let ccps = root_txs
2743 .iter()
2744 .filter_map(|tx| {
2745 if let Some(tx) = tx {
2746 if tx.transaction_data().is_consensus_commit_prologue() {
2747 Some(tx)
2748 } else {
2749 None
2750 }
2751 } else {
2752 None
2753 }
2754 })
2755 .collect::<Vec<_>>();
2756
2757 assert!(ccps.len() <= 1);
2759
2760 let txs = self
2762 .state
2763 .get_transaction_cache_reader()
2764 .multi_get_transaction_blocks(
2765 &sorted
2766 .iter()
2767 .map(|tx| tx.transaction_digest().clone())
2768 .collect::<Vec<_>>(),
2769 );
2770
2771 if ccps.len() == 0 {
2772 for tx in txs.iter() {
2775 if let Some(tx) = tx {
2776 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2777 }
2778 }
2779 } else {
2780 assert!(
2782 txs[0]
2783 .as_ref()
2784 .unwrap()
2785 .transaction_data()
2786 .is_consensus_commit_prologue()
2787 );
2788
2789 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2790
2791 for tx in txs.iter().skip(1) {
2792 if let Some(tx) = tx {
2793 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2794 }
2795 }
2796 }
2797 }
2798}
2799
2800async fn wait_for_effects_with_retry(
2801 effects_store: &dyn TransactionCacheRead,
2802 task_name: &'static str,
2803 digests: &[TransactionDigest],
2804 tx_key: TransactionKey,
2805) -> Vec<TransactionEffects> {
2806 let delay = if in_antithesis() {
2807 15
2809 } else {
2810 5
2811 };
2812 loop {
2813 match tokio::time::timeout(Duration::from_secs(delay), async {
2814 effects_store
2815 .notify_read_executed_effects(task_name, digests)
2816 .await
2817 })
2818 .await
2819 {
2820 Ok(effects) => break effects,
2821 Err(_) => {
2822 debug_fatal!(
2823 "Timeout waiting for transactions to be executed {:?}, retrying...",
2824 tx_key
2825 );
2826 }
2827 }
2828 }
2829}
2830
2831impl CheckpointAggregator {
2832 fn new(
2833 tables: Arc<CheckpointStore>,
2834 epoch_store: Arc<AuthorityPerEpochStore>,
2835 notify: Arc<Notify>,
2836 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2837 output: Box<dyn CertifiedCheckpointOutput>,
2838 state: Arc<AuthorityState>,
2839 metrics: Arc<CheckpointMetrics>,
2840 ) -> Self {
2841 Self {
2842 store: tables,
2843 epoch_store,
2844 notify,
2845 receiver,
2846 pending: BTreeMap::new(),
2847 current: None,
2848 output,
2849 state,
2850 metrics,
2851 }
2852 }
2853
2854 async fn run(mut self) {
2855 info!("Starting CheckpointAggregator");
2856 loop {
2857 while let Ok(sig) = self.receiver.try_recv() {
2859 self.pending
2860 .entry(sig.summary.sequence_number)
2861 .or_default()
2862 .push(sig);
2863 }
2864
2865 if let Err(e) = self.run_and_notify().await {
2866 error!(
2867 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2868 e
2869 );
2870 self.metrics.checkpoint_errors.inc();
2871 tokio::time::sleep(Duration::from_secs(1)).await;
2872 continue;
2873 }
2874
2875 tokio::select! {
2876 Some(sig) = self.receiver.recv() => {
2877 self.pending
2878 .entry(sig.summary.sequence_number)
2879 .or_default()
2880 .push(sig);
2881 }
2882 _ = self.notify.notified() => {}
2883 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2884 }
2885 }
2886 }
2887
2888 async fn run_and_notify(&mut self) -> SuiResult {
2889 let summaries = self.run_inner()?;
2890 for summary in summaries {
2891 self.output.certified_checkpoint_created(&summary).await?;
2892 }
2893 Ok(())
2894 }
2895
2896 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2897 let _scope = monitored_scope("CheckpointAggregator");
2898 let mut result = vec![];
2899 'outer: loop {
2900 let next_to_certify = self.next_checkpoint_to_certify()?;
2901 self.pending.retain(|&seq, _| seq >= next_to_certify);
2904 let current = if let Some(current) = &mut self.current {
2905 if current.summary.sequence_number < next_to_certify {
2911 assert_reachable!("skip checkpoint certification");
2912 self.current = None;
2913 continue;
2914 }
2915 current
2916 } else {
2917 let Some(summary) = self
2918 .epoch_store
2919 .get_built_checkpoint_summary(next_to_certify)?
2920 else {
2921 return Ok(result);
2922 };
2923 self.current = Some(CheckpointSignatureAggregator {
2924 digest: summary.digest(),
2925 summary,
2926 signatures_by_digest: MultiStakeAggregator::new(
2927 self.epoch_store.committee().clone(),
2928 ),
2929 store: self.store.clone(),
2930 state: self.state.clone(),
2931 metrics: self.metrics.clone(),
2932 });
2933 self.current.as_mut().unwrap()
2934 };
2935
2936 let seq = current.summary.sequence_number;
2937 let sigs = self.pending.remove(&seq).unwrap_or_default();
2938 if sigs.is_empty() {
2939 trace!(
2940 checkpoint_seq =? seq,
2941 "Not enough checkpoint signatures",
2942 );
2943 return Ok(result);
2944 }
2945 for data in sigs {
2946 trace!(
2947 checkpoint_seq = seq,
2948 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2949 current.summary.digest(),
2950 data.summary.auth_sig().authority.concise()
2951 );
2952 self.metrics
2953 .checkpoint_participation
2954 .with_label_values(&[&format!(
2955 "{:?}",
2956 data.summary.auth_sig().authority.concise()
2957 )])
2958 .inc();
2959 if let Ok(auth_signature) = current.try_aggregate(data) {
2960 debug!(
2961 checkpoint_seq = seq,
2962 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2963 current.summary.digest(),
2964 );
2965 let summary = VerifiedCheckpoint::new_unchecked(
2966 CertifiedCheckpointSummary::new_from_data_and_sig(
2967 current.summary.clone(),
2968 auth_signature,
2969 ),
2970 );
2971
2972 self.store.insert_certified_checkpoint(&summary)?;
2973 self.metrics.last_certified_checkpoint.set(seq as i64);
2974 current.summary.report_checkpoint_age(
2975 &self.metrics.last_certified_checkpoint_age,
2976 &self.metrics.last_certified_checkpoint_age_ms,
2977 );
2978 result.push(summary.into_inner());
2979 self.current = None;
2980 continue 'outer;
2981 }
2982 }
2983 break;
2984 }
2985 Ok(result)
2986 }
2987
2988 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2989 Ok(self
2990 .store
2991 .tables
2992 .certified_checkpoints
2993 .reversed_safe_iter_with_bounds(None, None)?
2994 .next()
2995 .transpose()?
2996 .map(|(seq, _)| seq + 1)
2997 .unwrap_or_default())
2998 }
2999}
3000
3001impl CheckpointSignatureAggregator {
3002 #[allow(clippy::result_unit_err)]
3003 pub fn try_aggregate(
3004 &mut self,
3005 data: CheckpointSignatureMessage,
3006 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3007 let their_digest = *data.summary.digest();
3008 let (_, signature) = data.summary.into_data_and_sig();
3009 let author = signature.authority;
3010 let envelope =
3011 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3012 match self.signatures_by_digest.insert(their_digest, envelope) {
3013 InsertResult::Failed { error }
3015 if matches!(
3016 error.as_inner(),
3017 SuiErrorKind::StakeAggregatorRepeatedSigner {
3018 conflicting_sig: false,
3019 ..
3020 },
3021 ) =>
3022 {
3023 Err(())
3024 }
3025 InsertResult::Failed { error } => {
3026 warn!(
3027 checkpoint_seq = self.summary.sequence_number,
3028 "Failed to aggregate new signature from validator {:?}: {:?}",
3029 author.concise(),
3030 error
3031 );
3032 self.check_for_split_brain();
3033 Err(())
3034 }
3035 InsertResult::QuorumReached(cert) => {
3036 if their_digest != self.digest {
3039 self.metrics.remote_checkpoint_forks.inc();
3040 warn!(
3041 checkpoint_seq = self.summary.sequence_number,
3042 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3043 author.concise(),
3044 their_digest,
3045 self.digest
3046 );
3047 return Err(());
3048 }
3049 Ok(cert)
3050 }
3051 InsertResult::NotEnoughVotes {
3052 bad_votes: _,
3053 bad_authorities: _,
3054 } => {
3055 self.check_for_split_brain();
3056 Err(())
3057 }
3058 }
3059 }
3060
3061 fn check_for_split_brain(&self) {
3065 debug!(
3066 checkpoint_seq = self.summary.sequence_number,
3067 "Checking for split brain condition"
3068 );
3069 if self.signatures_by_digest.quorum_unreachable() {
3070 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3076 let digests_by_stake_messages = all_unique_values
3077 .iter()
3078 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3079 .map(|(digest, (_authorities, total_stake))| {
3080 format!("{:?} (total stake: {})", digest, total_stake)
3081 })
3082 .collect::<Vec<String>>();
3083 fail_point_arg!("kill_split_brain_node", |(
3084 checkpoint_overrides,
3085 forked_authorities,
3086 ): (
3087 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3088 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3089 )| {
3090 #[cfg(msim)]
3091 {
3092 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3093 (checkpoint_overrides.lock(), forked_authorities.lock())
3094 {
3095 let correct_digest = all_unique_values
3097 .iter()
3098 .find(|(_, (authorities, _))| {
3099 authorities
3101 .iter()
3102 .any(|auth| !forked_authorities_set.contains(auth))
3103 })
3104 .map(|(digest, _)| digest.to_string())
3105 .unwrap_or_else(|| {
3106 all_unique_values
3108 .iter()
3109 .max_by_key(|(_, (_, stake))| *stake)
3110 .map(|(digest, _)| digest.to_string())
3111 .unwrap_or_else(|| self.digest.to_string())
3112 });
3113
3114 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3115
3116 tracing::error!(
3117 fatal = true,
3118 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3119 self.summary.sequence_number,
3120 correct_digest
3121 );
3122 }
3123 }
3124 });
3125
3126 debug_fatal!(
3127 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3128 self.summary.sequence_number,
3129 self.signatures_by_digest.uncommitted_stake(),
3130 digests_by_stake_messages
3131 );
3132 self.metrics.split_brain_checkpoint_forks.inc();
3133
3134 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3135 let local_summary = self.summary.clone();
3136 let state = self.state.clone();
3137 let tables = self.store.clone();
3138
3139 tokio::spawn(async move {
3140 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3141 });
3142 }
3143 }
3144}
3145
3146async fn diagnose_split_brain(
3152 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3153 local_summary: CheckpointSummary,
3154 state: Arc<AuthorityState>,
3155 tables: Arc<CheckpointStore>,
3156) {
3157 debug!(
3158 checkpoint_seq = local_summary.sequence_number,
3159 "Running split brain diagnostics..."
3160 );
3161 let time = SystemTime::now();
3162 let digest_to_validator = all_unique_values
3164 .iter()
3165 .filter_map(|(digest, (validators, _))| {
3166 if *digest != local_summary.digest() {
3167 let random_validator = validators.choose(&mut get_rng()).unwrap();
3168 Some((*digest, *random_validator))
3169 } else {
3170 None
3171 }
3172 })
3173 .collect::<HashMap<_, _>>();
3174 if digest_to_validator.is_empty() {
3175 panic!(
3176 "Given split brain condition, there should be at \
3177 least one validator that disagrees with local signature"
3178 );
3179 }
3180
3181 let epoch_store = state.load_epoch_store_one_call_per_task();
3182 let committee = epoch_store
3183 .epoch_start_state()
3184 .get_sui_committee_with_network_metadata();
3185 let network_config = default_mysten_network_config();
3186 let network_clients =
3187 make_network_authority_clients_with_network_config(&committee, &network_config);
3188
3189 let response_futures = digest_to_validator
3191 .values()
3192 .cloned()
3193 .map(|validator| {
3194 let client = network_clients
3195 .get(&validator)
3196 .expect("Failed to get network client");
3197 let request = CheckpointRequestV2 {
3198 sequence_number: Some(local_summary.sequence_number),
3199 request_content: true,
3200 certified: false,
3201 };
3202 client.handle_checkpoint_v2(request)
3203 })
3204 .collect::<Vec<_>>();
3205
3206 let digest_name_pair = digest_to_validator.iter();
3207 let response_data = futures::future::join_all(response_futures)
3208 .await
3209 .into_iter()
3210 .zip(digest_name_pair)
3211 .filter_map(|(response, (digest, name))| match response {
3212 Ok(response) => match response {
3213 CheckpointResponseV2 {
3214 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3215 contents: Some(contents),
3216 } => Some((*name, *digest, summary, contents)),
3217 CheckpointResponseV2 {
3218 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3219 contents: _,
3220 } => {
3221 panic!("Expected pending checkpoint, but got certified checkpoint");
3222 }
3223 CheckpointResponseV2 {
3224 checkpoint: None,
3225 contents: _,
3226 } => {
3227 error!(
3228 "Summary for checkpoint {:?} not found on validator {:?}",
3229 local_summary.sequence_number, name
3230 );
3231 None
3232 }
3233 CheckpointResponseV2 {
3234 checkpoint: _,
3235 contents: None,
3236 } => {
3237 error!(
3238 "Contents for checkpoint {:?} not found on validator {:?}",
3239 local_summary.sequence_number, name
3240 );
3241 None
3242 }
3243 },
3244 Err(e) => {
3245 error!(
3246 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3247 e
3248 );
3249 None
3250 }
3251 })
3252 .collect::<Vec<_>>();
3253
3254 let local_checkpoint_contents = tables
3255 .get_checkpoint_contents(&local_summary.content_digest)
3256 .unwrap_or_else(|_| {
3257 panic!(
3258 "Could not find checkpoint contents for digest {:?}",
3259 local_summary.digest()
3260 )
3261 })
3262 .unwrap_or_else(|| {
3263 panic!(
3264 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3265 local_summary.sequence_number,
3266 local_summary.digest()
3267 )
3268 });
3269 let local_contents_text = format!("{local_checkpoint_contents:?}");
3270
3271 let local_summary_text = format!("{local_summary:?}");
3272 let local_validator = state.name.concise();
3273 let diff_patches = response_data
3274 .iter()
3275 .map(|(name, other_digest, other_summary, contents)| {
3276 let other_contents_text = format!("{contents:?}");
3277 let other_summary_text = format!("{other_summary:?}");
3278 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3279 .enumerate_transactions(&local_summary)
3280 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3281 .unzip();
3282 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3283 .enumerate_transactions(other_summary)
3284 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3285 .unzip();
3286 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3287 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3288 let local_transactions_text = format!("{local_transactions:#?}");
3289 let other_transactions_text = format!("{other_transactions:#?}");
3290 let transactions_patch =
3291 create_patch(&local_transactions_text, &other_transactions_text);
3292 let local_effects_text = format!("{local_effects:#?}");
3293 let other_effects_text = format!("{other_effects:#?}");
3294 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3295 let seq_number = local_summary.sequence_number;
3296 let local_digest = local_summary.digest();
3297 let other_validator = name.concise();
3298 format!(
3299 "Checkpoint: {seq_number:?}\n\
3300 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3301 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3302 Summary Diff: \n{summary_patch}\n\n\
3303 Contents Diff: \n{contents_patch}\n\n\
3304 Transactions Diff: \n{transactions_patch}\n\n\
3305 Effects Diff: \n{effects_patch}",
3306 )
3307 })
3308 .collect::<Vec<_>>()
3309 .join("\n\n\n");
3310
3311 let header = format!(
3312 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3313 Datetime: {:?}",
3314 time
3315 );
3316 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3317 let path = tempfile::tempdir()
3318 .expect("Failed to create tempdir")
3319 .keep()
3320 .join(Path::new("checkpoint_fork_dump.txt"));
3321 let mut file = File::create(path).unwrap();
3322 write!(file, "{}", fork_logs_text).unwrap();
3323 debug!("{}", fork_logs_text);
3324}
3325
3326pub trait CheckpointServiceNotify {
3327 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3328
3329 fn notify_checkpoint(&self) -> SuiResult;
3330}
3331
3332#[allow(clippy::large_enum_variant)]
3333enum CheckpointServiceState {
3334 Unstarted(
3335 (
3336 CheckpointBuilder,
3337 CheckpointAggregator,
3338 CheckpointStateHasher,
3339 ),
3340 ),
3341 Started,
3342}
3343
3344impl CheckpointServiceState {
3345 fn take_unstarted(
3346 &mut self,
3347 ) -> (
3348 CheckpointBuilder,
3349 CheckpointAggregator,
3350 CheckpointStateHasher,
3351 ) {
3352 let mut state = CheckpointServiceState::Started;
3353 std::mem::swap(self, &mut state);
3354
3355 match state {
3356 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3357 (builder, aggregator, hasher)
3358 }
3359 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3360 }
3361 }
3362}
3363
3364pub struct CheckpointService {
3365 tables: Arc<CheckpointStore>,
3366 notify_builder: Arc<Notify>,
3367 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3368 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3370 highest_previously_built_seq: CheckpointSequenceNumber,
3373 metrics: Arc<CheckpointMetrics>,
3374 state: Mutex<CheckpointServiceState>,
3375}
3376
3377impl CheckpointService {
3378 #[allow(clippy::disallowed_methods)]
3384 pub fn build(
3385 state: Arc<AuthorityState>,
3386 checkpoint_store: Arc<CheckpointStore>,
3387 epoch_store: Arc<AuthorityPerEpochStore>,
3388 effects_store: Arc<dyn TransactionCacheRead>,
3389 global_state_hasher: Weak<GlobalStateHasher>,
3390 checkpoint_output: Box<dyn CheckpointOutput>,
3391 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3392 metrics: Arc<CheckpointMetrics>,
3393 max_transactions_per_checkpoint: usize,
3394 max_checkpoint_size_bytes: usize,
3395 ) -> Arc<Self> {
3396 info!(
3397 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3398 );
3399 let notify_builder = Arc::new(Notify::new());
3400 let notify_aggregator = Arc::new(Notify::new());
3401
3402 let highest_previously_built_seq = checkpoint_store
3404 .get_latest_locally_computed_checkpoint()
3405 .expect("failed to get latest locally computed checkpoint")
3406 .map(|s| s.sequence_number)
3407 .unwrap_or(0);
3408
3409 let highest_currently_built_seq =
3410 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3411 .expect("epoch should not have ended")
3412 .map(|(seq, _)| seq)
3413 .unwrap_or(0);
3414
3415 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3416
3417 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3418
3419 let aggregator = CheckpointAggregator::new(
3420 checkpoint_store.clone(),
3421 epoch_store.clone(),
3422 notify_aggregator.clone(),
3423 signature_receiver,
3424 certified_checkpoint_output,
3425 state.clone(),
3426 metrics.clone(),
3427 );
3428
3429 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3430
3431 let ckpt_state_hasher = CheckpointStateHasher::new(
3432 epoch_store.clone(),
3433 global_state_hasher.clone(),
3434 receive_from_builder,
3435 );
3436
3437 let builder = CheckpointBuilder::new(
3438 state.clone(),
3439 checkpoint_store.clone(),
3440 epoch_store.clone(),
3441 notify_builder.clone(),
3442 effects_store,
3443 global_state_hasher,
3444 send_to_hasher,
3445 checkpoint_output,
3446 notify_aggregator.clone(),
3447 highest_currently_built_seq_tx.clone(),
3448 metrics.clone(),
3449 max_transactions_per_checkpoint,
3450 max_checkpoint_size_bytes,
3451 );
3452
3453 Arc::new(Self {
3454 tables: checkpoint_store,
3455 notify_builder,
3456 signature_sender,
3457 highest_currently_built_seq_tx,
3458 highest_previously_built_seq,
3459 metrics,
3460 state: Mutex::new(CheckpointServiceState::Unstarted((
3461 builder,
3462 aggregator,
3463 ckpt_state_hasher,
3464 ))),
3465 })
3466 }
3467
3468 pub async fn spawn(
3476 &self,
3477 epoch_store: Arc<AuthorityPerEpochStore>,
3478 consensus_replay_waiter: Option<ReplayWaiter>,
3479 ) {
3480 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3481
3482 let last_persisted_builder_seq = epoch_store
3492 .last_persisted_checkpoint_builder_summary()
3493 .expect("epoch should not have ended")
3494 .map(|s| s.summary.sequence_number);
3495
3496 let last_executed_seq = self
3497 .tables
3498 .get_highest_executed_checkpoint()
3499 .expect("Failed to get highest executed checkpoint")
3500 .map(|checkpoint| *checkpoint.sequence_number());
3501
3502 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3503 if let Err(e) = builder
3504 .epoch_store
3505 .clear_state_hashes_after_checkpoint(last_committed_seq)
3506 {
3507 error!(
3508 "Failed to clear state hashes after checkpoint {}: {:?}",
3509 last_committed_seq, e
3510 );
3511 } else {
3512 info!(
3513 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3514 last_committed_seq
3515 );
3516 }
3517 }
3518
3519 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3520
3521 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3522 let aggregator_task = spawn_monitored_task!(aggregator.run());
3523
3524 spawn_monitored_task!(async move {
3525 epoch_store
3526 .within_alive_epoch(async move {
3527 builder.run(consensus_replay_waiter).await;
3528 builder_finished_tx.send(()).ok();
3529 })
3530 .await
3531 .ok();
3532
3533 state_hasher_task
3535 .await
3536 .expect("state hasher should exit normally");
3537
3538 aggregator_task.abort();
3541 aggregator_task.await.ok();
3542 });
3543
3544 if tokio::time::timeout(Duration::from_secs(120), async move {
3550 tokio::select! {
3551 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3552 _ = self.wait_for_rebuilt_checkpoints() => (),
3553 }
3554 })
3555 .await
3556 .is_err()
3557 {
3558 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3559 }
3560 }
3561}
3562
3563impl CheckpointService {
3564 pub async fn wait_for_rebuilt_checkpoints(&self) {
3570 let highest_previously_built_seq = self.highest_previously_built_seq;
3571 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3572 let mut highest_currently_built_seq = *rx.borrow_and_update();
3573 info!(
3574 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3575 );
3576 loop {
3577 if highest_currently_built_seq >= highest_previously_built_seq {
3578 info!("Checkpoint rebuild complete");
3579 break;
3580 }
3581 rx.changed().await.unwrap();
3582 highest_currently_built_seq = *rx.borrow_and_update();
3583 }
3584 }
3585
3586 #[cfg(test)]
3587 fn write_and_notify_checkpoint_for_testing(
3588 &self,
3589 epoch_store: &AuthorityPerEpochStore,
3590 checkpoint: PendingCheckpoint,
3591 ) -> SuiResult {
3592 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3593
3594 let mut output = ConsensusCommitOutput::new(0);
3595 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3596 output.set_default_commit_stats_for_testing();
3597 epoch_store.push_consensus_output_for_tests(output);
3598 self.notify_checkpoint()?;
3599 Ok(())
3600 }
3601}
3602
3603impl CheckpointServiceNotify for CheckpointService {
3604 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3605 let sequence = info.summary.sequence_number;
3606 let signer = info.summary.auth_sig().authority.concise();
3607
3608 if let Some(highest_verified_checkpoint) = self
3609 .tables
3610 .get_highest_verified_checkpoint()?
3611 .map(|x| *x.sequence_number())
3612 && sequence <= highest_verified_checkpoint
3613 {
3614 trace!(
3615 checkpoint_seq = sequence,
3616 "Ignore checkpoint signature from {} - already certified", signer,
3617 );
3618 self.metrics
3619 .last_ignored_checkpoint_signature_received
3620 .set(sequence as i64);
3621 return Ok(());
3622 }
3623 trace!(
3624 checkpoint_seq = sequence,
3625 "Received checkpoint signature, digest {} from {}",
3626 info.summary.digest(),
3627 signer,
3628 );
3629 self.metrics
3630 .last_received_checkpoint_signatures
3631 .with_label_values(&[&signer.to_string()])
3632 .set(sequence as i64);
3633 self.signature_sender.send(info.clone()).ok();
3634 Ok(())
3635 }
3636
3637 fn notify_checkpoint(&self) -> SuiResult {
3638 self.notify_builder.notify_one();
3639 Ok(())
3640 }
3641}
3642
3643pub struct CheckpointServiceNoop {}
3645impl CheckpointServiceNotify for CheckpointServiceNoop {
3646 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3647 Ok(())
3648 }
3649
3650 fn notify_checkpoint(&self) -> SuiResult {
3651 Ok(())
3652 }
3653}
3654
3655impl PendingCheckpoint {
3656 pub fn height(&self) -> CheckpointHeight {
3657 self.details.checkpoint_height
3658 }
3659
3660 pub fn roots(&self) -> &Vec<TransactionKey> {
3661 &self.roots
3662 }
3663
3664 pub fn details(&self) -> &PendingCheckpointInfo {
3665 &self.details
3666 }
3667}
3668
3669impl PendingCheckpointV2 {
3670 pub fn height(&self) -> CheckpointHeight {
3671 self.details.checkpoint_height
3672 }
3673
3674 pub(crate) fn num_roots(&self) -> usize {
3675 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3676 }
3677}
3678
3679pin_project! {
3680 pub struct PollCounter<Fut> {
3681 #[pin]
3682 future: Fut,
3683 count: usize,
3684 }
3685}
3686
3687impl<Fut> PollCounter<Fut> {
3688 pub fn new(future: Fut) -> Self {
3689 Self { future, count: 0 }
3690 }
3691
3692 pub fn count(&self) -> usize {
3693 self.count
3694 }
3695}
3696
3697impl<Fut: Future> Future for PollCounter<Fut> {
3698 type Output = (usize, Fut::Output);
3699
3700 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3701 let this = self.project();
3702 *this.count += 1;
3703 match this.future.poll(cx) {
3704 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3705 Poll::Pending => Poll::Pending,
3706 }
3707 }
3708}
3709
3710fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3711 PollCounter::new(future)
3712}
3713
3714#[cfg(test)]
3715mod tests {
3716 use super::*;
3717 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3718 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3719 use futures::FutureExt as _;
3720 use futures::future::BoxFuture;
3721 use std::collections::HashMap;
3722 use std::ops::Deref;
3723 use sui_macros::sim_test;
3724 use sui_protocol_config::{Chain, ProtocolConfig};
3725 use sui_types::accumulator_event::AccumulatorEvent;
3726 use sui_types::authenticator_state::ActiveJwk;
3727 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3728 use sui_types::crypto::Signature;
3729 use sui_types::effects::{TransactionEffects, TransactionEvents};
3730 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3731 use sui_types::transaction::VerifiedTransaction;
3732 use tokio::sync::mpsc;
3733
3734 #[tokio::test]
3735 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3736 let store = CheckpointStore::new_for_tests();
3737 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3738 for seq in 70u64..=80u64 {
3739 let contents =
3740 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3741 [sui_types::base_types::ExecutionDigests::new(
3742 sui_types::digests::TransactionDigest::random(),
3743 sui_types::digests::TransactionEffectsDigest::ZERO,
3744 )],
3745 );
3746 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3747 &protocol,
3748 0,
3749 seq,
3750 0,
3751 &contents,
3752 None,
3753 sui_types::gas::GasCostSummary::default(),
3754 None,
3755 0,
3756 Vec::new(),
3757 Vec::new(),
3758 );
3759 store
3760 .tables
3761 .locally_computed_checkpoints
3762 .insert(&seq, &summary)
3763 .unwrap();
3764 }
3765
3766 store
3767 .clear_locally_computed_checkpoints_from(76)
3768 .expect("clear should succeed");
3769
3770 assert!(
3772 store
3773 .tables
3774 .locally_computed_checkpoints
3775 .get(&75)
3776 .unwrap()
3777 .is_some()
3778 );
3779 assert!(
3780 store
3781 .tables
3782 .locally_computed_checkpoints
3783 .get(&76)
3784 .unwrap()
3785 .is_none()
3786 );
3787
3788 for seq in 70u64..76u64 {
3789 assert!(
3790 store
3791 .tables
3792 .locally_computed_checkpoints
3793 .get(&seq)
3794 .unwrap()
3795 .is_some()
3796 );
3797 }
3798 for seq in 76u64..=80u64 {
3799 assert!(
3800 store
3801 .tables
3802 .locally_computed_checkpoints
3803 .get(&seq)
3804 .unwrap()
3805 .is_none()
3806 );
3807 }
3808 }
3809
3810 #[tokio::test]
3811 async fn test_fork_detection_storage() {
3812 let store = CheckpointStore::new_for_tests();
3813 let seq_num = 42;
3815 let digest = CheckpointDigest::random();
3816
3817 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3818
3819 store
3820 .record_checkpoint_fork_detected(seq_num, digest)
3821 .unwrap();
3822
3823 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3824 assert!(retrieved.is_some());
3825 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3826 assert_eq!(retrieved_seq, seq_num);
3827 assert_eq!(retrieved_digest, digest);
3828
3829 store.clear_checkpoint_fork_detected().unwrap();
3830 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3831
3832 let tx_digest = TransactionDigest::random();
3834 let expected_effects = TransactionEffectsDigest::random();
3835 let actual_effects = TransactionEffectsDigest::random();
3836
3837 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3838
3839 store
3840 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3841 .unwrap();
3842
3843 let retrieved = store.get_transaction_fork_detected().unwrap();
3844 assert!(retrieved.is_some());
3845 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3846 assert_eq!(retrieved_tx, tx_digest);
3847 assert_eq!(retrieved_expected, expected_effects);
3848 assert_eq!(retrieved_actual, actual_effects);
3849
3850 store.clear_transaction_fork_detected().unwrap();
3851 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3852 }
3853
3854 #[sim_test]
3855 pub async fn checkpoint_builder_test() {
3856 telemetry_subscribers::init_for_testing();
3857
3858 let mut protocol_config =
3859 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3860 protocol_config.disable_accumulators_for_testing();
3861 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3862 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3863 let state = TestAuthorityBuilder::new()
3864 .with_protocol_config(protocol_config)
3865 .build()
3866 .await;
3867
3868 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3869 0,
3870 0,
3871 vec![],
3872 SequenceNumber::new(),
3873 );
3874
3875 let jwks = {
3876 let mut jwks = Vec::new();
3877 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3878 jwks.push(ActiveJwk {
3879 jwk_id: JwkId::new(
3880 "https://accounts.google.com".to_string(),
3881 "1234567890".to_string(),
3882 ),
3883 jwk: JWK {
3884 kty: "RSA".to_string(),
3885 e: "AQAB".to_string(),
3886 n: "1234567890".to_string(),
3887 alg: "RS256".to_string(),
3888 },
3889 epoch: 0,
3890 });
3891 }
3892 jwks
3893 };
3894
3895 let dummy_tx_with_data =
3896 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3897
3898 for i in 0..15 {
3899 state
3900 .database_for_testing()
3901 .perpetual_tables
3902 .transactions
3903 .insert(&d(i), dummy_tx.serializable_ref())
3904 .unwrap();
3905 }
3906 for i in 15..20 {
3907 state
3908 .database_for_testing()
3909 .perpetual_tables
3910 .transactions
3911 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3912 .unwrap();
3913 }
3914
3915 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3916 commit_cert_for_test(
3917 &mut store,
3918 state.clone(),
3919 d(1),
3920 vec![d(2), d(3)],
3921 GasCostSummary::new(11, 12, 11, 1),
3922 );
3923 commit_cert_for_test(
3924 &mut store,
3925 state.clone(),
3926 d(2),
3927 vec![d(3), d(4)],
3928 GasCostSummary::new(21, 22, 21, 1),
3929 );
3930 commit_cert_for_test(
3931 &mut store,
3932 state.clone(),
3933 d(3),
3934 vec![],
3935 GasCostSummary::new(31, 32, 31, 1),
3936 );
3937 commit_cert_for_test(
3938 &mut store,
3939 state.clone(),
3940 d(4),
3941 vec![],
3942 GasCostSummary::new(41, 42, 41, 1),
3943 );
3944 for i in [5, 6, 7, 10, 11, 12, 13] {
3945 commit_cert_for_test(
3946 &mut store,
3947 state.clone(),
3948 d(i),
3949 vec![],
3950 GasCostSummary::new(41, 42, 41, 1),
3951 );
3952 }
3953 for i in [15, 16, 17] {
3954 commit_cert_for_test(
3955 &mut store,
3956 state.clone(),
3957 d(i),
3958 vec![],
3959 GasCostSummary::new(51, 52, 51, 1),
3960 );
3961 }
3962 let all_digests: Vec<_> = store.keys().copied().collect();
3963 for digest in all_digests {
3964 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3965 state
3966 .epoch_store_for_testing()
3967 .test_insert_user_signature(digest, vec![(signature, None)]);
3968 }
3969
3970 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3971 let (certified_output, mut certified_result) =
3972 mpsc::channel::<CertifiedCheckpointSummary>(10);
3973 let store = Arc::new(store);
3974
3975 let ckpt_dir = tempfile::tempdir().unwrap();
3976 let checkpoint_store =
3977 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3978 let epoch_store = state.epoch_store_for_testing();
3979
3980 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3981 state.get_global_state_hash_store().clone(),
3982 ));
3983
3984 let checkpoint_service = CheckpointService::build(
3985 state.clone(),
3986 checkpoint_store,
3987 epoch_store.clone(),
3988 store,
3989 Arc::downgrade(&global_state_hasher),
3990 Box::new(output),
3991 Box::new(certified_output),
3992 CheckpointMetrics::new_for_tests(),
3993 3,
3994 100_000,
3995 );
3996 checkpoint_service.spawn(epoch_store.clone(), None).await;
3997
3998 checkpoint_service
3999 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4000 .unwrap();
4001 checkpoint_service
4002 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4003 .unwrap();
4004 checkpoint_service
4005 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4006 .unwrap();
4007 checkpoint_service
4008 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4009 .unwrap();
4010 checkpoint_service
4011 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4012 .unwrap();
4013 checkpoint_service
4014 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4015 .unwrap();
4016
4017 let (c1c, c1s) = result.recv().await.unwrap();
4018 let (c2c, c2s) = result.recv().await.unwrap();
4019
4020 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4021 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4022 assert_eq!(c1t, vec![d(4)]);
4023 assert_eq!(c1s.previous_digest, None);
4024 assert_eq!(c1s.sequence_number, 0);
4025 assert_eq!(
4026 c1s.epoch_rolling_gas_cost_summary,
4027 GasCostSummary::new(41, 42, 41, 1)
4028 );
4029
4030 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4031 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4032 assert_eq!(c2s.sequence_number, 1);
4033 assert_eq!(
4034 c2s.epoch_rolling_gas_cost_summary,
4035 GasCostSummary::new(104, 108, 104, 4)
4036 );
4037
4038 let (c3c, c3s) = result.recv().await.unwrap();
4041 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4042 let (c4c, c4s) = result.recv().await.unwrap();
4043 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4044 assert_eq!(c3s.sequence_number, 2);
4045 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4046 assert_eq!(c4s.sequence_number, 3);
4047 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4048 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4049 assert_eq!(c4t, vec![d(13)]);
4050
4051 let (c5c, c5s) = result.recv().await.unwrap();
4054 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055 let (c6c, c6s) = result.recv().await.unwrap();
4056 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4057 assert_eq!(c5s.sequence_number, 4);
4058 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4059 assert_eq!(c6s.sequence_number, 5);
4060 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4061 assert_eq!(c5t, vec![d(15), d(16)]);
4062 assert_eq!(c6t, vec![d(17)]);
4063
4064 let (c7c, c7s) = result.recv().await.unwrap();
4067 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4068 assert_eq!(c7t, vec![d(5), d(6)]);
4069 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4070 assert_eq!(c7s.sequence_number, 6);
4071
4072 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4073 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4074
4075 checkpoint_service
4076 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4077 .unwrap();
4078 checkpoint_service
4079 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4080 .unwrap();
4081
4082 let c1sc = certified_result.recv().await.unwrap();
4083 let c2sc = certified_result.recv().await.unwrap();
4084 assert_eq!(c1sc.sequence_number, 0);
4085 assert_eq!(c2sc.sequence_number, 1);
4086 }
4087
4088 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4089 fn notify_read_executed_effects_may_fail(
4090 &self,
4091 _: &str,
4092 digests: &[TransactionDigest],
4093 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4094 std::future::ready(Ok(digests
4095 .iter()
4096 .map(|d| self.get(d).expect("effects not found").clone())
4097 .collect()))
4098 .boxed()
4099 }
4100
4101 fn notify_read_executed_effects_digests(
4102 &self,
4103 _: &str,
4104 digests: &[TransactionDigest],
4105 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4106 std::future::ready(
4107 digests
4108 .iter()
4109 .map(|d| {
4110 self.get(d)
4111 .map(|fx| fx.digest())
4112 .expect("effects not found")
4113 })
4114 .collect(),
4115 )
4116 .boxed()
4117 }
4118
4119 fn multi_get_executed_effects(
4120 &self,
4121 digests: &[TransactionDigest],
4122 ) -> Vec<Option<TransactionEffects>> {
4123 digests.iter().map(|d| self.get(d).cloned()).collect()
4124 }
4125
4126 fn multi_get_transaction_blocks(
4132 &self,
4133 _: &[TransactionDigest],
4134 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4135 unimplemented!()
4136 }
4137
4138 fn multi_get_executed_effects_digests(
4139 &self,
4140 _: &[TransactionDigest],
4141 ) -> Vec<Option<TransactionEffectsDigest>> {
4142 unimplemented!()
4143 }
4144
4145 fn multi_get_effects(
4146 &self,
4147 _: &[TransactionEffectsDigest],
4148 ) -> Vec<Option<TransactionEffects>> {
4149 unimplemented!()
4150 }
4151
4152 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4153 unimplemented!()
4154 }
4155
4156 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4157 unimplemented!()
4158 }
4159
4160 fn get_unchanged_loaded_runtime_objects(
4161 &self,
4162 _digest: &TransactionDigest,
4163 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4164 unimplemented!()
4165 }
4166
4167 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4168 unimplemented!()
4169 }
4170 }
4171
4172 #[async_trait::async_trait]
4173 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4174 async fn checkpoint_created(
4175 &self,
4176 summary: &CheckpointSummary,
4177 contents: &CheckpointContents,
4178 _epoch_store: &Arc<AuthorityPerEpochStore>,
4179 _checkpoint_store: &Arc<CheckpointStore>,
4180 ) -> SuiResult {
4181 self.try_send((contents.clone(), summary.clone())).unwrap();
4182 Ok(())
4183 }
4184 }
4185
4186 #[async_trait::async_trait]
4187 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4188 async fn certified_checkpoint_created(
4189 &self,
4190 summary: &CertifiedCheckpointSummary,
4191 ) -> SuiResult {
4192 self.try_send(summary.clone()).unwrap();
4193 Ok(())
4194 }
4195 }
4196
4197 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4198 PendingCheckpoint {
4199 roots: t
4200 .into_iter()
4201 .map(|t| TransactionKey::Digest(d(t)))
4202 .collect(),
4203 details: PendingCheckpointInfo {
4204 timestamp_ms,
4205 last_of_epoch: false,
4206 checkpoint_height: i,
4207 consensus_commit_ref: CommitRef::default(),
4208 rejected_transactions_digest: Digest::default(),
4209 checkpoint_seq: None,
4210 },
4211 }
4212 }
4213
4214 fn d(i: u8) -> TransactionDigest {
4215 let mut bytes: [u8; 32] = Default::default();
4216 bytes[0] = i;
4217 TransactionDigest::new(bytes)
4218 }
4219
4220 fn e(
4221 transaction_digest: TransactionDigest,
4222 dependencies: Vec<TransactionDigest>,
4223 gas_used: GasCostSummary,
4224 ) -> TransactionEffects {
4225 let mut effects = TransactionEffects::default();
4226 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4227 *effects.dependencies_mut_for_testing() = dependencies;
4228 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4229 effects
4230 }
4231
4232 fn commit_cert_for_test(
4233 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4234 state: Arc<AuthorityState>,
4235 digest: TransactionDigest,
4236 dependencies: Vec<TransactionDigest>,
4237 gas_used: GasCostSummary,
4238 ) {
4239 let epoch_store = state.epoch_store_for_testing();
4240 let effects = e(digest, dependencies, gas_used);
4241 store.insert(digest, effects.clone());
4242 epoch_store.insert_executed_in_epoch(&digest);
4243 }
4244}