1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116 pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123 pub roots: Vec<TransactionKey>,
124 pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129 pub tx_roots: Vec<TransactionKey>,
130 pub settlement_root: Option<TransactionKey>,
131 pub height: CheckpointHeight,
132}
133
134#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142 pub roots: Vec<CheckpointRoots>,
143 pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148 pub summary: CheckpointSummary,
149 pub checkpoint_height: Option<CheckpointHeight>,
151 pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160 pub(crate) checkpoint_sequence_by_contents_digest:
162 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189 pub(crate) transaction_fork_detected: DBMap<
191 u8,
192 (
193 TransactionDigest,
194 TransactionEffectsDigest,
195 TransactionEffectsDigest,
196 ),
197 >,
198 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203 DBOptions {
204 options: default_db_options().options,
205 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209 }
210}
211
212impl CheckpointStoreTables {
213 #[cfg(not(tidehunter))]
214 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216 }
217
218 #[cfg(tidehunter)]
219 pub fn new(
220 path: &Path,
221 metric_name: &'static str,
222 pruner_watermarks: Arc<PrunerWatermarks>,
223 ) -> Self {
224 tracing::warn!("Checkpoint DB using tidehunter");
225 use crate::authority::authority_store_pruner::apply_relocation_filter;
226 use typed_store::tidehunter_util::{
227 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228 default_mutex_count, default_value_cache_size,
229 };
230 let mutexes = default_mutex_count();
231 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232 let override_dirty_keys_config = KeySpaceConfig::new()
233 .with_max_dirty_keys(16_000)
234 .with_value_cache_size(default_value_cache_size());
235 let config_u64 = ThConfig::new_with_config(
236 8,
237 mutexes,
238 u64_sequence_key,
239 override_dirty_keys_config.clone(),
240 );
241 let digest_config = ThConfig::new_with_rm_prefix(
242 32,
243 mutexes,
244 KeyType::uniform(default_cells_per_mutex()),
245 KeySpaceConfig::default(),
246 vec![0, 0, 0, 0, 0, 0, 0, 32],
247 );
248 let watermarks_config = KeySpaceConfig::new()
249 .with_value_cache_size(10)
250 .disable_unload();
251 let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
252 let configs = vec![
253 (
254 "checkpoint_content",
255 digest_config.clone().with_config(
256 KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
257 ),
258 ),
259 (
260 "checkpoint_sequence_by_contents_digest",
261 digest_config.clone().with_config(apply_relocation_filter(
262 KeySpaceConfig::default(),
263 pruner_watermarks.checkpoint_id.clone(),
264 |sequence_number: CheckpointSequenceNumber| sequence_number,
265 false,
266 )),
267 ),
268 (
269 "full_checkpoint_content",
270 config_u64.clone().with_config(apply_relocation_filter(
271 override_dirty_keys_config.clone(),
272 pruner_watermarks.checkpoint_id.clone(),
273 |sequence_number: CheckpointSequenceNumber| sequence_number,
274 true,
275 )),
276 ),
277 ("certified_checkpoints", config_u64.clone()),
278 (
279 "checkpoint_by_digest",
280 digest_config.clone().with_config(apply_relocation_filter(
281 lru_config,
282 pruner_watermarks.epoch_id.clone(),
283 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
284 false,
285 )),
286 ),
287 (
288 "locally_computed_checkpoints",
289 config_u64.clone().with_config(apply_relocation_filter(
290 override_dirty_keys_config.clone(),
291 pruner_watermarks.checkpoint_id.clone(),
292 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
293 true,
294 )),
295 ),
296 ("epoch_last_checkpoint_map", config_u64.clone()),
297 (
298 "watermarks",
299 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
300 ),
301 (
302 "transaction_fork_detected",
303 ThConfig::new_with_config(
304 1,
305 1,
306 KeyType::uniform(1),
307 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
308 ),
309 ),
310 (
311 "full_checkpoint_content_v2",
312 config_u64.clone().with_config(apply_relocation_filter(
313 override_dirty_keys_config.clone(),
314 pruner_watermarks.checkpoint_id.clone(),
315 |sequence_number: CheckpointSequenceNumber| sequence_number,
316 true,
317 )),
318 ),
319 ];
320 Self::open_tables_read_write(
321 path.to_path_buf(),
322 MetricConf::new(metric_name),
323 configs
324 .into_iter()
325 .map(|(cf, config)| (cf.to_string(), config))
326 .collect(),
327 )
328 }
329
330 #[cfg(not(tidehunter))]
331 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
332 Self::get_read_only_handle(
333 path.to_path_buf(),
334 None,
335 None,
336 MetricConf::new("checkpoint_readonly"),
337 )
338 }
339
340 #[cfg(tidehunter)]
341 pub fn open_readonly(path: &Path) -> Self {
342 Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
343 }
344}
345
346pub struct CheckpointStore {
347 pub(crate) tables: CheckpointStoreTables,
348 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
349 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
350}
351
352impl CheckpointStore {
353 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
354 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
355 Arc::new(Self {
356 tables,
357 synced_checkpoint_notify_read: NotifyRead::new(),
358 executed_checkpoint_notify_read: NotifyRead::new(),
359 })
360 }
361
362 pub fn new_for_tests() -> Arc<Self> {
363 let ckpt_dir = mysten_common::tempdir().unwrap();
364 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
365 }
366
367 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
368 let tables = CheckpointStoreTables::new(
369 path,
370 "db_checkpoint",
371 Arc::new(PrunerWatermarks::default()),
372 );
373 Arc::new(Self {
374 tables,
375 synced_checkpoint_notify_read: NotifyRead::new(),
376 executed_checkpoint_notify_read: NotifyRead::new(),
377 })
378 }
379
380 #[cfg(not(tidehunter))]
381 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
382 CheckpointStoreTables::open_readonly(path)
383 }
384
385 #[cfg(tidehunter)]
386 pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
387 CheckpointStoreTables::open_readonly(path)
388 }
389
390 #[instrument(level = "info", skip_all)]
391 pub fn insert_genesis_checkpoint(
392 &self,
393 checkpoint: VerifiedCheckpoint,
394 contents: CheckpointContents,
395 epoch_store: &AuthorityPerEpochStore,
396 ) {
397 assert_eq!(
398 checkpoint.epoch(),
399 0,
400 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
401 );
402 assert_eq!(
403 *checkpoint.sequence_number(),
404 0,
405 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
406 );
407
408 match self.get_checkpoint_by_sequence_number(0).unwrap() {
410 Some(existing_checkpoint) => {
411 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
412 }
413 None => {
414 if epoch_store.epoch() == checkpoint.epoch {
415 epoch_store
416 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
417 .unwrap();
418 } else {
419 debug!(
420 validator_epoch =% epoch_store.epoch(),
421 genesis_epoch =% checkpoint.epoch(),
422 "Not inserting checkpoint builder data for genesis checkpoint",
423 );
424 }
425 self.insert_checkpoint_contents(contents).unwrap();
426 self.insert_verified_checkpoint(&checkpoint).unwrap();
427 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
428 }
429 }
430 }
431
432 pub fn get_checkpoint_by_digest(
433 &self,
434 digest: &CheckpointDigest,
435 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
436 self.tables
437 .checkpoint_by_digest
438 .get(digest)
439 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
440 }
441
442 pub fn get_checkpoint_by_sequence_number(
443 &self,
444 sequence_number: CheckpointSequenceNumber,
445 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
446 self.tables
447 .certified_checkpoints
448 .get(&sequence_number)
449 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
450 }
451
452 pub fn get_locally_computed_checkpoint(
453 &self,
454 sequence_number: CheckpointSequenceNumber,
455 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
456 self.tables
457 .locally_computed_checkpoints
458 .get(&sequence_number)
459 }
460
461 pub fn multi_get_locally_computed_checkpoints(
462 &self,
463 sequence_numbers: &[CheckpointSequenceNumber],
464 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
465 let checkpoints = self
466 .tables
467 .locally_computed_checkpoints
468 .multi_get(sequence_numbers)?;
469
470 Ok(checkpoints)
471 }
472
473 pub fn get_sequence_number_by_contents_digest(
474 &self,
475 digest: &CheckpointContentsDigest,
476 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
477 self.tables
478 .checkpoint_sequence_by_contents_digest
479 .get(digest)
480 }
481
482 pub fn delete_contents_digest_sequence_number_mapping(
483 &self,
484 digest: &CheckpointContentsDigest,
485 ) -> Result<(), TypedStoreError> {
486 self.tables
487 .checkpoint_sequence_by_contents_digest
488 .remove(digest)
489 }
490
491 pub fn get_latest_certified_checkpoint(
492 &self,
493 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
494 Ok(self
495 .tables
496 .certified_checkpoints
497 .reversed_safe_iter_with_bounds(None, None)?
498 .next()
499 .transpose()?
500 .map(|(_, v)| v.into()))
501 }
502
503 pub fn get_latest_locally_computed_checkpoint(
504 &self,
505 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
506 Ok(self
507 .tables
508 .locally_computed_checkpoints
509 .reversed_safe_iter_with_bounds(None, None)?
510 .next()
511 .transpose()?
512 .map(|(_, v)| v))
513 }
514
515 pub fn multi_get_checkpoint_by_sequence_number(
516 &self,
517 sequence_numbers: &[CheckpointSequenceNumber],
518 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
519 let checkpoints = self
520 .tables
521 .certified_checkpoints
522 .multi_get(sequence_numbers)?
523 .into_iter()
524 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
525 .collect();
526
527 Ok(checkpoints)
528 }
529
530 pub fn multi_get_checkpoint_content(
531 &self,
532 contents_digest: &[CheckpointContentsDigest],
533 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
534 self.tables.checkpoint_content.multi_get(contents_digest)
535 }
536
537 pub fn get_highest_verified_checkpoint(
538 &self,
539 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
540 let highest_verified = if let Some(highest_verified) = self
541 .tables
542 .watermarks
543 .get(&CheckpointWatermark::HighestVerified)?
544 {
545 highest_verified
546 } else {
547 return Ok(None);
548 };
549 self.get_checkpoint_by_digest(&highest_verified.1)
550 }
551
552 pub fn get_highest_synced_checkpoint(
553 &self,
554 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
555 let highest_synced = if let Some(highest_synced) = self
556 .tables
557 .watermarks
558 .get(&CheckpointWatermark::HighestSynced)?
559 {
560 highest_synced
561 } else {
562 return Ok(None);
563 };
564 self.get_checkpoint_by_digest(&highest_synced.1)
565 }
566
567 pub fn get_highest_synced_checkpoint_seq_number(
568 &self,
569 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
570 if let Some(highest_synced) = self
571 .tables
572 .watermarks
573 .get(&CheckpointWatermark::HighestSynced)?
574 {
575 Ok(Some(highest_synced.0))
576 } else {
577 Ok(None)
578 }
579 }
580
581 pub fn get_highest_executed_checkpoint_seq_number(
582 &self,
583 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
584 if let Some(highest_executed) = self
585 .tables
586 .watermarks
587 .get(&CheckpointWatermark::HighestExecuted)?
588 {
589 Ok(Some(highest_executed.0))
590 } else {
591 Ok(None)
592 }
593 }
594
595 pub fn get_highest_executed_checkpoint(
596 &self,
597 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
598 let highest_executed = if let Some(highest_executed) = self
599 .tables
600 .watermarks
601 .get(&CheckpointWatermark::HighestExecuted)?
602 {
603 highest_executed
604 } else {
605 return Ok(None);
606 };
607 self.get_checkpoint_by_digest(&highest_executed.1)
608 }
609
610 pub fn get_highest_pruned_checkpoint_seq_number(
611 &self,
612 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
613 self.tables
614 .watermarks
615 .get(&CheckpointWatermark::HighestPruned)
616 .map(|watermark| watermark.map(|w| w.0))
617 }
618
619 pub fn get_checkpoint_contents(
620 &self,
621 digest: &CheckpointContentsDigest,
622 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
623 self.tables.checkpoint_content.get(digest)
624 }
625
626 pub fn get_full_checkpoint_contents_by_sequence_number(
627 &self,
628 seq: CheckpointSequenceNumber,
629 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
630 self.tables.full_checkpoint_content_v2.get(&seq)
631 }
632
633 fn prune_local_summaries(&self) -> SuiResult {
634 if let Some((last_local_summary, _)) = self
635 .tables
636 .locally_computed_checkpoints
637 .reversed_safe_iter_with_bounds(None, None)?
638 .next()
639 .transpose()?
640 {
641 let mut batch = self.tables.locally_computed_checkpoints.batch();
642 batch.schedule_delete_range(
643 &self.tables.locally_computed_checkpoints,
644 &0,
645 &last_local_summary,
646 )?;
647 batch.write()?;
648 info!("Pruned local summaries up to {:?}", last_local_summary);
649 }
650 Ok(())
651 }
652
653 pub fn clear_locally_computed_checkpoints_from(
654 &self,
655 from_seq: CheckpointSequenceNumber,
656 ) -> SuiResult {
657 let keys: Vec<_> = self
658 .tables
659 .locally_computed_checkpoints
660 .safe_iter_with_bounds(Some(from_seq), None)
661 .map(|r| r.map(|(k, _)| k))
662 .collect::<Result<_, _>>()?;
663 if let Some(&last_local_summary) = keys.last() {
664 let mut batch = self.tables.locally_computed_checkpoints.batch();
665 batch
666 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
667 .expect("Failed to delete locally computed checkpoints");
668 batch
669 .write()
670 .expect("Failed to delete locally computed checkpoints");
671 warn!(
672 from_seq,
673 last_local_summary,
674 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
675 from_seq,
676 last_local_summary
677 );
678 }
679 Ok(())
680 }
681
682 fn check_for_checkpoint_fork(
683 &self,
684 local_checkpoint: &CheckpointSummary,
685 verified_checkpoint: &VerifiedCheckpoint,
686 ) {
687 if local_checkpoint != verified_checkpoint.data() {
688 let verified_contents = self
689 .get_checkpoint_contents(&verified_checkpoint.content_digest)
690 .map(|opt_contents| {
691 opt_contents
692 .map(|contents| format!("{:?}", contents))
693 .unwrap_or_else(|| {
694 format!(
695 "Verified checkpoint contents not found, digest: {:?}",
696 verified_checkpoint.content_digest,
697 )
698 })
699 })
700 .map_err(|e| {
701 format!(
702 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
703 verified_checkpoint.content_digest, e
704 )
705 })
706 .unwrap_or_else(|err_msg| err_msg);
707
708 let local_contents = self
709 .get_checkpoint_contents(&local_checkpoint.content_digest)
710 .map(|opt_contents| {
711 opt_contents
712 .map(|contents| format!("{:?}", contents))
713 .unwrap_or_else(|| {
714 format!(
715 "Local checkpoint contents not found, digest: {:?}",
716 local_checkpoint.content_digest
717 )
718 })
719 })
720 .map_err(|e| {
721 format!(
722 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
723 local_checkpoint.content_digest, e
724 )
725 })
726 .unwrap_or_else(|err_msg| err_msg);
727
728 error!(
730 verified_checkpoint = ?verified_checkpoint.data(),
731 ?verified_contents,
732 ?local_checkpoint,
733 ?local_contents,
734 "Local checkpoint fork detected!",
735 );
736
737 if let Err(e) = self.record_checkpoint_fork_detected(
739 *local_checkpoint.sequence_number(),
740 local_checkpoint.digest(),
741 ) {
742 error!("Failed to record checkpoint fork in database: {:?}", e);
743 }
744
745 fail_point_arg!(
746 "kill_checkpoint_fork_node",
747 |checkpoint_overrides: std::sync::Arc<
748 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
749 >| {
750 #[cfg(msim)]
751 {
752 if let Ok(mut overrides) = checkpoint_overrides.lock() {
753 overrides.insert(
754 local_checkpoint.sequence_number,
755 verified_checkpoint.digest().to_string(),
756 );
757 }
758 tracing::error!(
759 fatal = true,
760 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
761 local_checkpoint.sequence_number(),
762 verified_checkpoint.digest()
763 );
764 sui_simulator::task::shutdown_current_node();
765 }
766 }
767 );
768
769 fatal!(
770 "Local checkpoint fork detected for sequence number: {}",
771 local_checkpoint.sequence_number()
772 );
773 }
774 }
775
776 pub fn insert_certified_checkpoint(
782 &self,
783 checkpoint: &VerifiedCheckpoint,
784 ) -> Result<(), TypedStoreError> {
785 debug!(
786 checkpoint_seq = checkpoint.sequence_number(),
787 "Inserting certified checkpoint",
788 );
789 let mut batch = self.tables.certified_checkpoints.batch();
790 batch
791 .insert_batch(
792 &self.tables.certified_checkpoints,
793 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
794 )?
795 .insert_batch(
796 &self.tables.checkpoint_by_digest,
797 [(checkpoint.digest(), checkpoint.serializable_ref())],
798 )?;
799 if checkpoint.next_epoch_committee().is_some() {
800 batch.insert_batch(
801 &self.tables.epoch_last_checkpoint_map,
802 [(&checkpoint.epoch(), checkpoint.sequence_number())],
803 )?;
804 }
805 batch.write()?;
806
807 if let Some(local_checkpoint) = self
808 .tables
809 .locally_computed_checkpoints
810 .get(checkpoint.sequence_number())?
811 {
812 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
813 }
814
815 Ok(())
816 }
817
818 #[instrument(level = "debug", skip_all)]
821 pub fn insert_verified_checkpoint(
822 &self,
823 checkpoint: &VerifiedCheckpoint,
824 ) -> Result<(), TypedStoreError> {
825 self.insert_certified_checkpoint(checkpoint)?;
826 self.update_highest_verified_checkpoint(checkpoint)
827 }
828
829 pub fn update_highest_verified_checkpoint(
830 &self,
831 checkpoint: &VerifiedCheckpoint,
832 ) -> Result<(), TypedStoreError> {
833 if Some(*checkpoint.sequence_number())
834 > self
835 .get_highest_verified_checkpoint()?
836 .map(|x| *x.sequence_number())
837 {
838 debug!(
839 checkpoint_seq = checkpoint.sequence_number(),
840 "Updating highest verified checkpoint",
841 );
842 self.tables.watermarks.insert(
843 &CheckpointWatermark::HighestVerified,
844 &(*checkpoint.sequence_number(), *checkpoint.digest()),
845 )?;
846 }
847
848 Ok(())
849 }
850
851 pub fn update_highest_synced_checkpoint(
852 &self,
853 checkpoint: &VerifiedCheckpoint,
854 ) -> Result<(), TypedStoreError> {
855 let seq = *checkpoint.sequence_number();
856 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
857 self.tables.watermarks.insert(
858 &CheckpointWatermark::HighestSynced,
859 &(seq, *checkpoint.digest()),
860 )?;
861 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
862 Ok(())
863 }
864
865 async fn notify_read_checkpoint_watermark<F>(
866 &self,
867 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
868 seq: CheckpointSequenceNumber,
869 get_watermark: F,
870 ) -> VerifiedCheckpoint
871 where
872 F: Fn() -> Option<CheckpointSequenceNumber>,
873 {
874 notify_read
875 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
876 let seq = seqs[0];
877 let Some(highest) = get_watermark() else {
878 return vec![None];
879 };
880 if highest < seq {
881 return vec![None];
882 }
883 let checkpoint = self
884 .get_checkpoint_by_sequence_number(seq)
885 .expect("db error")
886 .expect("checkpoint not found");
887 vec![Some(checkpoint)]
888 })
889 .await
890 .into_iter()
891 .next()
892 .unwrap()
893 }
894
895 pub async fn notify_read_synced_checkpoint(
896 &self,
897 seq: CheckpointSequenceNumber,
898 ) -> VerifiedCheckpoint {
899 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
900 self.get_highest_synced_checkpoint_seq_number()
901 .expect("db error")
902 })
903 .await
904 }
905
906 pub async fn notify_read_executed_checkpoint(
907 &self,
908 seq: CheckpointSequenceNumber,
909 ) -> VerifiedCheckpoint {
910 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
911 self.get_highest_executed_checkpoint_seq_number()
912 .expect("db error")
913 })
914 .await
915 }
916
917 pub fn update_highest_executed_checkpoint(
918 &self,
919 checkpoint: &VerifiedCheckpoint,
920 ) -> Result<(), TypedStoreError> {
921 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
922 if seq_number >= *checkpoint.sequence_number() {
923 return Ok(());
924 }
925 assert_eq!(
926 seq_number + 1,
927 *checkpoint.sequence_number(),
928 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
929 checkpoint.sequence_number(),
930 seq_number
931 );
932 }
933 let seq = *checkpoint.sequence_number();
934 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
935 self.tables.watermarks.insert(
936 &CheckpointWatermark::HighestExecuted,
937 &(seq, *checkpoint.digest()),
938 )?;
939 self.executed_checkpoint_notify_read
940 .notify(&seq, checkpoint);
941 Ok(())
942 }
943
944 pub fn update_highest_pruned_checkpoint(
945 &self,
946 checkpoint: &VerifiedCheckpoint,
947 ) -> Result<(), TypedStoreError> {
948 self.tables.watermarks.insert(
949 &CheckpointWatermark::HighestPruned,
950 &(*checkpoint.sequence_number(), *checkpoint.digest()),
951 )
952 }
953
954 pub fn set_highest_executed_checkpoint_subtle(
959 &self,
960 checkpoint: &VerifiedCheckpoint,
961 ) -> Result<(), TypedStoreError> {
962 self.tables.watermarks.insert(
963 &CheckpointWatermark::HighestExecuted,
964 &(*checkpoint.sequence_number(), *checkpoint.digest()),
965 )
966 }
967
968 pub fn insert_checkpoint_contents(
969 &self,
970 contents: CheckpointContents,
971 ) -> Result<(), TypedStoreError> {
972 debug!(
973 checkpoint_seq = ?contents.digest(),
974 "Inserting checkpoint contents",
975 );
976 self.tables
977 .checkpoint_content
978 .insert(contents.digest(), &contents)
979 }
980
981 pub fn insert_verified_checkpoint_contents(
982 &self,
983 checkpoint: &VerifiedCheckpoint,
984 full_contents: VerifiedCheckpointContents,
985 ) -> Result<(), TypedStoreError> {
986 let mut batch = self.tables.full_checkpoint_content_v2.batch();
987 batch.insert_batch(
988 &self.tables.checkpoint_sequence_by_contents_digest,
989 [(&checkpoint.content_digest, checkpoint.sequence_number())],
990 )?;
991 let full_contents = full_contents.into_inner();
992 batch.insert_batch(
993 &self.tables.full_checkpoint_content_v2,
994 [(checkpoint.sequence_number(), &full_contents)],
995 )?;
996
997 let contents = full_contents.into_checkpoint_contents();
998 assert_eq!(&checkpoint.content_digest, contents.digest());
999
1000 batch.insert_batch(
1001 &self.tables.checkpoint_content,
1002 [(contents.digest(), &contents)],
1003 )?;
1004
1005 batch.write()
1006 }
1007
1008 pub fn delete_full_checkpoint_contents(
1009 &self,
1010 seq: CheckpointSequenceNumber,
1011 ) -> Result<(), TypedStoreError> {
1012 self.tables.full_checkpoint_content.remove(&seq)?;
1013 self.tables.full_checkpoint_content_v2.remove(&seq)
1014 }
1015
1016 pub fn get_epoch_last_checkpoint(
1017 &self,
1018 epoch_id: EpochId,
1019 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1020 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1021 let checkpoint = match seq {
1022 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1023 None => None,
1024 };
1025 Ok(checkpoint)
1026 }
1027
1028 pub fn get_epoch_last_checkpoint_seq_number(
1029 &self,
1030 epoch_id: EpochId,
1031 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1032 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1033 Ok(seq)
1034 }
1035
1036 pub fn insert_epoch_last_checkpoint(
1037 &self,
1038 epoch_id: EpochId,
1039 checkpoint: &VerifiedCheckpoint,
1040 ) -> SuiResult {
1041 self.tables
1042 .epoch_last_checkpoint_map
1043 .insert(&epoch_id, checkpoint.sequence_number())?;
1044 Ok(())
1045 }
1046
1047 pub fn get_epoch_state_commitments(
1048 &self,
1049 epoch: EpochId,
1050 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1051 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1052 checkpoint
1053 .end_of_epoch_data
1054 .as_ref()
1055 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1056 .epoch_commitments
1057 .clone()
1058 });
1059 Ok(commitments)
1060 }
1061
1062 pub fn get_epoch_stats(
1064 &self,
1065 epoch: EpochId,
1066 last_checkpoint: &CheckpointSummary,
1067 ) -> Option<EpochStats> {
1068 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1069 (0, 0)
1070 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1071 (
1072 checkpoint.sequence_number + 1,
1073 checkpoint.network_total_transactions,
1074 )
1075 } else {
1076 return None;
1077 };
1078 Some(EpochStats {
1079 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1080 transaction_count: last_checkpoint.network_total_transactions
1081 - prev_epoch_network_transactions,
1082 total_gas_reward: last_checkpoint
1083 .epoch_rolling_gas_cost_summary
1084 .computation_cost,
1085 })
1086 }
1087
1088 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1089 self.tables
1091 .checkpoint_content
1092 .checkpoint_db(path)
1093 .map_err(Into::into)
1094 }
1095
1096 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1097 let mut wb = self.tables.watermarks.batch();
1098 wb.delete_batch(
1099 &self.tables.watermarks,
1100 std::iter::once(CheckpointWatermark::HighestExecuted),
1101 )?;
1102 wb.write()?;
1103 Ok(())
1104 }
1105
1106 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1107 self.delete_highest_executed_checkpoint_test_only()?;
1108 Ok(())
1109 }
1110
1111 pub fn record_checkpoint_fork_detected(
1112 &self,
1113 checkpoint_seq: CheckpointSequenceNumber,
1114 checkpoint_digest: CheckpointDigest,
1115 ) -> Result<(), TypedStoreError> {
1116 info!(
1117 checkpoint_seq = checkpoint_seq,
1118 checkpoint_digest = ?checkpoint_digest,
1119 "Recording checkpoint fork detection in database"
1120 );
1121 self.tables.watermarks.insert(
1122 &CheckpointWatermark::CheckpointForkDetected,
1123 &(checkpoint_seq, checkpoint_digest),
1124 )
1125 }
1126
1127 pub fn get_checkpoint_fork_detected(
1128 &self,
1129 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1130 self.tables
1131 .watermarks
1132 .get(&CheckpointWatermark::CheckpointForkDetected)
1133 }
1134
1135 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1136 self.tables
1137 .watermarks
1138 .remove(&CheckpointWatermark::CheckpointForkDetected)
1139 }
1140
1141 pub fn record_transaction_fork_detected(
1142 &self,
1143 tx_digest: TransactionDigest,
1144 expected_effects_digest: TransactionEffectsDigest,
1145 actual_effects_digest: TransactionEffectsDigest,
1146 ) -> Result<(), TypedStoreError> {
1147 info!(
1148 tx_digest = ?tx_digest,
1149 expected_effects_digest = ?expected_effects_digest,
1150 actual_effects_digest = ?actual_effects_digest,
1151 "Recording transaction fork detection in database"
1152 );
1153 self.tables.transaction_fork_detected.insert(
1154 &TRANSACTION_FORK_DETECTED_KEY,
1155 &(tx_digest, expected_effects_digest, actual_effects_digest),
1156 )
1157 }
1158
1159 pub fn get_transaction_fork_detected(
1160 &self,
1161 ) -> Result<
1162 Option<(
1163 TransactionDigest,
1164 TransactionEffectsDigest,
1165 TransactionEffectsDigest,
1166 )>,
1167 TypedStoreError,
1168 > {
1169 self.tables
1170 .transaction_fork_detected
1171 .get(&TRANSACTION_FORK_DETECTED_KEY)
1172 }
1173
1174 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1175 self.tables
1176 .transaction_fork_detected
1177 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1178 }
1179}
1180
1181#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1182pub enum CheckpointWatermark {
1183 HighestVerified,
1184 HighestSynced,
1185 HighestExecuted,
1186 HighestPruned,
1187 CheckpointForkDetected,
1188}
1189
1190struct CheckpointStateHasher {
1191 epoch_store: Arc<AuthorityPerEpochStore>,
1192 hasher: Weak<GlobalStateHasher>,
1193 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1194}
1195
1196impl CheckpointStateHasher {
1197 fn new(
1198 epoch_store: Arc<AuthorityPerEpochStore>,
1199 hasher: Weak<GlobalStateHasher>,
1200 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1201 ) -> Self {
1202 Self {
1203 epoch_store,
1204 hasher,
1205 receive_from_builder,
1206 }
1207 }
1208
1209 async fn run(self) {
1210 let Self {
1211 epoch_store,
1212 hasher,
1213 mut receive_from_builder,
1214 } = self;
1215 while let Some((seq, effects)) = receive_from_builder.recv().await {
1216 let Some(hasher) = hasher.upgrade() else {
1217 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1218 break;
1219 };
1220 hasher
1221 .accumulate_checkpoint(&effects, seq, &epoch_store)
1222 .expect("epoch ended while accumulating checkpoint");
1223 }
1224 }
1225}
1226
1227#[derive(Debug)]
1228pub enum CheckpointBuilderError {
1229 ChangeEpochTxAlreadyExecuted,
1230 SystemPackagesMissing,
1231 Retry(anyhow::Error),
1232}
1233
1234impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1235 for CheckpointBuilderError
1236{
1237 fn from(e: SuiError) -> Self {
1238 Self::Retry(e.into())
1239 }
1240}
1241
1242pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1243
1244pub struct CheckpointBuilder {
1245 state: Arc<AuthorityState>,
1246 store: Arc<CheckpointStore>,
1247 epoch_store: Arc<AuthorityPerEpochStore>,
1248 notify: Arc<Notify>,
1249 notify_aggregator: Arc<Notify>,
1250 last_built: watch::Sender<CheckpointSequenceNumber>,
1251 effects_store: Arc<dyn TransactionCacheRead>,
1252 global_state_hasher: Weak<GlobalStateHasher>,
1253 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1254 output: Box<dyn CheckpointOutput>,
1255 metrics: Arc<CheckpointMetrics>,
1256 max_transactions_per_checkpoint: usize,
1257 max_checkpoint_size_bytes: usize,
1258}
1259
1260pub struct CheckpointAggregator {
1261 store: Arc<CheckpointStore>,
1262 epoch_store: Arc<AuthorityPerEpochStore>,
1263 notify: Arc<Notify>,
1264 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1265 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1266 current: Option<CheckpointSignatureAggregator>,
1267 output: Box<dyn CertifiedCheckpointOutput>,
1268 state: Arc<AuthorityState>,
1269 metrics: Arc<CheckpointMetrics>,
1270}
1271
1272pub struct CheckpointSignatureAggregator {
1274 summary: CheckpointSummary,
1275 digest: CheckpointDigest,
1276 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1278 store: Arc<CheckpointStore>,
1279 state: Arc<AuthorityState>,
1280 metrics: Arc<CheckpointMetrics>,
1281}
1282
1283impl CheckpointBuilder {
1284 fn new(
1285 state: Arc<AuthorityState>,
1286 store: Arc<CheckpointStore>,
1287 epoch_store: Arc<AuthorityPerEpochStore>,
1288 notify: Arc<Notify>,
1289 effects_store: Arc<dyn TransactionCacheRead>,
1290 global_state_hasher: Weak<GlobalStateHasher>,
1292 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1294 output: Box<dyn CheckpointOutput>,
1295 notify_aggregator: Arc<Notify>,
1296 last_built: watch::Sender<CheckpointSequenceNumber>,
1297 metrics: Arc<CheckpointMetrics>,
1298 max_transactions_per_checkpoint: usize,
1299 max_checkpoint_size_bytes: usize,
1300 ) -> Self {
1301 Self {
1302 state,
1303 store,
1304 epoch_store,
1305 notify,
1306 effects_store,
1307 global_state_hasher,
1308 send_to_hasher,
1309 output,
1310 notify_aggregator,
1311 last_built,
1312 metrics,
1313 max_transactions_per_checkpoint,
1314 max_checkpoint_size_bytes,
1315 }
1316 }
1317
1318 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1326 if let Some(replay_waiter) = consensus_replay_waiter {
1327 info!("Waiting for consensus commits to replay ...");
1328 replay_waiter.wait_for_replay().await;
1329 info!("Consensus commits finished replaying");
1330 }
1331 info!("Starting CheckpointBuilder");
1332 loop {
1333 match self.maybe_build_checkpoints().await {
1334 Ok(()) => {}
1335 err @ Err(
1336 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1337 | CheckpointBuilderError::SystemPackagesMissing,
1338 ) => {
1339 info!("CheckpointBuilder stopping: {:?}", err);
1340 return;
1341 }
1342 Err(CheckpointBuilderError::Retry(inner)) => {
1343 let msg = format!("{:?}", inner);
1344 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1345 tokio::time::sleep(Duration::from_secs(1)).await;
1346 self.metrics.checkpoint_errors.inc();
1347 continue;
1348 }
1349 }
1350
1351 self.notify.notified().await;
1352 }
1353 }
1354
1355 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1356 if self
1357 .epoch_store
1358 .protocol_config()
1359 .split_checkpoints_in_consensus_handler()
1360 {
1361 self.maybe_build_checkpoints_v2().await
1362 } else {
1363 self.maybe_build_checkpoints_v1().await
1364 }
1365 }
1366
1367 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1368 let _scope = monitored_scope("BuildCheckpoints");
1369
1370 let summary = self
1372 .epoch_store
1373 .last_built_checkpoint_builder_summary()
1374 .expect("epoch should not have ended");
1375 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1376 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1377
1378 let min_checkpoint_interval_ms = self
1379 .epoch_store
1380 .protocol_config()
1381 .min_checkpoint_interval_ms_as_option()
1382 .unwrap_or_default();
1383 let mut grouped_pending_checkpoints = Vec::new();
1384 let mut checkpoints_iter = self
1385 .epoch_store
1386 .get_pending_checkpoints(last_height)
1387 .expect("unexpected epoch store error")
1388 .into_iter()
1389 .peekable();
1390 while let Some((height, pending)) = checkpoints_iter.next() {
1391 let current_timestamp = pending.details().timestamp_ms;
1394 let can_build = match last_timestamp {
1395 Some(last_timestamp) => {
1396 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1397 }
1398 None => true,
1399 } || checkpoints_iter
1402 .peek()
1403 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1404 || pending.details().last_of_epoch;
1406 grouped_pending_checkpoints.push(pending);
1407 if !can_build {
1408 debug!(
1409 checkpoint_commit_height = height,
1410 ?last_timestamp,
1411 ?current_timestamp,
1412 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1413 );
1414 continue;
1415 }
1416
1417 last_height = Some(height);
1419 last_timestamp = Some(current_timestamp);
1420 debug!(
1421 checkpoint_commit_height_from = grouped_pending_checkpoints
1422 .first()
1423 .unwrap()
1424 .details()
1425 .checkpoint_height,
1426 checkpoint_commit_height_to = last_height,
1427 "Making checkpoint with commit height range"
1428 );
1429
1430 let seq = self
1431 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1432 .await?;
1433
1434 self.last_built.send_if_modified(|cur| {
1435 if seq > *cur {
1437 *cur = seq;
1438 true
1439 } else {
1440 false
1441 }
1442 });
1443
1444 tokio::task::yield_now().await;
1447 }
1448 debug!(
1449 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1450 grouped_pending_checkpoints.len(),
1451 );
1452
1453 Ok(())
1454 }
1455
1456 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1457 let _scope = monitored_scope("BuildCheckpoints");
1458
1459 let last_height = self
1461 .epoch_store
1462 .last_built_checkpoint_builder_summary()
1463 .expect("epoch should not have ended")
1464 .and_then(|s| s.checkpoint_height);
1465
1466 for (height, pending) in self
1467 .epoch_store
1468 .get_pending_checkpoints_v2(last_height)
1469 .expect("unexpected epoch store error")
1470 {
1471 debug!(checkpoint_commit_height = height, "Making checkpoint");
1472
1473 let seq = self.make_checkpoint_v2(pending).await?;
1474
1475 self.last_built.send_if_modified(|cur| {
1476 if seq > *cur {
1478 *cur = seq;
1479 true
1480 } else {
1481 false
1482 }
1483 });
1484
1485 tokio::task::yield_now().await;
1488 }
1489
1490 Ok(())
1491 }
1492
1493 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1494 async fn make_checkpoint(
1495 &mut self,
1496 pendings: Vec<PendingCheckpoint>,
1497 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1498 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1499
1500 let pending_ckpt_str = pendings
1501 .iter()
1502 .map(|p| {
1503 format!(
1504 "height={}, commit={}",
1505 p.details().checkpoint_height,
1506 p.details().consensus_commit_ref
1507 )
1508 })
1509 .join("; ");
1510
1511 let last_details = pendings.last().unwrap().details().clone();
1512
1513 let highest_executed_sequence = self
1516 .store
1517 .get_highest_executed_checkpoint_seq_number()
1518 .expect("db error")
1519 .unwrap_or(0);
1520
1521 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1522 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1523
1524 let new_checkpoints = self
1525 .create_checkpoints(
1526 sorted_tx_effects_included_in_checkpoint,
1527 &last_details,
1528 &all_roots,
1529 )
1530 .await?;
1531 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1532 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1533 debug_fatal!(
1534 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1535 );
1536 }
1537
1538 let new_ckpt_str = new_checkpoints
1539 .iter()
1540 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1541 .join("; ");
1542
1543 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1544 .await?;
1545 info!(
1546 "Made new checkpoint {} from pending checkpoint {}",
1547 new_ckpt_str, pending_ckpt_str
1548 );
1549
1550 Ok(highest_sequence)
1551 }
1552
1553 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1554 async fn make_checkpoint_v2(
1555 &mut self,
1556 pending: PendingCheckpointV2,
1557 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1558 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1559
1560 let details = pending.details.clone();
1561
1562 let highest_executed_sequence = self
1563 .store
1564 .get_highest_executed_checkpoint_seq_number()
1565 .expect("db error")
1566 .unwrap_or(0);
1567
1568 let (poll_count, result) =
1569 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1570 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1571
1572 let new_checkpoints = self
1573 .create_checkpoints(
1574 sorted_tx_effects_included_in_checkpoint,
1575 &details,
1576 &all_roots,
1577 )
1578 .await?;
1579 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1580 let sequence = *new_checkpoints.first().0.sequence_number();
1581 let digest = new_checkpoints.first().0.digest();
1582 if sequence <= highest_executed_sequence && poll_count > 1 {
1583 debug_fatal!(
1584 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1585 );
1586 }
1587
1588 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1589 .await?;
1590 info!(
1591 seq = sequence,
1592 %digest,
1593 height = details.checkpoint_height,
1594 commit = %details.consensus_commit_ref,
1595 "Made new checkpoint"
1596 );
1597
1598 Ok(sequence)
1599 }
1600
1601 async fn construct_and_execute_settlement_transactions(
1602 &self,
1603 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1604 checkpoint_height: CheckpointHeight,
1605 checkpoint_seq: CheckpointSequenceNumber,
1606 tx_index_offset: u64,
1607 ) -> (TransactionKey, Vec<TransactionEffects>) {
1608 let _scope =
1609 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1610
1611 let tx_key =
1612 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1613
1614 let epoch = self.epoch_store.epoch();
1615 let accumulator_root_obj_initial_shared_version = self
1616 .epoch_store
1617 .epoch_start_config()
1618 .accumulator_root_obj_initial_shared_version()
1619 .expect("accumulator root object must exist");
1620
1621 let builder = AccumulatorSettlementTxBuilder::new(
1622 Some(self.effects_store.as_ref()),
1623 sorted_tx_effects_included_in_checkpoint,
1624 checkpoint_seq,
1625 tx_index_offset,
1626 );
1627
1628 let funds_changes = builder.collect_funds_changes();
1629 let num_updates = builder.num_updates();
1630 let settlement_txns = builder.build_tx(
1631 self.epoch_store.protocol_config(),
1632 epoch,
1633 accumulator_root_obj_initial_shared_version,
1634 checkpoint_height,
1635 checkpoint_seq,
1636 );
1637
1638 let settlement_txns: Vec<_> = settlement_txns
1639 .into_iter()
1640 .map(|tx| {
1641 VerifiedExecutableTransaction::new_system(
1642 VerifiedTransaction::new_system_transaction(tx),
1643 self.epoch_store.epoch(),
1644 )
1645 })
1646 .collect();
1647
1648 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1649
1650 debug!(
1651 ?settlement_digests,
1652 ?tx_key,
1653 "created settlement transactions with {num_updates} updates"
1654 );
1655
1656 self.epoch_store
1657 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1658
1659 let settlement_effects = wait_for_effects_with_retry(
1660 self.effects_store.as_ref(),
1661 "CheckpointBuilder::notify_read_settlement_effects",
1662 &settlement_digests,
1663 tx_key,
1664 )
1665 .await;
1666
1667 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1668 epoch,
1669 accumulator_root_obj_initial_shared_version,
1670 checkpoint_height,
1671 &settlement_effects,
1672 );
1673
1674 let barrier_tx = VerifiedExecutableTransaction::new_system(
1675 VerifiedTransaction::new_system_transaction(barrier_tx),
1676 self.epoch_store.epoch(),
1677 );
1678 let barrier_digest = *barrier_tx.digest();
1679
1680 self.epoch_store
1681 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1682
1683 let barrier_effects = wait_for_effects_with_retry(
1684 self.effects_store.as_ref(),
1685 "CheckpointBuilder::notify_read_barrier_effects",
1686 &[barrier_digest],
1687 tx_key,
1688 )
1689 .await;
1690
1691 let settlement_effects: Vec<_> = settlement_effects
1692 .into_iter()
1693 .chain(barrier_effects)
1694 .collect();
1695
1696 let mut next_accumulator_version = None;
1697 for fx in settlement_effects.iter() {
1698 assert!(
1699 fx.status().is_ok(),
1700 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1701 fx.transaction_digest(),
1702 fx
1703 );
1704 if let Some(version) = fx
1705 .mutated()
1706 .iter()
1707 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1708 {
1709 assert!(
1710 next_accumulator_version.is_none(),
1711 "Only one settlement transaction should mutate the accumulator root object"
1712 );
1713 next_accumulator_version = Some(version);
1714 }
1715 }
1716 let settlements = FundsSettlement {
1717 next_accumulator_version: next_accumulator_version
1718 .expect("Accumulator root object should be mutated in the settlement transactions"),
1719 funds_changes,
1720 };
1721
1722 self.state
1723 .execution_scheduler()
1724 .settle_address_funds(settlements);
1725
1726 (tx_key, settlement_effects)
1727 }
1728
1729 #[instrument(level = "debug", skip_all)]
1734 async fn resolve_checkpoint_transactions(
1735 &self,
1736 pending_checkpoints: Vec<PendingCheckpoint>,
1737 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1738 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1739
1740 let mut effects_in_current_checkpoint = BTreeSet::new();
1745
1746 let mut tx_effects = Vec::new();
1747 let mut tx_roots = HashSet::new();
1748
1749 for pending_checkpoint in pending_checkpoints.into_iter() {
1750 let mut pending = pending_checkpoint;
1751 debug!(
1752 checkpoint_commit_height = pending.details.checkpoint_height,
1753 "Resolving checkpoint transactions for pending checkpoint.",
1754 );
1755
1756 trace!(
1757 "roots for pending checkpoint {:?}: {:?}",
1758 pending.details.checkpoint_height, pending.roots,
1759 );
1760
1761 let settlement_root = if self.epoch_store.accumulators_enabled() {
1762 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1763 pending.roots.pop()
1764 else {
1765 fatal!("No settlement root found");
1766 };
1767 Some(settlement_root)
1768 } else {
1769 None
1770 };
1771
1772 let roots = &pending.roots;
1773
1774 self.metrics
1775 .checkpoint_roots_count
1776 .inc_by(roots.len() as u64);
1777
1778 let root_digests = self
1779 .epoch_store
1780 .notify_read_tx_key_to_digest(roots)
1781 .in_monitored_scope("CheckpointNotifyDigests")
1782 .await?;
1783 let root_effects = self
1784 .effects_store
1785 .notify_read_executed_effects(
1786 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1787 &root_digests,
1788 )
1789 .in_monitored_scope("CheckpointNotifyRead")
1790 .await;
1791
1792 assert!(
1793 self.epoch_store
1794 .protocol_config()
1795 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1796 );
1797
1798 let consensus_commit_prologue =
1801 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1802
1803 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1806 let unsorted_ccp = self.complete_checkpoint_effects(
1807 vec![ccp_effects.clone()],
1808 &mut effects_in_current_checkpoint,
1809 )?;
1810
1811 if unsorted_ccp.len() != 1 {
1814 fatal!(
1815 "Expected 1 consensus commit prologue, got {:?}",
1816 unsorted_ccp
1817 .iter()
1818 .map(|e| e.transaction_digest())
1819 .collect::<Vec<_>>()
1820 );
1821 }
1822 assert_eq!(unsorted_ccp.len(), 1);
1823 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1824 }
1825
1826 let unsorted =
1827 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1828
1829 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1830 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1831 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1832 if cfg!(debug_assertions) {
1833 for tx in unsorted.iter() {
1835 assert!(tx.transaction_digest() != &ccp_digest);
1836 }
1837 }
1838 sorted.push(ccp_effects);
1839 }
1840 sorted.extend(CausalOrder::causal_sort(unsorted));
1841
1842 if let Some(settlement_root) = settlement_root {
1843 let last_checkpoint =
1846 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1847 let next_checkpoint_seq = last_checkpoint
1848 .as_ref()
1849 .map(|(seq, _)| *seq)
1850 .unwrap_or_default()
1851 + 1;
1852 let tx_index_offset = tx_effects.len() as u64;
1853
1854 let (tx_key, settlement_effects) = self
1855 .construct_and_execute_settlement_transactions(
1856 &sorted,
1857 pending.details.checkpoint_height,
1858 next_checkpoint_seq,
1859 tx_index_offset,
1860 )
1861 .await;
1862 debug!(?tx_key, "executed settlement transactions");
1863
1864 assert_eq!(settlement_root, tx_key);
1865
1866 effects_in_current_checkpoint
1877 .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1878 sorted.extend(settlement_effects);
1879 }
1880
1881 #[cfg(msim)]
1882 {
1883 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1885 }
1886
1887 tx_effects.extend(sorted);
1888 tx_roots.extend(root_digests);
1889 }
1890
1891 Ok((tx_effects, tx_roots))
1892 }
1893
1894 #[instrument(level = "debug", skip_all)]
1897 async fn resolve_checkpoint_transactions_v2(
1898 &self,
1899 pending: PendingCheckpointV2,
1900 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1901 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1902
1903 debug!(
1904 checkpoint_commit_height = pending.details.checkpoint_height,
1905 "Resolving checkpoint transactions for pending checkpoint.",
1906 );
1907
1908 trace!(
1909 "roots for pending checkpoint {:?}: {:?}",
1910 pending.details.checkpoint_height, pending.roots,
1911 );
1912
1913 assert!(
1914 self.epoch_store
1915 .protocol_config()
1916 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1917 );
1918
1919 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1920 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1921
1922 for checkpoint_roots in &pending.roots {
1923 let tx_roots = &checkpoint_roots.tx_roots;
1924
1925 self.metrics
1926 .checkpoint_roots_count
1927 .inc_by(tx_roots.len() as u64);
1928
1929 let root_digests = self
1930 .epoch_store
1931 .notify_read_tx_key_to_digest(tx_roots)
1932 .in_monitored_scope("CheckpointNotifyDigests")
1933 .await?;
1934
1935 all_root_digests.extend(root_digests.iter().cloned());
1936
1937 let root_effects = self
1938 .effects_store
1939 .notify_read_executed_effects(
1940 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1941 &root_digests,
1942 )
1943 .in_monitored_scope("CheckpointNotifyRead")
1944 .await;
1945 let consensus_commit_prologue =
1946 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1947
1948 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1949 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1950 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1951
1952 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1953 let checkpoint_seq = pending
1954 .details
1955 .checkpoint_seq
1956 .expect("checkpoint_seq must be set");
1957 let tx_index_offset = all_effects.len() as u64;
1958 let effects = self
1959 .resolve_settlement_effects(
1960 *settlement_key,
1961 &sorted,
1962 checkpoint_roots.height,
1963 checkpoint_seq,
1964 tx_index_offset,
1965 )
1966 .await;
1967 sorted.extend(effects);
1968 }
1969
1970 #[cfg(msim)]
1971 {
1972 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1973 }
1974
1975 all_effects.extend(sorted);
1976 }
1977 Ok((all_effects, all_root_digests.into_iter().collect()))
1978 }
1979
1980 async fn resolve_settlement_effects(
1985 &self,
1986 settlement_key: TransactionKey,
1987 sorted_root_effects: &[TransactionEffects],
1988 checkpoint_height: CheckpointHeight,
1989 checkpoint_seq: CheckpointSequenceNumber,
1990 tx_index_offset: u64,
1991 ) -> Vec<TransactionEffects> {
1992 let epoch = self.epoch_store.epoch();
1993 let accumulator_root_obj_initial_shared_version = self
1994 .epoch_store
1995 .epoch_start_config()
1996 .accumulator_root_obj_initial_shared_version()
1997 .expect("accumulator root object must exist");
1998
1999 let builder = AccumulatorSettlementTxBuilder::new(
2000 None,
2001 sorted_root_effects,
2002 checkpoint_seq,
2003 tx_index_offset,
2004 );
2005
2006 let settlement_digests: Vec<_> = builder
2007 .build_tx(
2008 self.epoch_store.protocol_config(),
2009 epoch,
2010 accumulator_root_obj_initial_shared_version,
2011 checkpoint_height,
2012 checkpoint_seq,
2013 )
2014 .into_iter()
2015 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2016 .collect();
2017
2018 debug!(
2019 ?settlement_digests,
2020 ?settlement_key,
2021 "fallback: reading settlement effects from cache"
2022 );
2023
2024 let settlement_effects = wait_for_effects_with_retry(
2025 self.effects_store.as_ref(),
2026 "CheckpointBuilder::fallback_settlement_effects",
2027 &settlement_digests,
2028 settlement_key,
2029 )
2030 .await;
2031
2032 let barrier_digest = *VerifiedTransaction::new_system_transaction(
2033 accumulators::build_accumulator_barrier_tx(
2034 epoch,
2035 accumulator_root_obj_initial_shared_version,
2036 checkpoint_height,
2037 &settlement_effects,
2038 ),
2039 )
2040 .digest();
2041
2042 let barrier_effects = wait_for_effects_with_retry(
2043 self.effects_store.as_ref(),
2044 "CheckpointBuilder::fallback_barrier_effects",
2045 &[barrier_digest],
2046 settlement_key,
2047 )
2048 .await;
2049
2050 settlement_effects
2051 .into_iter()
2052 .chain(barrier_effects)
2053 .collect()
2054 }
2055
2056 fn extract_consensus_commit_prologue(
2059 &self,
2060 root_digests: &[TransactionDigest],
2061 root_effects: &[TransactionEffects],
2062 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2063 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2064 if root_digests.is_empty() {
2065 return Ok(None);
2066 }
2067
2068 let first_tx = self
2072 .state
2073 .get_transaction_cache_reader()
2074 .get_transaction_block(&root_digests[0])
2075 .expect("Transaction block must exist");
2076
2077 Ok(first_tx
2078 .transaction_data()
2079 .is_consensus_commit_prologue()
2080 .then(|| {
2081 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2082 (*first_tx.digest(), root_effects[0].clone())
2083 }))
2084 }
2085
2086 #[instrument(level = "debug", skip_all)]
2087 async fn write_checkpoints(
2088 &mut self,
2089 height: CheckpointHeight,
2090 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2091 ) -> SuiResult {
2092 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2093 let mut batch = self.store.tables.checkpoint_content.batch();
2094 let mut all_tx_digests =
2095 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2096
2097 for (summary, contents) in &new_checkpoints {
2098 debug!(
2099 checkpoint_commit_height = height,
2100 checkpoint_seq = summary.sequence_number,
2101 contents_digest = ?contents.digest(),
2102 "writing checkpoint",
2103 );
2104
2105 if let Some(previously_computed_summary) = self
2106 .store
2107 .tables
2108 .locally_computed_checkpoints
2109 .get(&summary.sequence_number)?
2110 && previously_computed_summary.digest() != summary.digest()
2111 {
2112 fatal!(
2113 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2114 summary.sequence_number,
2115 previously_computed_summary.digest(),
2116 summary.digest()
2117 );
2118 }
2119
2120 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2121
2122 self.metrics
2123 .transactions_included_in_checkpoint
2124 .inc_by(contents.size() as u64);
2125 let sequence_number = summary.sequence_number;
2126 self.metrics
2127 .last_constructed_checkpoint
2128 .set(sequence_number as i64);
2129
2130 batch.insert_batch(
2131 &self.store.tables.checkpoint_content,
2132 [(contents.digest(), contents)],
2133 )?;
2134
2135 batch.insert_batch(
2136 &self.store.tables.locally_computed_checkpoints,
2137 [(sequence_number, summary)],
2138 )?;
2139 }
2140
2141 batch.write()?;
2142
2143 for (summary, contents) in &new_checkpoints {
2145 self.output
2146 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2147 .await?;
2148 }
2149
2150 for (local_checkpoint, _) in &new_checkpoints {
2151 if let Some(certified_checkpoint) = self
2152 .store
2153 .tables
2154 .certified_checkpoints
2155 .get(local_checkpoint.sequence_number())?
2156 {
2157 self.store
2158 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2159 }
2160 }
2161
2162 self.notify_aggregator.notify_one();
2163 self.epoch_store
2164 .process_constructed_checkpoint(height, new_checkpoints);
2165 Ok(())
2166 }
2167
2168 #[allow(clippy::type_complexity)]
2169 fn split_checkpoint_chunks(
2170 &self,
2171 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2172 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2173 ) -> CheckpointBuilderResult<
2174 Vec<
2175 Vec<(
2176 TransactionEffects,
2177 Vec<(GenericSignature, Option<SequenceNumber>)>,
2178 )>,
2179 >,
2180 > {
2181 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2182
2183 if self
2185 .epoch_store
2186 .protocol_config()
2187 .split_checkpoints_in_consensus_handler()
2188 {
2189 let chunk: Vec<_> = effects_and_transaction_sizes
2190 .into_iter()
2191 .zip(signatures)
2192 .map(|((effects, _size), sigs)| (effects, sigs))
2193 .collect();
2194 return Ok(vec![chunk]);
2195 }
2196 let mut chunks = Vec::new();
2197 let mut chunk = Vec::new();
2198 let mut chunk_size: usize = 0;
2199 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2200 .into_iter()
2201 .zip(signatures.into_iter())
2202 {
2203 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2208 bcs::serialized_size(&signatures)?
2209 } else {
2210 let signatures: Vec<&GenericSignature> =
2211 signatures.iter().map(|(s, _)| s).collect();
2212 bcs::serialized_size(&signatures)?
2213 };
2214 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2215 if chunk.len() == self.max_transactions_per_checkpoint
2216 || (chunk_size + size) > self.max_checkpoint_size_bytes
2217 {
2218 if chunk.is_empty() {
2219 warn!(
2221 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2222 self.max_checkpoint_size_bytes
2223 );
2224 } else {
2225 chunks.push(chunk);
2226 chunk = Vec::new();
2227 chunk_size = 0;
2228 }
2229 }
2230
2231 chunk.push((effects, signatures));
2232 chunk_size += size;
2233 }
2234
2235 if !chunk.is_empty() || chunks.is_empty() {
2236 chunks.push(chunk);
2241 }
2246 Ok(chunks)
2247 }
2248
2249 fn load_last_built_checkpoint_summary(
2250 epoch_store: &AuthorityPerEpochStore,
2251 store: &CheckpointStore,
2252 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2253 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2254 if last_checkpoint.is_none() {
2255 let epoch = epoch_store.epoch();
2256 if epoch > 0 {
2257 let previous_epoch = epoch - 1;
2258 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2259 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2260 if let Some((ref seq, _)) = last_checkpoint {
2261 debug!(
2262 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2263 );
2264 } else {
2265 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2267 }
2268 }
2269 }
2270 Ok(last_checkpoint)
2271 }
2272
2273 #[instrument(level = "debug", skip_all)]
2274 async fn create_checkpoints(
2275 &self,
2276 all_effects: Vec<TransactionEffects>,
2277 details: &PendingCheckpointInfo,
2278 all_roots: &HashSet<TransactionDigest>,
2279 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2280 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2281
2282 let total = all_effects.len();
2283 let mut last_checkpoint =
2284 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2285 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2286 debug!(
2287 checkpoint_commit_height = details.checkpoint_height,
2288 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2289 checkpoint_timestamp = details.timestamp_ms,
2290 "Creating checkpoint(s) for {} transactions",
2291 all_effects.len(),
2292 );
2293
2294 let all_digests: Vec<_> = all_effects
2295 .iter()
2296 .map(|effect| *effect.transaction_digest())
2297 .collect();
2298 let transactions_and_sizes = self
2299 .state
2300 .get_transaction_cache_reader()
2301 .get_transactions_and_serialized_sizes(&all_digests)?;
2302 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2303 let mut transactions = Vec::with_capacity(all_effects.len());
2304 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2305 let mut randomness_rounds = BTreeMap::new();
2306 {
2307 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2308 debug!(
2309 ?last_checkpoint_seq,
2310 "Waiting for {:?} certificates to appear in consensus",
2311 all_effects.len()
2312 );
2313
2314 for (effects, transaction_and_size) in all_effects
2315 .into_iter()
2316 .zip(transactions_and_sizes.into_iter())
2317 {
2318 let (transaction, size) = transaction_and_size
2319 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2320 match transaction.inner().transaction_data().kind() {
2321 TransactionKind::ConsensusCommitPrologue(_)
2322 | TransactionKind::ConsensusCommitPrologueV2(_)
2323 | TransactionKind::ConsensusCommitPrologueV3(_)
2324 | TransactionKind::ConsensusCommitPrologueV4(_)
2325 | TransactionKind::AuthenticatorStateUpdate(_) => {
2326 }
2329 TransactionKind::ProgrammableSystemTransaction(_) => {
2330 }
2332 TransactionKind::ChangeEpoch(_)
2333 | TransactionKind::Genesis(_)
2334 | TransactionKind::EndOfEpochTransaction(_) => {
2335 fatal!(
2336 "unexpected transaction in checkpoint effects: {:?}",
2337 transaction
2338 );
2339 }
2340 TransactionKind::RandomnessStateUpdate(rsu) => {
2341 randomness_rounds
2342 .insert(*effects.transaction_digest(), rsu.randomness_round);
2343 }
2344 TransactionKind::ProgrammableTransaction(_) => {
2345 let digest = *effects.transaction_digest();
2349 if !all_roots.contains(&digest) {
2350 transaction_keys.push(SequencedConsensusTransactionKey::External(
2351 ConsensusTransactionKey::Certificate(digest),
2352 ));
2353 }
2354 }
2355 }
2356 transactions.push(transaction);
2357 all_effects_and_transaction_sizes.push((effects, size));
2358 }
2359
2360 self.epoch_store
2361 .consensus_messages_processed_notify(transaction_keys)
2362 .await?;
2363 }
2364
2365 let signatures = self
2366 .epoch_store
2367 .user_signatures_for_checkpoint(&transactions, &all_digests);
2368 debug!(
2369 ?last_checkpoint_seq,
2370 "Received {} checkpoint user signatures from consensus",
2371 signatures.len()
2372 );
2373
2374 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2375 Some(
2376 transactions
2377 .iter()
2378 .flat_map(|tx| {
2379 if let TransactionKind::ProgrammableTransaction(ptb) =
2380 tx.transaction_data().kind()
2381 {
2382 itertools::Either::Left(
2383 ptb.commands
2384 .iter()
2385 .map(ExecutionTimeObservationKey::from_command),
2386 )
2387 } else {
2388 itertools::Either::Right(std::iter::empty())
2389 }
2390 })
2391 .collect(),
2392 )
2393 } else {
2394 None
2395 };
2396
2397 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2398 let chunks_count = chunks.len();
2399
2400 let mut checkpoints = Vec::with_capacity(chunks_count);
2401 debug!(
2402 ?last_checkpoint_seq,
2403 "Creating {} checkpoints with {} transactions", chunks_count, total,
2404 );
2405
2406 let epoch = self.epoch_store.epoch();
2407 for (index, transactions) in chunks.into_iter().enumerate() {
2408 let first_checkpoint_of_epoch = index == 0
2409 && last_checkpoint
2410 .as_ref()
2411 .map(|(_, c)| c.epoch != epoch)
2412 .unwrap_or(true);
2413 if first_checkpoint_of_epoch {
2414 self.epoch_store
2415 .record_epoch_first_checkpoint_creation_time_metric();
2416 }
2417 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2418
2419 let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2420 preassigned_seq
2421 } else {
2422 last_checkpoint
2423 .as_ref()
2424 .map(|(_, c)| c.sequence_number + 1)
2425 .unwrap_or_default()
2426 };
2427 let mut timestamp_ms = details.timestamp_ms;
2428 if let Some((_, last_checkpoint)) = &last_checkpoint
2429 && last_checkpoint.timestamp_ms > timestamp_ms
2430 {
2431 debug!(
2433 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2434 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2435 );
2436 if self
2437 .epoch_store
2438 .protocol_config()
2439 .enforce_checkpoint_timestamp_monotonicity()
2440 {
2441 timestamp_ms = last_checkpoint.timestamp_ms;
2442 }
2443 }
2444
2445 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2446 let epoch_rolling_gas_cost_summary =
2447 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2448
2449 let end_of_epoch_data = if last_checkpoint_of_epoch {
2450 let system_state_obj = self
2451 .augment_epoch_last_checkpoint(
2452 &epoch_rolling_gas_cost_summary,
2453 timestamp_ms,
2454 &mut effects,
2455 &mut signatures,
2456 sequence_number,
2457 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2458 last_checkpoint_seq.unwrap_or_default(),
2459 )
2460 .await?;
2461
2462 let committee = system_state_obj
2463 .get_current_epoch_committee()
2464 .committee()
2465 .clone();
2466
2467 let root_state_digest = {
2470 let state_acc = self
2471 .global_state_hasher
2472 .upgrade()
2473 .expect("No checkpoints should be getting built after local configuration");
2474 let acc = state_acc.accumulate_checkpoint(
2475 &effects,
2476 sequence_number,
2477 &self.epoch_store,
2478 )?;
2479
2480 state_acc
2481 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2482 .await?;
2483
2484 state_acc.accumulate_running_root(
2485 &self.epoch_store,
2486 sequence_number,
2487 Some(acc),
2488 )?;
2489 state_acc
2490 .digest_epoch(self.epoch_store.clone(), sequence_number)
2491 .await?
2492 };
2493 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2494 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2495
2496 let epoch_commitments = if self
2497 .epoch_store
2498 .protocol_config()
2499 .check_commit_root_state_digest_supported()
2500 {
2501 vec![root_state_digest.into()]
2502 } else {
2503 vec![]
2504 };
2505
2506 Some(EndOfEpochData {
2507 next_epoch_committee: committee.voting_rights,
2508 next_epoch_protocol_version: ProtocolVersion::new(
2509 system_state_obj.protocol_version(),
2510 ),
2511 epoch_commitments,
2512 })
2513 } else {
2514 self.send_to_hasher
2515 .send((sequence_number, effects.clone()))
2516 .await?;
2517
2518 None
2519 };
2520 let contents = if self.epoch_store.protocol_config().address_aliases() {
2521 CheckpointContents::new_v2(&effects, signatures)
2522 } else {
2523 CheckpointContents::new_with_digests_and_signatures(
2524 effects.iter().map(TransactionEffects::execution_digests),
2525 signatures
2526 .into_iter()
2527 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2528 .collect(),
2529 )
2530 };
2531
2532 let num_txns = contents.size() as u64;
2533
2534 let network_total_transactions = last_checkpoint
2535 .as_ref()
2536 .map(|(_, c)| c.network_total_transactions + num_txns)
2537 .unwrap_or(num_txns);
2538
2539 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2540
2541 let matching_randomness_rounds: Vec<_> = effects
2542 .iter()
2543 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2544 .copied()
2545 .collect();
2546
2547 let checkpoint_commitments = if self
2548 .epoch_store
2549 .protocol_config()
2550 .include_checkpoint_artifacts_digest_in_summary()
2551 {
2552 let artifacts = CheckpointArtifacts::from(&effects[..]);
2553 let artifacts_digest = artifacts.digest()?;
2554 vec![artifacts_digest.into()]
2555 } else {
2556 Default::default()
2557 };
2558
2559 let summary = CheckpointSummary::new(
2560 self.epoch_store.protocol_config(),
2561 epoch,
2562 sequence_number,
2563 network_total_transactions,
2564 &contents,
2565 previous_digest,
2566 epoch_rolling_gas_cost_summary,
2567 end_of_epoch_data,
2568 timestamp_ms,
2569 matching_randomness_rounds,
2570 checkpoint_commitments,
2571 );
2572 summary.report_checkpoint_age(
2573 &self.metrics.last_created_checkpoint_age,
2574 &self.metrics.last_created_checkpoint_age_ms,
2575 );
2576 if last_checkpoint_of_epoch {
2577 info!(
2578 checkpoint_seq = sequence_number,
2579 "creating last checkpoint of epoch {}", epoch
2580 );
2581 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2582 self.epoch_store
2583 .report_epoch_metrics_at_last_checkpoint(stats);
2584 }
2585 }
2586 last_checkpoint = Some((sequence_number, summary.clone()));
2587 checkpoints.push((summary, contents));
2588 }
2589
2590 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2591 }
2592
2593 fn get_epoch_total_gas_cost(
2594 &self,
2595 last_checkpoint: Option<&CheckpointSummary>,
2596 cur_checkpoint_effects: &[TransactionEffects],
2597 ) -> GasCostSummary {
2598 let (previous_epoch, previous_gas_costs) = last_checkpoint
2599 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2600 .unwrap_or_default();
2601 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2602 if previous_epoch == self.epoch_store.epoch() {
2603 GasCostSummary::new(
2605 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2606 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2607 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2608 previous_gas_costs.non_refundable_storage_fee
2609 + current_gas_costs.non_refundable_storage_fee,
2610 )
2611 } else {
2612 current_gas_costs
2613 }
2614 }
2615
2616 #[instrument(level = "error", skip_all)]
2617 async fn augment_epoch_last_checkpoint(
2618 &self,
2619 epoch_total_gas_cost: &GasCostSummary,
2620 epoch_start_timestamp_ms: CheckpointTimestamp,
2621 checkpoint_effects: &mut Vec<TransactionEffects>,
2622 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2623 checkpoint: CheckpointSequenceNumber,
2624 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2625 last_checkpoint: CheckpointSequenceNumber,
2628 ) -> CheckpointBuilderResult<SuiSystemState> {
2629 let (system_state, effects) = self
2630 .state
2631 .create_and_execute_advance_epoch_tx(
2632 &self.epoch_store,
2633 epoch_total_gas_cost,
2634 checkpoint,
2635 epoch_start_timestamp_ms,
2636 end_of_epoch_observation_keys,
2637 last_checkpoint,
2638 )
2639 .await?;
2640 checkpoint_effects.push(effects);
2641 signatures.push(vec![]);
2642 Ok(system_state)
2643 }
2644
2645 #[instrument(level = "debug", skip_all)]
2652 fn complete_checkpoint_effects(
2653 &self,
2654 mut roots: Vec<TransactionEffects>,
2655 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2656 ) -> SuiResult<Vec<TransactionEffects>> {
2657 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2658 let mut results = vec![];
2659 let mut seen = HashSet::new();
2660 loop {
2661 let mut pending = HashSet::new();
2662
2663 let transactions_included = self
2664 .epoch_store
2665 .builder_included_transactions_in_checkpoint(
2666 roots.iter().map(|e| e.transaction_digest()),
2667 )?;
2668
2669 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2670 let digest = effect.transaction_digest();
2671 seen.insert(*digest);
2673
2674 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2676 continue;
2677 }
2678
2679 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2681 continue;
2682 }
2683
2684 let existing_effects = self
2685 .epoch_store
2686 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2687
2688 for (dependency, effects_signature_exists) in
2689 effect.dependencies().iter().zip(existing_effects.iter())
2690 {
2691 if !effects_signature_exists {
2696 continue;
2697 }
2698 if seen.insert(*dependency) {
2699 pending.insert(*dependency);
2700 }
2701 }
2702 results.push(effect);
2703 }
2704 if pending.is_empty() {
2705 break;
2706 }
2707 let pending = pending.into_iter().collect::<Vec<_>>();
2708 let effects = self.effects_store.multi_get_executed_effects(&pending);
2709 let effects = effects
2710 .into_iter()
2711 .zip(pending)
2712 .map(|(opt, digest)| match opt {
2713 Some(x) => x,
2714 None => panic!(
2715 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2716 digest
2717 ),
2718 })
2719 .collect::<Vec<_>>();
2720 roots = effects;
2721 }
2722
2723 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2724 Ok(results)
2725 }
2726
2727 #[cfg(msim)]
2730 fn expensive_consensus_commit_prologue_invariants_check(
2731 &self,
2732 root_digests: &[TransactionDigest],
2733 sorted: &[TransactionEffects],
2734 ) {
2735 let root_txs = self
2737 .state
2738 .get_transaction_cache_reader()
2739 .multi_get_transaction_blocks(root_digests);
2740 let ccps = root_txs
2741 .iter()
2742 .filter_map(|tx| {
2743 if let Some(tx) = tx {
2744 if tx.transaction_data().is_consensus_commit_prologue() {
2745 Some(tx)
2746 } else {
2747 None
2748 }
2749 } else {
2750 None
2751 }
2752 })
2753 .collect::<Vec<_>>();
2754
2755 assert!(ccps.len() <= 1);
2757
2758 let txs = self
2760 .state
2761 .get_transaction_cache_reader()
2762 .multi_get_transaction_blocks(
2763 &sorted
2764 .iter()
2765 .map(|tx| tx.transaction_digest().clone())
2766 .collect::<Vec<_>>(),
2767 );
2768
2769 if ccps.len() == 0 {
2770 for tx in txs.iter() {
2773 if let Some(tx) = tx {
2774 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2775 }
2776 }
2777 } else {
2778 assert!(
2780 txs[0]
2781 .as_ref()
2782 .unwrap()
2783 .transaction_data()
2784 .is_consensus_commit_prologue()
2785 );
2786
2787 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2788
2789 for tx in txs.iter().skip(1) {
2790 if let Some(tx) = tx {
2791 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2792 }
2793 }
2794 }
2795 }
2796}
2797
2798async fn wait_for_effects_with_retry(
2799 effects_store: &dyn TransactionCacheRead,
2800 task_name: &'static str,
2801 digests: &[TransactionDigest],
2802 tx_key: TransactionKey,
2803) -> Vec<TransactionEffects> {
2804 let delay = if in_antithesis() {
2805 15
2807 } else {
2808 5
2809 };
2810 loop {
2811 match tokio::time::timeout(Duration::from_secs(delay), async {
2812 effects_store
2813 .notify_read_executed_effects(task_name, digests)
2814 .await
2815 })
2816 .await
2817 {
2818 Ok(effects) => break effects,
2819 Err(_) => {
2820 debug_fatal!(
2821 "Timeout waiting for transactions to be executed {:?}, retrying...",
2822 tx_key
2823 );
2824 }
2825 }
2826 }
2827}
2828
2829impl CheckpointAggregator {
2830 fn new(
2831 tables: Arc<CheckpointStore>,
2832 epoch_store: Arc<AuthorityPerEpochStore>,
2833 notify: Arc<Notify>,
2834 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2835 output: Box<dyn CertifiedCheckpointOutput>,
2836 state: Arc<AuthorityState>,
2837 metrics: Arc<CheckpointMetrics>,
2838 ) -> Self {
2839 Self {
2840 store: tables,
2841 epoch_store,
2842 notify,
2843 receiver,
2844 pending: BTreeMap::new(),
2845 current: None,
2846 output,
2847 state,
2848 metrics,
2849 }
2850 }
2851
2852 async fn run(mut self) {
2853 info!("Starting CheckpointAggregator");
2854 loop {
2855 while let Ok(sig) = self.receiver.try_recv() {
2857 self.pending
2858 .entry(sig.summary.sequence_number)
2859 .or_default()
2860 .push(sig);
2861 }
2862
2863 if let Err(e) = self.run_and_notify().await {
2864 error!(
2865 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2866 e
2867 );
2868 self.metrics.checkpoint_errors.inc();
2869 tokio::time::sleep(Duration::from_secs(1)).await;
2870 continue;
2871 }
2872
2873 tokio::select! {
2874 Some(sig) = self.receiver.recv() => {
2875 self.pending
2876 .entry(sig.summary.sequence_number)
2877 .or_default()
2878 .push(sig);
2879 }
2880 _ = self.notify.notified() => {}
2881 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2882 }
2883 }
2884 }
2885
2886 async fn run_and_notify(&mut self) -> SuiResult {
2887 let summaries = self.run_inner()?;
2888 for summary in summaries {
2889 self.output.certified_checkpoint_created(&summary).await?;
2890 }
2891 Ok(())
2892 }
2893
2894 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2895 let _scope = monitored_scope("CheckpointAggregator");
2896 let mut result = vec![];
2897 'outer: loop {
2898 let next_to_certify = self.next_checkpoint_to_certify()?;
2899 self.pending.retain(|&seq, _| seq >= next_to_certify);
2902 let current = if let Some(current) = &mut self.current {
2903 if current.summary.sequence_number < next_to_certify {
2909 assert_reachable!("skip checkpoint certification");
2910 self.current = None;
2911 continue;
2912 }
2913 current
2914 } else {
2915 let Some(summary) = self
2916 .epoch_store
2917 .get_built_checkpoint_summary(next_to_certify)?
2918 else {
2919 return Ok(result);
2920 };
2921 self.current = Some(CheckpointSignatureAggregator {
2922 digest: summary.digest(),
2923 summary,
2924 signatures_by_digest: MultiStakeAggregator::new(
2925 self.epoch_store.committee().clone(),
2926 ),
2927 store: self.store.clone(),
2928 state: self.state.clone(),
2929 metrics: self.metrics.clone(),
2930 });
2931 self.current.as_mut().unwrap()
2932 };
2933
2934 let seq = current.summary.sequence_number;
2935 let sigs = self.pending.remove(&seq).unwrap_or_default();
2936 if sigs.is_empty() {
2937 trace!(
2938 checkpoint_seq =? seq,
2939 "Not enough checkpoint signatures",
2940 );
2941 return Ok(result);
2942 }
2943 for data in sigs {
2944 trace!(
2945 checkpoint_seq = seq,
2946 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2947 current.summary.digest(),
2948 data.summary.auth_sig().authority.concise()
2949 );
2950 self.metrics
2951 .checkpoint_participation
2952 .with_label_values(&[&format!(
2953 "{:?}",
2954 data.summary.auth_sig().authority.concise()
2955 )])
2956 .inc();
2957 if let Ok(auth_signature) = current.try_aggregate(data) {
2958 debug!(
2959 checkpoint_seq = seq,
2960 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2961 current.summary.digest(),
2962 );
2963 let summary = VerifiedCheckpoint::new_unchecked(
2964 CertifiedCheckpointSummary::new_from_data_and_sig(
2965 current.summary.clone(),
2966 auth_signature,
2967 ),
2968 );
2969
2970 self.store.insert_certified_checkpoint(&summary)?;
2971 self.metrics.last_certified_checkpoint.set(seq as i64);
2972 current.summary.report_checkpoint_age(
2973 &self.metrics.last_certified_checkpoint_age,
2974 &self.metrics.last_certified_checkpoint_age_ms,
2975 );
2976 result.push(summary.into_inner());
2977 self.current = None;
2978 continue 'outer;
2979 }
2980 }
2981 break;
2982 }
2983 Ok(result)
2984 }
2985
2986 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2987 Ok(self
2988 .store
2989 .tables
2990 .certified_checkpoints
2991 .reversed_safe_iter_with_bounds(None, None)?
2992 .next()
2993 .transpose()?
2994 .map(|(seq, _)| seq + 1)
2995 .unwrap_or_default())
2996 }
2997}
2998
2999impl CheckpointSignatureAggregator {
3000 #[allow(clippy::result_unit_err)]
3001 pub fn try_aggregate(
3002 &mut self,
3003 data: CheckpointSignatureMessage,
3004 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3005 let their_digest = *data.summary.digest();
3006 let (_, signature) = data.summary.into_data_and_sig();
3007 let author = signature.authority;
3008 let envelope =
3009 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3010 match self.signatures_by_digest.insert(their_digest, envelope) {
3011 InsertResult::Failed { error }
3013 if matches!(
3014 error.as_inner(),
3015 SuiErrorKind::StakeAggregatorRepeatedSigner {
3016 conflicting_sig: false,
3017 ..
3018 },
3019 ) =>
3020 {
3021 Err(())
3022 }
3023 InsertResult::Failed { error } => {
3024 warn!(
3025 checkpoint_seq = self.summary.sequence_number,
3026 "Failed to aggregate new signature from validator {:?}: {:?}",
3027 author.concise(),
3028 error
3029 );
3030 self.check_for_split_brain();
3031 Err(())
3032 }
3033 InsertResult::QuorumReached(cert) => {
3034 if their_digest != self.digest {
3037 self.metrics.remote_checkpoint_forks.inc();
3038 warn!(
3039 checkpoint_seq = self.summary.sequence_number,
3040 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3041 author.concise(),
3042 their_digest,
3043 self.digest
3044 );
3045 return Err(());
3046 }
3047 Ok(cert)
3048 }
3049 InsertResult::NotEnoughVotes {
3050 bad_votes: _,
3051 bad_authorities: _,
3052 } => {
3053 self.check_for_split_brain();
3054 Err(())
3055 }
3056 }
3057 }
3058
3059 fn check_for_split_brain(&self) {
3063 debug!(
3064 checkpoint_seq = self.summary.sequence_number,
3065 "Checking for split brain condition"
3066 );
3067 if self.signatures_by_digest.quorum_unreachable() {
3068 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3074 let digests_by_stake_messages = all_unique_values
3075 .iter()
3076 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3077 .map(|(digest, (_authorities, total_stake))| {
3078 format!("{:?} (total stake: {})", digest, total_stake)
3079 })
3080 .collect::<Vec<String>>();
3081 fail_point_arg!("kill_split_brain_node", |(
3082 checkpoint_overrides,
3083 forked_authorities,
3084 ): (
3085 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3086 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3087 )| {
3088 #[cfg(msim)]
3089 {
3090 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3091 (checkpoint_overrides.lock(), forked_authorities.lock())
3092 {
3093 let correct_digest = all_unique_values
3095 .iter()
3096 .find(|(_, (authorities, _))| {
3097 authorities
3099 .iter()
3100 .any(|auth| !forked_authorities_set.contains(auth))
3101 })
3102 .map(|(digest, _)| digest.to_string())
3103 .unwrap_or_else(|| {
3104 all_unique_values
3106 .iter()
3107 .max_by_key(|(_, (_, stake))| *stake)
3108 .map(|(digest, _)| digest.to_string())
3109 .unwrap_or_else(|| self.digest.to_string())
3110 });
3111
3112 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3113
3114 tracing::error!(
3115 fatal = true,
3116 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3117 self.summary.sequence_number,
3118 correct_digest
3119 );
3120 }
3121 }
3122 });
3123
3124 debug_fatal!(
3125 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3126 self.summary.sequence_number,
3127 self.signatures_by_digest.uncommitted_stake(),
3128 digests_by_stake_messages
3129 );
3130 self.metrics.split_brain_checkpoint_forks.inc();
3131
3132 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3133 let local_summary = self.summary.clone();
3134 let state = self.state.clone();
3135 let tables = self.store.clone();
3136
3137 tokio::spawn(async move {
3138 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3139 });
3140 }
3141 }
3142}
3143
3144async fn diagnose_split_brain(
3150 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3151 local_summary: CheckpointSummary,
3152 state: Arc<AuthorityState>,
3153 tables: Arc<CheckpointStore>,
3154) {
3155 debug!(
3156 checkpoint_seq = local_summary.sequence_number,
3157 "Running split brain diagnostics..."
3158 );
3159 let time = SystemTime::now();
3160 let digest_to_validator = all_unique_values
3162 .iter()
3163 .filter_map(|(digest, (validators, _))| {
3164 if *digest != local_summary.digest() {
3165 let random_validator = validators.choose(&mut get_rng()).unwrap();
3166 Some((*digest, *random_validator))
3167 } else {
3168 None
3169 }
3170 })
3171 .collect::<HashMap<_, _>>();
3172 if digest_to_validator.is_empty() {
3173 panic!(
3174 "Given split brain condition, there should be at \
3175 least one validator that disagrees with local signature"
3176 );
3177 }
3178
3179 let epoch_store = state.load_epoch_store_one_call_per_task();
3180 let committee = epoch_store
3181 .epoch_start_state()
3182 .get_sui_committee_with_network_metadata();
3183 let network_config = default_mysten_network_config();
3184 let network_clients =
3185 make_network_authority_clients_with_network_config(&committee, &network_config);
3186
3187 let response_futures = digest_to_validator
3189 .values()
3190 .cloned()
3191 .map(|validator| {
3192 let client = network_clients
3193 .get(&validator)
3194 .expect("Failed to get network client");
3195 let request = CheckpointRequestV2 {
3196 sequence_number: Some(local_summary.sequence_number),
3197 request_content: true,
3198 certified: false,
3199 };
3200 client.handle_checkpoint_v2(request)
3201 })
3202 .collect::<Vec<_>>();
3203
3204 let digest_name_pair = digest_to_validator.iter();
3205 let response_data = futures::future::join_all(response_futures)
3206 .await
3207 .into_iter()
3208 .zip(digest_name_pair)
3209 .filter_map(|(response, (digest, name))| match response {
3210 Ok(response) => match response {
3211 CheckpointResponseV2 {
3212 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3213 contents: Some(contents),
3214 } => Some((*name, *digest, summary, contents)),
3215 CheckpointResponseV2 {
3216 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3217 contents: _,
3218 } => {
3219 panic!("Expected pending checkpoint, but got certified checkpoint");
3220 }
3221 CheckpointResponseV2 {
3222 checkpoint: None,
3223 contents: _,
3224 } => {
3225 error!(
3226 "Summary for checkpoint {:?} not found on validator {:?}",
3227 local_summary.sequence_number, name
3228 );
3229 None
3230 }
3231 CheckpointResponseV2 {
3232 checkpoint: _,
3233 contents: None,
3234 } => {
3235 error!(
3236 "Contents for checkpoint {:?} not found on validator {:?}",
3237 local_summary.sequence_number, name
3238 );
3239 None
3240 }
3241 },
3242 Err(e) => {
3243 error!(
3244 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3245 e
3246 );
3247 None
3248 }
3249 })
3250 .collect::<Vec<_>>();
3251
3252 let local_checkpoint_contents = tables
3253 .get_checkpoint_contents(&local_summary.content_digest)
3254 .unwrap_or_else(|_| {
3255 panic!(
3256 "Could not find checkpoint contents for digest {:?}",
3257 local_summary.digest()
3258 )
3259 })
3260 .unwrap_or_else(|| {
3261 panic!(
3262 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3263 local_summary.sequence_number,
3264 local_summary.digest()
3265 )
3266 });
3267 let local_contents_text = format!("{local_checkpoint_contents:?}");
3268
3269 let local_summary_text = format!("{local_summary:?}");
3270 let local_validator = state.name.concise();
3271 let diff_patches = response_data
3272 .iter()
3273 .map(|(name, other_digest, other_summary, contents)| {
3274 let other_contents_text = format!("{contents:?}");
3275 let other_summary_text = format!("{other_summary:?}");
3276 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3277 .enumerate_transactions(&local_summary)
3278 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3279 .unzip();
3280 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3281 .enumerate_transactions(other_summary)
3282 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3283 .unzip();
3284 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3285 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3286 let local_transactions_text = format!("{local_transactions:#?}");
3287 let other_transactions_text = format!("{other_transactions:#?}");
3288 let transactions_patch =
3289 create_patch(&local_transactions_text, &other_transactions_text);
3290 let local_effects_text = format!("{local_effects:#?}");
3291 let other_effects_text = format!("{other_effects:#?}");
3292 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3293 let seq_number = local_summary.sequence_number;
3294 let local_digest = local_summary.digest();
3295 let other_validator = name.concise();
3296 format!(
3297 "Checkpoint: {seq_number:?}\n\
3298 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3299 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3300 Summary Diff: \n{summary_patch}\n\n\
3301 Contents Diff: \n{contents_patch}\n\n\
3302 Transactions Diff: \n{transactions_patch}\n\n\
3303 Effects Diff: \n{effects_patch}",
3304 )
3305 })
3306 .collect::<Vec<_>>()
3307 .join("\n\n\n");
3308
3309 let header = format!(
3310 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3311 Datetime: {:?}",
3312 time
3313 );
3314 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3315 let path = tempfile::tempdir()
3316 .expect("Failed to create tempdir")
3317 .keep()
3318 .join(Path::new("checkpoint_fork_dump.txt"));
3319 let mut file = File::create(path).unwrap();
3320 write!(file, "{}", fork_logs_text).unwrap();
3321 debug!("{}", fork_logs_text);
3322}
3323
3324pub trait CheckpointServiceNotify {
3325 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3326
3327 fn notify_checkpoint(&self) -> SuiResult;
3328}
3329
3330#[allow(clippy::large_enum_variant)]
3331enum CheckpointServiceState {
3332 Unstarted(
3333 (
3334 CheckpointBuilder,
3335 CheckpointAggregator,
3336 CheckpointStateHasher,
3337 ),
3338 ),
3339 Started,
3340}
3341
3342impl CheckpointServiceState {
3343 fn take_unstarted(
3344 &mut self,
3345 ) -> (
3346 CheckpointBuilder,
3347 CheckpointAggregator,
3348 CheckpointStateHasher,
3349 ) {
3350 let mut state = CheckpointServiceState::Started;
3351 std::mem::swap(self, &mut state);
3352
3353 match state {
3354 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3355 (builder, aggregator, hasher)
3356 }
3357 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3358 }
3359 }
3360}
3361
3362pub struct CheckpointService {
3363 tables: Arc<CheckpointStore>,
3364 notify_builder: Arc<Notify>,
3365 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3366 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3368 highest_previously_built_seq: CheckpointSequenceNumber,
3371 metrics: Arc<CheckpointMetrics>,
3372 state: Mutex<CheckpointServiceState>,
3373}
3374
3375impl CheckpointService {
3376 #[allow(clippy::disallowed_methods)]
3382 pub fn build(
3383 state: Arc<AuthorityState>,
3384 checkpoint_store: Arc<CheckpointStore>,
3385 epoch_store: Arc<AuthorityPerEpochStore>,
3386 effects_store: Arc<dyn TransactionCacheRead>,
3387 global_state_hasher: Weak<GlobalStateHasher>,
3388 checkpoint_output: Box<dyn CheckpointOutput>,
3389 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3390 metrics: Arc<CheckpointMetrics>,
3391 max_transactions_per_checkpoint: usize,
3392 max_checkpoint_size_bytes: usize,
3393 ) -> Arc<Self> {
3394 info!(
3395 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3396 );
3397 let notify_builder = Arc::new(Notify::new());
3398 let notify_aggregator = Arc::new(Notify::new());
3399
3400 let highest_previously_built_seq = checkpoint_store
3402 .get_latest_locally_computed_checkpoint()
3403 .expect("failed to get latest locally computed checkpoint")
3404 .map(|s| s.sequence_number)
3405 .unwrap_or(0);
3406
3407 let highest_currently_built_seq =
3408 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3409 .expect("epoch should not have ended")
3410 .map(|(seq, _)| seq)
3411 .unwrap_or(0);
3412
3413 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3414
3415 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3416
3417 let aggregator = CheckpointAggregator::new(
3418 checkpoint_store.clone(),
3419 epoch_store.clone(),
3420 notify_aggregator.clone(),
3421 signature_receiver,
3422 certified_checkpoint_output,
3423 state.clone(),
3424 metrics.clone(),
3425 );
3426
3427 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3428
3429 let ckpt_state_hasher = CheckpointStateHasher::new(
3430 epoch_store.clone(),
3431 global_state_hasher.clone(),
3432 receive_from_builder,
3433 );
3434
3435 let builder = CheckpointBuilder::new(
3436 state.clone(),
3437 checkpoint_store.clone(),
3438 epoch_store.clone(),
3439 notify_builder.clone(),
3440 effects_store,
3441 global_state_hasher,
3442 send_to_hasher,
3443 checkpoint_output,
3444 notify_aggregator.clone(),
3445 highest_currently_built_seq_tx.clone(),
3446 metrics.clone(),
3447 max_transactions_per_checkpoint,
3448 max_checkpoint_size_bytes,
3449 );
3450
3451 Arc::new(Self {
3452 tables: checkpoint_store,
3453 notify_builder,
3454 signature_sender,
3455 highest_currently_built_seq_tx,
3456 highest_previously_built_seq,
3457 metrics,
3458 state: Mutex::new(CheckpointServiceState::Unstarted((
3459 builder,
3460 aggregator,
3461 ckpt_state_hasher,
3462 ))),
3463 })
3464 }
3465
3466 pub async fn spawn(
3474 &self,
3475 epoch_store: Arc<AuthorityPerEpochStore>,
3476 consensus_replay_waiter: Option<ReplayWaiter>,
3477 ) {
3478 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3479
3480 let last_persisted_builder_seq = epoch_store
3490 .last_persisted_checkpoint_builder_summary()
3491 .expect("epoch should not have ended")
3492 .map(|s| s.summary.sequence_number);
3493
3494 let last_executed_seq = self
3495 .tables
3496 .get_highest_executed_checkpoint()
3497 .expect("Failed to get highest executed checkpoint")
3498 .map(|checkpoint| *checkpoint.sequence_number());
3499
3500 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3501 if let Err(e) = builder
3502 .epoch_store
3503 .clear_state_hashes_after_checkpoint(last_committed_seq)
3504 {
3505 error!(
3506 "Failed to clear state hashes after checkpoint {}: {:?}",
3507 last_committed_seq, e
3508 );
3509 } else {
3510 info!(
3511 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3512 last_committed_seq
3513 );
3514 }
3515 }
3516
3517 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3518
3519 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3520 let aggregator_task = spawn_monitored_task!(aggregator.run());
3521
3522 spawn_monitored_task!(async move {
3523 epoch_store
3524 .within_alive_epoch(async move {
3525 builder.run(consensus_replay_waiter).await;
3526 builder_finished_tx.send(()).ok();
3527 })
3528 .await
3529 .ok();
3530
3531 state_hasher_task
3533 .await
3534 .expect("state hasher should exit normally");
3535
3536 aggregator_task.abort();
3539 aggregator_task.await.ok();
3540 });
3541
3542 if tokio::time::timeout(Duration::from_secs(120), async move {
3548 tokio::select! {
3549 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3550 _ = self.wait_for_rebuilt_checkpoints() => (),
3551 }
3552 })
3553 .await
3554 .is_err()
3555 {
3556 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3557 }
3558 }
3559}
3560
3561impl CheckpointService {
3562 pub async fn wait_for_rebuilt_checkpoints(&self) {
3568 let highest_previously_built_seq = self.highest_previously_built_seq;
3569 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3570 let mut highest_currently_built_seq = *rx.borrow_and_update();
3571 info!(
3572 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3573 );
3574 loop {
3575 if highest_currently_built_seq >= highest_previously_built_seq {
3576 info!("Checkpoint rebuild complete");
3577 break;
3578 }
3579 rx.changed().await.unwrap();
3580 highest_currently_built_seq = *rx.borrow_and_update();
3581 }
3582 }
3583
3584 #[cfg(test)]
3585 fn write_and_notify_checkpoint_for_testing(
3586 &self,
3587 epoch_store: &AuthorityPerEpochStore,
3588 checkpoint: PendingCheckpoint,
3589 ) -> SuiResult {
3590 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3591
3592 let mut output = ConsensusCommitOutput::new(0);
3593 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3594 output.set_default_commit_stats_for_testing();
3595 epoch_store.push_consensus_output_for_tests(output);
3596 self.notify_checkpoint()?;
3597 Ok(())
3598 }
3599}
3600
3601impl CheckpointServiceNotify for CheckpointService {
3602 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3603 let sequence = info.summary.sequence_number;
3604 let signer = info.summary.auth_sig().authority.concise();
3605
3606 if let Some(highest_verified_checkpoint) = self
3607 .tables
3608 .get_highest_verified_checkpoint()?
3609 .map(|x| *x.sequence_number())
3610 && sequence <= highest_verified_checkpoint
3611 {
3612 trace!(
3613 checkpoint_seq = sequence,
3614 "Ignore checkpoint signature from {} - already certified", signer,
3615 );
3616 self.metrics
3617 .last_ignored_checkpoint_signature_received
3618 .set(sequence as i64);
3619 return Ok(());
3620 }
3621 trace!(
3622 checkpoint_seq = sequence,
3623 "Received checkpoint signature, digest {} from {}",
3624 info.summary.digest(),
3625 signer,
3626 );
3627 self.metrics
3628 .last_received_checkpoint_signatures
3629 .with_label_values(&[&signer.to_string()])
3630 .set(sequence as i64);
3631 self.signature_sender.send(info.clone()).ok();
3632 Ok(())
3633 }
3634
3635 fn notify_checkpoint(&self) -> SuiResult {
3636 self.notify_builder.notify_one();
3637 Ok(())
3638 }
3639}
3640
3641pub struct CheckpointServiceNoop {}
3643impl CheckpointServiceNotify for CheckpointServiceNoop {
3644 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3645 Ok(())
3646 }
3647
3648 fn notify_checkpoint(&self) -> SuiResult {
3649 Ok(())
3650 }
3651}
3652
3653impl PendingCheckpoint {
3654 pub fn height(&self) -> CheckpointHeight {
3655 self.details.checkpoint_height
3656 }
3657
3658 pub fn roots(&self) -> &Vec<TransactionKey> {
3659 &self.roots
3660 }
3661
3662 pub fn details(&self) -> &PendingCheckpointInfo {
3663 &self.details
3664 }
3665}
3666
3667impl PendingCheckpointV2 {
3668 pub fn height(&self) -> CheckpointHeight {
3669 self.details.checkpoint_height
3670 }
3671
3672 pub(crate) fn num_roots(&self) -> usize {
3673 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3674 }
3675}
3676
3677pin_project! {
3678 pub struct PollCounter<Fut> {
3679 #[pin]
3680 future: Fut,
3681 count: usize,
3682 }
3683}
3684
3685impl<Fut> PollCounter<Fut> {
3686 pub fn new(future: Fut) -> Self {
3687 Self { future, count: 0 }
3688 }
3689
3690 pub fn count(&self) -> usize {
3691 self.count
3692 }
3693}
3694
3695impl<Fut: Future> Future for PollCounter<Fut> {
3696 type Output = (usize, Fut::Output);
3697
3698 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3699 let this = self.project();
3700 *this.count += 1;
3701 match this.future.poll(cx) {
3702 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3703 Poll::Pending => Poll::Pending,
3704 }
3705 }
3706}
3707
3708fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3709 PollCounter::new(future)
3710}
3711
3712#[cfg(test)]
3713mod tests {
3714 use super::*;
3715 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3716 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3717 use futures::FutureExt as _;
3718 use futures::future::BoxFuture;
3719 use std::collections::HashMap;
3720 use std::ops::Deref;
3721 use sui_macros::sim_test;
3722 use sui_protocol_config::{Chain, ProtocolConfig};
3723 use sui_types::accumulator_event::AccumulatorEvent;
3724 use sui_types::authenticator_state::ActiveJwk;
3725 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3726 use sui_types::crypto::Signature;
3727 use sui_types::effects::{TransactionEffects, TransactionEvents};
3728 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3729 use sui_types::transaction::VerifiedTransaction;
3730 use tokio::sync::mpsc;
3731
3732 #[tokio::test]
3733 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3734 let store = CheckpointStore::new_for_tests();
3735 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3736 for seq in 70u64..=80u64 {
3737 let contents =
3738 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3739 [sui_types::base_types::ExecutionDigests::new(
3740 sui_types::digests::TransactionDigest::random(),
3741 sui_types::digests::TransactionEffectsDigest::ZERO,
3742 )],
3743 );
3744 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3745 &protocol,
3746 0,
3747 seq,
3748 0,
3749 &contents,
3750 None,
3751 sui_types::gas::GasCostSummary::default(),
3752 None,
3753 0,
3754 Vec::new(),
3755 Vec::new(),
3756 );
3757 store
3758 .tables
3759 .locally_computed_checkpoints
3760 .insert(&seq, &summary)
3761 .unwrap();
3762 }
3763
3764 store
3765 .clear_locally_computed_checkpoints_from(76)
3766 .expect("clear should succeed");
3767
3768 assert!(
3770 store
3771 .tables
3772 .locally_computed_checkpoints
3773 .get(&75)
3774 .unwrap()
3775 .is_some()
3776 );
3777 assert!(
3778 store
3779 .tables
3780 .locally_computed_checkpoints
3781 .get(&76)
3782 .unwrap()
3783 .is_none()
3784 );
3785
3786 for seq in 70u64..76u64 {
3787 assert!(
3788 store
3789 .tables
3790 .locally_computed_checkpoints
3791 .get(&seq)
3792 .unwrap()
3793 .is_some()
3794 );
3795 }
3796 for seq in 76u64..=80u64 {
3797 assert!(
3798 store
3799 .tables
3800 .locally_computed_checkpoints
3801 .get(&seq)
3802 .unwrap()
3803 .is_none()
3804 );
3805 }
3806 }
3807
3808 #[tokio::test]
3809 async fn test_fork_detection_storage() {
3810 let store = CheckpointStore::new_for_tests();
3811 let seq_num = 42;
3813 let digest = CheckpointDigest::random();
3814
3815 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3816
3817 store
3818 .record_checkpoint_fork_detected(seq_num, digest)
3819 .unwrap();
3820
3821 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3822 assert!(retrieved.is_some());
3823 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3824 assert_eq!(retrieved_seq, seq_num);
3825 assert_eq!(retrieved_digest, digest);
3826
3827 store.clear_checkpoint_fork_detected().unwrap();
3828 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3829
3830 let tx_digest = TransactionDigest::random();
3832 let expected_effects = TransactionEffectsDigest::random();
3833 let actual_effects = TransactionEffectsDigest::random();
3834
3835 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3836
3837 store
3838 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3839 .unwrap();
3840
3841 let retrieved = store.get_transaction_fork_detected().unwrap();
3842 assert!(retrieved.is_some());
3843 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3844 assert_eq!(retrieved_tx, tx_digest);
3845 assert_eq!(retrieved_expected, expected_effects);
3846 assert_eq!(retrieved_actual, actual_effects);
3847
3848 store.clear_transaction_fork_detected().unwrap();
3849 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3850 }
3851
3852 #[sim_test]
3853 pub async fn checkpoint_builder_test() {
3854 telemetry_subscribers::init_for_testing();
3855
3856 let mut protocol_config =
3857 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3858 protocol_config.disable_accumulators_for_testing();
3859 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3860 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3861 let state = TestAuthorityBuilder::new()
3862 .with_protocol_config(protocol_config)
3863 .build()
3864 .await;
3865
3866 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3867 0,
3868 0,
3869 vec![],
3870 SequenceNumber::new(),
3871 );
3872
3873 let jwks = {
3874 let mut jwks = Vec::new();
3875 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3876 jwks.push(ActiveJwk {
3877 jwk_id: JwkId::new(
3878 "https://accounts.google.com".to_string(),
3879 "1234567890".to_string(),
3880 ),
3881 jwk: JWK {
3882 kty: "RSA".to_string(),
3883 e: "AQAB".to_string(),
3884 n: "1234567890".to_string(),
3885 alg: "RS256".to_string(),
3886 },
3887 epoch: 0,
3888 });
3889 }
3890 jwks
3891 };
3892
3893 let dummy_tx_with_data =
3894 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3895
3896 for i in 0..15 {
3897 state
3898 .database_for_testing()
3899 .perpetual_tables
3900 .transactions
3901 .insert(&d(i), dummy_tx.serializable_ref())
3902 .unwrap();
3903 }
3904 for i in 15..20 {
3905 state
3906 .database_for_testing()
3907 .perpetual_tables
3908 .transactions
3909 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3910 .unwrap();
3911 }
3912
3913 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3914 commit_cert_for_test(
3915 &mut store,
3916 state.clone(),
3917 d(1),
3918 vec![d(2), d(3)],
3919 GasCostSummary::new(11, 12, 11, 1),
3920 );
3921 commit_cert_for_test(
3922 &mut store,
3923 state.clone(),
3924 d(2),
3925 vec![d(3), d(4)],
3926 GasCostSummary::new(21, 22, 21, 1),
3927 );
3928 commit_cert_for_test(
3929 &mut store,
3930 state.clone(),
3931 d(3),
3932 vec![],
3933 GasCostSummary::new(31, 32, 31, 1),
3934 );
3935 commit_cert_for_test(
3936 &mut store,
3937 state.clone(),
3938 d(4),
3939 vec![],
3940 GasCostSummary::new(41, 42, 41, 1),
3941 );
3942 for i in [5, 6, 7, 10, 11, 12, 13] {
3943 commit_cert_for_test(
3944 &mut store,
3945 state.clone(),
3946 d(i),
3947 vec![],
3948 GasCostSummary::new(41, 42, 41, 1),
3949 );
3950 }
3951 for i in [15, 16, 17] {
3952 commit_cert_for_test(
3953 &mut store,
3954 state.clone(),
3955 d(i),
3956 vec![],
3957 GasCostSummary::new(51, 52, 51, 1),
3958 );
3959 }
3960 let all_digests: Vec<_> = store.keys().copied().collect();
3961 for digest in all_digests {
3962 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3963 state
3964 .epoch_store_for_testing()
3965 .test_insert_user_signature(digest, vec![(signature, None)]);
3966 }
3967
3968 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3969 let (certified_output, mut certified_result) =
3970 mpsc::channel::<CertifiedCheckpointSummary>(10);
3971 let store = Arc::new(store);
3972
3973 let ckpt_dir = tempfile::tempdir().unwrap();
3974 let checkpoint_store =
3975 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3976 let epoch_store = state.epoch_store_for_testing();
3977
3978 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3979 state.get_global_state_hash_store().clone(),
3980 ));
3981
3982 let checkpoint_service = CheckpointService::build(
3983 state.clone(),
3984 checkpoint_store,
3985 epoch_store.clone(),
3986 store,
3987 Arc::downgrade(&global_state_hasher),
3988 Box::new(output),
3989 Box::new(certified_output),
3990 CheckpointMetrics::new_for_tests(),
3991 3,
3992 100_000,
3993 );
3994 checkpoint_service.spawn(epoch_store.clone(), None).await;
3995
3996 checkpoint_service
3997 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3998 .unwrap();
3999 checkpoint_service
4000 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4001 .unwrap();
4002 checkpoint_service
4003 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4004 .unwrap();
4005 checkpoint_service
4006 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4007 .unwrap();
4008 checkpoint_service
4009 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4010 .unwrap();
4011 checkpoint_service
4012 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4013 .unwrap();
4014
4015 let (c1c, c1s) = result.recv().await.unwrap();
4016 let (c2c, c2s) = result.recv().await.unwrap();
4017
4018 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4019 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4020 assert_eq!(c1t, vec![d(4)]);
4021 assert_eq!(c1s.previous_digest, None);
4022 assert_eq!(c1s.sequence_number, 0);
4023 assert_eq!(
4024 c1s.epoch_rolling_gas_cost_summary,
4025 GasCostSummary::new(41, 42, 41, 1)
4026 );
4027
4028 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4029 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4030 assert_eq!(c2s.sequence_number, 1);
4031 assert_eq!(
4032 c2s.epoch_rolling_gas_cost_summary,
4033 GasCostSummary::new(104, 108, 104, 4)
4034 );
4035
4036 let (c3c, c3s) = result.recv().await.unwrap();
4039 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4040 let (c4c, c4s) = result.recv().await.unwrap();
4041 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4042 assert_eq!(c3s.sequence_number, 2);
4043 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4044 assert_eq!(c4s.sequence_number, 3);
4045 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4046 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4047 assert_eq!(c4t, vec![d(13)]);
4048
4049 let (c5c, c5s) = result.recv().await.unwrap();
4052 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4053 let (c6c, c6s) = result.recv().await.unwrap();
4054 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055 assert_eq!(c5s.sequence_number, 4);
4056 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4057 assert_eq!(c6s.sequence_number, 5);
4058 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4059 assert_eq!(c5t, vec![d(15), d(16)]);
4060 assert_eq!(c6t, vec![d(17)]);
4061
4062 let (c7c, c7s) = result.recv().await.unwrap();
4065 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4066 assert_eq!(c7t, vec![d(5), d(6)]);
4067 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4068 assert_eq!(c7s.sequence_number, 6);
4069
4070 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4071 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4072
4073 checkpoint_service
4074 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4075 .unwrap();
4076 checkpoint_service
4077 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4078 .unwrap();
4079
4080 let c1sc = certified_result.recv().await.unwrap();
4081 let c2sc = certified_result.recv().await.unwrap();
4082 assert_eq!(c1sc.sequence_number, 0);
4083 assert_eq!(c2sc.sequence_number, 1);
4084 }
4085
4086 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4087 fn notify_read_executed_effects_may_fail(
4088 &self,
4089 _: &str,
4090 digests: &[TransactionDigest],
4091 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4092 std::future::ready(Ok(digests
4093 .iter()
4094 .map(|d| self.get(d).expect("effects not found").clone())
4095 .collect()))
4096 .boxed()
4097 }
4098
4099 fn notify_read_executed_effects_digests(
4100 &self,
4101 _: &str,
4102 digests: &[TransactionDigest],
4103 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4104 std::future::ready(
4105 digests
4106 .iter()
4107 .map(|d| {
4108 self.get(d)
4109 .map(|fx| fx.digest())
4110 .expect("effects not found")
4111 })
4112 .collect(),
4113 )
4114 .boxed()
4115 }
4116
4117 fn multi_get_executed_effects(
4118 &self,
4119 digests: &[TransactionDigest],
4120 ) -> Vec<Option<TransactionEffects>> {
4121 digests.iter().map(|d| self.get(d).cloned()).collect()
4122 }
4123
4124 fn multi_get_transaction_blocks(
4130 &self,
4131 _: &[TransactionDigest],
4132 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4133 unimplemented!()
4134 }
4135
4136 fn multi_get_executed_effects_digests(
4137 &self,
4138 _: &[TransactionDigest],
4139 ) -> Vec<Option<TransactionEffectsDigest>> {
4140 unimplemented!()
4141 }
4142
4143 fn multi_get_effects(
4144 &self,
4145 _: &[TransactionEffectsDigest],
4146 ) -> Vec<Option<TransactionEffects>> {
4147 unimplemented!()
4148 }
4149
4150 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4151 unimplemented!()
4152 }
4153
4154 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4155 unimplemented!()
4156 }
4157
4158 fn get_unchanged_loaded_runtime_objects(
4159 &self,
4160 _digest: &TransactionDigest,
4161 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4162 unimplemented!()
4163 }
4164
4165 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4166 unimplemented!()
4167 }
4168 }
4169
4170 #[async_trait::async_trait]
4171 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4172 async fn checkpoint_created(
4173 &self,
4174 summary: &CheckpointSummary,
4175 contents: &CheckpointContents,
4176 _epoch_store: &Arc<AuthorityPerEpochStore>,
4177 _checkpoint_store: &Arc<CheckpointStore>,
4178 ) -> SuiResult {
4179 self.try_send((contents.clone(), summary.clone())).unwrap();
4180 Ok(())
4181 }
4182 }
4183
4184 #[async_trait::async_trait]
4185 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4186 async fn certified_checkpoint_created(
4187 &self,
4188 summary: &CertifiedCheckpointSummary,
4189 ) -> SuiResult {
4190 self.try_send(summary.clone()).unwrap();
4191 Ok(())
4192 }
4193 }
4194
4195 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4196 PendingCheckpoint {
4197 roots: t
4198 .into_iter()
4199 .map(|t| TransactionKey::Digest(d(t)))
4200 .collect(),
4201 details: PendingCheckpointInfo {
4202 timestamp_ms,
4203 last_of_epoch: false,
4204 checkpoint_height: i,
4205 consensus_commit_ref: CommitRef::default(),
4206 rejected_transactions_digest: Digest::default(),
4207 checkpoint_seq: None,
4208 },
4209 }
4210 }
4211
4212 fn d(i: u8) -> TransactionDigest {
4213 let mut bytes: [u8; 32] = Default::default();
4214 bytes[0] = i;
4215 TransactionDigest::new(bytes)
4216 }
4217
4218 fn e(
4219 transaction_digest: TransactionDigest,
4220 dependencies: Vec<TransactionDigest>,
4221 gas_used: GasCostSummary,
4222 ) -> TransactionEffects {
4223 let mut effects = TransactionEffects::default();
4224 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4225 *effects.dependencies_mut_for_testing() = dependencies;
4226 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4227 effects
4228 }
4229
4230 fn commit_cert_for_test(
4231 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4232 state: Arc<AuthorityState>,
4233 digest: TransactionDigest,
4234 dependencies: Vec<TransactionDigest>,
4235 gas_used: GasCostSummary,
4236 ) {
4237 let epoch_store = state.epoch_store_for_testing();
4238 let effects = e(digest, dependencies, gas_used);
4239 store.insert(digest, effects.clone());
4240 epoch_store.insert_executed_in_epoch(&digest);
4241 }
4242}