1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::ZipDebugEqIteratorExt;
29use mysten_common::random::get_rng;
30use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
31use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
32use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
33use nonempty::NonEmpty;
34use parking_lot::Mutex;
35use pin_project_lite::pin_project;
36use serde::{Deserialize, Serialize};
37use sui_macros::fail_point_arg;
38use sui_network::default_mysten_network_config;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
47use tokio::sync::{mpsc, watch};
48use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
49
50use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
51use crate::authority::authority_store_pruner::PrunerWatermarks;
52use crate::consensus_handler::SequencedConsensusTransactionKey;
53use rand::seq::SliceRandom;
54use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
55use std::fs::File;
56use std::future::Future;
57use std::io::Write;
58use std::path::Path;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::Weak;
62use std::task::{Context, Poll};
63use std::time::{Duration, SystemTime};
64use sui_protocol_config::ProtocolVersion;
65use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
66use sui_types::committee::StakeUnit;
67use sui_types::crypto::AuthorityStrongQuorumSignInfo;
68use sui_types::digests::{
69 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
70};
71use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
72use sui_types::error::{SuiErrorKind, SuiResult};
73use sui_types::gas::GasCostSummary;
74use sui_types::message_envelope::Message;
75use sui_types::messages_checkpoint::{
76 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
77 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
78 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
79 VerifiedCheckpointContents,
80};
81use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
82use sui_types::messages_consensus::ConsensusTransactionKey;
83use sui_types::signature::GenericSignature;
84use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
85use sui_types::transaction::{
86 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
87};
88use tokio::sync::Notify;
89use tracing::{debug, error, info, instrument, trace, warn};
90use typed_store::DBMapUtils;
91use typed_store::Map;
92use typed_store::{
93 TypedStoreError,
94 rocks::{DBMap, MetricConf},
95};
96
97const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
98
99pub type CheckpointHeight = u64;
100
101pub struct EpochStats {
102 pub checkpoint_count: u64,
103 pub transaction_count: u64,
104 pub total_gas_reward: u64,
105}
106
107#[derive(Clone, Debug)]
108pub struct PendingCheckpointInfo {
109 pub timestamp_ms: CheckpointTimestamp,
110 pub last_of_epoch: bool,
111 pub checkpoint_height: CheckpointHeight,
114 pub consensus_commit_ref: CommitRef,
116 pub rejected_transactions_digest: Digest,
117 pub checkpoint_seq: Option<CheckpointSequenceNumber>,
120}
121
122#[derive(Clone, Debug)]
123pub struct PendingCheckpoint {
124 pub roots: Vec<TransactionKey>,
125 pub details: PendingCheckpointInfo,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct CheckpointRoots {
130 pub tx_roots: Vec<TransactionKey>,
131 pub settlement_root: Option<TransactionKey>,
132 pub height: CheckpointHeight,
133}
134
135#[derive(Clone, Debug)]
142pub struct PendingCheckpointV2 {
143 pub roots: Vec<CheckpointRoots>,
144 pub details: PendingCheckpointInfo,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize)]
148pub struct BuilderCheckpointSummary {
149 pub summary: CheckpointSummary,
150 pub checkpoint_height: Option<CheckpointHeight>,
152 pub position_in_commit: usize,
153}
154
155#[derive(DBMapUtils)]
156#[cfg_attr(tidehunter, tidehunter)]
157pub struct CheckpointStoreTables {
158 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
160
161 pub(crate) checkpoint_sequence_by_contents_digest:
163 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
164
165 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
169 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
172
173 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
175 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
177
178 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
182
183 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
185
186 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
189
190 pub(crate) transaction_fork_detected: DBMap<
192 u8,
193 (
194 TransactionDigest,
195 TransactionEffectsDigest,
196 TransactionEffectsDigest,
197 ),
198 >,
199 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
200 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
201}
202
203fn full_checkpoint_content_table_default_config() -> DBOptions {
204 DBOptions {
205 options: default_db_options().options,
206 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
210 }
211}
212
213impl CheckpointStoreTables {
214 #[cfg(not(tidehunter))]
215 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
216 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
217 }
218
219 #[cfg(tidehunter)]
220 pub fn new(
221 path: &Path,
222 metric_name: &'static str,
223 pruner_watermarks: Arc<PrunerWatermarks>,
224 ) -> Self {
225 tracing::warn!("Checkpoint DB using tidehunter");
226 use crate::authority::authority_store_pruner::apply_relocation_filter;
227 use typed_store::tidehunter_util::{
228 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
229 default_mutex_count, default_value_cache_size,
230 };
231 let mutexes = default_mutex_count();
232 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
233 let override_dirty_keys_config = KeySpaceConfig::new()
234 .with_max_dirty_keys(16_000)
235 .with_value_cache_size(default_value_cache_size());
236 let config_u64 = ThConfig::new_with_config(
237 8,
238 mutexes,
239 u64_sequence_key,
240 override_dirty_keys_config.clone(),
241 );
242 let digest_config = ThConfig::new_with_rm_prefix(
243 32,
244 mutexes,
245 KeyType::uniform(default_cells_per_mutex()),
246 KeySpaceConfig::default(),
247 vec![0, 0, 0, 0, 0, 0, 0, 32],
248 );
249 let watermarks_config = KeySpaceConfig::new()
250 .with_value_cache_size(10)
251 .disable_unload();
252 let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
253 let configs = vec![
254 (
255 "checkpoint_content",
256 digest_config.clone().with_config(
257 KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
258 ),
259 ),
260 (
261 "checkpoint_sequence_by_contents_digest",
262 digest_config.clone().with_config(apply_relocation_filter(
263 KeySpaceConfig::default(),
264 pruner_watermarks.checkpoint_id.clone(),
265 |sequence_number: CheckpointSequenceNumber| sequence_number,
266 false,
267 )),
268 ),
269 (
270 "full_checkpoint_content",
271 config_u64.clone().with_config(apply_relocation_filter(
272 override_dirty_keys_config.clone(),
273 pruner_watermarks.checkpoint_id.clone(),
274 |sequence_number: CheckpointSequenceNumber| sequence_number,
275 true,
276 )),
277 ),
278 ("certified_checkpoints", config_u64.clone()),
279 (
280 "checkpoint_by_digest",
281 digest_config.clone().with_config(apply_relocation_filter(
282 lru_config,
283 pruner_watermarks.epoch_id.clone(),
284 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
285 false,
286 )),
287 ),
288 (
289 "locally_computed_checkpoints",
290 config_u64.clone().with_config(apply_relocation_filter(
291 override_dirty_keys_config.clone(),
292 pruner_watermarks.checkpoint_id.clone(),
293 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
294 true,
295 )),
296 ),
297 ("epoch_last_checkpoint_map", config_u64.clone()),
298 (
299 "watermarks",
300 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
301 ),
302 (
303 "transaction_fork_detected",
304 ThConfig::new_with_config(
305 1,
306 1,
307 KeyType::uniform(1),
308 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
309 ),
310 ),
311 (
312 "full_checkpoint_content_v2",
313 config_u64.clone().with_config(apply_relocation_filter(
314 override_dirty_keys_config.clone(),
315 pruner_watermarks.checkpoint_id.clone(),
316 |sequence_number: CheckpointSequenceNumber| sequence_number,
317 true,
318 )),
319 ),
320 ];
321 Self::open_tables_read_write(
322 path.to_path_buf(),
323 MetricConf::new(metric_name),
324 configs
325 .into_iter()
326 .map(|(cf, config)| (cf.to_string(), config))
327 .collect(),
328 )
329 }
330
331 #[cfg(not(tidehunter))]
332 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
333 Self::get_read_only_handle(
334 path.to_path_buf(),
335 None,
336 None,
337 MetricConf::new("checkpoint_readonly"),
338 )
339 }
340
341 #[cfg(tidehunter)]
342 pub fn open_readonly(path: &Path) -> Self {
343 Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
344 }
345}
346
347pub struct CheckpointStore {
348 pub(crate) tables: CheckpointStoreTables,
349 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
350 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
351}
352
353impl CheckpointStore {
354 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
355 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
356 Arc::new(Self {
357 tables,
358 synced_checkpoint_notify_read: NotifyRead::new(),
359 executed_checkpoint_notify_read: NotifyRead::new(),
360 })
361 }
362
363 pub fn new_for_tests() -> Arc<Self> {
364 let ckpt_dir = mysten_common::tempdir().unwrap();
365 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
366 }
367
368 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
369 let tables = CheckpointStoreTables::new(
370 path,
371 "db_checkpoint",
372 Arc::new(PrunerWatermarks::default()),
373 );
374 Arc::new(Self {
375 tables,
376 synced_checkpoint_notify_read: NotifyRead::new(),
377 executed_checkpoint_notify_read: NotifyRead::new(),
378 })
379 }
380
381 #[cfg(not(tidehunter))]
382 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
383 CheckpointStoreTables::open_readonly(path)
384 }
385
386 #[cfg(tidehunter)]
387 pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
388 CheckpointStoreTables::open_readonly(path)
389 }
390
391 #[instrument(level = "info", skip_all)]
392 pub fn insert_genesis_checkpoint(
393 &self,
394 checkpoint: VerifiedCheckpoint,
395 contents: CheckpointContents,
396 epoch_store: &AuthorityPerEpochStore,
397 ) {
398 assert_eq!(
399 checkpoint.epoch(),
400 0,
401 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
402 );
403 assert_eq!(
404 *checkpoint.sequence_number(),
405 0,
406 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
407 );
408
409 match self.get_checkpoint_by_sequence_number(0).unwrap() {
411 Some(existing_checkpoint) => {
412 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
413 }
414 None => {
415 if epoch_store.epoch() == checkpoint.epoch {
416 epoch_store
417 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
418 .unwrap();
419 } else {
420 debug!(
421 validator_epoch =% epoch_store.epoch(),
422 genesis_epoch =% checkpoint.epoch(),
423 "Not inserting checkpoint builder data for genesis checkpoint",
424 );
425 }
426 self.insert_checkpoint_contents(contents).unwrap();
427 self.insert_verified_checkpoint(&checkpoint).unwrap();
428 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
429 }
430 }
431 }
432
433 pub fn get_checkpoint_by_digest(
434 &self,
435 digest: &CheckpointDigest,
436 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
437 self.tables
438 .checkpoint_by_digest
439 .get(digest)
440 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
441 }
442
443 pub fn get_checkpoint_by_sequence_number(
444 &self,
445 sequence_number: CheckpointSequenceNumber,
446 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
447 self.tables
448 .certified_checkpoints
449 .get(&sequence_number)
450 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
451 }
452
453 pub fn get_locally_computed_checkpoint(
454 &self,
455 sequence_number: CheckpointSequenceNumber,
456 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
457 self.tables
458 .locally_computed_checkpoints
459 .get(&sequence_number)
460 }
461
462 pub fn multi_get_locally_computed_checkpoints(
463 &self,
464 sequence_numbers: &[CheckpointSequenceNumber],
465 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
466 let checkpoints = self
467 .tables
468 .locally_computed_checkpoints
469 .multi_get(sequence_numbers)?;
470
471 Ok(checkpoints)
472 }
473
474 pub fn get_sequence_number_by_contents_digest(
475 &self,
476 digest: &CheckpointContentsDigest,
477 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
478 self.tables
479 .checkpoint_sequence_by_contents_digest
480 .get(digest)
481 }
482
483 pub fn delete_contents_digest_sequence_number_mapping(
484 &self,
485 digest: &CheckpointContentsDigest,
486 ) -> Result<(), TypedStoreError> {
487 self.tables
488 .checkpoint_sequence_by_contents_digest
489 .remove(digest)
490 }
491
492 pub fn get_latest_certified_checkpoint(
493 &self,
494 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
495 Ok(self
496 .tables
497 .certified_checkpoints
498 .reversed_safe_iter_with_bounds(None, None)?
499 .next()
500 .transpose()?
501 .map(|(_, v)| v.into()))
502 }
503
504 pub fn get_latest_locally_computed_checkpoint(
505 &self,
506 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
507 Ok(self
508 .tables
509 .locally_computed_checkpoints
510 .reversed_safe_iter_with_bounds(None, None)?
511 .next()
512 .transpose()?
513 .map(|(_, v)| v))
514 }
515
516 pub fn multi_get_checkpoint_by_sequence_number(
517 &self,
518 sequence_numbers: &[CheckpointSequenceNumber],
519 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
520 let checkpoints = self
521 .tables
522 .certified_checkpoints
523 .multi_get(sequence_numbers)?
524 .into_iter()
525 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
526 .collect();
527
528 Ok(checkpoints)
529 }
530
531 pub fn multi_get_checkpoint_content(
532 &self,
533 contents_digest: &[CheckpointContentsDigest],
534 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
535 self.tables.checkpoint_content.multi_get(contents_digest)
536 }
537
538 pub fn get_highest_verified_checkpoint(
539 &self,
540 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
541 let highest_verified = if let Some(highest_verified) = self
542 .tables
543 .watermarks
544 .get(&CheckpointWatermark::HighestVerified)?
545 {
546 highest_verified
547 } else {
548 return Ok(None);
549 };
550 self.get_checkpoint_by_digest(&highest_verified.1)
551 }
552
553 pub fn get_highest_synced_checkpoint(
554 &self,
555 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
556 let highest_synced = if let Some(highest_synced) = self
557 .tables
558 .watermarks
559 .get(&CheckpointWatermark::HighestSynced)?
560 {
561 highest_synced
562 } else {
563 return Ok(None);
564 };
565 self.get_checkpoint_by_digest(&highest_synced.1)
566 }
567
568 pub fn get_highest_synced_checkpoint_seq_number(
569 &self,
570 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571 if let Some(highest_synced) = self
572 .tables
573 .watermarks
574 .get(&CheckpointWatermark::HighestSynced)?
575 {
576 Ok(Some(highest_synced.0))
577 } else {
578 Ok(None)
579 }
580 }
581
582 pub fn get_highest_executed_checkpoint_seq_number(
583 &self,
584 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
585 if let Some(highest_executed) = self
586 .tables
587 .watermarks
588 .get(&CheckpointWatermark::HighestExecuted)?
589 {
590 Ok(Some(highest_executed.0))
591 } else {
592 Ok(None)
593 }
594 }
595
596 pub fn get_highest_executed_checkpoint(
597 &self,
598 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
599 let highest_executed = if let Some(highest_executed) = self
600 .tables
601 .watermarks
602 .get(&CheckpointWatermark::HighestExecuted)?
603 {
604 highest_executed
605 } else {
606 return Ok(None);
607 };
608 self.get_checkpoint_by_digest(&highest_executed.1)
609 }
610
611 pub fn get_highest_pruned_checkpoint_seq_number(
612 &self,
613 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
614 self.tables
615 .watermarks
616 .get(&CheckpointWatermark::HighestPruned)
617 .map(|watermark| watermark.map(|w| w.0))
618 }
619
620 pub fn get_checkpoint_contents(
621 &self,
622 digest: &CheckpointContentsDigest,
623 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
624 self.tables.checkpoint_content.get(digest)
625 }
626
627 pub fn get_full_checkpoint_contents_by_sequence_number(
628 &self,
629 seq: CheckpointSequenceNumber,
630 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
631 self.tables.full_checkpoint_content_v2.get(&seq)
632 }
633
634 fn prune_local_summaries(&self) -> SuiResult {
635 if let Some((last_local_summary, _)) = self
636 .tables
637 .locally_computed_checkpoints
638 .reversed_safe_iter_with_bounds(None, None)?
639 .next()
640 .transpose()?
641 {
642 let mut batch = self.tables.locally_computed_checkpoints.batch();
643 batch.schedule_delete_range(
644 &self.tables.locally_computed_checkpoints,
645 &0,
646 &last_local_summary,
647 )?;
648 batch.write()?;
649 info!("Pruned local summaries up to {:?}", last_local_summary);
650 }
651 Ok(())
652 }
653
654 pub fn clear_locally_computed_checkpoints_from(
655 &self,
656 from_seq: CheckpointSequenceNumber,
657 ) -> SuiResult {
658 let keys: Vec<_> = self
659 .tables
660 .locally_computed_checkpoints
661 .safe_iter_with_bounds(Some(from_seq), None)
662 .map(|r| r.map(|(k, _)| k))
663 .collect::<Result<_, _>>()?;
664 if let Some(&last_local_summary) = keys.last() {
665 let mut batch = self.tables.locally_computed_checkpoints.batch();
666 batch
667 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
668 .expect("Failed to delete locally computed checkpoints");
669 batch
670 .write()
671 .expect("Failed to delete locally computed checkpoints");
672 warn!(
673 from_seq,
674 last_local_summary,
675 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
676 from_seq,
677 last_local_summary
678 );
679 }
680 Ok(())
681 }
682
683 fn check_for_checkpoint_fork(
684 &self,
685 local_checkpoint: &CheckpointSummary,
686 verified_checkpoint: &VerifiedCheckpoint,
687 ) {
688 if local_checkpoint != verified_checkpoint.data() {
689 let verified_contents = self
690 .get_checkpoint_contents(&verified_checkpoint.content_digest)
691 .map(|opt_contents| {
692 opt_contents
693 .map(|contents| format!("{:?}", contents))
694 .unwrap_or_else(|| {
695 format!(
696 "Verified checkpoint contents not found, digest: {:?}",
697 verified_checkpoint.content_digest,
698 )
699 })
700 })
701 .map_err(|e| {
702 format!(
703 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
704 verified_checkpoint.content_digest, e
705 )
706 })
707 .unwrap_or_else(|err_msg| err_msg);
708
709 let local_contents = self
710 .get_checkpoint_contents(&local_checkpoint.content_digest)
711 .map(|opt_contents| {
712 opt_contents
713 .map(|contents| format!("{:?}", contents))
714 .unwrap_or_else(|| {
715 format!(
716 "Local checkpoint contents not found, digest: {:?}",
717 local_checkpoint.content_digest
718 )
719 })
720 })
721 .map_err(|e| {
722 format!(
723 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
724 local_checkpoint.content_digest, e
725 )
726 })
727 .unwrap_or_else(|err_msg| err_msg);
728
729 error!(
731 verified_checkpoint = ?verified_checkpoint.data(),
732 ?verified_contents,
733 ?local_checkpoint,
734 ?local_contents,
735 "Local checkpoint fork detected!",
736 );
737
738 if let Err(e) = self.record_checkpoint_fork_detected(
740 *local_checkpoint.sequence_number(),
741 local_checkpoint.digest(),
742 ) {
743 error!("Failed to record checkpoint fork in database: {:?}", e);
744 }
745
746 fail_point_arg!(
747 "kill_checkpoint_fork_node",
748 |checkpoint_overrides: std::sync::Arc<
749 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
750 >| {
751 #[cfg(msim)]
752 {
753 if let Ok(mut overrides) = checkpoint_overrides.lock() {
754 overrides.insert(
755 local_checkpoint.sequence_number,
756 verified_checkpoint.digest().to_string(),
757 );
758 }
759 tracing::error!(
760 fatal = true,
761 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
762 local_checkpoint.sequence_number(),
763 verified_checkpoint.digest()
764 );
765 sui_simulator::task::shutdown_current_node();
766 }
767 }
768 );
769
770 fatal!(
771 "Local checkpoint fork detected for sequence number: {}",
772 local_checkpoint.sequence_number()
773 );
774 }
775 }
776
777 pub fn insert_certified_checkpoint(
783 &self,
784 checkpoint: &VerifiedCheckpoint,
785 ) -> Result<(), TypedStoreError> {
786 debug!(
787 checkpoint_seq = checkpoint.sequence_number(),
788 "Inserting certified checkpoint",
789 );
790 let mut batch = self.tables.certified_checkpoints.batch();
791 batch
792 .insert_batch(
793 &self.tables.certified_checkpoints,
794 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
795 )?
796 .insert_batch(
797 &self.tables.checkpoint_by_digest,
798 [(checkpoint.digest(), checkpoint.serializable_ref())],
799 )?;
800 if checkpoint.next_epoch_committee().is_some() {
801 batch.insert_batch(
802 &self.tables.epoch_last_checkpoint_map,
803 [(&checkpoint.epoch(), checkpoint.sequence_number())],
804 )?;
805 }
806 batch.write()?;
807
808 if let Some(local_checkpoint) = self
809 .tables
810 .locally_computed_checkpoints
811 .get(checkpoint.sequence_number())?
812 {
813 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
814 }
815
816 Ok(())
817 }
818
819 #[instrument(level = "debug", skip_all)]
822 pub fn insert_verified_checkpoint(
823 &self,
824 checkpoint: &VerifiedCheckpoint,
825 ) -> Result<(), TypedStoreError> {
826 self.insert_certified_checkpoint(checkpoint)?;
827 self.update_highest_verified_checkpoint(checkpoint)
828 }
829
830 pub fn update_highest_verified_checkpoint(
831 &self,
832 checkpoint: &VerifiedCheckpoint,
833 ) -> Result<(), TypedStoreError> {
834 if Some(*checkpoint.sequence_number())
835 > self
836 .get_highest_verified_checkpoint()?
837 .map(|x| *x.sequence_number())
838 {
839 debug!(
840 checkpoint_seq = checkpoint.sequence_number(),
841 "Updating highest verified checkpoint",
842 );
843 self.tables.watermarks.insert(
844 &CheckpointWatermark::HighestVerified,
845 &(*checkpoint.sequence_number(), *checkpoint.digest()),
846 )?;
847 }
848
849 Ok(())
850 }
851
852 pub fn update_highest_synced_checkpoint(
853 &self,
854 checkpoint: &VerifiedCheckpoint,
855 ) -> Result<(), TypedStoreError> {
856 let seq = *checkpoint.sequence_number();
857 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
858 self.tables.watermarks.insert(
859 &CheckpointWatermark::HighestSynced,
860 &(seq, *checkpoint.digest()),
861 )?;
862 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
863 Ok(())
864 }
865
866 async fn notify_read_checkpoint_watermark<F>(
867 &self,
868 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
869 seq: CheckpointSequenceNumber,
870 get_watermark: F,
871 ) -> VerifiedCheckpoint
872 where
873 F: Fn() -> Option<CheckpointSequenceNumber>,
874 {
875 notify_read
876 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
877 let seq = seqs[0];
878 let Some(highest) = get_watermark() else {
879 return vec![None];
880 };
881 if highest < seq {
882 return vec![None];
883 }
884 let checkpoint = self
885 .get_checkpoint_by_sequence_number(seq)
886 .expect("db error")
887 .expect("checkpoint not found");
888 vec![Some(checkpoint)]
889 })
890 .await
891 .into_iter()
892 .next()
893 .unwrap()
894 }
895
896 pub async fn notify_read_synced_checkpoint(
897 &self,
898 seq: CheckpointSequenceNumber,
899 ) -> VerifiedCheckpoint {
900 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
901 self.get_highest_synced_checkpoint_seq_number()
902 .expect("db error")
903 })
904 .await
905 }
906
907 pub async fn notify_read_executed_checkpoint(
908 &self,
909 seq: CheckpointSequenceNumber,
910 ) -> VerifiedCheckpoint {
911 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
912 self.get_highest_executed_checkpoint_seq_number()
913 .expect("db error")
914 })
915 .await
916 }
917
918 pub fn update_highest_executed_checkpoint(
919 &self,
920 checkpoint: &VerifiedCheckpoint,
921 ) -> Result<(), TypedStoreError> {
922 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
923 if seq_number >= *checkpoint.sequence_number() {
924 return Ok(());
925 }
926 assert_eq!(
927 seq_number + 1,
928 *checkpoint.sequence_number(),
929 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
930 checkpoint.sequence_number(),
931 seq_number
932 );
933 }
934 let seq = *checkpoint.sequence_number();
935 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
936 self.tables.watermarks.insert(
937 &CheckpointWatermark::HighestExecuted,
938 &(seq, *checkpoint.digest()),
939 )?;
940 self.executed_checkpoint_notify_read
941 .notify(&seq, checkpoint);
942 Ok(())
943 }
944
945 pub fn update_highest_pruned_checkpoint(
946 &self,
947 checkpoint: &VerifiedCheckpoint,
948 ) -> Result<(), TypedStoreError> {
949 self.tables.watermarks.insert(
950 &CheckpointWatermark::HighestPruned,
951 &(*checkpoint.sequence_number(), *checkpoint.digest()),
952 )
953 }
954
955 pub fn set_highest_executed_checkpoint_subtle(
960 &self,
961 checkpoint: &VerifiedCheckpoint,
962 ) -> Result<(), TypedStoreError> {
963 self.tables.watermarks.insert(
964 &CheckpointWatermark::HighestExecuted,
965 &(*checkpoint.sequence_number(), *checkpoint.digest()),
966 )
967 }
968
969 pub fn insert_checkpoint_contents(
970 &self,
971 contents: CheckpointContents,
972 ) -> Result<(), TypedStoreError> {
973 debug!(
974 checkpoint_seq = ?contents.digest(),
975 "Inserting checkpoint contents",
976 );
977 self.tables
978 .checkpoint_content
979 .insert(contents.digest(), &contents)
980 }
981
982 pub fn insert_verified_checkpoint_contents(
983 &self,
984 checkpoint: &VerifiedCheckpoint,
985 full_contents: VerifiedCheckpointContents,
986 ) -> Result<(), TypedStoreError> {
987 let mut batch = self.tables.full_checkpoint_content_v2.batch();
988 batch.insert_batch(
989 &self.tables.checkpoint_sequence_by_contents_digest,
990 [(&checkpoint.content_digest, checkpoint.sequence_number())],
991 )?;
992 let full_contents = full_contents.into_inner();
993 batch.insert_batch(
994 &self.tables.full_checkpoint_content_v2,
995 [(checkpoint.sequence_number(), &full_contents)],
996 )?;
997
998 let contents = full_contents.into_checkpoint_contents();
999 assert_eq!(&checkpoint.content_digest, contents.digest());
1000
1001 batch.insert_batch(
1002 &self.tables.checkpoint_content,
1003 [(contents.digest(), &contents)],
1004 )?;
1005
1006 batch.write()
1007 }
1008
1009 pub fn delete_full_checkpoint_contents(
1010 &self,
1011 seq: CheckpointSequenceNumber,
1012 ) -> Result<(), TypedStoreError> {
1013 self.tables.full_checkpoint_content.remove(&seq)?;
1014 self.tables.full_checkpoint_content_v2.remove(&seq)
1015 }
1016
1017 pub fn get_epoch_last_checkpoint(
1018 &self,
1019 epoch_id: EpochId,
1020 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1021 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1022 let checkpoint = match seq {
1023 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1024 None => None,
1025 };
1026 Ok(checkpoint)
1027 }
1028
1029 pub fn get_epoch_last_checkpoint_seq_number(
1030 &self,
1031 epoch_id: EpochId,
1032 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1033 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1034 Ok(seq)
1035 }
1036
1037 pub fn insert_epoch_last_checkpoint(
1038 &self,
1039 epoch_id: EpochId,
1040 checkpoint: &VerifiedCheckpoint,
1041 ) -> SuiResult {
1042 self.tables
1043 .epoch_last_checkpoint_map
1044 .insert(&epoch_id, checkpoint.sequence_number())?;
1045 Ok(())
1046 }
1047
1048 pub fn get_epoch_state_commitments(
1049 &self,
1050 epoch: EpochId,
1051 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1052 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1053 checkpoint
1054 .end_of_epoch_data
1055 .as_ref()
1056 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1057 .epoch_commitments
1058 .clone()
1059 });
1060 Ok(commitments)
1061 }
1062
1063 pub fn get_epoch_stats(
1065 &self,
1066 epoch: EpochId,
1067 last_checkpoint: &CheckpointSummary,
1068 ) -> Option<EpochStats> {
1069 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1070 (0, 0)
1071 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1072 (
1073 checkpoint.sequence_number + 1,
1074 checkpoint.network_total_transactions,
1075 )
1076 } else {
1077 return None;
1078 };
1079 Some(EpochStats {
1080 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1081 transaction_count: last_checkpoint.network_total_transactions
1082 - prev_epoch_network_transactions,
1083 total_gas_reward: last_checkpoint
1084 .epoch_rolling_gas_cost_summary
1085 .computation_cost,
1086 })
1087 }
1088
1089 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1090 self.tables
1092 .checkpoint_content
1093 .checkpoint_db(path)
1094 .map_err(Into::into)
1095 }
1096
1097 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1098 let mut wb = self.tables.watermarks.batch();
1099 wb.delete_batch(
1100 &self.tables.watermarks,
1101 std::iter::once(CheckpointWatermark::HighestExecuted),
1102 )?;
1103 wb.write()?;
1104 Ok(())
1105 }
1106
1107 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1108 self.delete_highest_executed_checkpoint_test_only()?;
1109 Ok(())
1110 }
1111
1112 pub fn record_checkpoint_fork_detected(
1113 &self,
1114 checkpoint_seq: CheckpointSequenceNumber,
1115 checkpoint_digest: CheckpointDigest,
1116 ) -> Result<(), TypedStoreError> {
1117 info!(
1118 checkpoint_seq = checkpoint_seq,
1119 checkpoint_digest = ?checkpoint_digest,
1120 "Recording checkpoint fork detection in database"
1121 );
1122 self.tables.watermarks.insert(
1123 &CheckpointWatermark::CheckpointForkDetected,
1124 &(checkpoint_seq, checkpoint_digest),
1125 )
1126 }
1127
1128 pub fn get_checkpoint_fork_detected(
1129 &self,
1130 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1131 self.tables
1132 .watermarks
1133 .get(&CheckpointWatermark::CheckpointForkDetected)
1134 }
1135
1136 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1137 self.tables
1138 .watermarks
1139 .remove(&CheckpointWatermark::CheckpointForkDetected)
1140 }
1141
1142 pub fn record_transaction_fork_detected(
1143 &self,
1144 tx_digest: TransactionDigest,
1145 expected_effects_digest: TransactionEffectsDigest,
1146 actual_effects_digest: TransactionEffectsDigest,
1147 ) -> Result<(), TypedStoreError> {
1148 info!(
1149 tx_digest = ?tx_digest,
1150 expected_effects_digest = ?expected_effects_digest,
1151 actual_effects_digest = ?actual_effects_digest,
1152 "Recording transaction fork detection in database"
1153 );
1154 self.tables.transaction_fork_detected.insert(
1155 &TRANSACTION_FORK_DETECTED_KEY,
1156 &(tx_digest, expected_effects_digest, actual_effects_digest),
1157 )
1158 }
1159
1160 pub fn get_transaction_fork_detected(
1161 &self,
1162 ) -> Result<
1163 Option<(
1164 TransactionDigest,
1165 TransactionEffectsDigest,
1166 TransactionEffectsDigest,
1167 )>,
1168 TypedStoreError,
1169 > {
1170 self.tables
1171 .transaction_fork_detected
1172 .get(&TRANSACTION_FORK_DETECTED_KEY)
1173 }
1174
1175 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1176 self.tables
1177 .transaction_fork_detected
1178 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1179 }
1180}
1181
1182#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1183pub enum CheckpointWatermark {
1184 HighestVerified,
1185 HighestSynced,
1186 HighestExecuted,
1187 HighestPruned,
1188 CheckpointForkDetected,
1189}
1190
1191struct CheckpointStateHasher {
1192 epoch_store: Arc<AuthorityPerEpochStore>,
1193 hasher: Weak<GlobalStateHasher>,
1194 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1195}
1196
1197impl CheckpointStateHasher {
1198 fn new(
1199 epoch_store: Arc<AuthorityPerEpochStore>,
1200 hasher: Weak<GlobalStateHasher>,
1201 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1202 ) -> Self {
1203 Self {
1204 epoch_store,
1205 hasher,
1206 receive_from_builder,
1207 }
1208 }
1209
1210 async fn run(self) {
1211 let Self {
1212 epoch_store,
1213 hasher,
1214 mut receive_from_builder,
1215 } = self;
1216 while let Some((seq, effects)) = receive_from_builder.recv().await {
1217 let Some(hasher) = hasher.upgrade() else {
1218 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1219 break;
1220 };
1221 hasher
1222 .accumulate_checkpoint(&effects, seq, &epoch_store)
1223 .expect("epoch ended while accumulating checkpoint");
1224 }
1225 }
1226}
1227
1228#[derive(Debug)]
1229pub enum CheckpointBuilderError {
1230 ChangeEpochTxAlreadyExecuted,
1231 SystemPackagesMissing,
1232 Retry(anyhow::Error),
1233}
1234
1235impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1236 for CheckpointBuilderError
1237{
1238 fn from(e: SuiError) -> Self {
1239 Self::Retry(e.into())
1240 }
1241}
1242
1243pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1244
1245pub struct CheckpointBuilder {
1246 state: Arc<AuthorityState>,
1247 store: Arc<CheckpointStore>,
1248 epoch_store: Arc<AuthorityPerEpochStore>,
1249 notify: Arc<Notify>,
1250 notify_aggregator: Arc<Notify>,
1251 last_built: watch::Sender<CheckpointSequenceNumber>,
1252 effects_store: Arc<dyn TransactionCacheRead>,
1253 global_state_hasher: Weak<GlobalStateHasher>,
1254 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1255 output: Box<dyn CheckpointOutput>,
1256 metrics: Arc<CheckpointMetrics>,
1257 max_transactions_per_checkpoint: usize,
1258 max_checkpoint_size_bytes: usize,
1259}
1260
1261pub struct CheckpointAggregator {
1262 store: Arc<CheckpointStore>,
1263 epoch_store: Arc<AuthorityPerEpochStore>,
1264 notify: Arc<Notify>,
1265 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1266 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1267 current: Option<CheckpointSignatureAggregator>,
1268 output: Box<dyn CertifiedCheckpointOutput>,
1269 state: Arc<AuthorityState>,
1270 metrics: Arc<CheckpointMetrics>,
1271}
1272
1273pub struct CheckpointSignatureAggregator {
1275 summary: CheckpointSummary,
1276 digest: CheckpointDigest,
1277 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1279 store: Arc<CheckpointStore>,
1280 state: Arc<AuthorityState>,
1281 metrics: Arc<CheckpointMetrics>,
1282}
1283
1284impl CheckpointBuilder {
1285 fn new(
1286 state: Arc<AuthorityState>,
1287 store: Arc<CheckpointStore>,
1288 epoch_store: Arc<AuthorityPerEpochStore>,
1289 notify: Arc<Notify>,
1290 effects_store: Arc<dyn TransactionCacheRead>,
1291 global_state_hasher: Weak<GlobalStateHasher>,
1293 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1295 output: Box<dyn CheckpointOutput>,
1296 notify_aggregator: Arc<Notify>,
1297 last_built: watch::Sender<CheckpointSequenceNumber>,
1298 metrics: Arc<CheckpointMetrics>,
1299 max_transactions_per_checkpoint: usize,
1300 max_checkpoint_size_bytes: usize,
1301 ) -> Self {
1302 Self {
1303 state,
1304 store,
1305 epoch_store,
1306 notify,
1307 effects_store,
1308 global_state_hasher,
1309 send_to_hasher,
1310 output,
1311 notify_aggregator,
1312 last_built,
1313 metrics,
1314 max_transactions_per_checkpoint,
1315 max_checkpoint_size_bytes,
1316 }
1317 }
1318
1319 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1327 if let Some(replay_waiter) = consensus_replay_waiter {
1328 info!("Waiting for consensus commits to replay ...");
1329 replay_waiter.wait_for_replay().await;
1330 info!("Consensus commits finished replaying");
1331 }
1332 info!("Starting CheckpointBuilder");
1333 loop {
1334 match self.maybe_build_checkpoints().await {
1335 Ok(()) => {}
1336 err @ Err(
1337 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1338 | CheckpointBuilderError::SystemPackagesMissing,
1339 ) => {
1340 info!("CheckpointBuilder stopping: {:?}", err);
1341 return;
1342 }
1343 Err(CheckpointBuilderError::Retry(inner)) => {
1344 let msg = format!("{:?}", inner);
1345 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1346 tokio::time::sleep(Duration::from_secs(1)).await;
1347 self.metrics.checkpoint_errors.inc();
1348 continue;
1349 }
1350 }
1351
1352 self.notify.notified().await;
1353 }
1354 }
1355
1356 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1357 if self
1358 .epoch_store
1359 .protocol_config()
1360 .split_checkpoints_in_consensus_handler()
1361 {
1362 self.maybe_build_checkpoints_v2().await
1363 } else {
1364 self.maybe_build_checkpoints_v1().await
1365 }
1366 }
1367
1368 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1369 let _scope = monitored_scope("BuildCheckpoints");
1370
1371 let summary = self
1373 .epoch_store
1374 .last_built_checkpoint_builder_summary()
1375 .expect("epoch should not have ended");
1376 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1377 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1378
1379 let min_checkpoint_interval_ms = self
1380 .epoch_store
1381 .protocol_config()
1382 .min_checkpoint_interval_ms_as_option()
1383 .unwrap_or_default();
1384 let mut grouped_pending_checkpoints = Vec::new();
1385 let mut checkpoints_iter = self
1386 .epoch_store
1387 .get_pending_checkpoints(last_height)
1388 .expect("unexpected epoch store error")
1389 .into_iter()
1390 .peekable();
1391 while let Some((height, pending)) = checkpoints_iter.next() {
1392 let current_timestamp = pending.details().timestamp_ms;
1395 let can_build = match last_timestamp {
1396 Some(last_timestamp) => {
1397 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1398 }
1399 None => true,
1400 } || checkpoints_iter
1403 .peek()
1404 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1405 || pending.details().last_of_epoch;
1407 grouped_pending_checkpoints.push(pending);
1408 if !can_build {
1409 debug!(
1410 checkpoint_commit_height = height,
1411 ?last_timestamp,
1412 ?current_timestamp,
1413 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1414 );
1415 continue;
1416 }
1417
1418 last_height = Some(height);
1420 last_timestamp = Some(current_timestamp);
1421 debug!(
1422 checkpoint_commit_height_from = grouped_pending_checkpoints
1423 .first()
1424 .unwrap()
1425 .details()
1426 .checkpoint_height,
1427 checkpoint_commit_height_to = last_height,
1428 "Making checkpoint with commit height range"
1429 );
1430
1431 let seq = self
1432 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1433 .await?;
1434
1435 self.last_built.send_if_modified(|cur| {
1436 if seq > *cur {
1438 *cur = seq;
1439 true
1440 } else {
1441 false
1442 }
1443 });
1444
1445 tokio::task::yield_now().await;
1448 }
1449 debug!(
1450 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1451 grouped_pending_checkpoints.len(),
1452 );
1453
1454 Ok(())
1455 }
1456
1457 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1458 let _scope = monitored_scope("BuildCheckpoints");
1459
1460 let last_height = self
1462 .epoch_store
1463 .last_built_checkpoint_builder_summary()
1464 .expect("epoch should not have ended")
1465 .and_then(|s| s.checkpoint_height);
1466
1467 for (height, pending) in self
1468 .epoch_store
1469 .get_pending_checkpoints_v2(last_height)
1470 .expect("unexpected epoch store error")
1471 {
1472 debug!(checkpoint_commit_height = height, "Making checkpoint");
1473
1474 let seq = self.make_checkpoint_v2(pending).await?;
1475
1476 self.last_built.send_if_modified(|cur| {
1477 if seq > *cur {
1479 *cur = seq;
1480 true
1481 } else {
1482 false
1483 }
1484 });
1485
1486 tokio::task::yield_now().await;
1489 }
1490
1491 Ok(())
1492 }
1493
1494 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1495 async fn make_checkpoint(
1496 &mut self,
1497 pendings: Vec<PendingCheckpoint>,
1498 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1499 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1500
1501 let pending_ckpt_str = pendings
1502 .iter()
1503 .map(|p| {
1504 format!(
1505 "height={}, commit={}",
1506 p.details().checkpoint_height,
1507 p.details().consensus_commit_ref
1508 )
1509 })
1510 .join("; ");
1511
1512 let last_details = pendings.last().unwrap().details().clone();
1513
1514 let highest_executed_sequence = self
1517 .store
1518 .get_highest_executed_checkpoint_seq_number()
1519 .expect("db error")
1520 .unwrap_or(0);
1521
1522 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1523 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1524
1525 let new_checkpoints = self
1526 .create_checkpoints(
1527 sorted_tx_effects_included_in_checkpoint,
1528 &last_details,
1529 &all_roots,
1530 )
1531 .await?;
1532 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1533 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1534 debug_fatal!(
1535 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1536 );
1537 }
1538
1539 let new_ckpt_str = new_checkpoints
1540 .iter()
1541 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1542 .join("; ");
1543
1544 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1545 .await?;
1546 info!(
1547 "Made new checkpoint {} from pending checkpoint {}",
1548 new_ckpt_str, pending_ckpt_str
1549 );
1550
1551 Ok(highest_sequence)
1552 }
1553
1554 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1555 async fn make_checkpoint_v2(
1556 &mut self,
1557 pending: PendingCheckpointV2,
1558 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1559 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1560
1561 let details = pending.details.clone();
1562
1563 let highest_executed_sequence = self
1564 .store
1565 .get_highest_executed_checkpoint_seq_number()
1566 .expect("db error")
1567 .unwrap_or(0);
1568
1569 let (poll_count, result) =
1570 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1571 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1572
1573 let new_checkpoints = self
1574 .create_checkpoints(
1575 sorted_tx_effects_included_in_checkpoint,
1576 &details,
1577 &all_roots,
1578 )
1579 .await?;
1580 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1581 let sequence = *new_checkpoints.first().0.sequence_number();
1582 let digest = new_checkpoints.first().0.digest();
1583 if sequence <= highest_executed_sequence && poll_count > 1 {
1584 debug_fatal!(
1585 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1586 );
1587 }
1588
1589 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1590 .await?;
1591 info!(
1592 seq = sequence,
1593 %digest,
1594 height = details.checkpoint_height,
1595 commit = %details.consensus_commit_ref,
1596 "Made new checkpoint"
1597 );
1598
1599 Ok(sequence)
1600 }
1601
1602 async fn construct_and_execute_settlement_transactions(
1603 &self,
1604 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1605 checkpoint_height: CheckpointHeight,
1606 checkpoint_seq: CheckpointSequenceNumber,
1607 tx_index_offset: u64,
1608 ) -> (TransactionKey, Vec<TransactionEffects>) {
1609 let _scope =
1610 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1611
1612 let tx_key =
1613 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1614
1615 let epoch = self.epoch_store.epoch();
1616 let accumulator_root_obj_initial_shared_version = self
1617 .epoch_store
1618 .epoch_start_config()
1619 .accumulator_root_obj_initial_shared_version()
1620 .expect("accumulator root object must exist");
1621
1622 let builder = AccumulatorSettlementTxBuilder::new(
1623 Some(self.effects_store.as_ref()),
1624 sorted_tx_effects_included_in_checkpoint,
1625 checkpoint_seq,
1626 tx_index_offset,
1627 );
1628
1629 let funds_changes = builder.collect_funds_changes();
1630 let num_updates = builder.num_updates();
1631 let settlement_txns = builder.build_tx(
1632 self.epoch_store.protocol_config(),
1633 epoch,
1634 accumulator_root_obj_initial_shared_version,
1635 checkpoint_height,
1636 checkpoint_seq,
1637 );
1638
1639 let settlement_txns: Vec<_> = settlement_txns
1640 .into_iter()
1641 .map(|tx| {
1642 VerifiedExecutableTransaction::new_system(
1643 VerifiedTransaction::new_system_transaction(tx),
1644 self.epoch_store.epoch(),
1645 )
1646 })
1647 .collect();
1648
1649 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1650
1651 debug!(
1652 ?settlement_digests,
1653 ?tx_key,
1654 "created settlement transactions with {num_updates} updates"
1655 );
1656
1657 self.epoch_store
1658 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1659
1660 let settlement_effects = wait_for_effects_with_retry(
1661 self.effects_store.as_ref(),
1662 "CheckpointBuilder::notify_read_settlement_effects",
1663 &settlement_digests,
1664 tx_key,
1665 )
1666 .await;
1667 let (accounts_created, accounts_deleted) =
1668 accumulators::count_accumulator_object_changes(&settlement_effects);
1669 self.metrics
1670 .report_accumulator_account_changes(accounts_created, accounts_deleted);
1671
1672 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1673 epoch,
1674 accumulator_root_obj_initial_shared_version,
1675 checkpoint_height,
1676 &settlement_effects,
1677 );
1678
1679 let barrier_tx = VerifiedExecutableTransaction::new_system(
1680 VerifiedTransaction::new_system_transaction(barrier_tx),
1681 self.epoch_store.epoch(),
1682 );
1683 let barrier_digest = *barrier_tx.digest();
1684
1685 self.epoch_store
1686 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1687
1688 let barrier_effects = wait_for_effects_with_retry(
1689 self.effects_store.as_ref(),
1690 "CheckpointBuilder::notify_read_barrier_effects",
1691 &[barrier_digest],
1692 tx_key,
1693 )
1694 .await;
1695
1696 let settlement_and_barrier_effects: Vec<_> = settlement_effects
1697 .into_iter()
1698 .chain(barrier_effects)
1699 .collect();
1700
1701 let mut next_accumulator_version = None;
1702 for fx in settlement_and_barrier_effects.iter() {
1703 assert!(
1704 fx.status().is_ok(),
1705 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1706 fx.transaction_digest(),
1707 fx
1708 );
1709 if let Some(version) = fx
1710 .mutated()
1711 .iter()
1712 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1713 {
1714 assert!(
1715 next_accumulator_version.is_none(),
1716 "Only one settlement transaction should mutate the accumulator root object"
1717 );
1718 next_accumulator_version = Some(version);
1719 }
1720 }
1721 let settlements = FundsSettlement {
1722 next_accumulator_version: next_accumulator_version
1723 .expect("Accumulator root object should be mutated in the settlement transactions"),
1724 funds_changes,
1725 };
1726
1727 self.state
1728 .execution_scheduler()
1729 .settle_address_funds(settlements);
1730
1731 (tx_key, settlement_and_barrier_effects)
1732 }
1733
1734 #[instrument(level = "debug", skip_all)]
1739 async fn resolve_checkpoint_transactions(
1740 &self,
1741 pending_checkpoints: Vec<PendingCheckpoint>,
1742 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1743 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1744
1745 let mut effects_in_current_checkpoint = BTreeSet::new();
1750
1751 let mut tx_effects = Vec::new();
1752 let mut tx_roots = HashSet::new();
1753
1754 for pending_checkpoint in pending_checkpoints.into_iter() {
1755 let mut pending = pending_checkpoint;
1756 debug!(
1757 checkpoint_commit_height = pending.details.checkpoint_height,
1758 "Resolving checkpoint transactions for pending checkpoint.",
1759 );
1760
1761 trace!(
1762 "roots for pending checkpoint {:?}: {:?}",
1763 pending.details.checkpoint_height, pending.roots,
1764 );
1765
1766 let settlement_root = if self.epoch_store.accumulators_enabled() {
1767 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1768 pending.roots.pop()
1769 else {
1770 fatal!("No settlement root found");
1771 };
1772 Some(settlement_root)
1773 } else {
1774 None
1775 };
1776
1777 let roots = &pending.roots;
1778
1779 self.metrics
1780 .checkpoint_roots_count
1781 .inc_by(roots.len() as u64);
1782
1783 let root_digests = self
1784 .epoch_store
1785 .notify_read_tx_key_to_digest(roots)
1786 .in_monitored_scope("CheckpointNotifyDigests")
1787 .await?;
1788 let root_effects = self
1789 .effects_store
1790 .notify_read_executed_effects(
1791 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1792 &root_digests,
1793 )
1794 .in_monitored_scope("CheckpointNotifyRead")
1795 .await;
1796
1797 assert!(
1798 self.epoch_store
1799 .protocol_config()
1800 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1801 );
1802
1803 let consensus_commit_prologue =
1806 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1807
1808 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1811 let unsorted_ccp = self.complete_checkpoint_effects(
1812 vec![ccp_effects.clone()],
1813 &mut effects_in_current_checkpoint,
1814 )?;
1815
1816 if unsorted_ccp.len() != 1 {
1819 fatal!(
1820 "Expected 1 consensus commit prologue, got {:?}",
1821 unsorted_ccp
1822 .iter()
1823 .map(|e| e.transaction_digest())
1824 .collect::<Vec<_>>()
1825 );
1826 }
1827 assert_eq!(unsorted_ccp.len(), 1);
1828 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1829 }
1830
1831 let unsorted =
1832 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1833
1834 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1835 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1836 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1837 if cfg!(debug_assertions) {
1838 for tx in unsorted.iter() {
1840 assert!(tx.transaction_digest() != &ccp_digest);
1841 }
1842 }
1843 sorted.push(ccp_effects);
1844 }
1845 sorted.extend(CausalOrder::causal_sort(unsorted));
1846
1847 if let Some(settlement_root) = settlement_root {
1848 let last_checkpoint =
1851 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1852 let next_checkpoint_seq = last_checkpoint
1853 .as_ref()
1854 .map(|(seq, _)| *seq)
1855 .unwrap_or_default()
1856 + 1;
1857 let tx_index_offset = tx_effects.len() as u64;
1858
1859 let (tx_key, settlement_effects) = self
1860 .construct_and_execute_settlement_transactions(
1861 &sorted,
1862 pending.details.checkpoint_height,
1863 next_checkpoint_seq,
1864 tx_index_offset,
1865 )
1866 .await;
1867 debug!(?tx_key, "executed settlement transactions");
1868
1869 assert_eq!(settlement_root, tx_key);
1870
1871 effects_in_current_checkpoint
1882 .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1883 sorted.extend(settlement_effects);
1884 }
1885
1886 #[cfg(msim)]
1887 {
1888 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1890 }
1891
1892 tx_effects.extend(sorted);
1893 tx_roots.extend(root_digests);
1894 }
1895
1896 Ok((tx_effects, tx_roots))
1897 }
1898
1899 #[instrument(level = "debug", skip_all)]
1902 async fn resolve_checkpoint_transactions_v2(
1903 &self,
1904 pending: PendingCheckpointV2,
1905 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1906 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1907
1908 debug!(
1909 checkpoint_commit_height = pending.details.checkpoint_height,
1910 "Resolving checkpoint transactions for pending checkpoint.",
1911 );
1912
1913 trace!(
1914 "roots for pending checkpoint {:?}: {:?}",
1915 pending.details.checkpoint_height, pending.roots,
1916 );
1917
1918 assert!(
1919 self.epoch_store
1920 .protocol_config()
1921 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1922 );
1923
1924 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1925 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1926
1927 for checkpoint_roots in &pending.roots {
1928 let tx_roots = &checkpoint_roots.tx_roots;
1929
1930 self.metrics
1931 .checkpoint_roots_count
1932 .inc_by(tx_roots.len() as u64);
1933
1934 let root_digests = self
1935 .epoch_store
1936 .notify_read_tx_key_to_digest(tx_roots)
1937 .in_monitored_scope("CheckpointNotifyDigests")
1938 .await?;
1939
1940 all_root_digests.extend(root_digests.iter().cloned());
1941
1942 let root_effects = self
1943 .effects_store
1944 .notify_read_executed_effects(
1945 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1946 &root_digests,
1947 )
1948 .in_monitored_scope("CheckpointNotifyRead")
1949 .await;
1950 let consensus_commit_prologue =
1951 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1952
1953 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1954 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1955 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1956
1957 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1958 let checkpoint_seq = pending
1959 .details
1960 .checkpoint_seq
1961 .expect("checkpoint_seq must be set");
1962 let tx_index_offset = all_effects.len() as u64;
1963 let effects = self
1964 .resolve_settlement_effects(
1965 *settlement_key,
1966 &sorted,
1967 checkpoint_roots.height,
1968 checkpoint_seq,
1969 tx_index_offset,
1970 )
1971 .await;
1972 sorted.extend(effects);
1973 }
1974
1975 #[cfg(msim)]
1976 {
1977 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1978 }
1979
1980 all_effects.extend(sorted);
1981 }
1982 Ok((all_effects, all_root_digests.into_iter().collect()))
1983 }
1984
1985 async fn resolve_settlement_effects(
1990 &self,
1991 settlement_key: TransactionKey,
1992 sorted_root_effects: &[TransactionEffects],
1993 checkpoint_height: CheckpointHeight,
1994 checkpoint_seq: CheckpointSequenceNumber,
1995 tx_index_offset: u64,
1996 ) -> Vec<TransactionEffects> {
1997 let epoch = self.epoch_store.epoch();
1998 let accumulator_root_obj_initial_shared_version = self
1999 .epoch_store
2000 .epoch_start_config()
2001 .accumulator_root_obj_initial_shared_version()
2002 .expect("accumulator root object must exist");
2003
2004 let builder = AccumulatorSettlementTxBuilder::new(
2005 None,
2006 sorted_root_effects,
2007 checkpoint_seq,
2008 tx_index_offset,
2009 );
2010
2011 let settlement_digests: Vec<_> = builder
2012 .build_tx(
2013 self.epoch_store.protocol_config(),
2014 epoch,
2015 accumulator_root_obj_initial_shared_version,
2016 checkpoint_height,
2017 checkpoint_seq,
2018 )
2019 .into_iter()
2020 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2021 .collect();
2022
2023 debug!(
2024 ?settlement_digests,
2025 ?settlement_key,
2026 "fallback: reading settlement effects from cache"
2027 );
2028
2029 let settlement_effects = wait_for_effects_with_retry(
2030 self.effects_store.as_ref(),
2031 "CheckpointBuilder::fallback_settlement_effects",
2032 &settlement_digests,
2033 settlement_key,
2034 )
2035 .await;
2036
2037 let barrier_digest = *VerifiedTransaction::new_system_transaction(
2038 accumulators::build_accumulator_barrier_tx(
2039 epoch,
2040 accumulator_root_obj_initial_shared_version,
2041 checkpoint_height,
2042 &settlement_effects,
2043 ),
2044 )
2045 .digest();
2046
2047 let barrier_effects = wait_for_effects_with_retry(
2048 self.effects_store.as_ref(),
2049 "CheckpointBuilder::fallback_barrier_effects",
2050 &[barrier_digest],
2051 settlement_key,
2052 )
2053 .await;
2054
2055 settlement_effects
2056 .into_iter()
2057 .chain(barrier_effects)
2058 .collect()
2059 }
2060
2061 fn extract_consensus_commit_prologue(
2064 &self,
2065 root_digests: &[TransactionDigest],
2066 root_effects: &[TransactionEffects],
2067 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2068 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2069 if root_digests.is_empty() {
2070 return Ok(None);
2071 }
2072
2073 let first_tx = self
2077 .state
2078 .get_transaction_cache_reader()
2079 .get_transaction_block(&root_digests[0])
2080 .expect("Transaction block must exist");
2081
2082 Ok(first_tx
2083 .transaction_data()
2084 .is_consensus_commit_prologue()
2085 .then(|| {
2086 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2087 (*first_tx.digest(), root_effects[0].clone())
2088 }))
2089 }
2090
2091 #[instrument(level = "debug", skip_all)]
2092 async fn write_checkpoints(
2093 &mut self,
2094 height: CheckpointHeight,
2095 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2096 ) -> SuiResult {
2097 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2098 let mut batch = self.store.tables.checkpoint_content.batch();
2099 let mut all_tx_digests =
2100 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2101
2102 for (summary, contents) in &new_checkpoints {
2103 debug!(
2104 checkpoint_commit_height = height,
2105 checkpoint_seq = summary.sequence_number,
2106 contents_digest = ?contents.digest(),
2107 "writing checkpoint",
2108 );
2109
2110 if let Some(previously_computed_summary) = self
2111 .store
2112 .tables
2113 .locally_computed_checkpoints
2114 .get(&summary.sequence_number)?
2115 && previously_computed_summary.digest() != summary.digest()
2116 {
2117 fatal!(
2118 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2119 summary.sequence_number,
2120 previously_computed_summary.digest(),
2121 summary.digest()
2122 );
2123 }
2124
2125 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2126
2127 self.metrics
2128 .transactions_included_in_checkpoint
2129 .inc_by(contents.size() as u64);
2130 let sequence_number = summary.sequence_number;
2131 self.metrics
2132 .last_constructed_checkpoint
2133 .set(sequence_number as i64);
2134
2135 batch.insert_batch(
2136 &self.store.tables.checkpoint_content,
2137 [(contents.digest(), contents)],
2138 )?;
2139
2140 batch.insert_batch(
2141 &self.store.tables.locally_computed_checkpoints,
2142 [(sequence_number, summary)],
2143 )?;
2144 }
2145
2146 batch.write()?;
2147
2148 for (summary, contents) in &new_checkpoints {
2150 self.output
2151 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2152 .await?;
2153 }
2154
2155 for (local_checkpoint, _) in &new_checkpoints {
2156 if let Some(certified_checkpoint) = self
2157 .store
2158 .tables
2159 .certified_checkpoints
2160 .get(local_checkpoint.sequence_number())?
2161 {
2162 self.store
2163 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2164 }
2165 }
2166
2167 self.notify_aggregator.notify_one();
2168 self.epoch_store
2169 .process_constructed_checkpoint(height, new_checkpoints);
2170 Ok(())
2171 }
2172
2173 #[allow(clippy::type_complexity)]
2174 fn split_checkpoint_chunks(
2175 &self,
2176 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2177 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2178 ) -> CheckpointBuilderResult<
2179 Vec<
2180 Vec<(
2181 TransactionEffects,
2182 Vec<(GenericSignature, Option<SequenceNumber>)>,
2183 )>,
2184 >,
2185 > {
2186 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2187
2188 if self
2190 .epoch_store
2191 .protocol_config()
2192 .split_checkpoints_in_consensus_handler()
2193 {
2194 let chunk: Vec<_> = effects_and_transaction_sizes
2195 .into_iter()
2196 .zip_debug_eq(signatures)
2197 .map(|((effects, _size), sigs)| (effects, sigs))
2198 .collect();
2199 return Ok(vec![chunk]);
2200 }
2201 let mut chunks = Vec::new();
2202 let mut chunk = Vec::new();
2203 let mut chunk_size: usize = 0;
2204 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2205 .into_iter()
2206 .zip_debug_eq(signatures.into_iter())
2207 {
2208 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2213 bcs::serialized_size(&signatures)?
2214 } else {
2215 let signatures: Vec<&GenericSignature> =
2216 signatures.iter().map(|(s, _)| s).collect();
2217 bcs::serialized_size(&signatures)?
2218 };
2219 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2220 if chunk.len() == self.max_transactions_per_checkpoint
2221 || (chunk_size + size) > self.max_checkpoint_size_bytes
2222 {
2223 if chunk.is_empty() {
2224 warn!(
2226 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2227 self.max_checkpoint_size_bytes
2228 );
2229 } else {
2230 chunks.push(chunk);
2231 chunk = Vec::new();
2232 chunk_size = 0;
2233 }
2234 }
2235
2236 chunk.push((effects, signatures));
2237 chunk_size += size;
2238 }
2239
2240 if !chunk.is_empty() || chunks.is_empty() {
2241 chunks.push(chunk);
2246 }
2251 Ok(chunks)
2252 }
2253
2254 fn load_last_built_checkpoint_summary(
2255 epoch_store: &AuthorityPerEpochStore,
2256 store: &CheckpointStore,
2257 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2258 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2259 if last_checkpoint.is_none() {
2260 let epoch = epoch_store.epoch();
2261 if epoch > 0 {
2262 let previous_epoch = epoch - 1;
2263 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2264 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2265 if let Some((ref seq, _)) = last_checkpoint {
2266 debug!(
2267 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2268 );
2269 } else {
2270 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2272 }
2273 }
2274 }
2275 Ok(last_checkpoint)
2276 }
2277
2278 #[instrument(level = "debug", skip_all)]
2279 async fn create_checkpoints(
2280 &self,
2281 all_effects: Vec<TransactionEffects>,
2282 details: &PendingCheckpointInfo,
2283 all_roots: &HashSet<TransactionDigest>,
2284 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2285 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2286
2287 let total = all_effects.len();
2288 let mut last_checkpoint =
2289 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2290 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2291 debug!(
2292 checkpoint_commit_height = details.checkpoint_height,
2293 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2294 checkpoint_timestamp = details.timestamp_ms,
2295 "Creating checkpoint(s) for {} transactions",
2296 all_effects.len(),
2297 );
2298
2299 let all_digests: Vec<_> = all_effects
2300 .iter()
2301 .map(|effect| *effect.transaction_digest())
2302 .collect();
2303 let transactions_and_sizes = self
2304 .state
2305 .get_transaction_cache_reader()
2306 .get_transactions_and_serialized_sizes(&all_digests)?;
2307 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2308 let mut transactions = Vec::with_capacity(all_effects.len());
2309 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2310 let mut randomness_rounds = BTreeMap::new();
2311 {
2312 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2313 debug!(
2314 ?last_checkpoint_seq,
2315 "Waiting for {:?} certificates to appear in consensus",
2316 all_effects.len()
2317 );
2318
2319 for (effects, transaction_and_size) in all_effects
2320 .into_iter()
2321 .zip_debug_eq(transactions_and_sizes.into_iter())
2322 {
2323 let (transaction, size) = transaction_and_size
2324 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2325 match transaction.inner().transaction_data().kind() {
2326 TransactionKind::ConsensusCommitPrologue(_)
2327 | TransactionKind::ConsensusCommitPrologueV2(_)
2328 | TransactionKind::ConsensusCommitPrologueV3(_)
2329 | TransactionKind::ConsensusCommitPrologueV4(_)
2330 | TransactionKind::AuthenticatorStateUpdate(_) => {
2331 }
2334 TransactionKind::ProgrammableSystemTransaction(_) => {
2335 }
2337 TransactionKind::ChangeEpoch(_)
2338 | TransactionKind::Genesis(_)
2339 | TransactionKind::EndOfEpochTransaction(_) => {
2340 fatal!(
2341 "unexpected transaction in checkpoint effects: {:?}",
2342 transaction
2343 );
2344 }
2345 TransactionKind::RandomnessStateUpdate(rsu) => {
2346 randomness_rounds
2347 .insert(*effects.transaction_digest(), rsu.randomness_round);
2348 }
2349 TransactionKind::ProgrammableTransaction(_) => {
2350 let digest = *effects.transaction_digest();
2354 if !all_roots.contains(&digest) {
2355 transaction_keys.push(SequencedConsensusTransactionKey::External(
2356 ConsensusTransactionKey::Certificate(digest),
2357 ));
2358 }
2359 }
2360 }
2361 transactions.push(transaction);
2362 all_effects_and_transaction_sizes.push((effects, size));
2363 }
2364
2365 self.epoch_store
2366 .consensus_messages_processed_notify(transaction_keys)
2367 .await?;
2368 }
2369
2370 let signatures = self
2371 .epoch_store
2372 .user_signatures_for_checkpoint(&transactions, &all_digests);
2373 debug!(
2374 ?last_checkpoint_seq,
2375 "Received {} checkpoint user signatures from consensus",
2376 signatures.len()
2377 );
2378
2379 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2380 Some(
2381 transactions
2382 .iter()
2383 .flat_map(|tx| {
2384 if let TransactionKind::ProgrammableTransaction(ptb) =
2385 tx.transaction_data().kind()
2386 {
2387 itertools::Either::Left(
2388 ptb.commands
2389 .iter()
2390 .map(ExecutionTimeObservationKey::from_command),
2391 )
2392 } else {
2393 itertools::Either::Right(std::iter::empty())
2394 }
2395 })
2396 .collect(),
2397 )
2398 } else {
2399 None
2400 };
2401
2402 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2403 let chunks_count = chunks.len();
2404
2405 let mut checkpoints = Vec::with_capacity(chunks_count);
2406 debug!(
2407 ?last_checkpoint_seq,
2408 "Creating {} checkpoints with {} transactions", chunks_count, total,
2409 );
2410
2411 let epoch = self.epoch_store.epoch();
2412 for (index, transactions) in chunks.into_iter().enumerate() {
2413 let first_checkpoint_of_epoch = index == 0
2414 && last_checkpoint
2415 .as_ref()
2416 .map(|(_, c)| c.epoch != epoch)
2417 .unwrap_or(true);
2418 if first_checkpoint_of_epoch {
2419 self.epoch_store
2420 .record_epoch_first_checkpoint_creation_time_metric();
2421 }
2422 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2423
2424 let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2425 preassigned_seq
2426 } else {
2427 last_checkpoint
2428 .as_ref()
2429 .map(|(_, c)| c.sequence_number + 1)
2430 .unwrap_or_default()
2431 };
2432 let mut timestamp_ms = details.timestamp_ms;
2433 if let Some((_, last_checkpoint)) = &last_checkpoint
2434 && last_checkpoint.timestamp_ms > timestamp_ms
2435 {
2436 debug!(
2438 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2439 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2440 );
2441 if self
2442 .epoch_store
2443 .protocol_config()
2444 .enforce_checkpoint_timestamp_monotonicity()
2445 {
2446 timestamp_ms = last_checkpoint.timestamp_ms;
2447 }
2448 }
2449
2450 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2451 let epoch_rolling_gas_cost_summary =
2452 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2453
2454 let end_of_epoch_data = if last_checkpoint_of_epoch {
2455 let system_state_obj = self
2456 .augment_epoch_last_checkpoint(
2457 &epoch_rolling_gas_cost_summary,
2458 timestamp_ms,
2459 &mut effects,
2460 &mut signatures,
2461 sequence_number,
2462 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2463 last_checkpoint_seq.unwrap_or_default(),
2464 )
2465 .await?;
2466
2467 let committee = system_state_obj
2468 .get_current_epoch_committee()
2469 .committee()
2470 .clone();
2471
2472 let root_state_digest = {
2475 let state_acc = self
2476 .global_state_hasher
2477 .upgrade()
2478 .expect("No checkpoints should be getting built after local configuration");
2479 let acc = state_acc.accumulate_checkpoint(
2480 &effects,
2481 sequence_number,
2482 &self.epoch_store,
2483 )?;
2484
2485 state_acc
2486 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2487 .await?;
2488
2489 state_acc.accumulate_running_root(
2490 &self.epoch_store,
2491 sequence_number,
2492 Some(acc),
2493 )?;
2494 state_acc
2495 .digest_epoch(self.epoch_store.clone(), sequence_number)
2496 .await?
2497 };
2498 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2499 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2500
2501 let epoch_commitments = if self
2502 .epoch_store
2503 .protocol_config()
2504 .check_commit_root_state_digest_supported()
2505 {
2506 vec![root_state_digest.into()]
2507 } else {
2508 vec![]
2509 };
2510
2511 Some(EndOfEpochData {
2512 next_epoch_committee: committee.voting_rights,
2513 next_epoch_protocol_version: ProtocolVersion::new(
2514 system_state_obj.protocol_version(),
2515 ),
2516 epoch_commitments,
2517 })
2518 } else {
2519 self.send_to_hasher
2520 .send((sequence_number, effects.clone()))
2521 .await?;
2522
2523 None
2524 };
2525 let contents = if self.epoch_store.protocol_config().address_aliases() {
2526 CheckpointContents::new_v2(&effects, signatures)
2527 } else {
2528 CheckpointContents::new_with_digests_and_signatures(
2529 effects.iter().map(TransactionEffects::execution_digests),
2530 signatures
2531 .into_iter()
2532 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2533 .collect(),
2534 )
2535 };
2536
2537 let num_txns = contents.size() as u64;
2538
2539 let network_total_transactions = last_checkpoint
2540 .as_ref()
2541 .map(|(_, c)| c.network_total_transactions + num_txns)
2542 .unwrap_or(num_txns);
2543
2544 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2545
2546 let matching_randomness_rounds: Vec<_> = effects
2547 .iter()
2548 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2549 .copied()
2550 .collect();
2551
2552 let checkpoint_commitments = if self
2553 .epoch_store
2554 .protocol_config()
2555 .include_checkpoint_artifacts_digest_in_summary()
2556 {
2557 let artifacts = CheckpointArtifacts::from(&effects[..]);
2558 let artifacts_digest = artifacts.digest()?;
2559 vec![artifacts_digest.into()]
2560 } else {
2561 Default::default()
2562 };
2563
2564 let summary = CheckpointSummary::new(
2565 self.epoch_store.protocol_config(),
2566 epoch,
2567 sequence_number,
2568 network_total_transactions,
2569 &contents,
2570 previous_digest,
2571 epoch_rolling_gas_cost_summary,
2572 end_of_epoch_data,
2573 timestamp_ms,
2574 matching_randomness_rounds,
2575 checkpoint_commitments,
2576 );
2577 summary.report_checkpoint_age(
2578 &self.metrics.last_created_checkpoint_age,
2579 &self.metrics.last_created_checkpoint_age_ms,
2580 );
2581 if last_checkpoint_of_epoch {
2582 info!(
2583 checkpoint_seq = sequence_number,
2584 "creating last checkpoint of epoch {}", epoch
2585 );
2586 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2587 self.epoch_store
2588 .report_epoch_metrics_at_last_checkpoint(stats);
2589 }
2590 }
2591 last_checkpoint = Some((sequence_number, summary.clone()));
2592 checkpoints.push((summary, contents));
2593 }
2594
2595 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2596 }
2597
2598 fn get_epoch_total_gas_cost(
2599 &self,
2600 last_checkpoint: Option<&CheckpointSummary>,
2601 cur_checkpoint_effects: &[TransactionEffects],
2602 ) -> GasCostSummary {
2603 let (previous_epoch, previous_gas_costs) = last_checkpoint
2604 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2605 .unwrap_or_default();
2606 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2607 if previous_epoch == self.epoch_store.epoch() {
2608 GasCostSummary::new(
2610 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2611 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2612 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2613 previous_gas_costs.non_refundable_storage_fee
2614 + current_gas_costs.non_refundable_storage_fee,
2615 )
2616 } else {
2617 current_gas_costs
2618 }
2619 }
2620
2621 #[instrument(level = "error", skip_all)]
2622 async fn augment_epoch_last_checkpoint(
2623 &self,
2624 epoch_total_gas_cost: &GasCostSummary,
2625 epoch_start_timestamp_ms: CheckpointTimestamp,
2626 checkpoint_effects: &mut Vec<TransactionEffects>,
2627 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2628 checkpoint: CheckpointSequenceNumber,
2629 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2630 last_checkpoint: CheckpointSequenceNumber,
2633 ) -> CheckpointBuilderResult<SuiSystemState> {
2634 let (system_state, effects) = self
2635 .state
2636 .create_and_execute_advance_epoch_tx(
2637 &self.epoch_store,
2638 epoch_total_gas_cost,
2639 checkpoint,
2640 epoch_start_timestamp_ms,
2641 end_of_epoch_observation_keys,
2642 last_checkpoint,
2643 )
2644 .await?;
2645 checkpoint_effects.push(effects);
2646 signatures.push(vec![]);
2647 Ok(system_state)
2648 }
2649
2650 #[instrument(level = "debug", skip_all)]
2657 fn complete_checkpoint_effects(
2658 &self,
2659 mut roots: Vec<TransactionEffects>,
2660 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2661 ) -> SuiResult<Vec<TransactionEffects>> {
2662 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2663 let mut results = vec![];
2664 let mut seen = HashSet::new();
2665 loop {
2666 let mut pending = HashSet::new();
2667
2668 let transactions_included = self
2669 .epoch_store
2670 .builder_included_transactions_in_checkpoint(
2671 roots.iter().map(|e| e.transaction_digest()),
2672 )?;
2673
2674 for (effect, tx_included) in roots
2675 .into_iter()
2676 .zip_debug_eq(transactions_included.into_iter())
2677 {
2678 let digest = effect.transaction_digest();
2679 seen.insert(*digest);
2681
2682 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2684 continue;
2685 }
2686
2687 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2689 continue;
2690 }
2691
2692 let existing_effects = self
2693 .epoch_store
2694 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2695
2696 for (dependency, effects_signature_exists) in effect
2697 .dependencies()
2698 .iter()
2699 .zip_debug_eq(existing_effects.iter())
2700 {
2701 if !effects_signature_exists {
2706 continue;
2707 }
2708 if seen.insert(*dependency) {
2709 pending.insert(*dependency);
2710 }
2711 }
2712 results.push(effect);
2713 }
2714 if pending.is_empty() {
2715 break;
2716 }
2717 let pending = pending.into_iter().collect::<Vec<_>>();
2718 let effects = self.effects_store.multi_get_executed_effects(&pending);
2719 let effects = effects
2720 .into_iter()
2721 .zip_debug_eq(pending)
2722 .map(|(opt, digest)| match opt {
2723 Some(x) => x,
2724 None => panic!(
2725 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2726 digest
2727 ),
2728 })
2729 .collect::<Vec<_>>();
2730 roots = effects;
2731 }
2732
2733 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2734 Ok(results)
2735 }
2736
2737 #[cfg(msim)]
2740 fn expensive_consensus_commit_prologue_invariants_check(
2741 &self,
2742 root_digests: &[TransactionDigest],
2743 sorted: &[TransactionEffects],
2744 ) {
2745 let root_txs = self
2747 .state
2748 .get_transaction_cache_reader()
2749 .multi_get_transaction_blocks(root_digests);
2750 let ccps = root_txs
2751 .iter()
2752 .filter_map(|tx| {
2753 if let Some(tx) = tx {
2754 if tx.transaction_data().is_consensus_commit_prologue() {
2755 Some(tx)
2756 } else {
2757 None
2758 }
2759 } else {
2760 None
2761 }
2762 })
2763 .collect::<Vec<_>>();
2764
2765 assert!(ccps.len() <= 1);
2767
2768 let txs = self
2770 .state
2771 .get_transaction_cache_reader()
2772 .multi_get_transaction_blocks(
2773 &sorted
2774 .iter()
2775 .map(|tx| tx.transaction_digest().clone())
2776 .collect::<Vec<_>>(),
2777 );
2778
2779 if ccps.len() == 0 {
2780 for tx in txs.iter() {
2783 if let Some(tx) = tx {
2784 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2785 }
2786 }
2787 } else {
2788 assert!(
2790 txs[0]
2791 .as_ref()
2792 .unwrap()
2793 .transaction_data()
2794 .is_consensus_commit_prologue()
2795 );
2796
2797 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2798
2799 for tx in txs.iter().skip(1) {
2800 if let Some(tx) = tx {
2801 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2802 }
2803 }
2804 }
2805 }
2806}
2807
2808async fn wait_for_effects_with_retry(
2809 effects_store: &dyn TransactionCacheRead,
2810 task_name: &'static str,
2811 digests: &[TransactionDigest],
2812 tx_key: TransactionKey,
2813) -> Vec<TransactionEffects> {
2814 let delay = if in_antithesis() {
2815 15
2817 } else {
2818 5
2819 };
2820 loop {
2821 match tokio::time::timeout(Duration::from_secs(delay), async {
2822 effects_store
2823 .notify_read_executed_effects(task_name, digests)
2824 .await
2825 })
2826 .await
2827 {
2828 Ok(effects) => break effects,
2829 Err(_) => {
2830 debug_fatal!(
2831 "Timeout waiting for transactions to be executed {:?}, retrying...",
2832 tx_key
2833 );
2834 }
2835 }
2836 }
2837}
2838
2839impl CheckpointAggregator {
2840 fn new(
2841 tables: Arc<CheckpointStore>,
2842 epoch_store: Arc<AuthorityPerEpochStore>,
2843 notify: Arc<Notify>,
2844 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2845 output: Box<dyn CertifiedCheckpointOutput>,
2846 state: Arc<AuthorityState>,
2847 metrics: Arc<CheckpointMetrics>,
2848 ) -> Self {
2849 Self {
2850 store: tables,
2851 epoch_store,
2852 notify,
2853 receiver,
2854 pending: BTreeMap::new(),
2855 current: None,
2856 output,
2857 state,
2858 metrics,
2859 }
2860 }
2861
2862 async fn run(mut self) {
2863 info!("Starting CheckpointAggregator");
2864 loop {
2865 while let Ok(sig) = self.receiver.try_recv() {
2867 self.pending
2868 .entry(sig.summary.sequence_number)
2869 .or_default()
2870 .push(sig);
2871 }
2872
2873 if let Err(e) = self.run_and_notify().await {
2874 error!(
2875 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2876 e
2877 );
2878 self.metrics.checkpoint_errors.inc();
2879 tokio::time::sleep(Duration::from_secs(1)).await;
2880 continue;
2881 }
2882
2883 tokio::select! {
2884 Some(sig) = self.receiver.recv() => {
2885 self.pending
2886 .entry(sig.summary.sequence_number)
2887 .or_default()
2888 .push(sig);
2889 }
2890 _ = self.notify.notified() => {}
2891 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2892 }
2893 }
2894 }
2895
2896 async fn run_and_notify(&mut self) -> SuiResult {
2897 let summaries = self.run_inner()?;
2898 for summary in summaries {
2899 self.output.certified_checkpoint_created(&summary).await?;
2900 }
2901 Ok(())
2902 }
2903
2904 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2905 let _scope = monitored_scope("CheckpointAggregator");
2906 let mut result = vec![];
2907 'outer: loop {
2908 let next_to_certify = self.next_checkpoint_to_certify()?;
2909 self.pending.retain(|&seq, _| seq >= next_to_certify);
2912 let current = if let Some(current) = &mut self.current {
2913 if current.summary.sequence_number < next_to_certify {
2919 assert_reachable!("skip checkpoint certification");
2920 self.current = None;
2921 continue;
2922 }
2923 current
2924 } else {
2925 let Some(summary) = self
2926 .epoch_store
2927 .get_built_checkpoint_summary(next_to_certify)?
2928 else {
2929 return Ok(result);
2930 };
2931 self.current = Some(CheckpointSignatureAggregator {
2932 digest: summary.digest(),
2933 summary,
2934 signatures_by_digest: MultiStakeAggregator::new(
2935 self.epoch_store.committee().clone(),
2936 ),
2937 store: self.store.clone(),
2938 state: self.state.clone(),
2939 metrics: self.metrics.clone(),
2940 });
2941 self.current.as_mut().unwrap()
2942 };
2943
2944 let seq = current.summary.sequence_number;
2945 let sigs = self.pending.remove(&seq).unwrap_or_default();
2946 if sigs.is_empty() {
2947 trace!(
2948 checkpoint_seq =? seq,
2949 "Not enough checkpoint signatures",
2950 );
2951 return Ok(result);
2952 }
2953 for data in sigs {
2954 trace!(
2955 checkpoint_seq = seq,
2956 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2957 current.summary.digest(),
2958 data.summary.auth_sig().authority.concise()
2959 );
2960 self.metrics
2961 .checkpoint_participation
2962 .with_label_values(&[&format!(
2963 "{:?}",
2964 data.summary.auth_sig().authority.concise()
2965 )])
2966 .inc();
2967 if let Ok(auth_signature) = current.try_aggregate(data) {
2968 debug!(
2969 checkpoint_seq = seq,
2970 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2971 current.summary.digest(),
2972 );
2973 let summary = VerifiedCheckpoint::new_unchecked(
2974 CertifiedCheckpointSummary::new_from_data_and_sig(
2975 current.summary.clone(),
2976 auth_signature,
2977 ),
2978 );
2979
2980 self.store.insert_certified_checkpoint(&summary)?;
2981 self.metrics.last_certified_checkpoint.set(seq as i64);
2982 current.summary.report_checkpoint_age(
2983 &self.metrics.last_certified_checkpoint_age,
2984 &self.metrics.last_certified_checkpoint_age_ms,
2985 );
2986 result.push(summary.into_inner());
2987 self.current = None;
2988 continue 'outer;
2989 }
2990 }
2991 break;
2992 }
2993 Ok(result)
2994 }
2995
2996 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2997 Ok(self
2998 .store
2999 .tables
3000 .certified_checkpoints
3001 .reversed_safe_iter_with_bounds(None, None)?
3002 .next()
3003 .transpose()?
3004 .map(|(seq, _)| seq + 1)
3005 .unwrap_or_default())
3006 }
3007}
3008
3009impl CheckpointSignatureAggregator {
3010 #[allow(clippy::result_unit_err)]
3011 pub fn try_aggregate(
3012 &mut self,
3013 data: CheckpointSignatureMessage,
3014 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3015 let their_digest = *data.summary.digest();
3016 let (_, signature) = data.summary.into_data_and_sig();
3017 let author = signature.authority;
3018 let envelope =
3019 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3020 match self.signatures_by_digest.insert(their_digest, envelope) {
3021 InsertResult::Failed { error }
3023 if matches!(
3024 error.as_inner(),
3025 SuiErrorKind::StakeAggregatorRepeatedSigner {
3026 conflicting_sig: false,
3027 ..
3028 },
3029 ) =>
3030 {
3031 Err(())
3032 }
3033 InsertResult::Failed { error } => {
3034 warn!(
3035 checkpoint_seq = self.summary.sequence_number,
3036 "Failed to aggregate new signature from validator {:?}: {:?}",
3037 author.concise(),
3038 error
3039 );
3040 self.check_for_split_brain();
3041 Err(())
3042 }
3043 InsertResult::QuorumReached(cert) => {
3044 if their_digest != self.digest {
3047 self.metrics.remote_checkpoint_forks.inc();
3048 warn!(
3049 checkpoint_seq = self.summary.sequence_number,
3050 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3051 author.concise(),
3052 their_digest,
3053 self.digest
3054 );
3055 return Err(());
3056 }
3057 Ok(cert)
3058 }
3059 InsertResult::NotEnoughVotes {
3060 bad_votes: _,
3061 bad_authorities: _,
3062 } => {
3063 self.check_for_split_brain();
3064 Err(())
3065 }
3066 }
3067 }
3068
3069 fn check_for_split_brain(&self) {
3073 debug!(
3074 checkpoint_seq = self.summary.sequence_number,
3075 "Checking for split brain condition"
3076 );
3077 if self.signatures_by_digest.quorum_unreachable() {
3078 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3084 let digests_by_stake_messages = all_unique_values
3085 .iter()
3086 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3087 .map(|(digest, (_authorities, total_stake))| {
3088 format!("{:?} (total stake: {})", digest, total_stake)
3089 })
3090 .collect::<Vec<String>>();
3091 fail_point_arg!("kill_split_brain_node", |(
3092 checkpoint_overrides,
3093 forked_authorities,
3094 ): (
3095 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3096 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3097 )| {
3098 #[cfg(msim)]
3099 {
3100 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3101 (checkpoint_overrides.lock(), forked_authorities.lock())
3102 {
3103 let correct_digest = all_unique_values
3105 .iter()
3106 .find(|(_, (authorities, _))| {
3107 authorities
3109 .iter()
3110 .any(|auth| !forked_authorities_set.contains(auth))
3111 })
3112 .map(|(digest, _)| digest.to_string())
3113 .unwrap_or_else(|| {
3114 all_unique_values
3116 .iter()
3117 .max_by_key(|(_, (_, stake))| *stake)
3118 .map(|(digest, _)| digest.to_string())
3119 .unwrap_or_else(|| self.digest.to_string())
3120 });
3121
3122 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3123
3124 tracing::error!(
3125 fatal = true,
3126 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3127 self.summary.sequence_number,
3128 correct_digest
3129 );
3130 }
3131 }
3132 });
3133
3134 debug_fatal!(
3135 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3136 self.summary.sequence_number,
3137 self.signatures_by_digest.uncommitted_stake(),
3138 digests_by_stake_messages
3139 );
3140 self.metrics.split_brain_checkpoint_forks.inc();
3141
3142 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3143 let local_summary = self.summary.clone();
3144 let state = self.state.clone();
3145 let tables = self.store.clone();
3146
3147 tokio::spawn(async move {
3148 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3149 });
3150 }
3151 }
3152}
3153
3154async fn diagnose_split_brain(
3160 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3161 local_summary: CheckpointSummary,
3162 state: Arc<AuthorityState>,
3163 tables: Arc<CheckpointStore>,
3164) {
3165 debug!(
3166 checkpoint_seq = local_summary.sequence_number,
3167 "Running split brain diagnostics..."
3168 );
3169 let time = SystemTime::now();
3170 let digest_to_validator = all_unique_values
3172 .iter()
3173 .filter_map(|(digest, (validators, _))| {
3174 if *digest != local_summary.digest() {
3175 let random_validator = validators.choose(&mut get_rng()).unwrap();
3176 Some((*digest, *random_validator))
3177 } else {
3178 None
3179 }
3180 })
3181 .collect::<HashMap<_, _>>();
3182 if digest_to_validator.is_empty() {
3183 panic!(
3184 "Given split brain condition, there should be at \
3185 least one validator that disagrees with local signature"
3186 );
3187 }
3188
3189 let epoch_store = state.load_epoch_store_one_call_per_task();
3190 let committee = epoch_store
3191 .epoch_start_state()
3192 .get_sui_committee_with_network_metadata();
3193 let network_config = default_mysten_network_config();
3194 let network_clients =
3195 make_network_authority_clients_with_network_config(&committee, &network_config);
3196
3197 let response_futures = digest_to_validator
3199 .values()
3200 .cloned()
3201 .map(|validator| {
3202 let client = network_clients
3203 .get(&validator)
3204 .expect("Failed to get network client");
3205 let request = CheckpointRequestV2 {
3206 sequence_number: Some(local_summary.sequence_number),
3207 request_content: true,
3208 certified: false,
3209 };
3210 client.handle_checkpoint_v2(request)
3211 })
3212 .collect::<Vec<_>>();
3213
3214 let digest_name_pair = digest_to_validator.iter();
3215 let response_data = futures::future::join_all(response_futures)
3216 .await
3217 .into_iter()
3218 .zip_debug_eq(digest_name_pair)
3219 .filter_map(|(response, (digest, name))| match response {
3220 Ok(response) => match response {
3221 CheckpointResponseV2 {
3222 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3223 contents: Some(contents),
3224 } => Some((*name, *digest, summary, contents)),
3225 CheckpointResponseV2 {
3226 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3227 contents: _,
3228 } => {
3229 panic!("Expected pending checkpoint, but got certified checkpoint");
3230 }
3231 CheckpointResponseV2 {
3232 checkpoint: None,
3233 contents: _,
3234 } => {
3235 error!(
3236 "Summary for checkpoint {:?} not found on validator {:?}",
3237 local_summary.sequence_number, name
3238 );
3239 None
3240 }
3241 CheckpointResponseV2 {
3242 checkpoint: _,
3243 contents: None,
3244 } => {
3245 error!(
3246 "Contents for checkpoint {:?} not found on validator {:?}",
3247 local_summary.sequence_number, name
3248 );
3249 None
3250 }
3251 },
3252 Err(e) => {
3253 error!(
3254 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3255 e
3256 );
3257 None
3258 }
3259 })
3260 .collect::<Vec<_>>();
3261
3262 let local_checkpoint_contents = tables
3263 .get_checkpoint_contents(&local_summary.content_digest)
3264 .unwrap_or_else(|_| {
3265 panic!(
3266 "Could not find checkpoint contents for digest {:?}",
3267 local_summary.digest()
3268 )
3269 })
3270 .unwrap_or_else(|| {
3271 panic!(
3272 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3273 local_summary.sequence_number,
3274 local_summary.digest()
3275 )
3276 });
3277 let local_contents_text = format!("{local_checkpoint_contents:?}");
3278
3279 let local_summary_text = format!("{local_summary:?}");
3280 let local_validator = state.name.concise();
3281 let diff_patches = response_data
3282 .iter()
3283 .map(|(name, other_digest, other_summary, contents)| {
3284 let other_contents_text = format!("{contents:?}");
3285 let other_summary_text = format!("{other_summary:?}");
3286 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3287 .enumerate_transactions(&local_summary)
3288 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3289 .unzip();
3290 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3291 .enumerate_transactions(other_summary)
3292 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3293 .unzip();
3294 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3295 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3296 let local_transactions_text = format!("{local_transactions:#?}");
3297 let other_transactions_text = format!("{other_transactions:#?}");
3298 let transactions_patch =
3299 create_patch(&local_transactions_text, &other_transactions_text);
3300 let local_effects_text = format!("{local_effects:#?}");
3301 let other_effects_text = format!("{other_effects:#?}");
3302 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3303 let seq_number = local_summary.sequence_number;
3304 let local_digest = local_summary.digest();
3305 let other_validator = name.concise();
3306 format!(
3307 "Checkpoint: {seq_number:?}\n\
3308 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3309 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3310 Summary Diff: \n{summary_patch}\n\n\
3311 Contents Diff: \n{contents_patch}\n\n\
3312 Transactions Diff: \n{transactions_patch}\n\n\
3313 Effects Diff: \n{effects_patch}",
3314 )
3315 })
3316 .collect::<Vec<_>>()
3317 .join("\n\n\n");
3318
3319 let header = format!(
3320 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3321 Datetime: {:?}",
3322 time
3323 );
3324 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3325 let path = tempfile::tempdir()
3326 .expect("Failed to create tempdir")
3327 .keep()
3328 .join(Path::new("checkpoint_fork_dump.txt"));
3329 let mut file = File::create(path).unwrap();
3330 write!(file, "{}", fork_logs_text).unwrap();
3331 debug!("{}", fork_logs_text);
3332}
3333
3334pub trait CheckpointServiceNotify {
3335 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3336
3337 fn notify_checkpoint(&self) -> SuiResult;
3338}
3339
3340#[allow(clippy::large_enum_variant)]
3341enum CheckpointServiceState {
3342 Unstarted(
3343 (
3344 CheckpointBuilder,
3345 CheckpointAggregator,
3346 CheckpointStateHasher,
3347 ),
3348 ),
3349 Started,
3350}
3351
3352impl CheckpointServiceState {
3353 fn take_unstarted(
3354 &mut self,
3355 ) -> (
3356 CheckpointBuilder,
3357 CheckpointAggregator,
3358 CheckpointStateHasher,
3359 ) {
3360 let mut state = CheckpointServiceState::Started;
3361 std::mem::swap(self, &mut state);
3362
3363 match state {
3364 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3365 (builder, aggregator, hasher)
3366 }
3367 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3368 }
3369 }
3370}
3371
3372pub struct CheckpointService {
3373 tables: Arc<CheckpointStore>,
3374 notify_builder: Arc<Notify>,
3375 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3376 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3378 highest_previously_built_seq: CheckpointSequenceNumber,
3381 metrics: Arc<CheckpointMetrics>,
3382 state: Mutex<CheckpointServiceState>,
3383}
3384
3385impl CheckpointService {
3386 #[allow(clippy::disallowed_methods)]
3392 pub fn build(
3393 state: Arc<AuthorityState>,
3394 checkpoint_store: Arc<CheckpointStore>,
3395 epoch_store: Arc<AuthorityPerEpochStore>,
3396 effects_store: Arc<dyn TransactionCacheRead>,
3397 global_state_hasher: Weak<GlobalStateHasher>,
3398 checkpoint_output: Box<dyn CheckpointOutput>,
3399 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3400 metrics: Arc<CheckpointMetrics>,
3401 max_transactions_per_checkpoint: usize,
3402 max_checkpoint_size_bytes: usize,
3403 ) -> Arc<Self> {
3404 info!(
3405 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3406 );
3407 Self::initialize_accumulator_account_metrics(&state, &epoch_store, &metrics);
3408 let notify_builder = Arc::new(Notify::new());
3409 let notify_aggregator = Arc::new(Notify::new());
3410
3411 let highest_previously_built_seq = checkpoint_store
3413 .get_latest_locally_computed_checkpoint()
3414 .expect("failed to get latest locally computed checkpoint")
3415 .map(|s| s.sequence_number)
3416 .unwrap_or(0);
3417
3418 let highest_currently_built_seq =
3419 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3420 .expect("epoch should not have ended")
3421 .map(|(seq, _)| seq)
3422 .unwrap_or(0);
3423
3424 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3425
3426 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3427
3428 let aggregator = CheckpointAggregator::new(
3429 checkpoint_store.clone(),
3430 epoch_store.clone(),
3431 notify_aggregator.clone(),
3432 signature_receiver,
3433 certified_checkpoint_output,
3434 state.clone(),
3435 metrics.clone(),
3436 );
3437
3438 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3439
3440 let ckpt_state_hasher = CheckpointStateHasher::new(
3441 epoch_store.clone(),
3442 global_state_hasher.clone(),
3443 receive_from_builder,
3444 );
3445
3446 let builder = CheckpointBuilder::new(
3447 state.clone(),
3448 checkpoint_store.clone(),
3449 epoch_store.clone(),
3450 notify_builder.clone(),
3451 effects_store,
3452 global_state_hasher,
3453 send_to_hasher,
3454 checkpoint_output,
3455 notify_aggregator.clone(),
3456 highest_currently_built_seq_tx.clone(),
3457 metrics.clone(),
3458 max_transactions_per_checkpoint,
3459 max_checkpoint_size_bytes,
3460 );
3461
3462 Arc::new(Self {
3463 tables: checkpoint_store,
3464 notify_builder,
3465 signature_sender,
3466 highest_currently_built_seq_tx,
3467 highest_previously_built_seq,
3468 metrics,
3469 state: Mutex::new(CheckpointServiceState::Unstarted((
3470 builder,
3471 aggregator,
3472 ckpt_state_hasher,
3473 ))),
3474 })
3475 }
3476
3477 fn initialize_accumulator_account_metrics(
3478 state: &AuthorityState,
3479 epoch_store: &AuthorityPerEpochStore,
3480 metrics: &CheckpointMetrics,
3481 ) {
3482 if !epoch_store.protocol_config().enable_accumulators() {
3483 return;
3484 }
3485
3486 let object_store = state.get_object_store();
3487 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
3488 Ok(Some(count)) => metrics.initialize_accumulator_accounts_live(count),
3489 Ok(None) => {}
3490 Err(e) => fatal!("failed to initialize accumulator account metrics: {e}"),
3491 }
3492 }
3493
3494 pub async fn spawn(
3502 &self,
3503 epoch_store: Arc<AuthorityPerEpochStore>,
3504 consensus_replay_waiter: Option<ReplayWaiter>,
3505 ) {
3506 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3507
3508 let last_persisted_builder_seq = epoch_store
3518 .last_persisted_checkpoint_builder_summary()
3519 .expect("epoch should not have ended")
3520 .map(|s| s.summary.sequence_number);
3521
3522 let last_executed_seq = self
3523 .tables
3524 .get_highest_executed_checkpoint()
3525 .expect("Failed to get highest executed checkpoint")
3526 .map(|checkpoint| *checkpoint.sequence_number());
3527
3528 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3529 if let Err(e) = builder
3530 .epoch_store
3531 .clear_state_hashes_after_checkpoint(last_committed_seq)
3532 {
3533 error!(
3534 "Failed to clear state hashes after checkpoint {}: {:?}",
3535 last_committed_seq, e
3536 );
3537 } else {
3538 info!(
3539 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3540 last_committed_seq
3541 );
3542 }
3543 }
3544
3545 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3546
3547 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3548 let aggregator_task = spawn_monitored_task!(aggregator.run());
3549
3550 spawn_monitored_task!(async move {
3551 epoch_store
3552 .within_alive_epoch(async move {
3553 builder.run(consensus_replay_waiter).await;
3554 builder_finished_tx.send(()).ok();
3555 })
3556 .await
3557 .ok();
3558
3559 state_hasher_task
3561 .await
3562 .expect("state hasher should exit normally");
3563
3564 aggregator_task.abort();
3567 aggregator_task.await.ok();
3568 });
3569
3570 if tokio::time::timeout(Duration::from_secs(120), async move {
3576 tokio::select! {
3577 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3578 _ = self.wait_for_rebuilt_checkpoints() => (),
3579 }
3580 })
3581 .await
3582 .is_err()
3583 {
3584 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3585 }
3586 }
3587}
3588
3589impl CheckpointService {
3590 pub async fn wait_for_rebuilt_checkpoints(&self) {
3596 let highest_previously_built_seq = self.highest_previously_built_seq;
3597 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3598 let mut highest_currently_built_seq = *rx.borrow_and_update();
3599 info!(
3600 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3601 );
3602 loop {
3603 if highest_currently_built_seq >= highest_previously_built_seq {
3604 info!("Checkpoint rebuild complete");
3605 break;
3606 }
3607 rx.changed().await.unwrap();
3608 highest_currently_built_seq = *rx.borrow_and_update();
3609 }
3610 }
3611
3612 #[cfg(test)]
3613 fn write_and_notify_checkpoint_for_testing(
3614 &self,
3615 epoch_store: &AuthorityPerEpochStore,
3616 checkpoint: PendingCheckpoint,
3617 ) -> SuiResult {
3618 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3619
3620 let mut output = ConsensusCommitOutput::new(0);
3621 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3622 output.set_default_commit_stats_for_testing();
3623 epoch_store.push_consensus_output_for_tests(output);
3624 self.notify_checkpoint()?;
3625 Ok(())
3626 }
3627}
3628
3629impl CheckpointServiceNotify for CheckpointService {
3630 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3631 let sequence = info.summary.sequence_number;
3632 let signer = info.summary.auth_sig().authority.concise();
3633
3634 if let Some(highest_verified_checkpoint) = self
3635 .tables
3636 .get_highest_verified_checkpoint()?
3637 .map(|x| *x.sequence_number())
3638 && sequence <= highest_verified_checkpoint
3639 {
3640 trace!(
3641 checkpoint_seq = sequence,
3642 "Ignore checkpoint signature from {} - already certified", signer,
3643 );
3644 self.metrics
3645 .last_ignored_checkpoint_signature_received
3646 .set(sequence as i64);
3647 return Ok(());
3648 }
3649 trace!(
3650 checkpoint_seq = sequence,
3651 "Received checkpoint signature, digest {} from {}",
3652 info.summary.digest(),
3653 signer,
3654 );
3655 self.metrics
3656 .last_received_checkpoint_signatures
3657 .with_label_values(&[&signer.to_string()])
3658 .set(sequence as i64);
3659 self.signature_sender.send(info.clone()).ok();
3660 Ok(())
3661 }
3662
3663 fn notify_checkpoint(&self) -> SuiResult {
3664 self.notify_builder.notify_one();
3665 Ok(())
3666 }
3667}
3668
3669pub struct CheckpointServiceNoop {}
3671impl CheckpointServiceNotify for CheckpointServiceNoop {
3672 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3673 Ok(())
3674 }
3675
3676 fn notify_checkpoint(&self) -> SuiResult {
3677 Ok(())
3678 }
3679}
3680
3681impl PendingCheckpoint {
3682 pub fn height(&self) -> CheckpointHeight {
3683 self.details.checkpoint_height
3684 }
3685
3686 pub fn roots(&self) -> &Vec<TransactionKey> {
3687 &self.roots
3688 }
3689
3690 pub fn details(&self) -> &PendingCheckpointInfo {
3691 &self.details
3692 }
3693}
3694
3695impl PendingCheckpointV2 {
3696 pub fn height(&self) -> CheckpointHeight {
3697 self.details.checkpoint_height
3698 }
3699
3700 pub(crate) fn num_roots(&self) -> usize {
3701 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3702 }
3703}
3704
3705pin_project! {
3706 pub struct PollCounter<Fut> {
3707 #[pin]
3708 future: Fut,
3709 count: usize,
3710 }
3711}
3712
3713impl<Fut> PollCounter<Fut> {
3714 pub fn new(future: Fut) -> Self {
3715 Self { future, count: 0 }
3716 }
3717
3718 pub fn count(&self) -> usize {
3719 self.count
3720 }
3721}
3722
3723impl<Fut: Future> Future for PollCounter<Fut> {
3724 type Output = (usize, Fut::Output);
3725
3726 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3727 let this = self.project();
3728 *this.count += 1;
3729 match this.future.poll(cx) {
3730 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3731 Poll::Pending => Poll::Pending,
3732 }
3733 }
3734}
3735
3736fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3737 PollCounter::new(future)
3738}
3739
3740#[cfg(test)]
3741mod tests {
3742 use super::*;
3743 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3744 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3745 use futures::FutureExt as _;
3746 use futures::future::BoxFuture;
3747 use std::collections::HashMap;
3748 use std::ops::Deref;
3749 use sui_macros::sim_test;
3750 use sui_protocol_config::{Chain, ProtocolConfig};
3751 use sui_types::accumulator_event::AccumulatorEvent;
3752 use sui_types::authenticator_state::ActiveJwk;
3753 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3754 use sui_types::crypto::Signature;
3755 use sui_types::effects::{TransactionEffects, TransactionEvents};
3756 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3757 use sui_types::transaction::VerifiedTransaction;
3758 use tokio::sync::mpsc;
3759
3760 #[tokio::test]
3761 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3762 let store = CheckpointStore::new_for_tests();
3763 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3764 for seq in 70u64..=80u64 {
3765 let contents =
3766 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3767 [sui_types::base_types::ExecutionDigests::new(
3768 sui_types::digests::TransactionDigest::random(),
3769 sui_types::digests::TransactionEffectsDigest::ZERO,
3770 )],
3771 );
3772 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3773 &protocol,
3774 0,
3775 seq,
3776 0,
3777 &contents,
3778 None,
3779 sui_types::gas::GasCostSummary::default(),
3780 None,
3781 0,
3782 Vec::new(),
3783 Vec::new(),
3784 );
3785 store
3786 .tables
3787 .locally_computed_checkpoints
3788 .insert(&seq, &summary)
3789 .unwrap();
3790 }
3791
3792 store
3793 .clear_locally_computed_checkpoints_from(76)
3794 .expect("clear should succeed");
3795
3796 assert!(
3798 store
3799 .tables
3800 .locally_computed_checkpoints
3801 .get(&75)
3802 .unwrap()
3803 .is_some()
3804 );
3805 assert!(
3806 store
3807 .tables
3808 .locally_computed_checkpoints
3809 .get(&76)
3810 .unwrap()
3811 .is_none()
3812 );
3813
3814 for seq in 70u64..76u64 {
3815 assert!(
3816 store
3817 .tables
3818 .locally_computed_checkpoints
3819 .get(&seq)
3820 .unwrap()
3821 .is_some()
3822 );
3823 }
3824 for seq in 76u64..=80u64 {
3825 assert!(
3826 store
3827 .tables
3828 .locally_computed_checkpoints
3829 .get(&seq)
3830 .unwrap()
3831 .is_none()
3832 );
3833 }
3834 }
3835
3836 #[tokio::test]
3837 async fn test_fork_detection_storage() {
3838 let store = CheckpointStore::new_for_tests();
3839 let seq_num = 42;
3841 let digest = CheckpointDigest::random();
3842
3843 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3844
3845 store
3846 .record_checkpoint_fork_detected(seq_num, digest)
3847 .unwrap();
3848
3849 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3850 assert!(retrieved.is_some());
3851 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3852 assert_eq!(retrieved_seq, seq_num);
3853 assert_eq!(retrieved_digest, digest);
3854
3855 store.clear_checkpoint_fork_detected().unwrap();
3856 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3857
3858 let tx_digest = TransactionDigest::random();
3860 let expected_effects = TransactionEffectsDigest::random();
3861 let actual_effects = TransactionEffectsDigest::random();
3862
3863 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3864
3865 store
3866 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3867 .unwrap();
3868
3869 let retrieved = store.get_transaction_fork_detected().unwrap();
3870 assert!(retrieved.is_some());
3871 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3872 assert_eq!(retrieved_tx, tx_digest);
3873 assert_eq!(retrieved_expected, expected_effects);
3874 assert_eq!(retrieved_actual, actual_effects);
3875
3876 store.clear_transaction_fork_detected().unwrap();
3877 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3878 }
3879
3880 #[sim_test]
3881 pub async fn checkpoint_builder_test() {
3882 telemetry_subscribers::init_for_testing();
3883
3884 let mut protocol_config =
3885 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3886 protocol_config.disable_accumulators_for_testing();
3887 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3888 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3889 let state = TestAuthorityBuilder::new()
3890 .with_protocol_config(protocol_config)
3891 .build()
3892 .await;
3893
3894 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3895 0,
3896 0,
3897 vec![],
3898 SequenceNumber::new(),
3899 );
3900
3901 let jwks = {
3902 let mut jwks = Vec::new();
3903 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3904 jwks.push(ActiveJwk {
3905 jwk_id: JwkId::new(
3906 "https://accounts.google.com".to_string(),
3907 "1234567890".to_string(),
3908 ),
3909 jwk: JWK {
3910 kty: "RSA".to_string(),
3911 e: "AQAB".to_string(),
3912 n: "1234567890".to_string(),
3913 alg: "RS256".to_string(),
3914 },
3915 epoch: 0,
3916 });
3917 }
3918 jwks
3919 };
3920
3921 let dummy_tx_with_data =
3922 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3923
3924 for i in 0..15 {
3925 state
3926 .database_for_testing()
3927 .perpetual_tables
3928 .transactions
3929 .insert(&d(i), dummy_tx.serializable_ref())
3930 .unwrap();
3931 }
3932 for i in 15..20 {
3933 state
3934 .database_for_testing()
3935 .perpetual_tables
3936 .transactions
3937 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3938 .unwrap();
3939 }
3940
3941 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3942 commit_cert_for_test(
3943 &mut store,
3944 state.clone(),
3945 d(1),
3946 vec![d(2), d(3)],
3947 GasCostSummary::new(11, 12, 11, 1),
3948 );
3949 commit_cert_for_test(
3950 &mut store,
3951 state.clone(),
3952 d(2),
3953 vec![d(3), d(4)],
3954 GasCostSummary::new(21, 22, 21, 1),
3955 );
3956 commit_cert_for_test(
3957 &mut store,
3958 state.clone(),
3959 d(3),
3960 vec![],
3961 GasCostSummary::new(31, 32, 31, 1),
3962 );
3963 commit_cert_for_test(
3964 &mut store,
3965 state.clone(),
3966 d(4),
3967 vec![],
3968 GasCostSummary::new(41, 42, 41, 1),
3969 );
3970 for i in [5, 6, 7, 10, 11, 12, 13] {
3971 commit_cert_for_test(
3972 &mut store,
3973 state.clone(),
3974 d(i),
3975 vec![],
3976 GasCostSummary::new(41, 42, 41, 1),
3977 );
3978 }
3979 for i in [15, 16, 17] {
3980 commit_cert_for_test(
3981 &mut store,
3982 state.clone(),
3983 d(i),
3984 vec![],
3985 GasCostSummary::new(51, 52, 51, 1),
3986 );
3987 }
3988 let all_digests: Vec<_> = store.keys().copied().collect();
3989 for digest in all_digests {
3990 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3991 state
3992 .epoch_store_for_testing()
3993 .test_insert_user_signature(digest, vec![(signature, None)]);
3994 }
3995
3996 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3997 let (certified_output, mut certified_result) =
3998 mpsc::channel::<CertifiedCheckpointSummary>(10);
3999 let store = Arc::new(store);
4000
4001 let ckpt_dir = tempfile::tempdir().unwrap();
4002 let checkpoint_store =
4003 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
4004 let epoch_store = state.epoch_store_for_testing();
4005
4006 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
4007 state.get_global_state_hash_store().clone(),
4008 ));
4009
4010 let checkpoint_service = CheckpointService::build(
4011 state.clone(),
4012 checkpoint_store,
4013 epoch_store.clone(),
4014 store,
4015 Arc::downgrade(&global_state_hasher),
4016 Box::new(output),
4017 Box::new(certified_output),
4018 CheckpointMetrics::new_for_tests(),
4019 3,
4020 100_000,
4021 );
4022 checkpoint_service.spawn(epoch_store.clone(), None).await;
4023
4024 checkpoint_service
4025 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4026 .unwrap();
4027 checkpoint_service
4028 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4029 .unwrap();
4030 checkpoint_service
4031 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4032 .unwrap();
4033 checkpoint_service
4034 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4035 .unwrap();
4036 checkpoint_service
4037 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4038 .unwrap();
4039 checkpoint_service
4040 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4041 .unwrap();
4042
4043 let (c1c, c1s) = result.recv().await.unwrap();
4044 let (c2c, c2s) = result.recv().await.unwrap();
4045
4046 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4047 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4048 assert_eq!(c1t, vec![d(4)]);
4049 assert_eq!(c1s.previous_digest, None);
4050 assert_eq!(c1s.sequence_number, 0);
4051 assert_eq!(
4052 c1s.epoch_rolling_gas_cost_summary,
4053 GasCostSummary::new(41, 42, 41, 1)
4054 );
4055
4056 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4057 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4058 assert_eq!(c2s.sequence_number, 1);
4059 assert_eq!(
4060 c2s.epoch_rolling_gas_cost_summary,
4061 GasCostSummary::new(104, 108, 104, 4)
4062 );
4063
4064 let (c3c, c3s) = result.recv().await.unwrap();
4067 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4068 let (c4c, c4s) = result.recv().await.unwrap();
4069 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4070 assert_eq!(c3s.sequence_number, 2);
4071 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4072 assert_eq!(c4s.sequence_number, 3);
4073 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4074 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4075 assert_eq!(c4t, vec![d(13)]);
4076
4077 let (c5c, c5s) = result.recv().await.unwrap();
4080 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4081 let (c6c, c6s) = result.recv().await.unwrap();
4082 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4083 assert_eq!(c5s.sequence_number, 4);
4084 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4085 assert_eq!(c6s.sequence_number, 5);
4086 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4087 assert_eq!(c5t, vec![d(15), d(16)]);
4088 assert_eq!(c6t, vec![d(17)]);
4089
4090 let (c7c, c7s) = result.recv().await.unwrap();
4093 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4094 assert_eq!(c7t, vec![d(5), d(6)]);
4095 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4096 assert_eq!(c7s.sequence_number, 6);
4097
4098 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4099 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4100
4101 checkpoint_service
4102 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4103 .unwrap();
4104 checkpoint_service
4105 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4106 .unwrap();
4107
4108 let c1sc = certified_result.recv().await.unwrap();
4109 let c2sc = certified_result.recv().await.unwrap();
4110 assert_eq!(c1sc.sequence_number, 0);
4111 assert_eq!(c2sc.sequence_number, 1);
4112 }
4113
4114 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4115 fn notify_read_executed_effects_may_fail(
4116 &self,
4117 _: &str,
4118 digests: &[TransactionDigest],
4119 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4120 std::future::ready(Ok(digests
4121 .iter()
4122 .map(|d| self.get(d).expect("effects not found").clone())
4123 .collect()))
4124 .boxed()
4125 }
4126
4127 fn notify_read_executed_effects_digests(
4128 &self,
4129 _: &str,
4130 digests: &[TransactionDigest],
4131 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4132 std::future::ready(
4133 digests
4134 .iter()
4135 .map(|d| {
4136 self.get(d)
4137 .map(|fx| fx.digest())
4138 .expect("effects not found")
4139 })
4140 .collect(),
4141 )
4142 .boxed()
4143 }
4144
4145 fn multi_get_executed_effects(
4146 &self,
4147 digests: &[TransactionDigest],
4148 ) -> Vec<Option<TransactionEffects>> {
4149 digests.iter().map(|d| self.get(d).cloned()).collect()
4150 }
4151
4152 fn multi_get_transaction_blocks(
4158 &self,
4159 _: &[TransactionDigest],
4160 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4161 unimplemented!()
4162 }
4163
4164 fn multi_get_executed_effects_digests(
4165 &self,
4166 _: &[TransactionDigest],
4167 ) -> Vec<Option<TransactionEffectsDigest>> {
4168 unimplemented!()
4169 }
4170
4171 fn multi_get_effects(
4172 &self,
4173 _: &[TransactionEffectsDigest],
4174 ) -> Vec<Option<TransactionEffects>> {
4175 unimplemented!()
4176 }
4177
4178 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4179 unimplemented!()
4180 }
4181
4182 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4183 unimplemented!()
4184 }
4185
4186 fn get_unchanged_loaded_runtime_objects(
4187 &self,
4188 _digest: &TransactionDigest,
4189 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4190 unimplemented!()
4191 }
4192
4193 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4194 unimplemented!()
4195 }
4196 }
4197
4198 #[async_trait::async_trait]
4199 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4200 async fn checkpoint_created(
4201 &self,
4202 summary: &CheckpointSummary,
4203 contents: &CheckpointContents,
4204 _epoch_store: &Arc<AuthorityPerEpochStore>,
4205 _checkpoint_store: &Arc<CheckpointStore>,
4206 ) -> SuiResult {
4207 self.try_send((contents.clone(), summary.clone())).unwrap();
4208 Ok(())
4209 }
4210 }
4211
4212 #[async_trait::async_trait]
4213 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4214 async fn certified_checkpoint_created(
4215 &self,
4216 summary: &CertifiedCheckpointSummary,
4217 ) -> SuiResult {
4218 self.try_send(summary.clone()).unwrap();
4219 Ok(())
4220 }
4221 }
4222
4223 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4224 PendingCheckpoint {
4225 roots: t
4226 .into_iter()
4227 .map(|t| TransactionKey::Digest(d(t)))
4228 .collect(),
4229 details: PendingCheckpointInfo {
4230 timestamp_ms,
4231 last_of_epoch: false,
4232 checkpoint_height: i,
4233 consensus_commit_ref: CommitRef::default(),
4234 rejected_transactions_digest: Digest::default(),
4235 checkpoint_seq: None,
4236 },
4237 }
4238 }
4239
4240 fn d(i: u8) -> TransactionDigest {
4241 let mut bytes: [u8; 32] = Default::default();
4242 bytes[0] = i;
4243 TransactionDigest::new(bytes)
4244 }
4245
4246 fn e(
4247 transaction_digest: TransactionDigest,
4248 dependencies: Vec<TransactionDigest>,
4249 gas_used: GasCostSummary,
4250 ) -> TransactionEffects {
4251 let mut effects = TransactionEffects::default();
4252 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4253 *effects.dependencies_mut_for_testing() = dependencies;
4254 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4255 effects
4256 }
4257
4258 fn commit_cert_for_test(
4259 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4260 state: Arc<AuthorityState>,
4261 digest: TransactionDigest,
4262 dependencies: Vec<TransactionDigest>,
4263 gas_used: GasCostSummary,
4264 ) {
4265 let epoch_store = state.epoch_store_for_testing();
4266 let effects = e(digest, dependencies, gas_used);
4267 store.insert(digest, effects.clone());
4268 epoch_store.insert_executed_in_epoch(&digest);
4269 }
4270}