1mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120 pub roots: Vec<TransactionKey>,
121 pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Default)]
125pub struct CheckpointRoots {
126 pub tx_roots: Vec<TransactionKey>,
127 pub settlement_root: Option<TransactionKey>,
128 pub height: CheckpointHeight,
129}
130
131#[derive(Clone, Debug)]
138pub struct PendingCheckpointV2 {
139 pub roots: Vec<CheckpointRoots>,
140 pub details: PendingCheckpointInfo,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize)]
144pub struct BuilderCheckpointSummary {
145 pub summary: CheckpointSummary,
146 pub checkpoint_height: Option<CheckpointHeight>,
148 pub position_in_commit: usize,
149}
150
151#[derive(DBMapUtils)]
152#[cfg_attr(tidehunter, tidehunter)]
153pub struct CheckpointStoreTables {
154 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
156
157 pub(crate) checkpoint_sequence_by_contents_digest:
159 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
160
161 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
165 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
168
169 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
171 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
173
174 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
178
179 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
181
182 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
185
186 pub(crate) transaction_fork_detected: DBMap<
188 u8,
189 (
190 TransactionDigest,
191 TransactionEffectsDigest,
192 TransactionEffectsDigest,
193 ),
194 >,
195 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
196 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
197}
198
199fn full_checkpoint_content_table_default_config() -> DBOptions {
200 DBOptions {
201 options: default_db_options().options,
202 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
206 }
207}
208
209impl CheckpointStoreTables {
210 #[cfg(not(tidehunter))]
211 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
212 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
213 }
214
215 #[cfg(tidehunter)]
216 pub fn new(
217 path: &Path,
218 metric_name: &'static str,
219 pruner_watermarks: Arc<PrunerWatermarks>,
220 ) -> Self {
221 tracing::warn!("Checkpoint DB using tidehunter");
222 use crate::authority::authority_store_pruner::apply_relocation_filter;
223 use typed_store::tidehunter_util::{
224 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
225 default_mutex_count, default_value_cache_size,
226 };
227 let mutexes = default_mutex_count() * 4;
228 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
229 let override_dirty_keys_config = KeySpaceConfig::new()
230 .with_max_dirty_keys(64_000)
231 .with_value_cache_size(default_value_cache_size());
232 let config_u64 = ThConfig::new_with_config(
233 8,
234 mutexes,
235 u64_sequence_key,
236 override_dirty_keys_config.clone(),
237 );
238 let digest_config = ThConfig::new_with_rm_prefix(
239 32,
240 mutexes,
241 KeyType::uniform(default_cells_per_mutex()),
242 KeySpaceConfig::default(),
243 vec![0, 0, 0, 0, 0, 0, 0, 32],
244 );
245 let watermarks_config = KeySpaceConfig::new()
246 .with_value_cache_size(10)
247 .disable_unload();
248 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
249 let configs = vec![
250 (
251 "checkpoint_content",
252 digest_config.clone().with_config(
253 lru_config
254 .clone()
255 .with_relocation_filter(|_, _| Decision::Remove),
256 ),
257 ),
258 (
259 "checkpoint_sequence_by_contents_digest",
260 digest_config.clone().with_config(apply_relocation_filter(
261 KeySpaceConfig::default(),
262 pruner_watermarks.checkpoint_id.clone(),
263 |sequence_number: CheckpointSequenceNumber| sequence_number,
264 false,
265 )),
266 ),
267 (
268 "full_checkpoint_content",
269 config_u64.clone().with_config(apply_relocation_filter(
270 override_dirty_keys_config.clone(),
271 pruner_watermarks.checkpoint_id.clone(),
272 |sequence_number: CheckpointSequenceNumber| sequence_number,
273 true,
274 )),
275 ),
276 ("certified_checkpoints", config_u64.clone()),
277 (
278 "checkpoint_by_digest",
279 digest_config.clone().with_config(apply_relocation_filter(
280 lru_config,
281 pruner_watermarks.epoch_id.clone(),
282 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
283 false,
284 )),
285 ),
286 (
287 "locally_computed_checkpoints",
288 config_u64.clone().with_config(apply_relocation_filter(
289 override_dirty_keys_config.clone(),
290 pruner_watermarks.checkpoint_id.clone(),
291 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
292 true,
293 )),
294 ),
295 ("epoch_last_checkpoint_map", config_u64.clone()),
296 (
297 "watermarks",
298 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
299 ),
300 (
301 "transaction_fork_detected",
302 ThConfig::new_with_config(
303 1,
304 1,
305 KeyType::uniform(1),
306 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
307 ),
308 ),
309 (
310 "full_checkpoint_content_v2",
311 config_u64.clone().with_config(apply_relocation_filter(
312 override_dirty_keys_config.clone(),
313 pruner_watermarks.checkpoint_id.clone(),
314 |sequence_number: CheckpointSequenceNumber| sequence_number,
315 true,
316 )),
317 ),
318 ];
319 Self::open_tables_read_write(
320 path.to_path_buf(),
321 MetricConf::new(metric_name),
322 configs
323 .into_iter()
324 .map(|(cf, config)| (cf.to_string(), config))
325 .collect(),
326 )
327 }
328
329 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
330 Self::get_read_only_handle(
331 path.to_path_buf(),
332 None,
333 None,
334 MetricConf::new("checkpoint_readonly"),
335 )
336 }
337}
338
339pub struct CheckpointStore {
340 pub(crate) tables: CheckpointStoreTables,
341 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
342 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
343}
344
345impl CheckpointStore {
346 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
347 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
348 Arc::new(Self {
349 tables,
350 synced_checkpoint_notify_read: NotifyRead::new(),
351 executed_checkpoint_notify_read: NotifyRead::new(),
352 })
353 }
354
355 pub fn new_for_tests() -> Arc<Self> {
356 let ckpt_dir = mysten_common::tempdir().unwrap();
357 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
358 }
359
360 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
361 let tables = CheckpointStoreTables::new(
362 path,
363 "db_checkpoint",
364 Arc::new(PrunerWatermarks::default()),
365 );
366 Arc::new(Self {
367 tables,
368 synced_checkpoint_notify_read: NotifyRead::new(),
369 executed_checkpoint_notify_read: NotifyRead::new(),
370 })
371 }
372
373 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
374 CheckpointStoreTables::open_readonly(path)
375 }
376
377 #[instrument(level = "info", skip_all)]
378 pub fn insert_genesis_checkpoint(
379 &self,
380 checkpoint: VerifiedCheckpoint,
381 contents: CheckpointContents,
382 epoch_store: &AuthorityPerEpochStore,
383 ) {
384 assert_eq!(
385 checkpoint.epoch(),
386 0,
387 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
388 );
389 assert_eq!(
390 *checkpoint.sequence_number(),
391 0,
392 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
393 );
394
395 match self.get_checkpoint_by_sequence_number(0).unwrap() {
397 Some(existing_checkpoint) => {
398 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
399 }
400 None => {
401 if epoch_store.epoch() == checkpoint.epoch {
402 epoch_store
403 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
404 .unwrap();
405 } else {
406 debug!(
407 validator_epoch =% epoch_store.epoch(),
408 genesis_epoch =% checkpoint.epoch(),
409 "Not inserting checkpoint builder data for genesis checkpoint",
410 );
411 }
412 self.insert_checkpoint_contents(contents).unwrap();
413 self.insert_verified_checkpoint(&checkpoint).unwrap();
414 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
415 }
416 }
417 }
418
419 pub fn get_checkpoint_by_digest(
420 &self,
421 digest: &CheckpointDigest,
422 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
423 self.tables
424 .checkpoint_by_digest
425 .get(digest)
426 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
427 }
428
429 pub fn get_checkpoint_by_sequence_number(
430 &self,
431 sequence_number: CheckpointSequenceNumber,
432 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433 self.tables
434 .certified_checkpoints
435 .get(&sequence_number)
436 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
437 }
438
439 pub fn get_locally_computed_checkpoint(
440 &self,
441 sequence_number: CheckpointSequenceNumber,
442 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
443 self.tables
444 .locally_computed_checkpoints
445 .get(&sequence_number)
446 }
447
448 pub fn multi_get_locally_computed_checkpoints(
449 &self,
450 sequence_numbers: &[CheckpointSequenceNumber],
451 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
452 let checkpoints = self
453 .tables
454 .locally_computed_checkpoints
455 .multi_get(sequence_numbers)?;
456
457 Ok(checkpoints)
458 }
459
460 pub fn get_sequence_number_by_contents_digest(
461 &self,
462 digest: &CheckpointContentsDigest,
463 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
464 self.tables
465 .checkpoint_sequence_by_contents_digest
466 .get(digest)
467 }
468
469 pub fn delete_contents_digest_sequence_number_mapping(
470 &self,
471 digest: &CheckpointContentsDigest,
472 ) -> Result<(), TypedStoreError> {
473 self.tables
474 .checkpoint_sequence_by_contents_digest
475 .remove(digest)
476 }
477
478 pub fn get_latest_certified_checkpoint(
479 &self,
480 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
481 Ok(self
482 .tables
483 .certified_checkpoints
484 .reversed_safe_iter_with_bounds(None, None)?
485 .next()
486 .transpose()?
487 .map(|(_, v)| v.into()))
488 }
489
490 pub fn get_latest_locally_computed_checkpoint(
491 &self,
492 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
493 Ok(self
494 .tables
495 .locally_computed_checkpoints
496 .reversed_safe_iter_with_bounds(None, None)?
497 .next()
498 .transpose()?
499 .map(|(_, v)| v))
500 }
501
502 pub fn multi_get_checkpoint_by_sequence_number(
503 &self,
504 sequence_numbers: &[CheckpointSequenceNumber],
505 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
506 let checkpoints = self
507 .tables
508 .certified_checkpoints
509 .multi_get(sequence_numbers)?
510 .into_iter()
511 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
512 .collect();
513
514 Ok(checkpoints)
515 }
516
517 pub fn multi_get_checkpoint_content(
518 &self,
519 contents_digest: &[CheckpointContentsDigest],
520 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
521 self.tables.checkpoint_content.multi_get(contents_digest)
522 }
523
524 pub fn get_highest_verified_checkpoint(
525 &self,
526 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
527 let highest_verified = if let Some(highest_verified) = self
528 .tables
529 .watermarks
530 .get(&CheckpointWatermark::HighestVerified)?
531 {
532 highest_verified
533 } else {
534 return Ok(None);
535 };
536 self.get_checkpoint_by_digest(&highest_verified.1)
537 }
538
539 pub fn get_highest_synced_checkpoint(
540 &self,
541 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542 let highest_synced = if let Some(highest_synced) = self
543 .tables
544 .watermarks
545 .get(&CheckpointWatermark::HighestSynced)?
546 {
547 highest_synced
548 } else {
549 return Ok(None);
550 };
551 self.get_checkpoint_by_digest(&highest_synced.1)
552 }
553
554 pub fn get_highest_synced_checkpoint_seq_number(
555 &self,
556 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
557 if let Some(highest_synced) = self
558 .tables
559 .watermarks
560 .get(&CheckpointWatermark::HighestSynced)?
561 {
562 Ok(Some(highest_synced.0))
563 } else {
564 Ok(None)
565 }
566 }
567
568 pub fn get_highest_executed_checkpoint_seq_number(
569 &self,
570 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571 if let Some(highest_executed) = self
572 .tables
573 .watermarks
574 .get(&CheckpointWatermark::HighestExecuted)?
575 {
576 Ok(Some(highest_executed.0))
577 } else {
578 Ok(None)
579 }
580 }
581
582 pub fn get_highest_executed_checkpoint(
583 &self,
584 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
585 let highest_executed = if let Some(highest_executed) = self
586 .tables
587 .watermarks
588 .get(&CheckpointWatermark::HighestExecuted)?
589 {
590 highest_executed
591 } else {
592 return Ok(None);
593 };
594 self.get_checkpoint_by_digest(&highest_executed.1)
595 }
596
597 pub fn get_highest_pruned_checkpoint_seq_number(
598 &self,
599 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
600 self.tables
601 .watermarks
602 .get(&CheckpointWatermark::HighestPruned)
603 .map(|watermark| watermark.map(|w| w.0))
604 }
605
606 pub fn get_checkpoint_contents(
607 &self,
608 digest: &CheckpointContentsDigest,
609 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
610 self.tables.checkpoint_content.get(digest)
611 }
612
613 pub fn get_full_checkpoint_contents_by_sequence_number(
614 &self,
615 seq: CheckpointSequenceNumber,
616 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
617 self.tables.full_checkpoint_content_v2.get(&seq)
618 }
619
620 fn prune_local_summaries(&self) -> SuiResult {
621 if let Some((last_local_summary, _)) = self
622 .tables
623 .locally_computed_checkpoints
624 .reversed_safe_iter_with_bounds(None, None)?
625 .next()
626 .transpose()?
627 {
628 let mut batch = self.tables.locally_computed_checkpoints.batch();
629 batch.schedule_delete_range(
630 &self.tables.locally_computed_checkpoints,
631 &0,
632 &last_local_summary,
633 )?;
634 batch.write()?;
635 info!("Pruned local summaries up to {:?}", last_local_summary);
636 }
637 Ok(())
638 }
639
640 pub fn clear_locally_computed_checkpoints_from(
641 &self,
642 from_seq: CheckpointSequenceNumber,
643 ) -> SuiResult {
644 let keys: Vec<_> = self
645 .tables
646 .locally_computed_checkpoints
647 .safe_iter_with_bounds(Some(from_seq), None)
648 .map(|r| r.map(|(k, _)| k))
649 .collect::<Result<_, _>>()?;
650 if let Some(&last_local_summary) = keys.last() {
651 let mut batch = self.tables.locally_computed_checkpoints.batch();
652 batch
653 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
654 .expect("Failed to delete locally computed checkpoints");
655 batch
656 .write()
657 .expect("Failed to delete locally computed checkpoints");
658 warn!(
659 from_seq,
660 last_local_summary,
661 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
662 from_seq,
663 last_local_summary
664 );
665 }
666 Ok(())
667 }
668
669 fn check_for_checkpoint_fork(
670 &self,
671 local_checkpoint: &CheckpointSummary,
672 verified_checkpoint: &VerifiedCheckpoint,
673 ) {
674 if local_checkpoint != verified_checkpoint.data() {
675 let verified_contents = self
676 .get_checkpoint_contents(&verified_checkpoint.content_digest)
677 .map(|opt_contents| {
678 opt_contents
679 .map(|contents| format!("{:?}", contents))
680 .unwrap_or_else(|| {
681 format!(
682 "Verified checkpoint contents not found, digest: {:?}",
683 verified_checkpoint.content_digest,
684 )
685 })
686 })
687 .map_err(|e| {
688 format!(
689 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
690 verified_checkpoint.content_digest, e
691 )
692 })
693 .unwrap_or_else(|err_msg| err_msg);
694
695 let local_contents = self
696 .get_checkpoint_contents(&local_checkpoint.content_digest)
697 .map(|opt_contents| {
698 opt_contents
699 .map(|contents| format!("{:?}", contents))
700 .unwrap_or_else(|| {
701 format!(
702 "Local checkpoint contents not found, digest: {:?}",
703 local_checkpoint.content_digest
704 )
705 })
706 })
707 .map_err(|e| {
708 format!(
709 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
710 local_checkpoint.content_digest, e
711 )
712 })
713 .unwrap_or_else(|err_msg| err_msg);
714
715 error!(
717 verified_checkpoint = ?verified_checkpoint.data(),
718 ?verified_contents,
719 ?local_checkpoint,
720 ?local_contents,
721 "Local checkpoint fork detected!",
722 );
723
724 if let Err(e) = self.record_checkpoint_fork_detected(
726 *local_checkpoint.sequence_number(),
727 local_checkpoint.digest(),
728 ) {
729 error!("Failed to record checkpoint fork in database: {:?}", e);
730 }
731
732 fail_point_arg!(
733 "kill_checkpoint_fork_node",
734 |checkpoint_overrides: std::sync::Arc<
735 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
736 >| {
737 #[cfg(msim)]
738 {
739 if let Ok(mut overrides) = checkpoint_overrides.lock() {
740 overrides.insert(
741 local_checkpoint.sequence_number,
742 verified_checkpoint.digest().to_string(),
743 );
744 }
745 tracing::error!(
746 fatal = true,
747 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
748 local_checkpoint.sequence_number(),
749 verified_checkpoint.digest()
750 );
751 sui_simulator::task::shutdown_current_node();
752 }
753 }
754 );
755
756 fatal!(
757 "Local checkpoint fork detected for sequence number: {}",
758 local_checkpoint.sequence_number()
759 );
760 }
761 }
762
763 pub fn insert_certified_checkpoint(
769 &self,
770 checkpoint: &VerifiedCheckpoint,
771 ) -> Result<(), TypedStoreError> {
772 debug!(
773 checkpoint_seq = checkpoint.sequence_number(),
774 "Inserting certified checkpoint",
775 );
776 let mut batch = self.tables.certified_checkpoints.batch();
777 batch
778 .insert_batch(
779 &self.tables.certified_checkpoints,
780 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
781 )?
782 .insert_batch(
783 &self.tables.checkpoint_by_digest,
784 [(checkpoint.digest(), checkpoint.serializable_ref())],
785 )?;
786 if checkpoint.next_epoch_committee().is_some() {
787 batch.insert_batch(
788 &self.tables.epoch_last_checkpoint_map,
789 [(&checkpoint.epoch(), checkpoint.sequence_number())],
790 )?;
791 }
792 batch.write()?;
793
794 if let Some(local_checkpoint) = self
795 .tables
796 .locally_computed_checkpoints
797 .get(checkpoint.sequence_number())?
798 {
799 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
800 }
801
802 Ok(())
803 }
804
805 #[instrument(level = "debug", skip_all)]
808 pub fn insert_verified_checkpoint(
809 &self,
810 checkpoint: &VerifiedCheckpoint,
811 ) -> Result<(), TypedStoreError> {
812 self.insert_certified_checkpoint(checkpoint)?;
813 self.update_highest_verified_checkpoint(checkpoint)
814 }
815
816 pub fn update_highest_verified_checkpoint(
817 &self,
818 checkpoint: &VerifiedCheckpoint,
819 ) -> Result<(), TypedStoreError> {
820 if Some(*checkpoint.sequence_number())
821 > self
822 .get_highest_verified_checkpoint()?
823 .map(|x| *x.sequence_number())
824 {
825 debug!(
826 checkpoint_seq = checkpoint.sequence_number(),
827 "Updating highest verified checkpoint",
828 );
829 self.tables.watermarks.insert(
830 &CheckpointWatermark::HighestVerified,
831 &(*checkpoint.sequence_number(), *checkpoint.digest()),
832 )?;
833 }
834
835 Ok(())
836 }
837
838 pub fn update_highest_synced_checkpoint(
839 &self,
840 checkpoint: &VerifiedCheckpoint,
841 ) -> Result<(), TypedStoreError> {
842 let seq = *checkpoint.sequence_number();
843 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
844 self.tables.watermarks.insert(
845 &CheckpointWatermark::HighestSynced,
846 &(seq, *checkpoint.digest()),
847 )?;
848 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
849 Ok(())
850 }
851
852 async fn notify_read_checkpoint_watermark<F>(
853 &self,
854 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
855 seq: CheckpointSequenceNumber,
856 get_watermark: F,
857 ) -> VerifiedCheckpoint
858 where
859 F: Fn() -> Option<CheckpointSequenceNumber>,
860 {
861 notify_read
862 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
863 let seq = seqs[0];
864 let Some(highest) = get_watermark() else {
865 return vec![None];
866 };
867 if highest < seq {
868 return vec![None];
869 }
870 let checkpoint = self
871 .get_checkpoint_by_sequence_number(seq)
872 .expect("db error")
873 .expect("checkpoint not found");
874 vec![Some(checkpoint)]
875 })
876 .await
877 .into_iter()
878 .next()
879 .unwrap()
880 }
881
882 pub async fn notify_read_synced_checkpoint(
883 &self,
884 seq: CheckpointSequenceNumber,
885 ) -> VerifiedCheckpoint {
886 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
887 self.get_highest_synced_checkpoint_seq_number()
888 .expect("db error")
889 })
890 .await
891 }
892
893 pub async fn notify_read_executed_checkpoint(
894 &self,
895 seq: CheckpointSequenceNumber,
896 ) -> VerifiedCheckpoint {
897 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
898 self.get_highest_executed_checkpoint_seq_number()
899 .expect("db error")
900 })
901 .await
902 }
903
904 pub fn update_highest_executed_checkpoint(
905 &self,
906 checkpoint: &VerifiedCheckpoint,
907 ) -> Result<(), TypedStoreError> {
908 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
909 if seq_number >= *checkpoint.sequence_number() {
910 return Ok(());
911 }
912 assert_eq!(
913 seq_number + 1,
914 *checkpoint.sequence_number(),
915 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
916 checkpoint.sequence_number(),
917 seq_number
918 );
919 }
920 let seq = *checkpoint.sequence_number();
921 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
922 self.tables.watermarks.insert(
923 &CheckpointWatermark::HighestExecuted,
924 &(seq, *checkpoint.digest()),
925 )?;
926 self.executed_checkpoint_notify_read
927 .notify(&seq, checkpoint);
928 Ok(())
929 }
930
931 pub fn update_highest_pruned_checkpoint(
932 &self,
933 checkpoint: &VerifiedCheckpoint,
934 ) -> Result<(), TypedStoreError> {
935 self.tables.watermarks.insert(
936 &CheckpointWatermark::HighestPruned,
937 &(*checkpoint.sequence_number(), *checkpoint.digest()),
938 )
939 }
940
941 pub fn set_highest_executed_checkpoint_subtle(
946 &self,
947 checkpoint: &VerifiedCheckpoint,
948 ) -> Result<(), TypedStoreError> {
949 self.tables.watermarks.insert(
950 &CheckpointWatermark::HighestExecuted,
951 &(*checkpoint.sequence_number(), *checkpoint.digest()),
952 )
953 }
954
955 pub fn insert_checkpoint_contents(
956 &self,
957 contents: CheckpointContents,
958 ) -> Result<(), TypedStoreError> {
959 debug!(
960 checkpoint_seq = ?contents.digest(),
961 "Inserting checkpoint contents",
962 );
963 self.tables
964 .checkpoint_content
965 .insert(contents.digest(), &contents)
966 }
967
968 pub fn insert_verified_checkpoint_contents(
969 &self,
970 checkpoint: &VerifiedCheckpoint,
971 full_contents: VerifiedCheckpointContents,
972 ) -> Result<(), TypedStoreError> {
973 let mut batch = self.tables.full_checkpoint_content_v2.batch();
974 batch.insert_batch(
975 &self.tables.checkpoint_sequence_by_contents_digest,
976 [(&checkpoint.content_digest, checkpoint.sequence_number())],
977 )?;
978 let full_contents = full_contents.into_inner();
979 batch.insert_batch(
980 &self.tables.full_checkpoint_content_v2,
981 [(checkpoint.sequence_number(), &full_contents)],
982 )?;
983
984 let contents = full_contents.into_checkpoint_contents();
985 assert_eq!(&checkpoint.content_digest, contents.digest());
986
987 batch.insert_batch(
988 &self.tables.checkpoint_content,
989 [(contents.digest(), &contents)],
990 )?;
991
992 batch.write()
993 }
994
995 pub fn delete_full_checkpoint_contents(
996 &self,
997 seq: CheckpointSequenceNumber,
998 ) -> Result<(), TypedStoreError> {
999 self.tables.full_checkpoint_content.remove(&seq)?;
1000 self.tables.full_checkpoint_content_v2.remove(&seq)
1001 }
1002
1003 pub fn get_epoch_last_checkpoint(
1004 &self,
1005 epoch_id: EpochId,
1006 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1007 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1008 let checkpoint = match seq {
1009 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1010 None => None,
1011 };
1012 Ok(checkpoint)
1013 }
1014
1015 pub fn get_epoch_last_checkpoint_seq_number(
1016 &self,
1017 epoch_id: EpochId,
1018 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1019 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1020 Ok(seq)
1021 }
1022
1023 pub fn insert_epoch_last_checkpoint(
1024 &self,
1025 epoch_id: EpochId,
1026 checkpoint: &VerifiedCheckpoint,
1027 ) -> SuiResult {
1028 self.tables
1029 .epoch_last_checkpoint_map
1030 .insert(&epoch_id, checkpoint.sequence_number())?;
1031 Ok(())
1032 }
1033
1034 pub fn get_epoch_state_commitments(
1035 &self,
1036 epoch: EpochId,
1037 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1038 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1039 checkpoint
1040 .end_of_epoch_data
1041 .as_ref()
1042 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1043 .epoch_commitments
1044 .clone()
1045 });
1046 Ok(commitments)
1047 }
1048
1049 pub fn get_epoch_stats(
1051 &self,
1052 epoch: EpochId,
1053 last_checkpoint: &CheckpointSummary,
1054 ) -> Option<EpochStats> {
1055 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1056 (0, 0)
1057 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1058 (
1059 checkpoint.sequence_number + 1,
1060 checkpoint.network_total_transactions,
1061 )
1062 } else {
1063 return None;
1064 };
1065 Some(EpochStats {
1066 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1067 transaction_count: last_checkpoint.network_total_transactions
1068 - prev_epoch_network_transactions,
1069 total_gas_reward: last_checkpoint
1070 .epoch_rolling_gas_cost_summary
1071 .computation_cost,
1072 })
1073 }
1074
1075 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1076 self.tables
1078 .checkpoint_content
1079 .checkpoint_db(path)
1080 .map_err(Into::into)
1081 }
1082
1083 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1084 let mut wb = self.tables.watermarks.batch();
1085 wb.delete_batch(
1086 &self.tables.watermarks,
1087 std::iter::once(CheckpointWatermark::HighestExecuted),
1088 )?;
1089 wb.write()?;
1090 Ok(())
1091 }
1092
1093 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1094 self.delete_highest_executed_checkpoint_test_only()?;
1095 Ok(())
1096 }
1097
1098 pub fn record_checkpoint_fork_detected(
1099 &self,
1100 checkpoint_seq: CheckpointSequenceNumber,
1101 checkpoint_digest: CheckpointDigest,
1102 ) -> Result<(), TypedStoreError> {
1103 info!(
1104 checkpoint_seq = checkpoint_seq,
1105 checkpoint_digest = ?checkpoint_digest,
1106 "Recording checkpoint fork detection in database"
1107 );
1108 self.tables.watermarks.insert(
1109 &CheckpointWatermark::CheckpointForkDetected,
1110 &(checkpoint_seq, checkpoint_digest),
1111 )
1112 }
1113
1114 pub fn get_checkpoint_fork_detected(
1115 &self,
1116 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1117 self.tables
1118 .watermarks
1119 .get(&CheckpointWatermark::CheckpointForkDetected)
1120 }
1121
1122 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1123 self.tables
1124 .watermarks
1125 .remove(&CheckpointWatermark::CheckpointForkDetected)
1126 }
1127
1128 pub fn record_transaction_fork_detected(
1129 &self,
1130 tx_digest: TransactionDigest,
1131 expected_effects_digest: TransactionEffectsDigest,
1132 actual_effects_digest: TransactionEffectsDigest,
1133 ) -> Result<(), TypedStoreError> {
1134 info!(
1135 tx_digest = ?tx_digest,
1136 expected_effects_digest = ?expected_effects_digest,
1137 actual_effects_digest = ?actual_effects_digest,
1138 "Recording transaction fork detection in database"
1139 );
1140 self.tables.transaction_fork_detected.insert(
1141 &TRANSACTION_FORK_DETECTED_KEY,
1142 &(tx_digest, expected_effects_digest, actual_effects_digest),
1143 )
1144 }
1145
1146 pub fn get_transaction_fork_detected(
1147 &self,
1148 ) -> Result<
1149 Option<(
1150 TransactionDigest,
1151 TransactionEffectsDigest,
1152 TransactionEffectsDigest,
1153 )>,
1154 TypedStoreError,
1155 > {
1156 self.tables
1157 .transaction_fork_detected
1158 .get(&TRANSACTION_FORK_DETECTED_KEY)
1159 }
1160
1161 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1162 self.tables
1163 .transaction_fork_detected
1164 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1165 }
1166}
1167
1168#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1169pub enum CheckpointWatermark {
1170 HighestVerified,
1171 HighestSynced,
1172 HighestExecuted,
1173 HighestPruned,
1174 CheckpointForkDetected,
1175}
1176
1177struct CheckpointStateHasher {
1178 epoch_store: Arc<AuthorityPerEpochStore>,
1179 hasher: Weak<GlobalStateHasher>,
1180 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1181}
1182
1183impl CheckpointStateHasher {
1184 fn new(
1185 epoch_store: Arc<AuthorityPerEpochStore>,
1186 hasher: Weak<GlobalStateHasher>,
1187 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1188 ) -> Self {
1189 Self {
1190 epoch_store,
1191 hasher,
1192 receive_from_builder,
1193 }
1194 }
1195
1196 async fn run(self) {
1197 let Self {
1198 epoch_store,
1199 hasher,
1200 mut receive_from_builder,
1201 } = self;
1202 while let Some((seq, effects)) = receive_from_builder.recv().await {
1203 let Some(hasher) = hasher.upgrade() else {
1204 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1205 break;
1206 };
1207 hasher
1208 .accumulate_checkpoint(&effects, seq, &epoch_store)
1209 .expect("epoch ended while accumulating checkpoint");
1210 }
1211 }
1212}
1213
1214#[derive(Debug)]
1215pub enum CheckpointBuilderError {
1216 ChangeEpochTxAlreadyExecuted,
1217 SystemPackagesMissing,
1218 Retry(anyhow::Error),
1219}
1220
1221impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1222 for CheckpointBuilderError
1223{
1224 fn from(e: SuiError) -> Self {
1225 Self::Retry(e.into())
1226 }
1227}
1228
1229pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1230
1231pub struct CheckpointBuilder {
1232 state: Arc<AuthorityState>,
1233 store: Arc<CheckpointStore>,
1234 epoch_store: Arc<AuthorityPerEpochStore>,
1235 notify: Arc<Notify>,
1236 notify_aggregator: Arc<Notify>,
1237 last_built: watch::Sender<CheckpointSequenceNumber>,
1238 effects_store: Arc<dyn TransactionCacheRead>,
1239 global_state_hasher: Weak<GlobalStateHasher>,
1240 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1241 output: Box<dyn CheckpointOutput>,
1242 metrics: Arc<CheckpointMetrics>,
1243 max_transactions_per_checkpoint: usize,
1244 max_checkpoint_size_bytes: usize,
1245}
1246
1247pub struct CheckpointAggregator {
1248 store: Arc<CheckpointStore>,
1249 epoch_store: Arc<AuthorityPerEpochStore>,
1250 notify: Arc<Notify>,
1251 current: Option<CheckpointSignatureAggregator>,
1252 output: Box<dyn CertifiedCheckpointOutput>,
1253 state: Arc<AuthorityState>,
1254 metrics: Arc<CheckpointMetrics>,
1255}
1256
1257pub struct CheckpointSignatureAggregator {
1259 next_index: u64,
1260 summary: CheckpointSummary,
1261 digest: CheckpointDigest,
1262 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1264 store: Arc<CheckpointStore>,
1265 state: Arc<AuthorityState>,
1266 metrics: Arc<CheckpointMetrics>,
1267}
1268
1269impl CheckpointBuilder {
1270 fn new(
1271 state: Arc<AuthorityState>,
1272 store: Arc<CheckpointStore>,
1273 epoch_store: Arc<AuthorityPerEpochStore>,
1274 notify: Arc<Notify>,
1275 effects_store: Arc<dyn TransactionCacheRead>,
1276 global_state_hasher: Weak<GlobalStateHasher>,
1278 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1280 output: Box<dyn CheckpointOutput>,
1281 notify_aggregator: Arc<Notify>,
1282 last_built: watch::Sender<CheckpointSequenceNumber>,
1283 metrics: Arc<CheckpointMetrics>,
1284 max_transactions_per_checkpoint: usize,
1285 max_checkpoint_size_bytes: usize,
1286 ) -> Self {
1287 Self {
1288 state,
1289 store,
1290 epoch_store,
1291 notify,
1292 effects_store,
1293 global_state_hasher,
1294 send_to_hasher,
1295 output,
1296 notify_aggregator,
1297 last_built,
1298 metrics,
1299 max_transactions_per_checkpoint,
1300 max_checkpoint_size_bytes,
1301 }
1302 }
1303
1304 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1312 if let Some(replay_waiter) = consensus_replay_waiter {
1313 info!("Waiting for consensus commits to replay ...");
1314 replay_waiter.wait_for_replay().await;
1315 info!("Consensus commits finished replaying");
1316 }
1317 info!("Starting CheckpointBuilder");
1318 loop {
1319 match self.maybe_build_checkpoints().await {
1320 Ok(()) => {}
1321 err @ Err(
1322 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1323 | CheckpointBuilderError::SystemPackagesMissing,
1324 ) => {
1325 info!("CheckpointBuilder stopping: {:?}", err);
1326 return;
1327 }
1328 Err(CheckpointBuilderError::Retry(inner)) => {
1329 let msg = format!("{:?}", inner);
1330 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1331 tokio::time::sleep(Duration::from_secs(1)).await;
1332 self.metrics.checkpoint_errors.inc();
1333 continue;
1334 }
1335 }
1336
1337 self.notify.notified().await;
1338 }
1339 }
1340
1341 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1342 if self
1343 .epoch_store
1344 .protocol_config()
1345 .split_checkpoints_in_consensus_handler()
1346 {
1347 self.maybe_build_checkpoints_v2().await
1348 } else {
1349 self.maybe_build_checkpoints_v1().await
1350 }
1351 }
1352
1353 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1354 let _scope = monitored_scope("BuildCheckpoints");
1355
1356 let summary = self
1358 .epoch_store
1359 .last_built_checkpoint_builder_summary()
1360 .expect("epoch should not have ended");
1361 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1362 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1363
1364 let min_checkpoint_interval_ms = self
1365 .epoch_store
1366 .protocol_config()
1367 .min_checkpoint_interval_ms_as_option()
1368 .unwrap_or_default();
1369 let mut grouped_pending_checkpoints = Vec::new();
1370 let mut checkpoints_iter = self
1371 .epoch_store
1372 .get_pending_checkpoints(last_height)
1373 .expect("unexpected epoch store error")
1374 .into_iter()
1375 .peekable();
1376 while let Some((height, pending)) = checkpoints_iter.next() {
1377 let current_timestamp = pending.details().timestamp_ms;
1380 let can_build = match last_timestamp {
1381 Some(last_timestamp) => {
1382 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1383 }
1384 None => true,
1385 } || checkpoints_iter
1388 .peek()
1389 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1390 || pending.details().last_of_epoch;
1392 grouped_pending_checkpoints.push(pending);
1393 if !can_build {
1394 debug!(
1395 checkpoint_commit_height = height,
1396 ?last_timestamp,
1397 ?current_timestamp,
1398 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1399 );
1400 continue;
1401 }
1402
1403 last_height = Some(height);
1405 last_timestamp = Some(current_timestamp);
1406 debug!(
1407 checkpoint_commit_height_from = grouped_pending_checkpoints
1408 .first()
1409 .unwrap()
1410 .details()
1411 .checkpoint_height,
1412 checkpoint_commit_height_to = last_height,
1413 "Making checkpoint with commit height range"
1414 );
1415
1416 let seq = self
1417 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1418 .await?;
1419
1420 self.last_built.send_if_modified(|cur| {
1421 if seq > *cur {
1423 *cur = seq;
1424 true
1425 } else {
1426 false
1427 }
1428 });
1429
1430 tokio::task::yield_now().await;
1433 }
1434 debug!(
1435 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1436 grouped_pending_checkpoints.len(),
1437 );
1438
1439 Ok(())
1440 }
1441
1442 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1443 let _scope = monitored_scope("BuildCheckpoints");
1444
1445 let last_height = self
1447 .epoch_store
1448 .last_built_checkpoint_builder_summary()
1449 .expect("epoch should not have ended")
1450 .and_then(|s| s.checkpoint_height);
1451
1452 for (height, pending) in self
1453 .epoch_store
1454 .get_pending_checkpoints_v2(last_height)
1455 .expect("unexpected epoch store error")
1456 {
1457 debug!(checkpoint_commit_height = height, "Making checkpoint");
1458
1459 let seq = self.make_checkpoint_v2(pending).await?;
1460
1461 self.last_built.send_if_modified(|cur| {
1462 if seq > *cur {
1464 *cur = seq;
1465 true
1466 } else {
1467 false
1468 }
1469 });
1470
1471 tokio::task::yield_now().await;
1474 }
1475
1476 Ok(())
1477 }
1478
1479 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1480 async fn make_checkpoint(
1481 &mut self,
1482 pendings: Vec<PendingCheckpoint>,
1483 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1484 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1485
1486 let pending_ckpt_str = pendings
1487 .iter()
1488 .map(|p| {
1489 format!(
1490 "height={}, commit={}",
1491 p.details().checkpoint_height,
1492 p.details().consensus_commit_ref
1493 )
1494 })
1495 .join("; ");
1496
1497 let last_details = pendings.last().unwrap().details().clone();
1498
1499 let highest_executed_sequence = self
1502 .store
1503 .get_highest_executed_checkpoint_seq_number()
1504 .expect("db error")
1505 .unwrap_or(0);
1506
1507 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1508 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1509
1510 let new_checkpoints = self
1511 .create_checkpoints(
1512 sorted_tx_effects_included_in_checkpoint,
1513 &last_details,
1514 &all_roots,
1515 )
1516 .await?;
1517 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1518 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1519 debug_fatal!(
1520 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1521 );
1522 }
1523
1524 let new_ckpt_str = new_checkpoints
1525 .iter()
1526 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1527 .join("; ");
1528
1529 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1530 .await?;
1531 info!(
1532 "Made new checkpoint {} from pending checkpoint {}",
1533 new_ckpt_str, pending_ckpt_str
1534 );
1535
1536 Ok(highest_sequence)
1537 }
1538
1539 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1540 async fn make_checkpoint_v2(
1541 &mut self,
1542 pending: PendingCheckpointV2,
1543 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1544 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1545
1546 let details = pending.details.clone();
1547
1548 let highest_executed_sequence = self
1549 .store
1550 .get_highest_executed_checkpoint_seq_number()
1551 .expect("db error")
1552 .unwrap_or(0);
1553
1554 let (poll_count, result) =
1555 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1556 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1557
1558 let new_checkpoints = self
1559 .create_checkpoints(
1560 sorted_tx_effects_included_in_checkpoint,
1561 &details,
1562 &all_roots,
1563 )
1564 .await?;
1565 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1566 let sequence = *new_checkpoints.first().0.sequence_number();
1567 let digest = new_checkpoints.first().0.digest();
1568 if sequence <= highest_executed_sequence && poll_count > 1 {
1569 debug_fatal!(
1570 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1571 );
1572 }
1573
1574 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1575 .await?;
1576 info!(
1577 seq = sequence,
1578 %digest,
1579 height = details.checkpoint_height,
1580 commit = %details.consensus_commit_ref,
1581 "Made new checkpoint"
1582 );
1583
1584 Ok(sequence)
1585 }
1586
1587 async fn construct_and_execute_settlement_transactions(
1588 &self,
1589 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1590 checkpoint_height: CheckpointHeight,
1591 checkpoint_seq: CheckpointSequenceNumber,
1592 tx_index_offset: u64,
1593 ) -> (TransactionKey, Vec<TransactionEffects>) {
1594 let _scope =
1595 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1596
1597 let tx_key =
1598 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1599
1600 let epoch = self.epoch_store.epoch();
1601 let accumulator_root_obj_initial_shared_version = self
1602 .epoch_store
1603 .epoch_start_config()
1604 .accumulator_root_obj_initial_shared_version()
1605 .expect("accumulator root object must exist");
1606
1607 let builder = AccumulatorSettlementTxBuilder::new(
1608 Some(self.effects_store.as_ref()),
1609 sorted_tx_effects_included_in_checkpoint,
1610 checkpoint_seq,
1611 tx_index_offset,
1612 );
1613
1614 let funds_changes = builder.collect_funds_changes();
1615 let num_updates = builder.num_updates();
1616 let settlement_txns = builder.build_tx(
1617 self.epoch_store.protocol_config(),
1618 epoch,
1619 accumulator_root_obj_initial_shared_version,
1620 checkpoint_height,
1621 checkpoint_seq,
1622 );
1623
1624 let settlement_txns: Vec<_> = settlement_txns
1625 .into_iter()
1626 .map(|tx| {
1627 VerifiedExecutableTransaction::new_system(
1628 VerifiedTransaction::new_system_transaction(tx),
1629 self.epoch_store.epoch(),
1630 )
1631 })
1632 .collect();
1633
1634 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1635
1636 debug!(
1637 ?settlement_digests,
1638 ?tx_key,
1639 "created settlement transactions with {num_updates} updates"
1640 );
1641
1642 self.epoch_store
1643 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1644
1645 let settlement_effects = wait_for_effects_with_retry(
1646 self.effects_store.as_ref(),
1647 "CheckpointBuilder::notify_read_settlement_effects",
1648 &settlement_digests,
1649 tx_key,
1650 )
1651 .await;
1652
1653 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1654 epoch,
1655 accumulator_root_obj_initial_shared_version,
1656 checkpoint_height,
1657 &settlement_effects,
1658 );
1659
1660 let barrier_tx = VerifiedExecutableTransaction::new_system(
1661 VerifiedTransaction::new_system_transaction(barrier_tx),
1662 self.epoch_store.epoch(),
1663 );
1664 let barrier_digest = *barrier_tx.digest();
1665
1666 self.epoch_store
1667 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1668
1669 let barrier_effects = wait_for_effects_with_retry(
1670 self.effects_store.as_ref(),
1671 "CheckpointBuilder::notify_read_barrier_effects",
1672 &[barrier_digest],
1673 tx_key,
1674 )
1675 .await;
1676
1677 let settlement_effects: Vec<_> = settlement_effects
1678 .into_iter()
1679 .chain(barrier_effects)
1680 .collect();
1681
1682 let mut next_accumulator_version = None;
1683 for fx in settlement_effects.iter() {
1684 assert!(
1685 fx.status().is_ok(),
1686 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1687 fx.transaction_digest(),
1688 fx
1689 );
1690 if let Some(version) = fx
1691 .mutated()
1692 .iter()
1693 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1694 {
1695 assert!(
1696 next_accumulator_version.is_none(),
1697 "Only one settlement transaction should mutate the accumulator root object"
1698 );
1699 next_accumulator_version = Some(version);
1700 }
1701 }
1702 let settlements = FundsSettlement {
1703 next_accumulator_version: next_accumulator_version
1704 .expect("Accumulator root object should be mutated in the settlement transactions"),
1705 funds_changes,
1706 };
1707
1708 self.state
1709 .execution_scheduler()
1710 .settle_address_funds(settlements);
1711
1712 (tx_key, settlement_effects)
1713 }
1714
1715 #[instrument(level = "debug", skip_all)]
1720 async fn resolve_checkpoint_transactions(
1721 &self,
1722 pending_checkpoints: Vec<PendingCheckpoint>,
1723 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1724 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1725
1726 let mut effects_in_current_checkpoint = BTreeSet::new();
1731
1732 let mut tx_effects = Vec::new();
1733 let mut tx_roots = HashSet::new();
1734
1735 for pending_checkpoint in pending_checkpoints.into_iter() {
1736 let mut pending = pending_checkpoint;
1737 debug!(
1738 checkpoint_commit_height = pending.details.checkpoint_height,
1739 "Resolving checkpoint transactions for pending checkpoint.",
1740 );
1741
1742 trace!(
1743 "roots for pending checkpoint {:?}: {:?}",
1744 pending.details.checkpoint_height, pending.roots,
1745 );
1746
1747 let settlement_root = if self.epoch_store.accumulators_enabled() {
1748 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1749 pending.roots.pop()
1750 else {
1751 fatal!("No settlement root found");
1752 };
1753 Some(settlement_root)
1754 } else {
1755 None
1756 };
1757
1758 let roots = &pending.roots;
1759
1760 self.metrics
1761 .checkpoint_roots_count
1762 .inc_by(roots.len() as u64);
1763
1764 let root_digests = self
1765 .epoch_store
1766 .notify_read_tx_key_to_digest(roots)
1767 .in_monitored_scope("CheckpointNotifyDigests")
1768 .await?;
1769 let root_effects = self
1770 .effects_store
1771 .notify_read_executed_effects(
1772 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1773 &root_digests,
1774 )
1775 .in_monitored_scope("CheckpointNotifyRead")
1776 .await;
1777
1778 assert!(
1779 self.epoch_store
1780 .protocol_config()
1781 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1782 );
1783
1784 let consensus_commit_prologue =
1787 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1788
1789 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1792 let unsorted_ccp = self.complete_checkpoint_effects(
1793 vec![ccp_effects.clone()],
1794 &mut effects_in_current_checkpoint,
1795 )?;
1796
1797 if unsorted_ccp.len() != 1 {
1800 fatal!(
1801 "Expected 1 consensus commit prologue, got {:?}",
1802 unsorted_ccp
1803 .iter()
1804 .map(|e| e.transaction_digest())
1805 .collect::<Vec<_>>()
1806 );
1807 }
1808 assert_eq!(unsorted_ccp.len(), 1);
1809 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1810 }
1811
1812 let unsorted =
1813 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1814
1815 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1816 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1817 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1818 if cfg!(debug_assertions) {
1819 for tx in unsorted.iter() {
1821 assert!(tx.transaction_digest() != &ccp_digest);
1822 }
1823 }
1824 sorted.push(ccp_effects);
1825 }
1826 sorted.extend(CausalOrder::causal_sort(unsorted));
1827
1828 if let Some(settlement_root) = settlement_root {
1829 let last_checkpoint =
1832 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1833 let next_checkpoint_seq = last_checkpoint
1834 .as_ref()
1835 .map(|(seq, _)| *seq)
1836 .unwrap_or_default()
1837 + 1;
1838 let tx_index_offset = tx_effects.len() as u64;
1839
1840 let (tx_key, settlement_effects) = self
1841 .construct_and_execute_settlement_transactions(
1842 &sorted,
1843 pending.details.checkpoint_height,
1844 next_checkpoint_seq,
1845 tx_index_offset,
1846 )
1847 .await;
1848 debug!(?tx_key, "executed settlement transactions");
1849
1850 assert_eq!(settlement_root, tx_key);
1851
1852 sorted.extend(settlement_effects);
1859 }
1860
1861 #[cfg(msim)]
1862 {
1863 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1865 }
1866
1867 tx_effects.extend(sorted);
1868 tx_roots.extend(root_digests);
1869 }
1870
1871 Ok((tx_effects, tx_roots))
1872 }
1873
1874 #[instrument(level = "debug", skip_all)]
1877 async fn resolve_checkpoint_transactions_v2(
1878 &self,
1879 pending: PendingCheckpointV2,
1880 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1881 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1882
1883 let mut effects_in_current_checkpoint = BTreeSet::new();
1884 debug!(
1885 checkpoint_commit_height = pending.details.checkpoint_height,
1886 "Resolving checkpoint transactions for pending checkpoint.",
1887 );
1888
1889 trace!(
1890 "roots for pending checkpoint {:?}: {:?}",
1891 pending.details.checkpoint_height, pending.roots,
1892 );
1893
1894 assert!(
1895 self.epoch_store
1896 .protocol_config()
1897 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1898 );
1899
1900 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1901 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1902
1903 let last_checkpoint =
1904 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1905 let next_checkpoint_seq = last_checkpoint
1906 .as_ref()
1907 .map(|(seq, _)| *seq)
1908 .unwrap_or_default()
1909 + 1;
1910
1911 for checkpoint_roots in &pending.roots {
1912 let tx_roots = &checkpoint_roots.tx_roots;
1913
1914 self.metrics
1915 .checkpoint_roots_count
1916 .inc_by(tx_roots.len() as u64);
1917
1918 let root_digests = self
1919 .epoch_store
1920 .notify_read_tx_key_to_digest(tx_roots)
1921 .in_monitored_scope("CheckpointNotifyDigests")
1922 .await?;
1923
1924 all_root_digests.extend(root_digests.iter().cloned());
1925
1926 let root_effects = self
1927 .effects_store
1928 .notify_read_executed_effects(
1929 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1930 &root_digests,
1931 )
1932 .in_monitored_scope("CheckpointNotifyRead")
1933 .await;
1934
1935 let consensus_commit_prologue = {
1936 let ccp = self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1937
1938 if let Some((ccp_digest, ccp_effects)) = &ccp {
1939 let unsorted_ccp = self.complete_checkpoint_effects(
1940 vec![ccp_effects.clone()],
1941 &mut effects_in_current_checkpoint,
1942 )?;
1943
1944 if unsorted_ccp.is_empty() {
1945 None
1949 } else if unsorted_ccp.len() != 1 {
1950 fatal!(
1951 "Expected 1 consensus commit prologue, got {:?}",
1952 unsorted_ccp
1953 .iter()
1954 .map(|e| e.transaction_digest())
1955 .collect::<Vec<_>>()
1956 );
1957 } else {
1958 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1959 ccp.clone()
1960 }
1961 } else {
1962 None
1963 }
1964 };
1965
1966 let unsorted =
1967 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1968
1969 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1970 let tx_index_offset = all_effects.len() as u64;
1971 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1972
1973 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1974 if cfg!(debug_assertions) {
1975 for tx in unsorted.iter() {
1976 assert!(tx.transaction_digest() != &ccp_digest);
1977 }
1978 }
1979 sorted.push(ccp_effects);
1980 }
1981 sorted.extend(CausalOrder::causal_sort(unsorted));
1982
1983 if checkpoint_roots.settlement_root.is_some() {
1984 let (tx_key, settlement_effects) = self
1985 .construct_and_execute_settlement_transactions(
1986 &sorted,
1987 checkpoint_roots.height,
1988 next_checkpoint_seq,
1989 tx_index_offset,
1990 )
1991 .await;
1992 debug!(?tx_key, "executed settlement transactions");
1993
1994 sorted.extend(settlement_effects);
1995 }
1996
1997 all_effects.extend(sorted);
1998 }
1999
2000 #[cfg(msim)]
2001 {
2002 self.expensive_consensus_commit_prologue_invariants_check_v2(
2003 &all_root_digests,
2004 &all_effects,
2005 );
2006 }
2007 Ok((all_effects, all_root_digests.into_iter().collect()))
2008 }
2009
2010 fn extract_consensus_commit_prologue(
2013 &self,
2014 root_digests: &[TransactionDigest],
2015 root_effects: &[TransactionEffects],
2016 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2017 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2018 if root_digests.is_empty() {
2019 return Ok(None);
2020 }
2021
2022 let first_tx = self
2026 .state
2027 .get_transaction_cache_reader()
2028 .get_transaction_block(&root_digests[0])
2029 .expect("Transaction block must exist");
2030
2031 Ok(first_tx
2032 .transaction_data()
2033 .is_consensus_commit_prologue()
2034 .then(|| {
2035 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2036 (*first_tx.digest(), root_effects[0].clone())
2037 }))
2038 }
2039
2040 #[instrument(level = "debug", skip_all)]
2041 async fn write_checkpoints(
2042 &mut self,
2043 height: CheckpointHeight,
2044 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2045 ) -> SuiResult {
2046 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2047 let mut batch = self.store.tables.checkpoint_content.batch();
2048 let mut all_tx_digests =
2049 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2050
2051 for (summary, contents) in &new_checkpoints {
2052 debug!(
2053 checkpoint_commit_height = height,
2054 checkpoint_seq = summary.sequence_number,
2055 contents_digest = ?contents.digest(),
2056 "writing checkpoint",
2057 );
2058
2059 if let Some(previously_computed_summary) = self
2060 .store
2061 .tables
2062 .locally_computed_checkpoints
2063 .get(&summary.sequence_number)?
2064 && previously_computed_summary.digest() != summary.digest()
2065 {
2066 fatal!(
2067 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2068 summary.sequence_number,
2069 previously_computed_summary.digest(),
2070 summary.digest()
2071 );
2072 }
2073
2074 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2075
2076 self.metrics
2077 .transactions_included_in_checkpoint
2078 .inc_by(contents.size() as u64);
2079 let sequence_number = summary.sequence_number;
2080 self.metrics
2081 .last_constructed_checkpoint
2082 .set(sequence_number as i64);
2083
2084 batch.insert_batch(
2085 &self.store.tables.checkpoint_content,
2086 [(contents.digest(), contents)],
2087 )?;
2088
2089 batch.insert_batch(
2090 &self.store.tables.locally_computed_checkpoints,
2091 [(sequence_number, summary)],
2092 )?;
2093 }
2094
2095 batch.write()?;
2096
2097 for (summary, contents) in &new_checkpoints {
2099 self.output
2100 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2101 .await?;
2102 }
2103
2104 for (local_checkpoint, _) in &new_checkpoints {
2105 if let Some(certified_checkpoint) = self
2106 .store
2107 .tables
2108 .certified_checkpoints
2109 .get(local_checkpoint.sequence_number())?
2110 {
2111 self.store
2112 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2113 }
2114 }
2115
2116 self.notify_aggregator.notify_one();
2117 self.epoch_store
2118 .process_constructed_checkpoint(height, new_checkpoints);
2119 Ok(())
2120 }
2121
2122 #[allow(clippy::type_complexity)]
2123 fn split_checkpoint_chunks(
2124 &self,
2125 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2126 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2127 ) -> CheckpointBuilderResult<
2128 Vec<
2129 Vec<(
2130 TransactionEffects,
2131 Vec<(GenericSignature, Option<SequenceNumber>)>,
2132 )>,
2133 >,
2134 > {
2135 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2136
2137 if self
2139 .epoch_store
2140 .protocol_config()
2141 .split_checkpoints_in_consensus_handler()
2142 {
2143 let chunk: Vec<_> = effects_and_transaction_sizes
2144 .into_iter()
2145 .zip(signatures)
2146 .map(|((effects, _size), sigs)| (effects, sigs))
2147 .collect();
2148 return Ok(vec![chunk]);
2149 }
2150 let mut chunks = Vec::new();
2151 let mut chunk = Vec::new();
2152 let mut chunk_size: usize = 0;
2153 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2154 .into_iter()
2155 .zip(signatures.into_iter())
2156 {
2157 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2162 bcs::serialized_size(&signatures)?
2163 } else {
2164 let signatures: Vec<&GenericSignature> =
2165 signatures.iter().map(|(s, _)| s).collect();
2166 bcs::serialized_size(&signatures)?
2167 };
2168 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2169 if chunk.len() == self.max_transactions_per_checkpoint
2170 || (chunk_size + size) > self.max_checkpoint_size_bytes
2171 {
2172 if chunk.is_empty() {
2173 warn!(
2175 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2176 self.max_checkpoint_size_bytes
2177 );
2178 } else {
2179 chunks.push(chunk);
2180 chunk = Vec::new();
2181 chunk_size = 0;
2182 }
2183 }
2184
2185 chunk.push((effects, signatures));
2186 chunk_size += size;
2187 }
2188
2189 if !chunk.is_empty() || chunks.is_empty() {
2190 chunks.push(chunk);
2195 }
2200 Ok(chunks)
2201 }
2202
2203 fn load_last_built_checkpoint_summary(
2204 epoch_store: &AuthorityPerEpochStore,
2205 store: &CheckpointStore,
2206 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2207 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2208 if last_checkpoint.is_none() {
2209 let epoch = epoch_store.epoch();
2210 if epoch > 0 {
2211 let previous_epoch = epoch - 1;
2212 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2213 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2214 if let Some((ref seq, _)) = last_checkpoint {
2215 debug!(
2216 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2217 );
2218 } else {
2219 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2221 }
2222 }
2223 }
2224 Ok(last_checkpoint)
2225 }
2226
2227 #[instrument(level = "debug", skip_all)]
2228 async fn create_checkpoints(
2229 &self,
2230 all_effects: Vec<TransactionEffects>,
2231 details: &PendingCheckpointInfo,
2232 all_roots: &HashSet<TransactionDigest>,
2233 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2234 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2235
2236 let total = all_effects.len();
2237 let mut last_checkpoint =
2238 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2239 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2240 debug!(
2241 checkpoint_commit_height = details.checkpoint_height,
2242 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2243 checkpoint_timestamp = details.timestamp_ms,
2244 "Creating checkpoint(s) for {} transactions",
2245 all_effects.len(),
2246 );
2247
2248 let all_digests: Vec<_> = all_effects
2249 .iter()
2250 .map(|effect| *effect.transaction_digest())
2251 .collect();
2252 let transactions_and_sizes = self
2253 .state
2254 .get_transaction_cache_reader()
2255 .get_transactions_and_serialized_sizes(&all_digests)?;
2256 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2257 let mut transactions = Vec::with_capacity(all_effects.len());
2258 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2259 let mut randomness_rounds = BTreeMap::new();
2260 {
2261 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2262 debug!(
2263 ?last_checkpoint_seq,
2264 "Waiting for {:?} certificates to appear in consensus",
2265 all_effects.len()
2266 );
2267
2268 for (effects, transaction_and_size) in all_effects
2269 .into_iter()
2270 .zip(transactions_and_sizes.into_iter())
2271 {
2272 let (transaction, size) = transaction_and_size
2273 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2274 match transaction.inner().transaction_data().kind() {
2275 TransactionKind::ConsensusCommitPrologue(_)
2276 | TransactionKind::ConsensusCommitPrologueV2(_)
2277 | TransactionKind::ConsensusCommitPrologueV3(_)
2278 | TransactionKind::ConsensusCommitPrologueV4(_)
2279 | TransactionKind::AuthenticatorStateUpdate(_) => {
2280 }
2283 TransactionKind::ProgrammableSystemTransaction(_) => {
2284 }
2286 TransactionKind::ChangeEpoch(_)
2287 | TransactionKind::Genesis(_)
2288 | TransactionKind::EndOfEpochTransaction(_) => {
2289 fatal!(
2290 "unexpected transaction in checkpoint effects: {:?}",
2291 transaction
2292 );
2293 }
2294 TransactionKind::RandomnessStateUpdate(rsu) => {
2295 randomness_rounds
2296 .insert(*effects.transaction_digest(), rsu.randomness_round);
2297 }
2298 TransactionKind::ProgrammableTransaction(_) => {
2299 let digest = *effects.transaction_digest();
2303 if !all_roots.contains(&digest) {
2304 transaction_keys.push(SequencedConsensusTransactionKey::External(
2305 ConsensusTransactionKey::Certificate(digest),
2306 ));
2307 }
2308 }
2309 }
2310 transactions.push(transaction);
2311 all_effects_and_transaction_sizes.push((effects, size));
2312 }
2313
2314 self.epoch_store
2315 .consensus_messages_processed_notify(transaction_keys)
2316 .await?;
2317 }
2318
2319 let signatures = self
2320 .epoch_store
2321 .user_signatures_for_checkpoint(&transactions, &all_digests);
2322 debug!(
2323 ?last_checkpoint_seq,
2324 "Received {} checkpoint user signatures from consensus",
2325 signatures.len()
2326 );
2327
2328 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2329 Some(
2330 transactions
2331 .iter()
2332 .flat_map(|tx| {
2333 if let TransactionKind::ProgrammableTransaction(ptb) =
2334 tx.transaction_data().kind()
2335 {
2336 itertools::Either::Left(
2337 ptb.commands
2338 .iter()
2339 .map(ExecutionTimeObservationKey::from_command),
2340 )
2341 } else {
2342 itertools::Either::Right(std::iter::empty())
2343 }
2344 })
2345 .collect(),
2346 )
2347 } else {
2348 None
2349 };
2350
2351 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2352 let chunks_count = chunks.len();
2353
2354 let mut checkpoints = Vec::with_capacity(chunks_count);
2355 debug!(
2356 ?last_checkpoint_seq,
2357 "Creating {} checkpoints with {} transactions", chunks_count, total,
2358 );
2359
2360 let epoch = self.epoch_store.epoch();
2361 for (index, transactions) in chunks.into_iter().enumerate() {
2362 let first_checkpoint_of_epoch = index == 0
2363 && last_checkpoint
2364 .as_ref()
2365 .map(|(_, c)| c.epoch != epoch)
2366 .unwrap_or(true);
2367 if first_checkpoint_of_epoch {
2368 self.epoch_store
2369 .record_epoch_first_checkpoint_creation_time_metric();
2370 }
2371 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2372
2373 let sequence_number = last_checkpoint
2374 .as_ref()
2375 .map(|(_, c)| c.sequence_number + 1)
2376 .unwrap_or_default();
2377 let mut timestamp_ms = details.timestamp_ms;
2378 if let Some((_, last_checkpoint)) = &last_checkpoint
2379 && last_checkpoint.timestamp_ms > timestamp_ms
2380 {
2381 debug!(
2383 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2384 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2385 );
2386 if self
2387 .epoch_store
2388 .protocol_config()
2389 .enforce_checkpoint_timestamp_monotonicity()
2390 {
2391 timestamp_ms = last_checkpoint.timestamp_ms;
2392 }
2393 }
2394
2395 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2396 let epoch_rolling_gas_cost_summary =
2397 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2398
2399 let end_of_epoch_data = if last_checkpoint_of_epoch {
2400 let system_state_obj = self
2401 .augment_epoch_last_checkpoint(
2402 &epoch_rolling_gas_cost_summary,
2403 timestamp_ms,
2404 &mut effects,
2405 &mut signatures,
2406 sequence_number,
2407 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2408 last_checkpoint_seq.unwrap_or_default(),
2409 )
2410 .await?;
2411
2412 let committee = system_state_obj
2413 .get_current_epoch_committee()
2414 .committee()
2415 .clone();
2416
2417 let root_state_digest = {
2420 let state_acc = self
2421 .global_state_hasher
2422 .upgrade()
2423 .expect("No checkpoints should be getting built after local configuration");
2424 let acc = state_acc.accumulate_checkpoint(
2425 &effects,
2426 sequence_number,
2427 &self.epoch_store,
2428 )?;
2429
2430 state_acc
2431 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2432 .await?;
2433
2434 state_acc.accumulate_running_root(
2435 &self.epoch_store,
2436 sequence_number,
2437 Some(acc),
2438 )?;
2439 state_acc
2440 .digest_epoch(self.epoch_store.clone(), sequence_number)
2441 .await?
2442 };
2443 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2444 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2445
2446 let epoch_commitments = if self
2447 .epoch_store
2448 .protocol_config()
2449 .check_commit_root_state_digest_supported()
2450 {
2451 vec![root_state_digest.into()]
2452 } else {
2453 vec![]
2454 };
2455
2456 Some(EndOfEpochData {
2457 next_epoch_committee: committee.voting_rights,
2458 next_epoch_protocol_version: ProtocolVersion::new(
2459 system_state_obj.protocol_version(),
2460 ),
2461 epoch_commitments,
2462 })
2463 } else {
2464 self.send_to_hasher
2465 .send((sequence_number, effects.clone()))
2466 .await?;
2467
2468 None
2469 };
2470 let contents = if self.epoch_store.protocol_config().address_aliases() {
2471 CheckpointContents::new_v2(&effects, signatures)
2472 } else {
2473 CheckpointContents::new_with_digests_and_signatures(
2474 effects.iter().map(TransactionEffects::execution_digests),
2475 signatures
2476 .into_iter()
2477 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2478 .collect(),
2479 )
2480 };
2481
2482 let num_txns = contents.size() as u64;
2483
2484 let network_total_transactions = last_checkpoint
2485 .as_ref()
2486 .map(|(_, c)| c.network_total_transactions + num_txns)
2487 .unwrap_or(num_txns);
2488
2489 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2490
2491 let matching_randomness_rounds: Vec<_> = effects
2492 .iter()
2493 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2494 .copied()
2495 .collect();
2496
2497 let checkpoint_commitments = if self
2498 .epoch_store
2499 .protocol_config()
2500 .include_checkpoint_artifacts_digest_in_summary()
2501 {
2502 let artifacts = CheckpointArtifacts::from(&effects[..]);
2503 let artifacts_digest = artifacts.digest()?;
2504 vec![artifacts_digest.into()]
2505 } else {
2506 Default::default()
2507 };
2508
2509 let summary = CheckpointSummary::new(
2510 self.epoch_store.protocol_config(),
2511 epoch,
2512 sequence_number,
2513 network_total_transactions,
2514 &contents,
2515 previous_digest,
2516 epoch_rolling_gas_cost_summary,
2517 end_of_epoch_data,
2518 timestamp_ms,
2519 matching_randomness_rounds,
2520 checkpoint_commitments,
2521 );
2522 summary.report_checkpoint_age(
2523 &self.metrics.last_created_checkpoint_age,
2524 &self.metrics.last_created_checkpoint_age_ms,
2525 );
2526 if last_checkpoint_of_epoch {
2527 info!(
2528 checkpoint_seq = sequence_number,
2529 "creating last checkpoint of epoch {}", epoch
2530 );
2531 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2532 self.epoch_store
2533 .report_epoch_metrics_at_last_checkpoint(stats);
2534 }
2535 }
2536 last_checkpoint = Some((sequence_number, summary.clone()));
2537 checkpoints.push((summary, contents));
2538 }
2539
2540 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2541 }
2542
2543 fn get_epoch_total_gas_cost(
2544 &self,
2545 last_checkpoint: Option<&CheckpointSummary>,
2546 cur_checkpoint_effects: &[TransactionEffects],
2547 ) -> GasCostSummary {
2548 let (previous_epoch, previous_gas_costs) = last_checkpoint
2549 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2550 .unwrap_or_default();
2551 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2552 if previous_epoch == self.epoch_store.epoch() {
2553 GasCostSummary::new(
2555 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2556 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2557 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2558 previous_gas_costs.non_refundable_storage_fee
2559 + current_gas_costs.non_refundable_storage_fee,
2560 )
2561 } else {
2562 current_gas_costs
2563 }
2564 }
2565
2566 #[instrument(level = "error", skip_all)]
2567 async fn augment_epoch_last_checkpoint(
2568 &self,
2569 epoch_total_gas_cost: &GasCostSummary,
2570 epoch_start_timestamp_ms: CheckpointTimestamp,
2571 checkpoint_effects: &mut Vec<TransactionEffects>,
2572 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2573 checkpoint: CheckpointSequenceNumber,
2574 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2575 last_checkpoint: CheckpointSequenceNumber,
2578 ) -> CheckpointBuilderResult<SuiSystemState> {
2579 let (system_state, effects) = self
2580 .state
2581 .create_and_execute_advance_epoch_tx(
2582 &self.epoch_store,
2583 epoch_total_gas_cost,
2584 checkpoint,
2585 epoch_start_timestamp_ms,
2586 end_of_epoch_observation_keys,
2587 last_checkpoint,
2588 )
2589 .await?;
2590 checkpoint_effects.push(effects);
2591 signatures.push(vec![]);
2592 Ok(system_state)
2593 }
2594
2595 #[instrument(level = "debug", skip_all)]
2602 fn complete_checkpoint_effects(
2603 &self,
2604 mut roots: Vec<TransactionEffects>,
2605 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2606 ) -> SuiResult<Vec<TransactionEffects>> {
2607 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2608 let mut results = vec![];
2609 let mut seen = HashSet::new();
2610 loop {
2611 let mut pending = HashSet::new();
2612
2613 let transactions_included = self
2614 .epoch_store
2615 .builder_included_transactions_in_checkpoint(
2616 roots.iter().map(|e| e.transaction_digest()),
2617 )?;
2618
2619 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2620 let digest = effect.transaction_digest();
2621 seen.insert(*digest);
2623
2624 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2626 continue;
2627 }
2628
2629 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2631 continue;
2632 }
2633
2634 let existing_effects = self
2635 .epoch_store
2636 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2637
2638 for (dependency, effects_signature_exists) in
2639 effect.dependencies().iter().zip(existing_effects.iter())
2640 {
2641 if !effects_signature_exists {
2646 continue;
2647 }
2648 if seen.insert(*dependency) {
2649 pending.insert(*dependency);
2650 }
2651 }
2652 results.push(effect);
2653 }
2654 if pending.is_empty() {
2655 break;
2656 }
2657 let pending = pending.into_iter().collect::<Vec<_>>();
2658 let effects = self.effects_store.multi_get_executed_effects(&pending);
2659 let effects = effects
2660 .into_iter()
2661 .zip(pending)
2662 .map(|(opt, digest)| match opt {
2663 Some(x) => x,
2664 None => panic!(
2665 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2666 digest
2667 ),
2668 })
2669 .collect::<Vec<_>>();
2670 roots = effects;
2671 }
2672
2673 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2674 Ok(results)
2675 }
2676
2677 #[cfg(msim)]
2680 fn expensive_consensus_commit_prologue_invariants_check(
2681 &self,
2682 root_digests: &[TransactionDigest],
2683 sorted: &[TransactionEffects],
2684 ) {
2685 let root_txs = self
2687 .state
2688 .get_transaction_cache_reader()
2689 .multi_get_transaction_blocks(root_digests);
2690 let ccps = root_txs
2691 .iter()
2692 .filter_map(|tx| {
2693 if let Some(tx) = tx {
2694 if tx.transaction_data().is_consensus_commit_prologue() {
2695 Some(tx)
2696 } else {
2697 None
2698 }
2699 } else {
2700 None
2701 }
2702 })
2703 .collect::<Vec<_>>();
2704
2705 assert!(ccps.len() <= 1);
2707
2708 let txs = self
2710 .state
2711 .get_transaction_cache_reader()
2712 .multi_get_transaction_blocks(
2713 &sorted
2714 .iter()
2715 .map(|tx| tx.transaction_digest().clone())
2716 .collect::<Vec<_>>(),
2717 );
2718
2719 if ccps.len() == 0 {
2720 for tx in txs.iter() {
2723 if let Some(tx) = tx {
2724 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2725 }
2726 }
2727 } else {
2728 assert!(
2730 txs[0]
2731 .as_ref()
2732 .unwrap()
2733 .transaction_data()
2734 .is_consensus_commit_prologue()
2735 );
2736
2737 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2738
2739 for tx in txs.iter().skip(1) {
2740 if let Some(tx) = tx {
2741 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2742 }
2743 }
2744 }
2745 }
2746
2747 #[cfg(msim)]
2748 fn expensive_consensus_commit_prologue_invariants_check_v2(
2749 &self,
2750 root_digests: &[TransactionDigest],
2751 sorted: &[TransactionEffects],
2752 ) {
2753 let root_txs = self
2755 .state
2756 .get_transaction_cache_reader()
2757 .multi_get_transaction_blocks(root_digests);
2758 let ccp_digests_from_roots: HashSet<_> = root_txs
2759 .iter()
2760 .filter_map(|tx| {
2761 if let Some(tx) = tx {
2762 if tx.transaction_data().is_consensus_commit_prologue() {
2763 Some(*tx.digest())
2764 } else {
2765 None
2766 }
2767 } else {
2768 None
2769 }
2770 })
2771 .collect();
2772
2773 let txs = self
2775 .state
2776 .get_transaction_cache_reader()
2777 .multi_get_transaction_blocks(
2778 &sorted
2779 .iter()
2780 .map(|tx| tx.transaction_digest().clone())
2781 .collect::<Vec<_>>(),
2782 );
2783
2784 let ccps_in_checkpoint: Vec<_> = txs
2787 .iter()
2788 .filter_map(|tx| {
2789 if let Some(tx) = tx {
2790 if tx.transaction_data().is_consensus_commit_prologue() {
2791 Some(*tx.digest())
2792 } else {
2793 None
2794 }
2795 } else {
2796 None
2797 }
2798 })
2799 .collect();
2800
2801 for ccp_digest in &ccps_in_checkpoint {
2803 assert!(
2804 ccp_digests_from_roots.contains(ccp_digest),
2805 "CCP in checkpoint not found in roots"
2806 );
2807 }
2808
2809 if !ccps_in_checkpoint.is_empty() {
2812 assert!(
2813 txs[0]
2814 .as_ref()
2815 .unwrap()
2816 .transaction_data()
2817 .is_consensus_commit_prologue(),
2818 "First transaction must be a CCP when CCPs are present"
2819 );
2820 }
2821 }
2822}
2823
2824async fn wait_for_effects_with_retry(
2825 effects_store: &dyn TransactionCacheRead,
2826 task_name: &'static str,
2827 digests: &[TransactionDigest],
2828 tx_key: TransactionKey,
2829) -> Vec<TransactionEffects> {
2830 let delay = if in_antithesis() {
2831 15
2833 } else {
2834 5
2835 };
2836 loop {
2837 match tokio::time::timeout(Duration::from_secs(delay), async {
2838 effects_store
2839 .notify_read_executed_effects(task_name, digests)
2840 .await
2841 })
2842 .await
2843 {
2844 Ok(effects) => break effects,
2845 Err(_) => {
2846 debug_fatal!(
2847 "Timeout waiting for transactions to be executed {:?}, retrying...",
2848 tx_key
2849 );
2850 }
2851 }
2852 }
2853}
2854
2855impl CheckpointAggregator {
2856 fn new(
2857 tables: Arc<CheckpointStore>,
2858 epoch_store: Arc<AuthorityPerEpochStore>,
2859 notify: Arc<Notify>,
2860 output: Box<dyn CertifiedCheckpointOutput>,
2861 state: Arc<AuthorityState>,
2862 metrics: Arc<CheckpointMetrics>,
2863 ) -> Self {
2864 let current = None;
2865 Self {
2866 store: tables,
2867 epoch_store,
2868 notify,
2869 current,
2870 output,
2871 state,
2872 metrics,
2873 }
2874 }
2875
2876 async fn run(mut self) {
2877 info!("Starting CheckpointAggregator");
2878 loop {
2879 if let Err(e) = self.run_and_notify().await {
2880 error!(
2881 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2882 e
2883 );
2884 self.metrics.checkpoint_errors.inc();
2885 tokio::time::sleep(Duration::from_secs(1)).await;
2886 continue;
2887 }
2888
2889 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2890 }
2891 }
2892
2893 async fn run_and_notify(&mut self) -> SuiResult {
2894 let summaries = self.run_inner()?;
2895 for summary in summaries {
2896 self.output.certified_checkpoint_created(&summary).await?;
2897 }
2898 Ok(())
2899 }
2900
2901 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2902 let _scope = monitored_scope("CheckpointAggregator");
2903 let mut result = vec![];
2904 'outer: loop {
2905 let next_to_certify = self.next_checkpoint_to_certify()?;
2906 let current = if let Some(current) = &mut self.current {
2907 if current.summary.sequence_number < next_to_certify {
2913 assert_reachable!("skip checkpoint certification");
2914 self.current = None;
2915 continue;
2916 }
2917 current
2918 } else {
2919 let Some(summary) = self
2920 .epoch_store
2921 .get_built_checkpoint_summary(next_to_certify)?
2922 else {
2923 return Ok(result);
2924 };
2925 self.current = Some(CheckpointSignatureAggregator {
2926 next_index: 0,
2927 digest: summary.digest(),
2928 summary,
2929 signatures_by_digest: MultiStakeAggregator::new(
2930 self.epoch_store.committee().clone(),
2931 ),
2932 store: self.store.clone(),
2933 state: self.state.clone(),
2934 metrics: self.metrics.clone(),
2935 });
2936 self.current.as_mut().unwrap()
2937 };
2938
2939 let epoch_tables = self
2940 .epoch_store
2941 .tables()
2942 .expect("should not run past end of epoch");
2943 let iter = epoch_tables
2944 .pending_checkpoint_signatures
2945 .safe_iter_with_bounds(
2946 Some((current.summary.sequence_number, current.next_index)),
2947 None,
2948 );
2949 for item in iter {
2950 let ((seq, index), data) = item?;
2951 if seq != current.summary.sequence_number {
2952 trace!(
2953 checkpoint_seq =? current.summary.sequence_number,
2954 "Not enough checkpoint signatures",
2955 );
2956 return Ok(result);
2958 }
2959 trace!(
2960 checkpoint_seq = current.summary.sequence_number,
2961 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2962 current.summary.digest(),
2963 data.summary.auth_sig().authority.concise()
2964 );
2965 self.metrics
2966 .checkpoint_participation
2967 .with_label_values(&[&format!(
2968 "{:?}",
2969 data.summary.auth_sig().authority.concise()
2970 )])
2971 .inc();
2972 if let Ok(auth_signature) = current.try_aggregate(data) {
2973 debug!(
2974 checkpoint_seq = current.summary.sequence_number,
2975 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2976 current.summary.digest(),
2977 );
2978 let summary = VerifiedCheckpoint::new_unchecked(
2979 CertifiedCheckpointSummary::new_from_data_and_sig(
2980 current.summary.clone(),
2981 auth_signature,
2982 ),
2983 );
2984
2985 self.store.insert_certified_checkpoint(&summary)?;
2986 self.metrics
2987 .last_certified_checkpoint
2988 .set(current.summary.sequence_number as i64);
2989 current.summary.report_checkpoint_age(
2990 &self.metrics.last_certified_checkpoint_age,
2991 &self.metrics.last_certified_checkpoint_age_ms,
2992 );
2993 result.push(summary.into_inner());
2994 self.current = None;
2995 continue 'outer;
2996 } else {
2997 current.next_index = index + 1;
2998 }
2999 }
3000 break;
3001 }
3002 Ok(result)
3003 }
3004
3005 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
3006 Ok(self
3007 .store
3008 .tables
3009 .certified_checkpoints
3010 .reversed_safe_iter_with_bounds(None, None)?
3011 .next()
3012 .transpose()?
3013 .map(|(seq, _)| seq + 1)
3014 .unwrap_or_default())
3015 }
3016}
3017
3018impl CheckpointSignatureAggregator {
3019 #[allow(clippy::result_unit_err)]
3020 pub fn try_aggregate(
3021 &mut self,
3022 data: CheckpointSignatureMessage,
3023 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3024 let their_digest = *data.summary.digest();
3025 let (_, signature) = data.summary.into_data_and_sig();
3026 let author = signature.authority;
3027 let envelope =
3028 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3029 match self.signatures_by_digest.insert(their_digest, envelope) {
3030 InsertResult::Failed { error }
3032 if matches!(
3033 error.as_inner(),
3034 SuiErrorKind::StakeAggregatorRepeatedSigner {
3035 conflicting_sig: false,
3036 ..
3037 },
3038 ) =>
3039 {
3040 Err(())
3041 }
3042 InsertResult::Failed { error } => {
3043 warn!(
3044 checkpoint_seq = self.summary.sequence_number,
3045 "Failed to aggregate new signature from validator {:?}: {:?}",
3046 author.concise(),
3047 error
3048 );
3049 self.check_for_split_brain();
3050 Err(())
3051 }
3052 InsertResult::QuorumReached(cert) => {
3053 if their_digest != self.digest {
3056 self.metrics.remote_checkpoint_forks.inc();
3057 warn!(
3058 checkpoint_seq = self.summary.sequence_number,
3059 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3060 author.concise(),
3061 their_digest,
3062 self.digest
3063 );
3064 return Err(());
3065 }
3066 Ok(cert)
3067 }
3068 InsertResult::NotEnoughVotes {
3069 bad_votes: _,
3070 bad_authorities: _,
3071 } => {
3072 self.check_for_split_brain();
3073 Err(())
3074 }
3075 }
3076 }
3077
3078 fn check_for_split_brain(&self) {
3082 debug!(
3083 checkpoint_seq = self.summary.sequence_number,
3084 "Checking for split brain condition"
3085 );
3086 if self.signatures_by_digest.quorum_unreachable() {
3087 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3093 let digests_by_stake_messages = all_unique_values
3094 .iter()
3095 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3096 .map(|(digest, (_authorities, total_stake))| {
3097 format!("{:?} (total stake: {})", digest, total_stake)
3098 })
3099 .collect::<Vec<String>>();
3100 fail_point_arg!("kill_split_brain_node", |(
3101 checkpoint_overrides,
3102 forked_authorities,
3103 ): (
3104 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3105 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3106 )| {
3107 #[cfg(msim)]
3108 {
3109 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3110 (checkpoint_overrides.lock(), forked_authorities.lock())
3111 {
3112 let correct_digest = all_unique_values
3114 .iter()
3115 .find(|(_, (authorities, _))| {
3116 authorities
3118 .iter()
3119 .any(|auth| !forked_authorities_set.contains(auth))
3120 })
3121 .map(|(digest, _)| digest.to_string())
3122 .unwrap_or_else(|| {
3123 all_unique_values
3125 .iter()
3126 .max_by_key(|(_, (_, stake))| *stake)
3127 .map(|(digest, _)| digest.to_string())
3128 .unwrap_or_else(|| self.digest.to_string())
3129 });
3130
3131 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3132
3133 tracing::error!(
3134 fatal = true,
3135 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3136 self.summary.sequence_number,
3137 correct_digest
3138 );
3139 }
3140 }
3141 });
3142
3143 debug_fatal!(
3144 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3145 self.summary.sequence_number,
3146 self.signatures_by_digest.uncommitted_stake(),
3147 digests_by_stake_messages
3148 );
3149 self.metrics.split_brain_checkpoint_forks.inc();
3150
3151 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3152 let local_summary = self.summary.clone();
3153 let state = self.state.clone();
3154 let tables = self.store.clone();
3155
3156 tokio::spawn(async move {
3157 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3158 });
3159 }
3160 }
3161}
3162
3163async fn diagnose_split_brain(
3169 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3170 local_summary: CheckpointSummary,
3171 state: Arc<AuthorityState>,
3172 tables: Arc<CheckpointStore>,
3173) {
3174 debug!(
3175 checkpoint_seq = local_summary.sequence_number,
3176 "Running split brain diagnostics..."
3177 );
3178 let time = SystemTime::now();
3179 let digest_to_validator = all_unique_values
3181 .iter()
3182 .filter_map(|(digest, (validators, _))| {
3183 if *digest != local_summary.digest() {
3184 let random_validator = validators.choose(&mut get_rng()).unwrap();
3185 Some((*digest, *random_validator))
3186 } else {
3187 None
3188 }
3189 })
3190 .collect::<HashMap<_, _>>();
3191 if digest_to_validator.is_empty() {
3192 panic!(
3193 "Given split brain condition, there should be at \
3194 least one validator that disagrees with local signature"
3195 );
3196 }
3197
3198 let epoch_store = state.load_epoch_store_one_call_per_task();
3199 let committee = epoch_store
3200 .epoch_start_state()
3201 .get_sui_committee_with_network_metadata();
3202 let network_config = default_mysten_network_config();
3203 let network_clients =
3204 make_network_authority_clients_with_network_config(&committee, &network_config);
3205
3206 let response_futures = digest_to_validator
3208 .values()
3209 .cloned()
3210 .map(|validator| {
3211 let client = network_clients
3212 .get(&validator)
3213 .expect("Failed to get network client");
3214 let request = CheckpointRequestV2 {
3215 sequence_number: Some(local_summary.sequence_number),
3216 request_content: true,
3217 certified: false,
3218 };
3219 client.handle_checkpoint_v2(request)
3220 })
3221 .collect::<Vec<_>>();
3222
3223 let digest_name_pair = digest_to_validator.iter();
3224 let response_data = futures::future::join_all(response_futures)
3225 .await
3226 .into_iter()
3227 .zip(digest_name_pair)
3228 .filter_map(|(response, (digest, name))| match response {
3229 Ok(response) => match response {
3230 CheckpointResponseV2 {
3231 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3232 contents: Some(contents),
3233 } => Some((*name, *digest, summary, contents)),
3234 CheckpointResponseV2 {
3235 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3236 contents: _,
3237 } => {
3238 panic!("Expected pending checkpoint, but got certified checkpoint");
3239 }
3240 CheckpointResponseV2 {
3241 checkpoint: None,
3242 contents: _,
3243 } => {
3244 error!(
3245 "Summary for checkpoint {:?} not found on validator {:?}",
3246 local_summary.sequence_number, name
3247 );
3248 None
3249 }
3250 CheckpointResponseV2 {
3251 checkpoint: _,
3252 contents: None,
3253 } => {
3254 error!(
3255 "Contents for checkpoint {:?} not found on validator {:?}",
3256 local_summary.sequence_number, name
3257 );
3258 None
3259 }
3260 },
3261 Err(e) => {
3262 error!(
3263 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3264 e
3265 );
3266 None
3267 }
3268 })
3269 .collect::<Vec<_>>();
3270
3271 let local_checkpoint_contents = tables
3272 .get_checkpoint_contents(&local_summary.content_digest)
3273 .unwrap_or_else(|_| {
3274 panic!(
3275 "Could not find checkpoint contents for digest {:?}",
3276 local_summary.digest()
3277 )
3278 })
3279 .unwrap_or_else(|| {
3280 panic!(
3281 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3282 local_summary.sequence_number,
3283 local_summary.digest()
3284 )
3285 });
3286 let local_contents_text = format!("{local_checkpoint_contents:?}");
3287
3288 let local_summary_text = format!("{local_summary:?}");
3289 let local_validator = state.name.concise();
3290 let diff_patches = response_data
3291 .iter()
3292 .map(|(name, other_digest, other_summary, contents)| {
3293 let other_contents_text = format!("{contents:?}");
3294 let other_summary_text = format!("{other_summary:?}");
3295 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3296 .enumerate_transactions(&local_summary)
3297 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3298 .unzip();
3299 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3300 .enumerate_transactions(other_summary)
3301 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3302 .unzip();
3303 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3304 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3305 let local_transactions_text = format!("{local_transactions:#?}");
3306 let other_transactions_text = format!("{other_transactions:#?}");
3307 let transactions_patch =
3308 create_patch(&local_transactions_text, &other_transactions_text);
3309 let local_effects_text = format!("{local_effects:#?}");
3310 let other_effects_text = format!("{other_effects:#?}");
3311 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3312 let seq_number = local_summary.sequence_number;
3313 let local_digest = local_summary.digest();
3314 let other_validator = name.concise();
3315 format!(
3316 "Checkpoint: {seq_number:?}\n\
3317 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3318 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3319 Summary Diff: \n{summary_patch}\n\n\
3320 Contents Diff: \n{contents_patch}\n\n\
3321 Transactions Diff: \n{transactions_patch}\n\n\
3322 Effects Diff: \n{effects_patch}",
3323 )
3324 })
3325 .collect::<Vec<_>>()
3326 .join("\n\n\n");
3327
3328 let header = format!(
3329 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3330 Datetime: {:?}",
3331 time
3332 );
3333 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3334 let path = tempfile::tempdir()
3335 .expect("Failed to create tempdir")
3336 .keep()
3337 .join(Path::new("checkpoint_fork_dump.txt"));
3338 let mut file = File::create(path).unwrap();
3339 write!(file, "{}", fork_logs_text).unwrap();
3340 debug!("{}", fork_logs_text);
3341}
3342
3343pub trait CheckpointServiceNotify {
3344 fn notify_checkpoint_signature(
3345 &self,
3346 epoch_store: &AuthorityPerEpochStore,
3347 info: &CheckpointSignatureMessage,
3348 ) -> SuiResult;
3349
3350 fn notify_checkpoint(&self) -> SuiResult;
3351}
3352
3353#[allow(clippy::large_enum_variant)]
3354enum CheckpointServiceState {
3355 Unstarted(
3356 (
3357 CheckpointBuilder,
3358 CheckpointAggregator,
3359 CheckpointStateHasher,
3360 ),
3361 ),
3362 Started,
3363}
3364
3365impl CheckpointServiceState {
3366 fn take_unstarted(
3367 &mut self,
3368 ) -> (
3369 CheckpointBuilder,
3370 CheckpointAggregator,
3371 CheckpointStateHasher,
3372 ) {
3373 let mut state = CheckpointServiceState::Started;
3374 std::mem::swap(self, &mut state);
3375
3376 match state {
3377 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3378 (builder, aggregator, hasher)
3379 }
3380 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3381 }
3382 }
3383}
3384
3385pub struct CheckpointService {
3386 tables: Arc<CheckpointStore>,
3387 notify_builder: Arc<Notify>,
3388 notify_aggregator: Arc<Notify>,
3389 last_signature_index: Mutex<u64>,
3390 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3392 highest_previously_built_seq: CheckpointSequenceNumber,
3395 metrics: Arc<CheckpointMetrics>,
3396 state: Mutex<CheckpointServiceState>,
3397}
3398
3399impl CheckpointService {
3400 pub fn build(
3402 state: Arc<AuthorityState>,
3403 checkpoint_store: Arc<CheckpointStore>,
3404 epoch_store: Arc<AuthorityPerEpochStore>,
3405 effects_store: Arc<dyn TransactionCacheRead>,
3406 global_state_hasher: Weak<GlobalStateHasher>,
3407 checkpoint_output: Box<dyn CheckpointOutput>,
3408 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3409 metrics: Arc<CheckpointMetrics>,
3410 max_transactions_per_checkpoint: usize,
3411 max_checkpoint_size_bytes: usize,
3412 ) -> Arc<Self> {
3413 info!(
3414 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3415 );
3416 let notify_builder = Arc::new(Notify::new());
3417 let notify_aggregator = Arc::new(Notify::new());
3418
3419 let highest_previously_built_seq = checkpoint_store
3421 .get_latest_locally_computed_checkpoint()
3422 .expect("failed to get latest locally computed checkpoint")
3423 .map(|s| s.sequence_number)
3424 .unwrap_or(0);
3425
3426 let highest_currently_built_seq =
3427 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3428 .expect("epoch should not have ended")
3429 .map(|(seq, _)| seq)
3430 .unwrap_or(0);
3431
3432 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3433
3434 let aggregator = CheckpointAggregator::new(
3435 checkpoint_store.clone(),
3436 epoch_store.clone(),
3437 notify_aggregator.clone(),
3438 certified_checkpoint_output,
3439 state.clone(),
3440 metrics.clone(),
3441 );
3442
3443 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3444
3445 let ckpt_state_hasher = CheckpointStateHasher::new(
3446 epoch_store.clone(),
3447 global_state_hasher.clone(),
3448 receive_from_builder,
3449 );
3450
3451 let builder = CheckpointBuilder::new(
3452 state.clone(),
3453 checkpoint_store.clone(),
3454 epoch_store.clone(),
3455 notify_builder.clone(),
3456 effects_store,
3457 global_state_hasher,
3458 send_to_hasher,
3459 checkpoint_output,
3460 notify_aggregator.clone(),
3461 highest_currently_built_seq_tx.clone(),
3462 metrics.clone(),
3463 max_transactions_per_checkpoint,
3464 max_checkpoint_size_bytes,
3465 );
3466
3467 let last_signature_index = epoch_store
3468 .get_last_checkpoint_signature_index()
3469 .expect("should not cross end of epoch");
3470 let last_signature_index = Mutex::new(last_signature_index);
3471
3472 Arc::new(Self {
3473 tables: checkpoint_store,
3474 notify_builder,
3475 notify_aggregator,
3476 last_signature_index,
3477 highest_currently_built_seq_tx,
3478 highest_previously_built_seq,
3479 metrics,
3480 state: Mutex::new(CheckpointServiceState::Unstarted((
3481 builder,
3482 aggregator,
3483 ckpt_state_hasher,
3484 ))),
3485 })
3486 }
3487
3488 pub async fn spawn(
3496 &self,
3497 epoch_store: Arc<AuthorityPerEpochStore>,
3498 consensus_replay_waiter: Option<ReplayWaiter>,
3499 ) {
3500 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3501
3502 let last_persisted_builder_seq = epoch_store
3512 .last_persisted_checkpoint_builder_summary()
3513 .expect("epoch should not have ended")
3514 .map(|s| s.summary.sequence_number);
3515
3516 let last_executed_seq = self
3517 .tables
3518 .get_highest_executed_checkpoint()
3519 .expect("Failed to get highest executed checkpoint")
3520 .map(|checkpoint| *checkpoint.sequence_number());
3521
3522 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3523 if let Err(e) = builder
3524 .epoch_store
3525 .clear_state_hashes_after_checkpoint(last_committed_seq)
3526 {
3527 error!(
3528 "Failed to clear state hashes after checkpoint {}: {:?}",
3529 last_committed_seq, e
3530 );
3531 } else {
3532 info!(
3533 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3534 last_committed_seq
3535 );
3536 }
3537 }
3538
3539 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3540
3541 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3542 let aggregator_task = spawn_monitored_task!(aggregator.run());
3543
3544 spawn_monitored_task!(async move {
3545 epoch_store
3546 .within_alive_epoch(async move {
3547 builder.run(consensus_replay_waiter).await;
3548 builder_finished_tx.send(()).ok();
3549 })
3550 .await
3551 .ok();
3552
3553 state_hasher_task
3555 .await
3556 .expect("state hasher should exit normally");
3557
3558 aggregator_task.abort();
3561 aggregator_task.await.ok();
3562 });
3563
3564 if tokio::time::timeout(Duration::from_secs(120), async move {
3570 tokio::select! {
3571 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3572 _ = self.wait_for_rebuilt_checkpoints() => (),
3573 }
3574 })
3575 .await
3576 .is_err()
3577 {
3578 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3579 }
3580 }
3581}
3582
3583impl CheckpointService {
3584 pub async fn wait_for_rebuilt_checkpoints(&self) {
3590 let highest_previously_built_seq = self.highest_previously_built_seq;
3591 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3592 let mut highest_currently_built_seq = *rx.borrow_and_update();
3593 info!(
3594 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3595 );
3596 loop {
3597 if highest_currently_built_seq >= highest_previously_built_seq {
3598 info!("Checkpoint rebuild complete");
3599 break;
3600 }
3601 rx.changed().await.unwrap();
3602 highest_currently_built_seq = *rx.borrow_and_update();
3603 }
3604 }
3605
3606 #[cfg(test)]
3607 fn write_and_notify_checkpoint_for_testing(
3608 &self,
3609 epoch_store: &AuthorityPerEpochStore,
3610 checkpoint: PendingCheckpoint,
3611 ) -> SuiResult {
3612 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3613
3614 let mut output = ConsensusCommitOutput::new(0);
3615 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3616 output.set_default_commit_stats_for_testing();
3617 epoch_store.push_consensus_output_for_tests(output);
3618 self.notify_checkpoint()?;
3619 Ok(())
3620 }
3621}
3622
3623impl CheckpointServiceNotify for CheckpointService {
3624 fn notify_checkpoint_signature(
3625 &self,
3626 epoch_store: &AuthorityPerEpochStore,
3627 info: &CheckpointSignatureMessage,
3628 ) -> SuiResult {
3629 let sequence = info.summary.sequence_number;
3630 let signer = info.summary.auth_sig().authority.concise();
3631
3632 if let Some(highest_verified_checkpoint) = self
3633 .tables
3634 .get_highest_verified_checkpoint()?
3635 .map(|x| *x.sequence_number())
3636 && sequence <= highest_verified_checkpoint
3637 {
3638 trace!(
3639 checkpoint_seq = sequence,
3640 "Ignore checkpoint signature from {} - already certified", signer,
3641 );
3642 self.metrics
3643 .last_ignored_checkpoint_signature_received
3644 .set(sequence as i64);
3645 return Ok(());
3646 }
3647 trace!(
3648 checkpoint_seq = sequence,
3649 "Received checkpoint signature, digest {} from {}",
3650 info.summary.digest(),
3651 signer,
3652 );
3653 self.metrics
3654 .last_received_checkpoint_signatures
3655 .with_label_values(&[&signer.to_string()])
3656 .set(sequence as i64);
3657 let mut index = self.last_signature_index.lock();
3660 *index += 1;
3661 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3662 self.notify_aggregator.notify_one();
3663 Ok(())
3664 }
3665
3666 fn notify_checkpoint(&self) -> SuiResult {
3667 self.notify_builder.notify_one();
3668 Ok(())
3669 }
3670}
3671
3672pub struct CheckpointServiceNoop {}
3674impl CheckpointServiceNotify for CheckpointServiceNoop {
3675 fn notify_checkpoint_signature(
3676 &self,
3677 _: &AuthorityPerEpochStore,
3678 _: &CheckpointSignatureMessage,
3679 ) -> SuiResult {
3680 Ok(())
3681 }
3682
3683 fn notify_checkpoint(&self) -> SuiResult {
3684 Ok(())
3685 }
3686}
3687
3688impl PendingCheckpoint {
3689 pub fn height(&self) -> CheckpointHeight {
3690 self.details.checkpoint_height
3691 }
3692
3693 pub fn roots(&self) -> &Vec<TransactionKey> {
3694 &self.roots
3695 }
3696
3697 pub fn details(&self) -> &PendingCheckpointInfo {
3698 &self.details
3699 }
3700}
3701
3702impl PendingCheckpointV2 {
3703 pub fn height(&self) -> CheckpointHeight {
3704 self.details.checkpoint_height
3705 }
3706
3707 pub(crate) fn num_roots(&self) -> usize {
3708 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3709 }
3710}
3711
3712pin_project! {
3713 pub struct PollCounter<Fut> {
3714 #[pin]
3715 future: Fut,
3716 count: usize,
3717 }
3718}
3719
3720impl<Fut> PollCounter<Fut> {
3721 pub fn new(future: Fut) -> Self {
3722 Self { future, count: 0 }
3723 }
3724
3725 pub fn count(&self) -> usize {
3726 self.count
3727 }
3728}
3729
3730impl<Fut: Future> Future for PollCounter<Fut> {
3731 type Output = (usize, Fut::Output);
3732
3733 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3734 let this = self.project();
3735 *this.count += 1;
3736 match this.future.poll(cx) {
3737 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3738 Poll::Pending => Poll::Pending,
3739 }
3740 }
3741}
3742
3743fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3744 PollCounter::new(future)
3745}
3746
3747#[cfg(test)]
3748mod tests {
3749 use super::*;
3750 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3751 use crate::transaction_outputs::TransactionOutputs;
3752 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3753 use futures::FutureExt as _;
3754 use futures::future::BoxFuture;
3755 use std::collections::HashMap;
3756 use std::ops::Deref;
3757 use sui_macros::sim_test;
3758 use sui_protocol_config::{Chain, ProtocolConfig};
3759 use sui_types::accumulator_event::AccumulatorEvent;
3760 use sui_types::authenticator_state::ActiveJwk;
3761 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3762 use sui_types::crypto::Signature;
3763 use sui_types::effects::{TransactionEffects, TransactionEvents};
3764 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3765 use sui_types::transaction::VerifiedTransaction;
3766 use tokio::sync::mpsc;
3767
3768 #[tokio::test]
3769 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3770 let store = CheckpointStore::new_for_tests();
3771 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3772 for seq in 70u64..=80u64 {
3773 let contents =
3774 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3775 [sui_types::base_types::ExecutionDigests::new(
3776 sui_types::digests::TransactionDigest::random(),
3777 sui_types::digests::TransactionEffectsDigest::ZERO,
3778 )],
3779 );
3780 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3781 &protocol,
3782 0,
3783 seq,
3784 0,
3785 &contents,
3786 None,
3787 sui_types::gas::GasCostSummary::default(),
3788 None,
3789 0,
3790 Vec::new(),
3791 Vec::new(),
3792 );
3793 store
3794 .tables
3795 .locally_computed_checkpoints
3796 .insert(&seq, &summary)
3797 .unwrap();
3798 }
3799
3800 store
3801 .clear_locally_computed_checkpoints_from(76)
3802 .expect("clear should succeed");
3803
3804 assert!(
3806 store
3807 .tables
3808 .locally_computed_checkpoints
3809 .get(&75)
3810 .unwrap()
3811 .is_some()
3812 );
3813 assert!(
3814 store
3815 .tables
3816 .locally_computed_checkpoints
3817 .get(&76)
3818 .unwrap()
3819 .is_none()
3820 );
3821
3822 for seq in 70u64..76u64 {
3823 assert!(
3824 store
3825 .tables
3826 .locally_computed_checkpoints
3827 .get(&seq)
3828 .unwrap()
3829 .is_some()
3830 );
3831 }
3832 for seq in 76u64..=80u64 {
3833 assert!(
3834 store
3835 .tables
3836 .locally_computed_checkpoints
3837 .get(&seq)
3838 .unwrap()
3839 .is_none()
3840 );
3841 }
3842 }
3843
3844 #[tokio::test]
3845 async fn test_fork_detection_storage() {
3846 let store = CheckpointStore::new_for_tests();
3847 let seq_num = 42;
3849 let digest = CheckpointDigest::random();
3850
3851 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3852
3853 store
3854 .record_checkpoint_fork_detected(seq_num, digest)
3855 .unwrap();
3856
3857 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3858 assert!(retrieved.is_some());
3859 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3860 assert_eq!(retrieved_seq, seq_num);
3861 assert_eq!(retrieved_digest, digest);
3862
3863 store.clear_checkpoint_fork_detected().unwrap();
3864 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3865
3866 let tx_digest = TransactionDigest::random();
3868 let expected_effects = TransactionEffectsDigest::random();
3869 let actual_effects = TransactionEffectsDigest::random();
3870
3871 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3872
3873 store
3874 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3875 .unwrap();
3876
3877 let retrieved = store.get_transaction_fork_detected().unwrap();
3878 assert!(retrieved.is_some());
3879 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3880 assert_eq!(retrieved_tx, tx_digest);
3881 assert_eq!(retrieved_expected, expected_effects);
3882 assert_eq!(retrieved_actual, actual_effects);
3883
3884 store.clear_transaction_fork_detected().unwrap();
3885 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3886 }
3887
3888 #[sim_test]
3889 pub async fn checkpoint_builder_test() {
3890 telemetry_subscribers::init_for_testing();
3891
3892 let mut protocol_config =
3893 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3894 protocol_config.disable_accumulators_for_testing();
3895 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3896 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3897 let state = TestAuthorityBuilder::new()
3898 .with_protocol_config(protocol_config)
3899 .build()
3900 .await;
3901
3902 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3903 0,
3904 0,
3905 vec![],
3906 SequenceNumber::new(),
3907 );
3908
3909 let jwks = {
3910 let mut jwks = Vec::new();
3911 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3912 jwks.push(ActiveJwk {
3913 jwk_id: JwkId::new(
3914 "https://accounts.google.com".to_string(),
3915 "1234567890".to_string(),
3916 ),
3917 jwk: JWK {
3918 kty: "RSA".to_string(),
3919 e: "AQAB".to_string(),
3920 n: "1234567890".to_string(),
3921 alg: "RS256".to_string(),
3922 },
3923 epoch: 0,
3924 });
3925 }
3926 jwks
3927 };
3928
3929 let dummy_tx_with_data =
3930 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3931
3932 for i in 0..15 {
3933 state
3934 .database_for_testing()
3935 .perpetual_tables
3936 .transactions
3937 .insert(&d(i), dummy_tx.serializable_ref())
3938 .unwrap();
3939 }
3940 for i in 15..20 {
3941 state
3942 .database_for_testing()
3943 .perpetual_tables
3944 .transactions
3945 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3946 .unwrap();
3947 }
3948
3949 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3950 commit_cert_for_test(
3951 &mut store,
3952 state.clone(),
3953 d(1),
3954 vec![d(2), d(3)],
3955 GasCostSummary::new(11, 12, 11, 1),
3956 );
3957 commit_cert_for_test(
3958 &mut store,
3959 state.clone(),
3960 d(2),
3961 vec![d(3), d(4)],
3962 GasCostSummary::new(21, 22, 21, 1),
3963 );
3964 commit_cert_for_test(
3965 &mut store,
3966 state.clone(),
3967 d(3),
3968 vec![],
3969 GasCostSummary::new(31, 32, 31, 1),
3970 );
3971 commit_cert_for_test(
3972 &mut store,
3973 state.clone(),
3974 d(4),
3975 vec![],
3976 GasCostSummary::new(41, 42, 41, 1),
3977 );
3978 for i in [5, 6, 7, 10, 11, 12, 13] {
3979 commit_cert_for_test(
3980 &mut store,
3981 state.clone(),
3982 d(i),
3983 vec![],
3984 GasCostSummary::new(41, 42, 41, 1),
3985 );
3986 }
3987 for i in [15, 16, 17] {
3988 commit_cert_for_test(
3989 &mut store,
3990 state.clone(),
3991 d(i),
3992 vec![],
3993 GasCostSummary::new(51, 52, 51, 1),
3994 );
3995 }
3996 let all_digests: Vec<_> = store.keys().copied().collect();
3997 for digest in all_digests {
3998 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3999 state
4000 .epoch_store_for_testing()
4001 .test_insert_user_signature(digest, vec![(signature, None)]);
4002 }
4003
4004 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
4005 let (certified_output, mut certified_result) =
4006 mpsc::channel::<CertifiedCheckpointSummary>(10);
4007 let store = Arc::new(store);
4008
4009 let ckpt_dir = tempfile::tempdir().unwrap();
4010 let checkpoint_store =
4011 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
4012 let epoch_store = state.epoch_store_for_testing();
4013
4014 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
4015 state.get_global_state_hash_store().clone(),
4016 ));
4017
4018 let checkpoint_service = CheckpointService::build(
4019 state.clone(),
4020 checkpoint_store,
4021 epoch_store.clone(),
4022 store,
4023 Arc::downgrade(&global_state_hasher),
4024 Box::new(output),
4025 Box::new(certified_output),
4026 CheckpointMetrics::new_for_tests(),
4027 3,
4028 100_000,
4029 );
4030 checkpoint_service.spawn(epoch_store.clone(), None).await;
4031
4032 checkpoint_service
4033 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4034 .unwrap();
4035 checkpoint_service
4036 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4037 .unwrap();
4038 checkpoint_service
4039 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4040 .unwrap();
4041 checkpoint_service
4042 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4043 .unwrap();
4044 checkpoint_service
4045 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4046 .unwrap();
4047 checkpoint_service
4048 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4049 .unwrap();
4050
4051 let (c1c, c1s) = result.recv().await.unwrap();
4052 let (c2c, c2s) = result.recv().await.unwrap();
4053
4054 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4056 assert_eq!(c1t, vec![d(4)]);
4057 assert_eq!(c1s.previous_digest, None);
4058 assert_eq!(c1s.sequence_number, 0);
4059 assert_eq!(
4060 c1s.epoch_rolling_gas_cost_summary,
4061 GasCostSummary::new(41, 42, 41, 1)
4062 );
4063
4064 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4065 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4066 assert_eq!(c2s.sequence_number, 1);
4067 assert_eq!(
4068 c2s.epoch_rolling_gas_cost_summary,
4069 GasCostSummary::new(104, 108, 104, 4)
4070 );
4071
4072 let (c3c, c3s) = result.recv().await.unwrap();
4075 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4076 let (c4c, c4s) = result.recv().await.unwrap();
4077 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4078 assert_eq!(c3s.sequence_number, 2);
4079 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4080 assert_eq!(c4s.sequence_number, 3);
4081 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4082 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4083 assert_eq!(c4t, vec![d(13)]);
4084
4085 let (c5c, c5s) = result.recv().await.unwrap();
4088 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4089 let (c6c, c6s) = result.recv().await.unwrap();
4090 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4091 assert_eq!(c5s.sequence_number, 4);
4092 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4093 assert_eq!(c6s.sequence_number, 5);
4094 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4095 assert_eq!(c5t, vec![d(15), d(16)]);
4096 assert_eq!(c6t, vec![d(17)]);
4097
4098 let (c7c, c7s) = result.recv().await.unwrap();
4101 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4102 assert_eq!(c7t, vec![d(5), d(6)]);
4103 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4104 assert_eq!(c7s.sequence_number, 6);
4105
4106 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4107 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4108
4109 checkpoint_service
4110 .notify_checkpoint_signature(
4111 &epoch_store,
4112 &CheckpointSignatureMessage { summary: c2ss },
4113 )
4114 .unwrap();
4115 checkpoint_service
4116 .notify_checkpoint_signature(
4117 &epoch_store,
4118 &CheckpointSignatureMessage { summary: c1ss },
4119 )
4120 .unwrap();
4121
4122 let c1sc = certified_result.recv().await.unwrap();
4123 let c2sc = certified_result.recv().await.unwrap();
4124 assert_eq!(c1sc.sequence_number, 0);
4125 assert_eq!(c2sc.sequence_number, 1);
4126 }
4127
4128 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4129 fn notify_read_executed_effects(
4130 &self,
4131 _: &str,
4132 digests: &[TransactionDigest],
4133 ) -> BoxFuture<'_, Vec<TransactionEffects>> {
4134 std::future::ready(
4135 digests
4136 .iter()
4137 .map(|d| self.get(d).expect("effects not found").clone())
4138 .collect(),
4139 )
4140 .boxed()
4141 }
4142
4143 fn notify_read_executed_effects_digests(
4144 &self,
4145 _: &str,
4146 digests: &[TransactionDigest],
4147 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4148 std::future::ready(
4149 digests
4150 .iter()
4151 .map(|d| {
4152 self.get(d)
4153 .map(|fx| fx.digest())
4154 .expect("effects not found")
4155 })
4156 .collect(),
4157 )
4158 .boxed()
4159 }
4160
4161 fn multi_get_executed_effects(
4162 &self,
4163 digests: &[TransactionDigest],
4164 ) -> Vec<Option<TransactionEffects>> {
4165 digests.iter().map(|d| self.get(d).cloned()).collect()
4166 }
4167
4168 fn multi_get_transaction_blocks(
4174 &self,
4175 _: &[TransactionDigest],
4176 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4177 unimplemented!()
4178 }
4179
4180 fn multi_get_executed_effects_digests(
4181 &self,
4182 _: &[TransactionDigest],
4183 ) -> Vec<Option<TransactionEffectsDigest>> {
4184 unimplemented!()
4185 }
4186
4187 fn multi_get_effects(
4188 &self,
4189 _: &[TransactionEffectsDigest],
4190 ) -> Vec<Option<TransactionEffects>> {
4191 unimplemented!()
4192 }
4193
4194 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4195 unimplemented!()
4196 }
4197
4198 fn get_mysticeti_fastpath_outputs(
4199 &self,
4200 _: &TransactionDigest,
4201 ) -> Option<Arc<TransactionOutputs>> {
4202 unimplemented!()
4203 }
4204
4205 fn notify_read_fastpath_transaction_outputs<'a>(
4206 &'a self,
4207 _: &'a [TransactionDigest],
4208 ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
4209 unimplemented!()
4210 }
4211
4212 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4213 unimplemented!()
4214 }
4215
4216 fn get_unchanged_loaded_runtime_objects(
4217 &self,
4218 _digest: &TransactionDigest,
4219 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4220 unimplemented!()
4221 }
4222
4223 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4224 unimplemented!()
4225 }
4226 }
4227
4228 #[async_trait::async_trait]
4229 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4230 async fn checkpoint_created(
4231 &self,
4232 summary: &CheckpointSummary,
4233 contents: &CheckpointContents,
4234 _epoch_store: &Arc<AuthorityPerEpochStore>,
4235 _checkpoint_store: &Arc<CheckpointStore>,
4236 ) -> SuiResult {
4237 self.try_send((contents.clone(), summary.clone())).unwrap();
4238 Ok(())
4239 }
4240 }
4241
4242 #[async_trait::async_trait]
4243 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4244 async fn certified_checkpoint_created(
4245 &self,
4246 summary: &CertifiedCheckpointSummary,
4247 ) -> SuiResult {
4248 self.try_send(summary.clone()).unwrap();
4249 Ok(())
4250 }
4251 }
4252
4253 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4254 PendingCheckpoint {
4255 roots: t
4256 .into_iter()
4257 .map(|t| TransactionKey::Digest(d(t)))
4258 .collect(),
4259 details: PendingCheckpointInfo {
4260 timestamp_ms,
4261 last_of_epoch: false,
4262 checkpoint_height: i,
4263 consensus_commit_ref: CommitRef::default(),
4264 rejected_transactions_digest: Digest::default(),
4265 },
4266 }
4267 }
4268
4269 fn d(i: u8) -> TransactionDigest {
4270 let mut bytes: [u8; 32] = Default::default();
4271 bytes[0] = i;
4272 TransactionDigest::new(bytes)
4273 }
4274
4275 fn e(
4276 transaction_digest: TransactionDigest,
4277 dependencies: Vec<TransactionDigest>,
4278 gas_used: GasCostSummary,
4279 ) -> TransactionEffects {
4280 let mut effects = TransactionEffects::default();
4281 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4282 *effects.dependencies_mut_for_testing() = dependencies;
4283 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4284 effects
4285 }
4286
4287 fn commit_cert_for_test(
4288 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4289 state: Arc<AuthorityState>,
4290 digest: TransactionDigest,
4291 dependencies: Vec<TransactionDigest>,
4292 gas_used: GasCostSummary,
4293 ) {
4294 let epoch_store = state.epoch_store_for_testing();
4295 let effects = e(digest, dependencies, gas_used);
4296 store.insert(digest, effects.clone());
4297 epoch_store.insert_executed_in_epoch(&digest);
4298 }
4299}