1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116 pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123 pub roots: Vec<TransactionKey>,
124 pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129 pub tx_roots: Vec<TransactionKey>,
130 pub settlement_root: Option<TransactionKey>,
131 pub height: CheckpointHeight,
132}
133
134#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142 pub roots: Vec<CheckpointRoots>,
143 pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148 pub summary: CheckpointSummary,
149 pub checkpoint_height: Option<CheckpointHeight>,
151 pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160 pub(crate) checkpoint_sequence_by_contents_digest:
162 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189 pub(crate) transaction_fork_detected: DBMap<
191 u8,
192 (
193 TransactionDigest,
194 TransactionEffectsDigest,
195 TransactionEffectsDigest,
196 ),
197 >,
198 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203 DBOptions {
204 options: default_db_options().options,
205 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209 }
210}
211
212impl CheckpointStoreTables {
213 #[cfg(not(tidehunter))]
214 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216 }
217
218 #[cfg(tidehunter)]
219 pub fn new(
220 path: &Path,
221 metric_name: &'static str,
222 pruner_watermarks: Arc<PrunerWatermarks>,
223 ) -> Self {
224 tracing::warn!("Checkpoint DB using tidehunter");
225 use crate::authority::authority_store_pruner::apply_relocation_filter;
226 use typed_store::tidehunter_util::{
227 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228 default_mutex_count, default_value_cache_size,
229 };
230 let mutexes = default_mutex_count() * 4;
231 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232 let override_dirty_keys_config = KeySpaceConfig::new()
233 .with_max_dirty_keys(64_000)
234 .with_value_cache_size(default_value_cache_size());
235 let config_u64 = ThConfig::new_with_config(
236 8,
237 mutexes,
238 u64_sequence_key,
239 override_dirty_keys_config.clone(),
240 );
241 let digest_config = ThConfig::new_with_rm_prefix(
242 32,
243 mutexes,
244 KeyType::uniform(default_cells_per_mutex()),
245 KeySpaceConfig::default(),
246 vec![0, 0, 0, 0, 0, 0, 0, 32],
247 );
248 let watermarks_config = KeySpaceConfig::new()
249 .with_value_cache_size(10)
250 .disable_unload();
251 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
252 let configs = vec![
253 (
254 "checkpoint_content",
255 digest_config.clone().with_config(
256 lru_config
257 .clone()
258 .with_relocation_filter(|_, _| Decision::Remove),
259 ),
260 ),
261 (
262 "checkpoint_sequence_by_contents_digest",
263 digest_config.clone().with_config(apply_relocation_filter(
264 KeySpaceConfig::default(),
265 pruner_watermarks.checkpoint_id.clone(),
266 |sequence_number: CheckpointSequenceNumber| sequence_number,
267 false,
268 )),
269 ),
270 (
271 "full_checkpoint_content",
272 config_u64.clone().with_config(apply_relocation_filter(
273 override_dirty_keys_config.clone(),
274 pruner_watermarks.checkpoint_id.clone(),
275 |sequence_number: CheckpointSequenceNumber| sequence_number,
276 true,
277 )),
278 ),
279 ("certified_checkpoints", config_u64.clone()),
280 (
281 "checkpoint_by_digest",
282 digest_config.clone().with_config(apply_relocation_filter(
283 lru_config,
284 pruner_watermarks.epoch_id.clone(),
285 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
286 false,
287 )),
288 ),
289 (
290 "locally_computed_checkpoints",
291 config_u64.clone().with_config(apply_relocation_filter(
292 override_dirty_keys_config.clone(),
293 pruner_watermarks.checkpoint_id.clone(),
294 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
295 true,
296 )),
297 ),
298 ("epoch_last_checkpoint_map", config_u64.clone()),
299 (
300 "watermarks",
301 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
302 ),
303 (
304 "transaction_fork_detected",
305 ThConfig::new_with_config(
306 1,
307 1,
308 KeyType::uniform(1),
309 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
310 ),
311 ),
312 (
313 "full_checkpoint_content_v2",
314 config_u64.clone().with_config(apply_relocation_filter(
315 override_dirty_keys_config.clone(),
316 pruner_watermarks.checkpoint_id.clone(),
317 |sequence_number: CheckpointSequenceNumber| sequence_number,
318 true,
319 )),
320 ),
321 ];
322 Self::open_tables_read_write(
323 path.to_path_buf(),
324 MetricConf::new(metric_name),
325 configs
326 .into_iter()
327 .map(|(cf, config)| (cf.to_string(), config))
328 .collect(),
329 )
330 }
331
332 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
333 Self::get_read_only_handle(
334 path.to_path_buf(),
335 None,
336 None,
337 MetricConf::new("checkpoint_readonly"),
338 )
339 }
340}
341
342pub struct CheckpointStore {
343 pub(crate) tables: CheckpointStoreTables,
344 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
345 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
346}
347
348impl CheckpointStore {
349 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
350 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
351 Arc::new(Self {
352 tables,
353 synced_checkpoint_notify_read: NotifyRead::new(),
354 executed_checkpoint_notify_read: NotifyRead::new(),
355 })
356 }
357
358 pub fn new_for_tests() -> Arc<Self> {
359 let ckpt_dir = mysten_common::tempdir().unwrap();
360 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
361 }
362
363 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
364 let tables = CheckpointStoreTables::new(
365 path,
366 "db_checkpoint",
367 Arc::new(PrunerWatermarks::default()),
368 );
369 Arc::new(Self {
370 tables,
371 synced_checkpoint_notify_read: NotifyRead::new(),
372 executed_checkpoint_notify_read: NotifyRead::new(),
373 })
374 }
375
376 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
377 CheckpointStoreTables::open_readonly(path)
378 }
379
380 #[instrument(level = "info", skip_all)]
381 pub fn insert_genesis_checkpoint(
382 &self,
383 checkpoint: VerifiedCheckpoint,
384 contents: CheckpointContents,
385 epoch_store: &AuthorityPerEpochStore,
386 ) {
387 assert_eq!(
388 checkpoint.epoch(),
389 0,
390 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
391 );
392 assert_eq!(
393 *checkpoint.sequence_number(),
394 0,
395 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
396 );
397
398 match self.get_checkpoint_by_sequence_number(0).unwrap() {
400 Some(existing_checkpoint) => {
401 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
402 }
403 None => {
404 if epoch_store.epoch() == checkpoint.epoch {
405 epoch_store
406 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
407 .unwrap();
408 } else {
409 debug!(
410 validator_epoch =% epoch_store.epoch(),
411 genesis_epoch =% checkpoint.epoch(),
412 "Not inserting checkpoint builder data for genesis checkpoint",
413 );
414 }
415 self.insert_checkpoint_contents(contents).unwrap();
416 self.insert_verified_checkpoint(&checkpoint).unwrap();
417 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
418 }
419 }
420 }
421
422 pub fn get_checkpoint_by_digest(
423 &self,
424 digest: &CheckpointDigest,
425 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
426 self.tables
427 .checkpoint_by_digest
428 .get(digest)
429 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
430 }
431
432 pub fn get_checkpoint_by_sequence_number(
433 &self,
434 sequence_number: CheckpointSequenceNumber,
435 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
436 self.tables
437 .certified_checkpoints
438 .get(&sequence_number)
439 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
440 }
441
442 pub fn get_locally_computed_checkpoint(
443 &self,
444 sequence_number: CheckpointSequenceNumber,
445 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
446 self.tables
447 .locally_computed_checkpoints
448 .get(&sequence_number)
449 }
450
451 pub fn multi_get_locally_computed_checkpoints(
452 &self,
453 sequence_numbers: &[CheckpointSequenceNumber],
454 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
455 let checkpoints = self
456 .tables
457 .locally_computed_checkpoints
458 .multi_get(sequence_numbers)?;
459
460 Ok(checkpoints)
461 }
462
463 pub fn get_sequence_number_by_contents_digest(
464 &self,
465 digest: &CheckpointContentsDigest,
466 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
467 self.tables
468 .checkpoint_sequence_by_contents_digest
469 .get(digest)
470 }
471
472 pub fn delete_contents_digest_sequence_number_mapping(
473 &self,
474 digest: &CheckpointContentsDigest,
475 ) -> Result<(), TypedStoreError> {
476 self.tables
477 .checkpoint_sequence_by_contents_digest
478 .remove(digest)
479 }
480
481 pub fn get_latest_certified_checkpoint(
482 &self,
483 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
484 Ok(self
485 .tables
486 .certified_checkpoints
487 .reversed_safe_iter_with_bounds(None, None)?
488 .next()
489 .transpose()?
490 .map(|(_, v)| v.into()))
491 }
492
493 pub fn get_latest_locally_computed_checkpoint(
494 &self,
495 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
496 Ok(self
497 .tables
498 .locally_computed_checkpoints
499 .reversed_safe_iter_with_bounds(None, None)?
500 .next()
501 .transpose()?
502 .map(|(_, v)| v))
503 }
504
505 pub fn multi_get_checkpoint_by_sequence_number(
506 &self,
507 sequence_numbers: &[CheckpointSequenceNumber],
508 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
509 let checkpoints = self
510 .tables
511 .certified_checkpoints
512 .multi_get(sequence_numbers)?
513 .into_iter()
514 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
515 .collect();
516
517 Ok(checkpoints)
518 }
519
520 pub fn multi_get_checkpoint_content(
521 &self,
522 contents_digest: &[CheckpointContentsDigest],
523 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
524 self.tables.checkpoint_content.multi_get(contents_digest)
525 }
526
527 pub fn get_highest_verified_checkpoint(
528 &self,
529 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
530 let highest_verified = if let Some(highest_verified) = self
531 .tables
532 .watermarks
533 .get(&CheckpointWatermark::HighestVerified)?
534 {
535 highest_verified
536 } else {
537 return Ok(None);
538 };
539 self.get_checkpoint_by_digest(&highest_verified.1)
540 }
541
542 pub fn get_highest_synced_checkpoint(
543 &self,
544 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
545 let highest_synced = if let Some(highest_synced) = self
546 .tables
547 .watermarks
548 .get(&CheckpointWatermark::HighestSynced)?
549 {
550 highest_synced
551 } else {
552 return Ok(None);
553 };
554 self.get_checkpoint_by_digest(&highest_synced.1)
555 }
556
557 pub fn get_highest_synced_checkpoint_seq_number(
558 &self,
559 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
560 if let Some(highest_synced) = self
561 .tables
562 .watermarks
563 .get(&CheckpointWatermark::HighestSynced)?
564 {
565 Ok(Some(highest_synced.0))
566 } else {
567 Ok(None)
568 }
569 }
570
571 pub fn get_highest_executed_checkpoint_seq_number(
572 &self,
573 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
574 if let Some(highest_executed) = self
575 .tables
576 .watermarks
577 .get(&CheckpointWatermark::HighestExecuted)?
578 {
579 Ok(Some(highest_executed.0))
580 } else {
581 Ok(None)
582 }
583 }
584
585 pub fn get_highest_executed_checkpoint(
586 &self,
587 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
588 let highest_executed = if let Some(highest_executed) = self
589 .tables
590 .watermarks
591 .get(&CheckpointWatermark::HighestExecuted)?
592 {
593 highest_executed
594 } else {
595 return Ok(None);
596 };
597 self.get_checkpoint_by_digest(&highest_executed.1)
598 }
599
600 pub fn get_highest_pruned_checkpoint_seq_number(
601 &self,
602 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
603 self.tables
604 .watermarks
605 .get(&CheckpointWatermark::HighestPruned)
606 .map(|watermark| watermark.map(|w| w.0))
607 }
608
609 pub fn get_checkpoint_contents(
610 &self,
611 digest: &CheckpointContentsDigest,
612 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
613 self.tables.checkpoint_content.get(digest)
614 }
615
616 pub fn get_full_checkpoint_contents_by_sequence_number(
617 &self,
618 seq: CheckpointSequenceNumber,
619 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
620 self.tables.full_checkpoint_content_v2.get(&seq)
621 }
622
623 fn prune_local_summaries(&self) -> SuiResult {
624 if let Some((last_local_summary, _)) = self
625 .tables
626 .locally_computed_checkpoints
627 .reversed_safe_iter_with_bounds(None, None)?
628 .next()
629 .transpose()?
630 {
631 let mut batch = self.tables.locally_computed_checkpoints.batch();
632 batch.schedule_delete_range(
633 &self.tables.locally_computed_checkpoints,
634 &0,
635 &last_local_summary,
636 )?;
637 batch.write()?;
638 info!("Pruned local summaries up to {:?}", last_local_summary);
639 }
640 Ok(())
641 }
642
643 pub fn clear_locally_computed_checkpoints_from(
644 &self,
645 from_seq: CheckpointSequenceNumber,
646 ) -> SuiResult {
647 let keys: Vec<_> = self
648 .tables
649 .locally_computed_checkpoints
650 .safe_iter_with_bounds(Some(from_seq), None)
651 .map(|r| r.map(|(k, _)| k))
652 .collect::<Result<_, _>>()?;
653 if let Some(&last_local_summary) = keys.last() {
654 let mut batch = self.tables.locally_computed_checkpoints.batch();
655 batch
656 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
657 .expect("Failed to delete locally computed checkpoints");
658 batch
659 .write()
660 .expect("Failed to delete locally computed checkpoints");
661 warn!(
662 from_seq,
663 last_local_summary,
664 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
665 from_seq,
666 last_local_summary
667 );
668 }
669 Ok(())
670 }
671
672 fn check_for_checkpoint_fork(
673 &self,
674 local_checkpoint: &CheckpointSummary,
675 verified_checkpoint: &VerifiedCheckpoint,
676 ) {
677 if local_checkpoint != verified_checkpoint.data() {
678 let verified_contents = self
679 .get_checkpoint_contents(&verified_checkpoint.content_digest)
680 .map(|opt_contents| {
681 opt_contents
682 .map(|contents| format!("{:?}", contents))
683 .unwrap_or_else(|| {
684 format!(
685 "Verified checkpoint contents not found, digest: {:?}",
686 verified_checkpoint.content_digest,
687 )
688 })
689 })
690 .map_err(|e| {
691 format!(
692 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
693 verified_checkpoint.content_digest, e
694 )
695 })
696 .unwrap_or_else(|err_msg| err_msg);
697
698 let local_contents = self
699 .get_checkpoint_contents(&local_checkpoint.content_digest)
700 .map(|opt_contents| {
701 opt_contents
702 .map(|contents| format!("{:?}", contents))
703 .unwrap_or_else(|| {
704 format!(
705 "Local checkpoint contents not found, digest: {:?}",
706 local_checkpoint.content_digest
707 )
708 })
709 })
710 .map_err(|e| {
711 format!(
712 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
713 local_checkpoint.content_digest, e
714 )
715 })
716 .unwrap_or_else(|err_msg| err_msg);
717
718 error!(
720 verified_checkpoint = ?verified_checkpoint.data(),
721 ?verified_contents,
722 ?local_checkpoint,
723 ?local_contents,
724 "Local checkpoint fork detected!",
725 );
726
727 if let Err(e) = self.record_checkpoint_fork_detected(
729 *local_checkpoint.sequence_number(),
730 local_checkpoint.digest(),
731 ) {
732 error!("Failed to record checkpoint fork in database: {:?}", e);
733 }
734
735 fail_point_arg!(
736 "kill_checkpoint_fork_node",
737 |checkpoint_overrides: std::sync::Arc<
738 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
739 >| {
740 #[cfg(msim)]
741 {
742 if let Ok(mut overrides) = checkpoint_overrides.lock() {
743 overrides.insert(
744 local_checkpoint.sequence_number,
745 verified_checkpoint.digest().to_string(),
746 );
747 }
748 tracing::error!(
749 fatal = true,
750 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
751 local_checkpoint.sequence_number(),
752 verified_checkpoint.digest()
753 );
754 sui_simulator::task::shutdown_current_node();
755 }
756 }
757 );
758
759 fatal!(
760 "Local checkpoint fork detected for sequence number: {}",
761 local_checkpoint.sequence_number()
762 );
763 }
764 }
765
766 pub fn insert_certified_checkpoint(
772 &self,
773 checkpoint: &VerifiedCheckpoint,
774 ) -> Result<(), TypedStoreError> {
775 debug!(
776 checkpoint_seq = checkpoint.sequence_number(),
777 "Inserting certified checkpoint",
778 );
779 let mut batch = self.tables.certified_checkpoints.batch();
780 batch
781 .insert_batch(
782 &self.tables.certified_checkpoints,
783 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
784 )?
785 .insert_batch(
786 &self.tables.checkpoint_by_digest,
787 [(checkpoint.digest(), checkpoint.serializable_ref())],
788 )?;
789 if checkpoint.next_epoch_committee().is_some() {
790 batch.insert_batch(
791 &self.tables.epoch_last_checkpoint_map,
792 [(&checkpoint.epoch(), checkpoint.sequence_number())],
793 )?;
794 }
795 batch.write()?;
796
797 if let Some(local_checkpoint) = self
798 .tables
799 .locally_computed_checkpoints
800 .get(checkpoint.sequence_number())?
801 {
802 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
803 }
804
805 Ok(())
806 }
807
808 #[instrument(level = "debug", skip_all)]
811 pub fn insert_verified_checkpoint(
812 &self,
813 checkpoint: &VerifiedCheckpoint,
814 ) -> Result<(), TypedStoreError> {
815 self.insert_certified_checkpoint(checkpoint)?;
816 self.update_highest_verified_checkpoint(checkpoint)
817 }
818
819 pub fn update_highest_verified_checkpoint(
820 &self,
821 checkpoint: &VerifiedCheckpoint,
822 ) -> Result<(), TypedStoreError> {
823 if Some(*checkpoint.sequence_number())
824 > self
825 .get_highest_verified_checkpoint()?
826 .map(|x| *x.sequence_number())
827 {
828 debug!(
829 checkpoint_seq = checkpoint.sequence_number(),
830 "Updating highest verified checkpoint",
831 );
832 self.tables.watermarks.insert(
833 &CheckpointWatermark::HighestVerified,
834 &(*checkpoint.sequence_number(), *checkpoint.digest()),
835 )?;
836 }
837
838 Ok(())
839 }
840
841 pub fn update_highest_synced_checkpoint(
842 &self,
843 checkpoint: &VerifiedCheckpoint,
844 ) -> Result<(), TypedStoreError> {
845 let seq = *checkpoint.sequence_number();
846 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
847 self.tables.watermarks.insert(
848 &CheckpointWatermark::HighestSynced,
849 &(seq, *checkpoint.digest()),
850 )?;
851 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
852 Ok(())
853 }
854
855 async fn notify_read_checkpoint_watermark<F>(
856 &self,
857 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
858 seq: CheckpointSequenceNumber,
859 get_watermark: F,
860 ) -> VerifiedCheckpoint
861 where
862 F: Fn() -> Option<CheckpointSequenceNumber>,
863 {
864 notify_read
865 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
866 let seq = seqs[0];
867 let Some(highest) = get_watermark() else {
868 return vec![None];
869 };
870 if highest < seq {
871 return vec![None];
872 }
873 let checkpoint = self
874 .get_checkpoint_by_sequence_number(seq)
875 .expect("db error")
876 .expect("checkpoint not found");
877 vec![Some(checkpoint)]
878 })
879 .await
880 .into_iter()
881 .next()
882 .unwrap()
883 }
884
885 pub async fn notify_read_synced_checkpoint(
886 &self,
887 seq: CheckpointSequenceNumber,
888 ) -> VerifiedCheckpoint {
889 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
890 self.get_highest_synced_checkpoint_seq_number()
891 .expect("db error")
892 })
893 .await
894 }
895
896 pub async fn notify_read_executed_checkpoint(
897 &self,
898 seq: CheckpointSequenceNumber,
899 ) -> VerifiedCheckpoint {
900 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
901 self.get_highest_executed_checkpoint_seq_number()
902 .expect("db error")
903 })
904 .await
905 }
906
907 pub fn update_highest_executed_checkpoint(
908 &self,
909 checkpoint: &VerifiedCheckpoint,
910 ) -> Result<(), TypedStoreError> {
911 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
912 if seq_number >= *checkpoint.sequence_number() {
913 return Ok(());
914 }
915 assert_eq!(
916 seq_number + 1,
917 *checkpoint.sequence_number(),
918 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
919 checkpoint.sequence_number(),
920 seq_number
921 );
922 }
923 let seq = *checkpoint.sequence_number();
924 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
925 self.tables.watermarks.insert(
926 &CheckpointWatermark::HighestExecuted,
927 &(seq, *checkpoint.digest()),
928 )?;
929 self.executed_checkpoint_notify_read
930 .notify(&seq, checkpoint);
931 Ok(())
932 }
933
934 pub fn update_highest_pruned_checkpoint(
935 &self,
936 checkpoint: &VerifiedCheckpoint,
937 ) -> Result<(), TypedStoreError> {
938 self.tables.watermarks.insert(
939 &CheckpointWatermark::HighestPruned,
940 &(*checkpoint.sequence_number(), *checkpoint.digest()),
941 )
942 }
943
944 pub fn set_highest_executed_checkpoint_subtle(
949 &self,
950 checkpoint: &VerifiedCheckpoint,
951 ) -> Result<(), TypedStoreError> {
952 self.tables.watermarks.insert(
953 &CheckpointWatermark::HighestExecuted,
954 &(*checkpoint.sequence_number(), *checkpoint.digest()),
955 )
956 }
957
958 pub fn insert_checkpoint_contents(
959 &self,
960 contents: CheckpointContents,
961 ) -> Result<(), TypedStoreError> {
962 debug!(
963 checkpoint_seq = ?contents.digest(),
964 "Inserting checkpoint contents",
965 );
966 self.tables
967 .checkpoint_content
968 .insert(contents.digest(), &contents)
969 }
970
971 pub fn insert_verified_checkpoint_contents(
972 &self,
973 checkpoint: &VerifiedCheckpoint,
974 full_contents: VerifiedCheckpointContents,
975 ) -> Result<(), TypedStoreError> {
976 let mut batch = self.tables.full_checkpoint_content_v2.batch();
977 batch.insert_batch(
978 &self.tables.checkpoint_sequence_by_contents_digest,
979 [(&checkpoint.content_digest, checkpoint.sequence_number())],
980 )?;
981 let full_contents = full_contents.into_inner();
982 batch.insert_batch(
983 &self.tables.full_checkpoint_content_v2,
984 [(checkpoint.sequence_number(), &full_contents)],
985 )?;
986
987 let contents = full_contents.into_checkpoint_contents();
988 assert_eq!(&checkpoint.content_digest, contents.digest());
989
990 batch.insert_batch(
991 &self.tables.checkpoint_content,
992 [(contents.digest(), &contents)],
993 )?;
994
995 batch.write()
996 }
997
998 pub fn delete_full_checkpoint_contents(
999 &self,
1000 seq: CheckpointSequenceNumber,
1001 ) -> Result<(), TypedStoreError> {
1002 self.tables.full_checkpoint_content.remove(&seq)?;
1003 self.tables.full_checkpoint_content_v2.remove(&seq)
1004 }
1005
1006 pub fn get_epoch_last_checkpoint(
1007 &self,
1008 epoch_id: EpochId,
1009 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1010 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1011 let checkpoint = match seq {
1012 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1013 None => None,
1014 };
1015 Ok(checkpoint)
1016 }
1017
1018 pub fn get_epoch_last_checkpoint_seq_number(
1019 &self,
1020 epoch_id: EpochId,
1021 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1022 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1023 Ok(seq)
1024 }
1025
1026 pub fn insert_epoch_last_checkpoint(
1027 &self,
1028 epoch_id: EpochId,
1029 checkpoint: &VerifiedCheckpoint,
1030 ) -> SuiResult {
1031 self.tables
1032 .epoch_last_checkpoint_map
1033 .insert(&epoch_id, checkpoint.sequence_number())?;
1034 Ok(())
1035 }
1036
1037 pub fn get_epoch_state_commitments(
1038 &self,
1039 epoch: EpochId,
1040 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1041 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1042 checkpoint
1043 .end_of_epoch_data
1044 .as_ref()
1045 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1046 .epoch_commitments
1047 .clone()
1048 });
1049 Ok(commitments)
1050 }
1051
1052 pub fn get_epoch_stats(
1054 &self,
1055 epoch: EpochId,
1056 last_checkpoint: &CheckpointSummary,
1057 ) -> Option<EpochStats> {
1058 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1059 (0, 0)
1060 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1061 (
1062 checkpoint.sequence_number + 1,
1063 checkpoint.network_total_transactions,
1064 )
1065 } else {
1066 return None;
1067 };
1068 Some(EpochStats {
1069 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1070 transaction_count: last_checkpoint.network_total_transactions
1071 - prev_epoch_network_transactions,
1072 total_gas_reward: last_checkpoint
1073 .epoch_rolling_gas_cost_summary
1074 .computation_cost,
1075 })
1076 }
1077
1078 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1079 self.tables
1081 .checkpoint_content
1082 .checkpoint_db(path)
1083 .map_err(Into::into)
1084 }
1085
1086 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1087 let mut wb = self.tables.watermarks.batch();
1088 wb.delete_batch(
1089 &self.tables.watermarks,
1090 std::iter::once(CheckpointWatermark::HighestExecuted),
1091 )?;
1092 wb.write()?;
1093 Ok(())
1094 }
1095
1096 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1097 self.delete_highest_executed_checkpoint_test_only()?;
1098 Ok(())
1099 }
1100
1101 pub fn record_checkpoint_fork_detected(
1102 &self,
1103 checkpoint_seq: CheckpointSequenceNumber,
1104 checkpoint_digest: CheckpointDigest,
1105 ) -> Result<(), TypedStoreError> {
1106 info!(
1107 checkpoint_seq = checkpoint_seq,
1108 checkpoint_digest = ?checkpoint_digest,
1109 "Recording checkpoint fork detection in database"
1110 );
1111 self.tables.watermarks.insert(
1112 &CheckpointWatermark::CheckpointForkDetected,
1113 &(checkpoint_seq, checkpoint_digest),
1114 )
1115 }
1116
1117 pub fn get_checkpoint_fork_detected(
1118 &self,
1119 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1120 self.tables
1121 .watermarks
1122 .get(&CheckpointWatermark::CheckpointForkDetected)
1123 }
1124
1125 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1126 self.tables
1127 .watermarks
1128 .remove(&CheckpointWatermark::CheckpointForkDetected)
1129 }
1130
1131 pub fn record_transaction_fork_detected(
1132 &self,
1133 tx_digest: TransactionDigest,
1134 expected_effects_digest: TransactionEffectsDigest,
1135 actual_effects_digest: TransactionEffectsDigest,
1136 ) -> Result<(), TypedStoreError> {
1137 info!(
1138 tx_digest = ?tx_digest,
1139 expected_effects_digest = ?expected_effects_digest,
1140 actual_effects_digest = ?actual_effects_digest,
1141 "Recording transaction fork detection in database"
1142 );
1143 self.tables.transaction_fork_detected.insert(
1144 &TRANSACTION_FORK_DETECTED_KEY,
1145 &(tx_digest, expected_effects_digest, actual_effects_digest),
1146 )
1147 }
1148
1149 pub fn get_transaction_fork_detected(
1150 &self,
1151 ) -> Result<
1152 Option<(
1153 TransactionDigest,
1154 TransactionEffectsDigest,
1155 TransactionEffectsDigest,
1156 )>,
1157 TypedStoreError,
1158 > {
1159 self.tables
1160 .transaction_fork_detected
1161 .get(&TRANSACTION_FORK_DETECTED_KEY)
1162 }
1163
1164 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1165 self.tables
1166 .transaction_fork_detected
1167 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1168 }
1169}
1170
1171#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1172pub enum CheckpointWatermark {
1173 HighestVerified,
1174 HighestSynced,
1175 HighestExecuted,
1176 HighestPruned,
1177 CheckpointForkDetected,
1178}
1179
1180struct CheckpointStateHasher {
1181 epoch_store: Arc<AuthorityPerEpochStore>,
1182 hasher: Weak<GlobalStateHasher>,
1183 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1184}
1185
1186impl CheckpointStateHasher {
1187 fn new(
1188 epoch_store: Arc<AuthorityPerEpochStore>,
1189 hasher: Weak<GlobalStateHasher>,
1190 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1191 ) -> Self {
1192 Self {
1193 epoch_store,
1194 hasher,
1195 receive_from_builder,
1196 }
1197 }
1198
1199 async fn run(self) {
1200 let Self {
1201 epoch_store,
1202 hasher,
1203 mut receive_from_builder,
1204 } = self;
1205 while let Some((seq, effects)) = receive_from_builder.recv().await {
1206 let Some(hasher) = hasher.upgrade() else {
1207 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1208 break;
1209 };
1210 hasher
1211 .accumulate_checkpoint(&effects, seq, &epoch_store)
1212 .expect("epoch ended while accumulating checkpoint");
1213 }
1214 }
1215}
1216
1217#[derive(Debug)]
1218pub enum CheckpointBuilderError {
1219 ChangeEpochTxAlreadyExecuted,
1220 SystemPackagesMissing,
1221 Retry(anyhow::Error),
1222}
1223
1224impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1225 for CheckpointBuilderError
1226{
1227 fn from(e: SuiError) -> Self {
1228 Self::Retry(e.into())
1229 }
1230}
1231
1232pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1233
1234pub struct CheckpointBuilder {
1235 state: Arc<AuthorityState>,
1236 store: Arc<CheckpointStore>,
1237 epoch_store: Arc<AuthorityPerEpochStore>,
1238 notify: Arc<Notify>,
1239 notify_aggregator: Arc<Notify>,
1240 last_built: watch::Sender<CheckpointSequenceNumber>,
1241 effects_store: Arc<dyn TransactionCacheRead>,
1242 global_state_hasher: Weak<GlobalStateHasher>,
1243 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1244 output: Box<dyn CheckpointOutput>,
1245 metrics: Arc<CheckpointMetrics>,
1246 max_transactions_per_checkpoint: usize,
1247 max_checkpoint_size_bytes: usize,
1248}
1249
1250pub struct CheckpointAggregator {
1251 store: Arc<CheckpointStore>,
1252 epoch_store: Arc<AuthorityPerEpochStore>,
1253 notify: Arc<Notify>,
1254 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1255 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1256 current: Option<CheckpointSignatureAggregator>,
1257 output: Box<dyn CertifiedCheckpointOutput>,
1258 state: Arc<AuthorityState>,
1259 metrics: Arc<CheckpointMetrics>,
1260}
1261
1262pub struct CheckpointSignatureAggregator {
1264 summary: CheckpointSummary,
1265 digest: CheckpointDigest,
1266 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1268 store: Arc<CheckpointStore>,
1269 state: Arc<AuthorityState>,
1270 metrics: Arc<CheckpointMetrics>,
1271}
1272
1273impl CheckpointBuilder {
1274 fn new(
1275 state: Arc<AuthorityState>,
1276 store: Arc<CheckpointStore>,
1277 epoch_store: Arc<AuthorityPerEpochStore>,
1278 notify: Arc<Notify>,
1279 effects_store: Arc<dyn TransactionCacheRead>,
1280 global_state_hasher: Weak<GlobalStateHasher>,
1282 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1284 output: Box<dyn CheckpointOutput>,
1285 notify_aggregator: Arc<Notify>,
1286 last_built: watch::Sender<CheckpointSequenceNumber>,
1287 metrics: Arc<CheckpointMetrics>,
1288 max_transactions_per_checkpoint: usize,
1289 max_checkpoint_size_bytes: usize,
1290 ) -> Self {
1291 Self {
1292 state,
1293 store,
1294 epoch_store,
1295 notify,
1296 effects_store,
1297 global_state_hasher,
1298 send_to_hasher,
1299 output,
1300 notify_aggregator,
1301 last_built,
1302 metrics,
1303 max_transactions_per_checkpoint,
1304 max_checkpoint_size_bytes,
1305 }
1306 }
1307
1308 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1316 if let Some(replay_waiter) = consensus_replay_waiter {
1317 info!("Waiting for consensus commits to replay ...");
1318 replay_waiter.wait_for_replay().await;
1319 info!("Consensus commits finished replaying");
1320 }
1321 info!("Starting CheckpointBuilder");
1322 loop {
1323 match self.maybe_build_checkpoints().await {
1324 Ok(()) => {}
1325 err @ Err(
1326 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1327 | CheckpointBuilderError::SystemPackagesMissing,
1328 ) => {
1329 info!("CheckpointBuilder stopping: {:?}", err);
1330 return;
1331 }
1332 Err(CheckpointBuilderError::Retry(inner)) => {
1333 let msg = format!("{:?}", inner);
1334 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1335 tokio::time::sleep(Duration::from_secs(1)).await;
1336 self.metrics.checkpoint_errors.inc();
1337 continue;
1338 }
1339 }
1340
1341 self.notify.notified().await;
1342 }
1343 }
1344
1345 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1346 if self
1347 .epoch_store
1348 .protocol_config()
1349 .split_checkpoints_in_consensus_handler()
1350 {
1351 self.maybe_build_checkpoints_v2().await
1352 } else {
1353 self.maybe_build_checkpoints_v1().await
1354 }
1355 }
1356
1357 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1358 let _scope = monitored_scope("BuildCheckpoints");
1359
1360 let summary = self
1362 .epoch_store
1363 .last_built_checkpoint_builder_summary()
1364 .expect("epoch should not have ended");
1365 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1366 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1367
1368 let min_checkpoint_interval_ms = self
1369 .epoch_store
1370 .protocol_config()
1371 .min_checkpoint_interval_ms_as_option()
1372 .unwrap_or_default();
1373 let mut grouped_pending_checkpoints = Vec::new();
1374 let mut checkpoints_iter = self
1375 .epoch_store
1376 .get_pending_checkpoints(last_height)
1377 .expect("unexpected epoch store error")
1378 .into_iter()
1379 .peekable();
1380 while let Some((height, pending)) = checkpoints_iter.next() {
1381 let current_timestamp = pending.details().timestamp_ms;
1384 let can_build = match last_timestamp {
1385 Some(last_timestamp) => {
1386 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1387 }
1388 None => true,
1389 } || checkpoints_iter
1392 .peek()
1393 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1394 || pending.details().last_of_epoch;
1396 grouped_pending_checkpoints.push(pending);
1397 if !can_build {
1398 debug!(
1399 checkpoint_commit_height = height,
1400 ?last_timestamp,
1401 ?current_timestamp,
1402 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1403 );
1404 continue;
1405 }
1406
1407 last_height = Some(height);
1409 last_timestamp = Some(current_timestamp);
1410 debug!(
1411 checkpoint_commit_height_from = grouped_pending_checkpoints
1412 .first()
1413 .unwrap()
1414 .details()
1415 .checkpoint_height,
1416 checkpoint_commit_height_to = last_height,
1417 "Making checkpoint with commit height range"
1418 );
1419
1420 let seq = self
1421 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1422 .await?;
1423
1424 self.last_built.send_if_modified(|cur| {
1425 if seq > *cur {
1427 *cur = seq;
1428 true
1429 } else {
1430 false
1431 }
1432 });
1433
1434 tokio::task::yield_now().await;
1437 }
1438 debug!(
1439 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1440 grouped_pending_checkpoints.len(),
1441 );
1442
1443 Ok(())
1444 }
1445
1446 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1447 let _scope = monitored_scope("BuildCheckpoints");
1448
1449 let last_height = self
1451 .epoch_store
1452 .last_built_checkpoint_builder_summary()
1453 .expect("epoch should not have ended")
1454 .and_then(|s| s.checkpoint_height);
1455
1456 for (height, pending) in self
1457 .epoch_store
1458 .get_pending_checkpoints_v2(last_height)
1459 .expect("unexpected epoch store error")
1460 {
1461 debug!(checkpoint_commit_height = height, "Making checkpoint");
1462
1463 let seq = self.make_checkpoint_v2(pending).await?;
1464
1465 self.last_built.send_if_modified(|cur| {
1466 if seq > *cur {
1468 *cur = seq;
1469 true
1470 } else {
1471 false
1472 }
1473 });
1474
1475 tokio::task::yield_now().await;
1478 }
1479
1480 Ok(())
1481 }
1482
1483 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1484 async fn make_checkpoint(
1485 &mut self,
1486 pendings: Vec<PendingCheckpoint>,
1487 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1488 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1489
1490 let pending_ckpt_str = pendings
1491 .iter()
1492 .map(|p| {
1493 format!(
1494 "height={}, commit={}",
1495 p.details().checkpoint_height,
1496 p.details().consensus_commit_ref
1497 )
1498 })
1499 .join("; ");
1500
1501 let last_details = pendings.last().unwrap().details().clone();
1502
1503 let highest_executed_sequence = self
1506 .store
1507 .get_highest_executed_checkpoint_seq_number()
1508 .expect("db error")
1509 .unwrap_or(0);
1510
1511 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1512 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1513
1514 let new_checkpoints = self
1515 .create_checkpoints(
1516 sorted_tx_effects_included_in_checkpoint,
1517 &last_details,
1518 &all_roots,
1519 )
1520 .await?;
1521 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1522 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1523 debug_fatal!(
1524 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1525 );
1526 }
1527
1528 let new_ckpt_str = new_checkpoints
1529 .iter()
1530 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1531 .join("; ");
1532
1533 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1534 .await?;
1535 info!(
1536 "Made new checkpoint {} from pending checkpoint {}",
1537 new_ckpt_str, pending_ckpt_str
1538 );
1539
1540 Ok(highest_sequence)
1541 }
1542
1543 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1544 async fn make_checkpoint_v2(
1545 &mut self,
1546 pending: PendingCheckpointV2,
1547 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1548 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1549
1550 let details = pending.details.clone();
1551
1552 let highest_executed_sequence = self
1553 .store
1554 .get_highest_executed_checkpoint_seq_number()
1555 .expect("db error")
1556 .unwrap_or(0);
1557
1558 let (poll_count, result) =
1559 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1560 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1561
1562 let new_checkpoints = self
1563 .create_checkpoints(
1564 sorted_tx_effects_included_in_checkpoint,
1565 &details,
1566 &all_roots,
1567 )
1568 .await?;
1569 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1570 let sequence = *new_checkpoints.first().0.sequence_number();
1571 let digest = new_checkpoints.first().0.digest();
1572 if sequence <= highest_executed_sequence && poll_count > 1 {
1573 debug_fatal!(
1574 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1575 );
1576 }
1577
1578 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1579 .await?;
1580 info!(
1581 seq = sequence,
1582 %digest,
1583 height = details.checkpoint_height,
1584 commit = %details.consensus_commit_ref,
1585 "Made new checkpoint"
1586 );
1587
1588 Ok(sequence)
1589 }
1590
1591 async fn construct_and_execute_settlement_transactions(
1592 &self,
1593 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1594 checkpoint_height: CheckpointHeight,
1595 checkpoint_seq: CheckpointSequenceNumber,
1596 tx_index_offset: u64,
1597 ) -> (TransactionKey, Vec<TransactionEffects>) {
1598 let _scope =
1599 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1600
1601 let tx_key =
1602 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1603
1604 let epoch = self.epoch_store.epoch();
1605 let accumulator_root_obj_initial_shared_version = self
1606 .epoch_store
1607 .epoch_start_config()
1608 .accumulator_root_obj_initial_shared_version()
1609 .expect("accumulator root object must exist");
1610
1611 let builder = AccumulatorSettlementTxBuilder::new(
1612 Some(self.effects_store.as_ref()),
1613 sorted_tx_effects_included_in_checkpoint,
1614 checkpoint_seq,
1615 tx_index_offset,
1616 );
1617
1618 let funds_changes = builder.collect_funds_changes();
1619 let num_updates = builder.num_updates();
1620 let settlement_txns = builder.build_tx(
1621 self.epoch_store.protocol_config(),
1622 epoch,
1623 accumulator_root_obj_initial_shared_version,
1624 checkpoint_height,
1625 checkpoint_seq,
1626 );
1627
1628 let settlement_txns: Vec<_> = settlement_txns
1629 .into_iter()
1630 .map(|tx| {
1631 VerifiedExecutableTransaction::new_system(
1632 VerifiedTransaction::new_system_transaction(tx),
1633 self.epoch_store.epoch(),
1634 )
1635 })
1636 .collect();
1637
1638 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1639
1640 debug!(
1641 ?settlement_digests,
1642 ?tx_key,
1643 "created settlement transactions with {num_updates} updates"
1644 );
1645
1646 self.epoch_store
1647 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1648
1649 let settlement_effects = wait_for_effects_with_retry(
1650 self.effects_store.as_ref(),
1651 "CheckpointBuilder::notify_read_settlement_effects",
1652 &settlement_digests,
1653 tx_key,
1654 )
1655 .await;
1656
1657 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1658 epoch,
1659 accumulator_root_obj_initial_shared_version,
1660 checkpoint_height,
1661 &settlement_effects,
1662 );
1663
1664 let barrier_tx = VerifiedExecutableTransaction::new_system(
1665 VerifiedTransaction::new_system_transaction(barrier_tx),
1666 self.epoch_store.epoch(),
1667 );
1668 let barrier_digest = *barrier_tx.digest();
1669
1670 self.epoch_store
1671 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1672
1673 let barrier_effects = wait_for_effects_with_retry(
1674 self.effects_store.as_ref(),
1675 "CheckpointBuilder::notify_read_barrier_effects",
1676 &[barrier_digest],
1677 tx_key,
1678 )
1679 .await;
1680
1681 let settlement_effects: Vec<_> = settlement_effects
1682 .into_iter()
1683 .chain(barrier_effects)
1684 .collect();
1685
1686 let mut next_accumulator_version = None;
1687 for fx in settlement_effects.iter() {
1688 assert!(
1689 fx.status().is_ok(),
1690 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1691 fx.transaction_digest(),
1692 fx
1693 );
1694 if let Some(version) = fx
1695 .mutated()
1696 .iter()
1697 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1698 {
1699 assert!(
1700 next_accumulator_version.is_none(),
1701 "Only one settlement transaction should mutate the accumulator root object"
1702 );
1703 next_accumulator_version = Some(version);
1704 }
1705 }
1706 let settlements = FundsSettlement {
1707 next_accumulator_version: next_accumulator_version
1708 .expect("Accumulator root object should be mutated in the settlement transactions"),
1709 funds_changes,
1710 };
1711
1712 self.state
1713 .execution_scheduler()
1714 .settle_address_funds(settlements);
1715
1716 (tx_key, settlement_effects)
1717 }
1718
1719 #[instrument(level = "debug", skip_all)]
1724 async fn resolve_checkpoint_transactions(
1725 &self,
1726 pending_checkpoints: Vec<PendingCheckpoint>,
1727 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1728 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1729
1730 let mut effects_in_current_checkpoint = BTreeSet::new();
1735
1736 let mut tx_effects = Vec::new();
1737 let mut tx_roots = HashSet::new();
1738
1739 for pending_checkpoint in pending_checkpoints.into_iter() {
1740 let mut pending = pending_checkpoint;
1741 debug!(
1742 checkpoint_commit_height = pending.details.checkpoint_height,
1743 "Resolving checkpoint transactions for pending checkpoint.",
1744 );
1745
1746 trace!(
1747 "roots for pending checkpoint {:?}: {:?}",
1748 pending.details.checkpoint_height, pending.roots,
1749 );
1750
1751 let settlement_root = if self.epoch_store.accumulators_enabled() {
1752 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1753 pending.roots.pop()
1754 else {
1755 fatal!("No settlement root found");
1756 };
1757 Some(settlement_root)
1758 } else {
1759 None
1760 };
1761
1762 let roots = &pending.roots;
1763
1764 self.metrics
1765 .checkpoint_roots_count
1766 .inc_by(roots.len() as u64);
1767
1768 let root_digests = self
1769 .epoch_store
1770 .notify_read_tx_key_to_digest(roots)
1771 .in_monitored_scope("CheckpointNotifyDigests")
1772 .await?;
1773 let root_effects = self
1774 .effects_store
1775 .notify_read_executed_effects(
1776 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1777 &root_digests,
1778 )
1779 .in_monitored_scope("CheckpointNotifyRead")
1780 .await;
1781
1782 assert!(
1783 self.epoch_store
1784 .protocol_config()
1785 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1786 );
1787
1788 let consensus_commit_prologue =
1791 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1792
1793 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1796 let unsorted_ccp = self.complete_checkpoint_effects(
1797 vec![ccp_effects.clone()],
1798 &mut effects_in_current_checkpoint,
1799 )?;
1800
1801 if unsorted_ccp.len() != 1 {
1804 fatal!(
1805 "Expected 1 consensus commit prologue, got {:?}",
1806 unsorted_ccp
1807 .iter()
1808 .map(|e| e.transaction_digest())
1809 .collect::<Vec<_>>()
1810 );
1811 }
1812 assert_eq!(unsorted_ccp.len(), 1);
1813 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1814 }
1815
1816 let unsorted =
1817 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1818
1819 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1820 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1821 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1822 if cfg!(debug_assertions) {
1823 for tx in unsorted.iter() {
1825 assert!(tx.transaction_digest() != &ccp_digest);
1826 }
1827 }
1828 sorted.push(ccp_effects);
1829 }
1830 sorted.extend(CausalOrder::causal_sort(unsorted));
1831
1832 if let Some(settlement_root) = settlement_root {
1833 let last_checkpoint =
1836 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1837 let next_checkpoint_seq = last_checkpoint
1838 .as_ref()
1839 .map(|(seq, _)| *seq)
1840 .unwrap_or_default()
1841 + 1;
1842 let tx_index_offset = tx_effects.len() as u64;
1843
1844 let (tx_key, settlement_effects) = self
1845 .construct_and_execute_settlement_transactions(
1846 &sorted,
1847 pending.details.checkpoint_height,
1848 next_checkpoint_seq,
1849 tx_index_offset,
1850 )
1851 .await;
1852 debug!(?tx_key, "executed settlement transactions");
1853
1854 assert_eq!(settlement_root, tx_key);
1855
1856 effects_in_current_checkpoint
1867 .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1868 sorted.extend(settlement_effects);
1869 }
1870
1871 #[cfg(msim)]
1872 {
1873 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1875 }
1876
1877 tx_effects.extend(sorted);
1878 tx_roots.extend(root_digests);
1879 }
1880
1881 Ok((tx_effects, tx_roots))
1882 }
1883
1884 #[instrument(level = "debug", skip_all)]
1887 async fn resolve_checkpoint_transactions_v2(
1888 &self,
1889 pending: PendingCheckpointV2,
1890 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1891 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1892
1893 debug!(
1894 checkpoint_commit_height = pending.details.checkpoint_height,
1895 "Resolving checkpoint transactions for pending checkpoint.",
1896 );
1897
1898 trace!(
1899 "roots for pending checkpoint {:?}: {:?}",
1900 pending.details.checkpoint_height, pending.roots,
1901 );
1902
1903 assert!(
1904 self.epoch_store
1905 .protocol_config()
1906 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1907 );
1908
1909 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1910 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1911
1912 for checkpoint_roots in &pending.roots {
1913 let tx_roots = &checkpoint_roots.tx_roots;
1914
1915 self.metrics
1916 .checkpoint_roots_count
1917 .inc_by(tx_roots.len() as u64);
1918
1919 let root_digests = self
1920 .epoch_store
1921 .notify_read_tx_key_to_digest(tx_roots)
1922 .in_monitored_scope("CheckpointNotifyDigests")
1923 .await?;
1924
1925 all_root_digests.extend(root_digests.iter().cloned());
1926
1927 let root_effects = self
1928 .effects_store
1929 .notify_read_executed_effects(
1930 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1931 &root_digests,
1932 )
1933 .in_monitored_scope("CheckpointNotifyRead")
1934 .await;
1935 let consensus_commit_prologue =
1936 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1937
1938 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1939 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1940 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1941
1942 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1943 let checkpoint_seq = pending
1944 .details
1945 .checkpoint_seq
1946 .expect("checkpoint_seq must be set");
1947 let tx_index_offset = all_effects.len() as u64;
1948 let effects = self
1949 .resolve_settlement_effects(
1950 *settlement_key,
1951 &sorted,
1952 checkpoint_roots.height,
1953 checkpoint_seq,
1954 tx_index_offset,
1955 )
1956 .await;
1957 sorted.extend(effects);
1958 }
1959
1960 #[cfg(msim)]
1961 {
1962 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1963 }
1964
1965 all_effects.extend(sorted);
1966 }
1967 Ok((all_effects, all_root_digests.into_iter().collect()))
1968 }
1969
1970 async fn resolve_settlement_effects(
1975 &self,
1976 settlement_key: TransactionKey,
1977 sorted_root_effects: &[TransactionEffects],
1978 checkpoint_height: CheckpointHeight,
1979 checkpoint_seq: CheckpointSequenceNumber,
1980 tx_index_offset: u64,
1981 ) -> Vec<TransactionEffects> {
1982 let epoch = self.epoch_store.epoch();
1983 let accumulator_root_obj_initial_shared_version = self
1984 .epoch_store
1985 .epoch_start_config()
1986 .accumulator_root_obj_initial_shared_version()
1987 .expect("accumulator root object must exist");
1988
1989 let builder = AccumulatorSettlementTxBuilder::new(
1990 None,
1991 sorted_root_effects,
1992 checkpoint_seq,
1993 tx_index_offset,
1994 );
1995
1996 let settlement_digests: Vec<_> = builder
1997 .build_tx(
1998 self.epoch_store.protocol_config(),
1999 epoch,
2000 accumulator_root_obj_initial_shared_version,
2001 checkpoint_height,
2002 checkpoint_seq,
2003 )
2004 .into_iter()
2005 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2006 .collect();
2007
2008 debug!(
2009 ?settlement_digests,
2010 ?settlement_key,
2011 "fallback: reading settlement effects from cache"
2012 );
2013
2014 let settlement_effects = wait_for_effects_with_retry(
2015 self.effects_store.as_ref(),
2016 "CheckpointBuilder::fallback_settlement_effects",
2017 &settlement_digests,
2018 settlement_key,
2019 )
2020 .await;
2021
2022 let barrier_digest = *VerifiedTransaction::new_system_transaction(
2023 accumulators::build_accumulator_barrier_tx(
2024 epoch,
2025 accumulator_root_obj_initial_shared_version,
2026 checkpoint_height,
2027 &settlement_effects,
2028 ),
2029 )
2030 .digest();
2031
2032 let barrier_effects = wait_for_effects_with_retry(
2033 self.effects_store.as_ref(),
2034 "CheckpointBuilder::fallback_barrier_effects",
2035 &[barrier_digest],
2036 settlement_key,
2037 )
2038 .await;
2039
2040 settlement_effects
2041 .into_iter()
2042 .chain(barrier_effects)
2043 .collect()
2044 }
2045
2046 fn extract_consensus_commit_prologue(
2049 &self,
2050 root_digests: &[TransactionDigest],
2051 root_effects: &[TransactionEffects],
2052 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2053 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2054 if root_digests.is_empty() {
2055 return Ok(None);
2056 }
2057
2058 let first_tx = self
2062 .state
2063 .get_transaction_cache_reader()
2064 .get_transaction_block(&root_digests[0])
2065 .expect("Transaction block must exist");
2066
2067 Ok(first_tx
2068 .transaction_data()
2069 .is_consensus_commit_prologue()
2070 .then(|| {
2071 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2072 (*first_tx.digest(), root_effects[0].clone())
2073 }))
2074 }
2075
2076 #[instrument(level = "debug", skip_all)]
2077 async fn write_checkpoints(
2078 &mut self,
2079 height: CheckpointHeight,
2080 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2081 ) -> SuiResult {
2082 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2083 let mut batch = self.store.tables.checkpoint_content.batch();
2084 let mut all_tx_digests =
2085 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2086
2087 for (summary, contents) in &new_checkpoints {
2088 debug!(
2089 checkpoint_commit_height = height,
2090 checkpoint_seq = summary.sequence_number,
2091 contents_digest = ?contents.digest(),
2092 "writing checkpoint",
2093 );
2094
2095 if let Some(previously_computed_summary) = self
2096 .store
2097 .tables
2098 .locally_computed_checkpoints
2099 .get(&summary.sequence_number)?
2100 && previously_computed_summary.digest() != summary.digest()
2101 {
2102 fatal!(
2103 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2104 summary.sequence_number,
2105 previously_computed_summary.digest(),
2106 summary.digest()
2107 );
2108 }
2109
2110 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2111
2112 self.metrics
2113 .transactions_included_in_checkpoint
2114 .inc_by(contents.size() as u64);
2115 let sequence_number = summary.sequence_number;
2116 self.metrics
2117 .last_constructed_checkpoint
2118 .set(sequence_number as i64);
2119
2120 batch.insert_batch(
2121 &self.store.tables.checkpoint_content,
2122 [(contents.digest(), contents)],
2123 )?;
2124
2125 batch.insert_batch(
2126 &self.store.tables.locally_computed_checkpoints,
2127 [(sequence_number, summary)],
2128 )?;
2129 }
2130
2131 batch.write()?;
2132
2133 for (summary, contents) in &new_checkpoints {
2135 self.output
2136 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2137 .await?;
2138 }
2139
2140 for (local_checkpoint, _) in &new_checkpoints {
2141 if let Some(certified_checkpoint) = self
2142 .store
2143 .tables
2144 .certified_checkpoints
2145 .get(local_checkpoint.sequence_number())?
2146 {
2147 self.store
2148 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2149 }
2150 }
2151
2152 self.notify_aggregator.notify_one();
2153 self.epoch_store
2154 .process_constructed_checkpoint(height, new_checkpoints);
2155 Ok(())
2156 }
2157
2158 #[allow(clippy::type_complexity)]
2159 fn split_checkpoint_chunks(
2160 &self,
2161 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2162 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2163 ) -> CheckpointBuilderResult<
2164 Vec<
2165 Vec<(
2166 TransactionEffects,
2167 Vec<(GenericSignature, Option<SequenceNumber>)>,
2168 )>,
2169 >,
2170 > {
2171 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2172
2173 if self
2175 .epoch_store
2176 .protocol_config()
2177 .split_checkpoints_in_consensus_handler()
2178 {
2179 let chunk: Vec<_> = effects_and_transaction_sizes
2180 .into_iter()
2181 .zip(signatures)
2182 .map(|((effects, _size), sigs)| (effects, sigs))
2183 .collect();
2184 return Ok(vec![chunk]);
2185 }
2186 let mut chunks = Vec::new();
2187 let mut chunk = Vec::new();
2188 let mut chunk_size: usize = 0;
2189 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2190 .into_iter()
2191 .zip(signatures.into_iter())
2192 {
2193 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2198 bcs::serialized_size(&signatures)?
2199 } else {
2200 let signatures: Vec<&GenericSignature> =
2201 signatures.iter().map(|(s, _)| s).collect();
2202 bcs::serialized_size(&signatures)?
2203 };
2204 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2205 if chunk.len() == self.max_transactions_per_checkpoint
2206 || (chunk_size + size) > self.max_checkpoint_size_bytes
2207 {
2208 if chunk.is_empty() {
2209 warn!(
2211 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2212 self.max_checkpoint_size_bytes
2213 );
2214 } else {
2215 chunks.push(chunk);
2216 chunk = Vec::new();
2217 chunk_size = 0;
2218 }
2219 }
2220
2221 chunk.push((effects, signatures));
2222 chunk_size += size;
2223 }
2224
2225 if !chunk.is_empty() || chunks.is_empty() {
2226 chunks.push(chunk);
2231 }
2236 Ok(chunks)
2237 }
2238
2239 fn load_last_built_checkpoint_summary(
2240 epoch_store: &AuthorityPerEpochStore,
2241 store: &CheckpointStore,
2242 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2243 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2244 if last_checkpoint.is_none() {
2245 let epoch = epoch_store.epoch();
2246 if epoch > 0 {
2247 let previous_epoch = epoch - 1;
2248 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2249 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2250 if let Some((ref seq, _)) = last_checkpoint {
2251 debug!(
2252 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2253 );
2254 } else {
2255 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2257 }
2258 }
2259 }
2260 Ok(last_checkpoint)
2261 }
2262
2263 #[instrument(level = "debug", skip_all)]
2264 async fn create_checkpoints(
2265 &self,
2266 all_effects: Vec<TransactionEffects>,
2267 details: &PendingCheckpointInfo,
2268 all_roots: &HashSet<TransactionDigest>,
2269 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2270 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2271
2272 let total = all_effects.len();
2273 let mut last_checkpoint =
2274 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2275 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2276 debug!(
2277 checkpoint_commit_height = details.checkpoint_height,
2278 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2279 checkpoint_timestamp = details.timestamp_ms,
2280 "Creating checkpoint(s) for {} transactions",
2281 all_effects.len(),
2282 );
2283
2284 let all_digests: Vec<_> = all_effects
2285 .iter()
2286 .map(|effect| *effect.transaction_digest())
2287 .collect();
2288 let transactions_and_sizes = self
2289 .state
2290 .get_transaction_cache_reader()
2291 .get_transactions_and_serialized_sizes(&all_digests)?;
2292 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2293 let mut transactions = Vec::with_capacity(all_effects.len());
2294 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2295 let mut randomness_rounds = BTreeMap::new();
2296 {
2297 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2298 debug!(
2299 ?last_checkpoint_seq,
2300 "Waiting for {:?} certificates to appear in consensus",
2301 all_effects.len()
2302 );
2303
2304 for (effects, transaction_and_size) in all_effects
2305 .into_iter()
2306 .zip(transactions_and_sizes.into_iter())
2307 {
2308 let (transaction, size) = transaction_and_size
2309 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2310 match transaction.inner().transaction_data().kind() {
2311 TransactionKind::ConsensusCommitPrologue(_)
2312 | TransactionKind::ConsensusCommitPrologueV2(_)
2313 | TransactionKind::ConsensusCommitPrologueV3(_)
2314 | TransactionKind::ConsensusCommitPrologueV4(_)
2315 | TransactionKind::AuthenticatorStateUpdate(_) => {
2316 }
2319 TransactionKind::ProgrammableSystemTransaction(_) => {
2320 }
2322 TransactionKind::ChangeEpoch(_)
2323 | TransactionKind::Genesis(_)
2324 | TransactionKind::EndOfEpochTransaction(_) => {
2325 fatal!(
2326 "unexpected transaction in checkpoint effects: {:?}",
2327 transaction
2328 );
2329 }
2330 TransactionKind::RandomnessStateUpdate(rsu) => {
2331 randomness_rounds
2332 .insert(*effects.transaction_digest(), rsu.randomness_round);
2333 }
2334 TransactionKind::ProgrammableTransaction(_) => {
2335 let digest = *effects.transaction_digest();
2339 if !all_roots.contains(&digest) {
2340 transaction_keys.push(SequencedConsensusTransactionKey::External(
2341 ConsensusTransactionKey::Certificate(digest),
2342 ));
2343 }
2344 }
2345 }
2346 transactions.push(transaction);
2347 all_effects_and_transaction_sizes.push((effects, size));
2348 }
2349
2350 self.epoch_store
2351 .consensus_messages_processed_notify(transaction_keys)
2352 .await?;
2353 }
2354
2355 let signatures = self
2356 .epoch_store
2357 .user_signatures_for_checkpoint(&transactions, &all_digests);
2358 debug!(
2359 ?last_checkpoint_seq,
2360 "Received {} checkpoint user signatures from consensus",
2361 signatures.len()
2362 );
2363
2364 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2365 Some(
2366 transactions
2367 .iter()
2368 .flat_map(|tx| {
2369 if let TransactionKind::ProgrammableTransaction(ptb) =
2370 tx.transaction_data().kind()
2371 {
2372 itertools::Either::Left(
2373 ptb.commands
2374 .iter()
2375 .map(ExecutionTimeObservationKey::from_command),
2376 )
2377 } else {
2378 itertools::Either::Right(std::iter::empty())
2379 }
2380 })
2381 .collect(),
2382 )
2383 } else {
2384 None
2385 };
2386
2387 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2388 let chunks_count = chunks.len();
2389
2390 let mut checkpoints = Vec::with_capacity(chunks_count);
2391 debug!(
2392 ?last_checkpoint_seq,
2393 "Creating {} checkpoints with {} transactions", chunks_count, total,
2394 );
2395
2396 let epoch = self.epoch_store.epoch();
2397 for (index, transactions) in chunks.into_iter().enumerate() {
2398 let first_checkpoint_of_epoch = index == 0
2399 && last_checkpoint
2400 .as_ref()
2401 .map(|(_, c)| c.epoch != epoch)
2402 .unwrap_or(true);
2403 if first_checkpoint_of_epoch {
2404 self.epoch_store
2405 .record_epoch_first_checkpoint_creation_time_metric();
2406 }
2407 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2408
2409 let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2410 preassigned_seq
2411 } else {
2412 last_checkpoint
2413 .as_ref()
2414 .map(|(_, c)| c.sequence_number + 1)
2415 .unwrap_or_default()
2416 };
2417 let mut timestamp_ms = details.timestamp_ms;
2418 if let Some((_, last_checkpoint)) = &last_checkpoint
2419 && last_checkpoint.timestamp_ms > timestamp_ms
2420 {
2421 debug!(
2423 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2424 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2425 );
2426 if self
2427 .epoch_store
2428 .protocol_config()
2429 .enforce_checkpoint_timestamp_monotonicity()
2430 {
2431 timestamp_ms = last_checkpoint.timestamp_ms;
2432 }
2433 }
2434
2435 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2436 let epoch_rolling_gas_cost_summary =
2437 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2438
2439 let end_of_epoch_data = if last_checkpoint_of_epoch {
2440 let system_state_obj = self
2441 .augment_epoch_last_checkpoint(
2442 &epoch_rolling_gas_cost_summary,
2443 timestamp_ms,
2444 &mut effects,
2445 &mut signatures,
2446 sequence_number,
2447 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2448 last_checkpoint_seq.unwrap_or_default(),
2449 )
2450 .await?;
2451
2452 let committee = system_state_obj
2453 .get_current_epoch_committee()
2454 .committee()
2455 .clone();
2456
2457 let root_state_digest = {
2460 let state_acc = self
2461 .global_state_hasher
2462 .upgrade()
2463 .expect("No checkpoints should be getting built after local configuration");
2464 let acc = state_acc.accumulate_checkpoint(
2465 &effects,
2466 sequence_number,
2467 &self.epoch_store,
2468 )?;
2469
2470 state_acc
2471 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2472 .await?;
2473
2474 state_acc.accumulate_running_root(
2475 &self.epoch_store,
2476 sequence_number,
2477 Some(acc),
2478 )?;
2479 state_acc
2480 .digest_epoch(self.epoch_store.clone(), sequence_number)
2481 .await?
2482 };
2483 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2484 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2485
2486 let epoch_commitments = if self
2487 .epoch_store
2488 .protocol_config()
2489 .check_commit_root_state_digest_supported()
2490 {
2491 vec![root_state_digest.into()]
2492 } else {
2493 vec![]
2494 };
2495
2496 Some(EndOfEpochData {
2497 next_epoch_committee: committee.voting_rights,
2498 next_epoch_protocol_version: ProtocolVersion::new(
2499 system_state_obj.protocol_version(),
2500 ),
2501 epoch_commitments,
2502 })
2503 } else {
2504 self.send_to_hasher
2505 .send((sequence_number, effects.clone()))
2506 .await?;
2507
2508 None
2509 };
2510 let contents = if self.epoch_store.protocol_config().address_aliases() {
2511 CheckpointContents::new_v2(&effects, signatures)
2512 } else {
2513 CheckpointContents::new_with_digests_and_signatures(
2514 effects.iter().map(TransactionEffects::execution_digests),
2515 signatures
2516 .into_iter()
2517 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2518 .collect(),
2519 )
2520 };
2521
2522 let num_txns = contents.size() as u64;
2523
2524 let network_total_transactions = last_checkpoint
2525 .as_ref()
2526 .map(|(_, c)| c.network_total_transactions + num_txns)
2527 .unwrap_or(num_txns);
2528
2529 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2530
2531 let matching_randomness_rounds: Vec<_> = effects
2532 .iter()
2533 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2534 .copied()
2535 .collect();
2536
2537 let checkpoint_commitments = if self
2538 .epoch_store
2539 .protocol_config()
2540 .include_checkpoint_artifacts_digest_in_summary()
2541 {
2542 let artifacts = CheckpointArtifacts::from(&effects[..]);
2543 let artifacts_digest = artifacts.digest()?;
2544 vec![artifacts_digest.into()]
2545 } else {
2546 Default::default()
2547 };
2548
2549 let summary = CheckpointSummary::new(
2550 self.epoch_store.protocol_config(),
2551 epoch,
2552 sequence_number,
2553 network_total_transactions,
2554 &contents,
2555 previous_digest,
2556 epoch_rolling_gas_cost_summary,
2557 end_of_epoch_data,
2558 timestamp_ms,
2559 matching_randomness_rounds,
2560 checkpoint_commitments,
2561 );
2562 summary.report_checkpoint_age(
2563 &self.metrics.last_created_checkpoint_age,
2564 &self.metrics.last_created_checkpoint_age_ms,
2565 );
2566 if last_checkpoint_of_epoch {
2567 info!(
2568 checkpoint_seq = sequence_number,
2569 "creating last checkpoint of epoch {}", epoch
2570 );
2571 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2572 self.epoch_store
2573 .report_epoch_metrics_at_last_checkpoint(stats);
2574 }
2575 }
2576 last_checkpoint = Some((sequence_number, summary.clone()));
2577 checkpoints.push((summary, contents));
2578 }
2579
2580 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2581 }
2582
2583 fn get_epoch_total_gas_cost(
2584 &self,
2585 last_checkpoint: Option<&CheckpointSummary>,
2586 cur_checkpoint_effects: &[TransactionEffects],
2587 ) -> GasCostSummary {
2588 let (previous_epoch, previous_gas_costs) = last_checkpoint
2589 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2590 .unwrap_or_default();
2591 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2592 if previous_epoch == self.epoch_store.epoch() {
2593 GasCostSummary::new(
2595 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2596 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2597 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2598 previous_gas_costs.non_refundable_storage_fee
2599 + current_gas_costs.non_refundable_storage_fee,
2600 )
2601 } else {
2602 current_gas_costs
2603 }
2604 }
2605
2606 #[instrument(level = "error", skip_all)]
2607 async fn augment_epoch_last_checkpoint(
2608 &self,
2609 epoch_total_gas_cost: &GasCostSummary,
2610 epoch_start_timestamp_ms: CheckpointTimestamp,
2611 checkpoint_effects: &mut Vec<TransactionEffects>,
2612 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2613 checkpoint: CheckpointSequenceNumber,
2614 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2615 last_checkpoint: CheckpointSequenceNumber,
2618 ) -> CheckpointBuilderResult<SuiSystemState> {
2619 let (system_state, effects) = self
2620 .state
2621 .create_and_execute_advance_epoch_tx(
2622 &self.epoch_store,
2623 epoch_total_gas_cost,
2624 checkpoint,
2625 epoch_start_timestamp_ms,
2626 end_of_epoch_observation_keys,
2627 last_checkpoint,
2628 )
2629 .await?;
2630 checkpoint_effects.push(effects);
2631 signatures.push(vec![]);
2632 Ok(system_state)
2633 }
2634
2635 #[instrument(level = "debug", skip_all)]
2642 fn complete_checkpoint_effects(
2643 &self,
2644 mut roots: Vec<TransactionEffects>,
2645 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2646 ) -> SuiResult<Vec<TransactionEffects>> {
2647 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2648 let mut results = vec![];
2649 let mut seen = HashSet::new();
2650 loop {
2651 let mut pending = HashSet::new();
2652
2653 let transactions_included = self
2654 .epoch_store
2655 .builder_included_transactions_in_checkpoint(
2656 roots.iter().map(|e| e.transaction_digest()),
2657 )?;
2658
2659 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2660 let digest = effect.transaction_digest();
2661 seen.insert(*digest);
2663
2664 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2666 continue;
2667 }
2668
2669 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2671 continue;
2672 }
2673
2674 let existing_effects = self
2675 .epoch_store
2676 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2677
2678 for (dependency, effects_signature_exists) in
2679 effect.dependencies().iter().zip(existing_effects.iter())
2680 {
2681 if !effects_signature_exists {
2686 continue;
2687 }
2688 if seen.insert(*dependency) {
2689 pending.insert(*dependency);
2690 }
2691 }
2692 results.push(effect);
2693 }
2694 if pending.is_empty() {
2695 break;
2696 }
2697 let pending = pending.into_iter().collect::<Vec<_>>();
2698 let effects = self.effects_store.multi_get_executed_effects(&pending);
2699 let effects = effects
2700 .into_iter()
2701 .zip(pending)
2702 .map(|(opt, digest)| match opt {
2703 Some(x) => x,
2704 None => panic!(
2705 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2706 digest
2707 ),
2708 })
2709 .collect::<Vec<_>>();
2710 roots = effects;
2711 }
2712
2713 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2714 Ok(results)
2715 }
2716
2717 #[cfg(msim)]
2720 fn expensive_consensus_commit_prologue_invariants_check(
2721 &self,
2722 root_digests: &[TransactionDigest],
2723 sorted: &[TransactionEffects],
2724 ) {
2725 let root_txs = self
2727 .state
2728 .get_transaction_cache_reader()
2729 .multi_get_transaction_blocks(root_digests);
2730 let ccps = root_txs
2731 .iter()
2732 .filter_map(|tx| {
2733 if let Some(tx) = tx {
2734 if tx.transaction_data().is_consensus_commit_prologue() {
2735 Some(tx)
2736 } else {
2737 None
2738 }
2739 } else {
2740 None
2741 }
2742 })
2743 .collect::<Vec<_>>();
2744
2745 assert!(ccps.len() <= 1);
2747
2748 let txs = self
2750 .state
2751 .get_transaction_cache_reader()
2752 .multi_get_transaction_blocks(
2753 &sorted
2754 .iter()
2755 .map(|tx| tx.transaction_digest().clone())
2756 .collect::<Vec<_>>(),
2757 );
2758
2759 if ccps.len() == 0 {
2760 for tx in txs.iter() {
2763 if let Some(tx) = tx {
2764 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2765 }
2766 }
2767 } else {
2768 assert!(
2770 txs[0]
2771 .as_ref()
2772 .unwrap()
2773 .transaction_data()
2774 .is_consensus_commit_prologue()
2775 );
2776
2777 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2778
2779 for tx in txs.iter().skip(1) {
2780 if let Some(tx) = tx {
2781 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2782 }
2783 }
2784 }
2785 }
2786}
2787
2788async fn wait_for_effects_with_retry(
2789 effects_store: &dyn TransactionCacheRead,
2790 task_name: &'static str,
2791 digests: &[TransactionDigest],
2792 tx_key: TransactionKey,
2793) -> Vec<TransactionEffects> {
2794 let delay = if in_antithesis() {
2795 15
2797 } else {
2798 5
2799 };
2800 loop {
2801 match tokio::time::timeout(Duration::from_secs(delay), async {
2802 effects_store
2803 .notify_read_executed_effects(task_name, digests)
2804 .await
2805 })
2806 .await
2807 {
2808 Ok(effects) => break effects,
2809 Err(_) => {
2810 debug_fatal!(
2811 "Timeout waiting for transactions to be executed {:?}, retrying...",
2812 tx_key
2813 );
2814 }
2815 }
2816 }
2817}
2818
2819impl CheckpointAggregator {
2820 fn new(
2821 tables: Arc<CheckpointStore>,
2822 epoch_store: Arc<AuthorityPerEpochStore>,
2823 notify: Arc<Notify>,
2824 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2825 output: Box<dyn CertifiedCheckpointOutput>,
2826 state: Arc<AuthorityState>,
2827 metrics: Arc<CheckpointMetrics>,
2828 ) -> Self {
2829 Self {
2830 store: tables,
2831 epoch_store,
2832 notify,
2833 receiver,
2834 pending: BTreeMap::new(),
2835 current: None,
2836 output,
2837 state,
2838 metrics,
2839 }
2840 }
2841
2842 async fn run(mut self) {
2843 info!("Starting CheckpointAggregator");
2844 loop {
2845 while let Ok(sig) = self.receiver.try_recv() {
2847 self.pending
2848 .entry(sig.summary.sequence_number)
2849 .or_default()
2850 .push(sig);
2851 }
2852
2853 if let Err(e) = self.run_and_notify().await {
2854 error!(
2855 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2856 e
2857 );
2858 self.metrics.checkpoint_errors.inc();
2859 tokio::time::sleep(Duration::from_secs(1)).await;
2860 continue;
2861 }
2862
2863 tokio::select! {
2864 Some(sig) = self.receiver.recv() => {
2865 self.pending
2866 .entry(sig.summary.sequence_number)
2867 .or_default()
2868 .push(sig);
2869 }
2870 _ = self.notify.notified() => {}
2871 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2872 }
2873 }
2874 }
2875
2876 async fn run_and_notify(&mut self) -> SuiResult {
2877 let summaries = self.run_inner()?;
2878 for summary in summaries {
2879 self.output.certified_checkpoint_created(&summary).await?;
2880 }
2881 Ok(())
2882 }
2883
2884 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2885 let _scope = monitored_scope("CheckpointAggregator");
2886 let mut result = vec![];
2887 'outer: loop {
2888 let next_to_certify = self.next_checkpoint_to_certify()?;
2889 self.pending.retain(|&seq, _| seq >= next_to_certify);
2892 let current = if let Some(current) = &mut self.current {
2893 if current.summary.sequence_number < next_to_certify {
2899 assert_reachable!("skip checkpoint certification");
2900 self.current = None;
2901 continue;
2902 }
2903 current
2904 } else {
2905 let Some(summary) = self
2906 .epoch_store
2907 .get_built_checkpoint_summary(next_to_certify)?
2908 else {
2909 return Ok(result);
2910 };
2911 self.current = Some(CheckpointSignatureAggregator {
2912 digest: summary.digest(),
2913 summary,
2914 signatures_by_digest: MultiStakeAggregator::new(
2915 self.epoch_store.committee().clone(),
2916 ),
2917 store: self.store.clone(),
2918 state: self.state.clone(),
2919 metrics: self.metrics.clone(),
2920 });
2921 self.current.as_mut().unwrap()
2922 };
2923
2924 let seq = current.summary.sequence_number;
2925 let sigs = self.pending.remove(&seq).unwrap_or_default();
2926 if sigs.is_empty() {
2927 trace!(
2928 checkpoint_seq =? seq,
2929 "Not enough checkpoint signatures",
2930 );
2931 return Ok(result);
2932 }
2933 for data in sigs {
2934 trace!(
2935 checkpoint_seq = seq,
2936 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2937 current.summary.digest(),
2938 data.summary.auth_sig().authority.concise()
2939 );
2940 self.metrics
2941 .checkpoint_participation
2942 .with_label_values(&[&format!(
2943 "{:?}",
2944 data.summary.auth_sig().authority.concise()
2945 )])
2946 .inc();
2947 if let Ok(auth_signature) = current.try_aggregate(data) {
2948 debug!(
2949 checkpoint_seq = seq,
2950 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2951 current.summary.digest(),
2952 );
2953 let summary = VerifiedCheckpoint::new_unchecked(
2954 CertifiedCheckpointSummary::new_from_data_and_sig(
2955 current.summary.clone(),
2956 auth_signature,
2957 ),
2958 );
2959
2960 self.store.insert_certified_checkpoint(&summary)?;
2961 self.metrics.last_certified_checkpoint.set(seq as i64);
2962 current.summary.report_checkpoint_age(
2963 &self.metrics.last_certified_checkpoint_age,
2964 &self.metrics.last_certified_checkpoint_age_ms,
2965 );
2966 result.push(summary.into_inner());
2967 self.current = None;
2968 continue 'outer;
2969 }
2970 }
2971 break;
2972 }
2973 Ok(result)
2974 }
2975
2976 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2977 Ok(self
2978 .store
2979 .tables
2980 .certified_checkpoints
2981 .reversed_safe_iter_with_bounds(None, None)?
2982 .next()
2983 .transpose()?
2984 .map(|(seq, _)| seq + 1)
2985 .unwrap_or_default())
2986 }
2987}
2988
2989impl CheckpointSignatureAggregator {
2990 #[allow(clippy::result_unit_err)]
2991 pub fn try_aggregate(
2992 &mut self,
2993 data: CheckpointSignatureMessage,
2994 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2995 let their_digest = *data.summary.digest();
2996 let (_, signature) = data.summary.into_data_and_sig();
2997 let author = signature.authority;
2998 let envelope =
2999 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3000 match self.signatures_by_digest.insert(their_digest, envelope) {
3001 InsertResult::Failed { error }
3003 if matches!(
3004 error.as_inner(),
3005 SuiErrorKind::StakeAggregatorRepeatedSigner {
3006 conflicting_sig: false,
3007 ..
3008 },
3009 ) =>
3010 {
3011 Err(())
3012 }
3013 InsertResult::Failed { error } => {
3014 warn!(
3015 checkpoint_seq = self.summary.sequence_number,
3016 "Failed to aggregate new signature from validator {:?}: {:?}",
3017 author.concise(),
3018 error
3019 );
3020 self.check_for_split_brain();
3021 Err(())
3022 }
3023 InsertResult::QuorumReached(cert) => {
3024 if their_digest != self.digest {
3027 self.metrics.remote_checkpoint_forks.inc();
3028 warn!(
3029 checkpoint_seq = self.summary.sequence_number,
3030 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3031 author.concise(),
3032 their_digest,
3033 self.digest
3034 );
3035 return Err(());
3036 }
3037 Ok(cert)
3038 }
3039 InsertResult::NotEnoughVotes {
3040 bad_votes: _,
3041 bad_authorities: _,
3042 } => {
3043 self.check_for_split_brain();
3044 Err(())
3045 }
3046 }
3047 }
3048
3049 fn check_for_split_brain(&self) {
3053 debug!(
3054 checkpoint_seq = self.summary.sequence_number,
3055 "Checking for split brain condition"
3056 );
3057 if self.signatures_by_digest.quorum_unreachable() {
3058 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3064 let digests_by_stake_messages = all_unique_values
3065 .iter()
3066 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3067 .map(|(digest, (_authorities, total_stake))| {
3068 format!("{:?} (total stake: {})", digest, total_stake)
3069 })
3070 .collect::<Vec<String>>();
3071 fail_point_arg!("kill_split_brain_node", |(
3072 checkpoint_overrides,
3073 forked_authorities,
3074 ): (
3075 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3076 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3077 )| {
3078 #[cfg(msim)]
3079 {
3080 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3081 (checkpoint_overrides.lock(), forked_authorities.lock())
3082 {
3083 let correct_digest = all_unique_values
3085 .iter()
3086 .find(|(_, (authorities, _))| {
3087 authorities
3089 .iter()
3090 .any(|auth| !forked_authorities_set.contains(auth))
3091 })
3092 .map(|(digest, _)| digest.to_string())
3093 .unwrap_or_else(|| {
3094 all_unique_values
3096 .iter()
3097 .max_by_key(|(_, (_, stake))| *stake)
3098 .map(|(digest, _)| digest.to_string())
3099 .unwrap_or_else(|| self.digest.to_string())
3100 });
3101
3102 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3103
3104 tracing::error!(
3105 fatal = true,
3106 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3107 self.summary.sequence_number,
3108 correct_digest
3109 );
3110 }
3111 }
3112 });
3113
3114 debug_fatal!(
3115 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3116 self.summary.sequence_number,
3117 self.signatures_by_digest.uncommitted_stake(),
3118 digests_by_stake_messages
3119 );
3120 self.metrics.split_brain_checkpoint_forks.inc();
3121
3122 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3123 let local_summary = self.summary.clone();
3124 let state = self.state.clone();
3125 let tables = self.store.clone();
3126
3127 tokio::spawn(async move {
3128 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3129 });
3130 }
3131 }
3132}
3133
3134async fn diagnose_split_brain(
3140 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3141 local_summary: CheckpointSummary,
3142 state: Arc<AuthorityState>,
3143 tables: Arc<CheckpointStore>,
3144) {
3145 debug!(
3146 checkpoint_seq = local_summary.sequence_number,
3147 "Running split brain diagnostics..."
3148 );
3149 let time = SystemTime::now();
3150 let digest_to_validator = all_unique_values
3152 .iter()
3153 .filter_map(|(digest, (validators, _))| {
3154 if *digest != local_summary.digest() {
3155 let random_validator = validators.choose(&mut get_rng()).unwrap();
3156 Some((*digest, *random_validator))
3157 } else {
3158 None
3159 }
3160 })
3161 .collect::<HashMap<_, _>>();
3162 if digest_to_validator.is_empty() {
3163 panic!(
3164 "Given split brain condition, there should be at \
3165 least one validator that disagrees with local signature"
3166 );
3167 }
3168
3169 let epoch_store = state.load_epoch_store_one_call_per_task();
3170 let committee = epoch_store
3171 .epoch_start_state()
3172 .get_sui_committee_with_network_metadata();
3173 let network_config = default_mysten_network_config();
3174 let network_clients =
3175 make_network_authority_clients_with_network_config(&committee, &network_config);
3176
3177 let response_futures = digest_to_validator
3179 .values()
3180 .cloned()
3181 .map(|validator| {
3182 let client = network_clients
3183 .get(&validator)
3184 .expect("Failed to get network client");
3185 let request = CheckpointRequestV2 {
3186 sequence_number: Some(local_summary.sequence_number),
3187 request_content: true,
3188 certified: false,
3189 };
3190 client.handle_checkpoint_v2(request)
3191 })
3192 .collect::<Vec<_>>();
3193
3194 let digest_name_pair = digest_to_validator.iter();
3195 let response_data = futures::future::join_all(response_futures)
3196 .await
3197 .into_iter()
3198 .zip(digest_name_pair)
3199 .filter_map(|(response, (digest, name))| match response {
3200 Ok(response) => match response {
3201 CheckpointResponseV2 {
3202 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3203 contents: Some(contents),
3204 } => Some((*name, *digest, summary, contents)),
3205 CheckpointResponseV2 {
3206 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3207 contents: _,
3208 } => {
3209 panic!("Expected pending checkpoint, but got certified checkpoint");
3210 }
3211 CheckpointResponseV2 {
3212 checkpoint: None,
3213 contents: _,
3214 } => {
3215 error!(
3216 "Summary for checkpoint {:?} not found on validator {:?}",
3217 local_summary.sequence_number, name
3218 );
3219 None
3220 }
3221 CheckpointResponseV2 {
3222 checkpoint: _,
3223 contents: None,
3224 } => {
3225 error!(
3226 "Contents for checkpoint {:?} not found on validator {:?}",
3227 local_summary.sequence_number, name
3228 );
3229 None
3230 }
3231 },
3232 Err(e) => {
3233 error!(
3234 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3235 e
3236 );
3237 None
3238 }
3239 })
3240 .collect::<Vec<_>>();
3241
3242 let local_checkpoint_contents = tables
3243 .get_checkpoint_contents(&local_summary.content_digest)
3244 .unwrap_or_else(|_| {
3245 panic!(
3246 "Could not find checkpoint contents for digest {:?}",
3247 local_summary.digest()
3248 )
3249 })
3250 .unwrap_or_else(|| {
3251 panic!(
3252 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3253 local_summary.sequence_number,
3254 local_summary.digest()
3255 )
3256 });
3257 let local_contents_text = format!("{local_checkpoint_contents:?}");
3258
3259 let local_summary_text = format!("{local_summary:?}");
3260 let local_validator = state.name.concise();
3261 let diff_patches = response_data
3262 .iter()
3263 .map(|(name, other_digest, other_summary, contents)| {
3264 let other_contents_text = format!("{contents:?}");
3265 let other_summary_text = format!("{other_summary:?}");
3266 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3267 .enumerate_transactions(&local_summary)
3268 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3269 .unzip();
3270 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3271 .enumerate_transactions(other_summary)
3272 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3273 .unzip();
3274 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3275 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3276 let local_transactions_text = format!("{local_transactions:#?}");
3277 let other_transactions_text = format!("{other_transactions:#?}");
3278 let transactions_patch =
3279 create_patch(&local_transactions_text, &other_transactions_text);
3280 let local_effects_text = format!("{local_effects:#?}");
3281 let other_effects_text = format!("{other_effects:#?}");
3282 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3283 let seq_number = local_summary.sequence_number;
3284 let local_digest = local_summary.digest();
3285 let other_validator = name.concise();
3286 format!(
3287 "Checkpoint: {seq_number:?}\n\
3288 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3289 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3290 Summary Diff: \n{summary_patch}\n\n\
3291 Contents Diff: \n{contents_patch}\n\n\
3292 Transactions Diff: \n{transactions_patch}\n\n\
3293 Effects Diff: \n{effects_patch}",
3294 )
3295 })
3296 .collect::<Vec<_>>()
3297 .join("\n\n\n");
3298
3299 let header = format!(
3300 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3301 Datetime: {:?}",
3302 time
3303 );
3304 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3305 let path = tempfile::tempdir()
3306 .expect("Failed to create tempdir")
3307 .keep()
3308 .join(Path::new("checkpoint_fork_dump.txt"));
3309 let mut file = File::create(path).unwrap();
3310 write!(file, "{}", fork_logs_text).unwrap();
3311 debug!("{}", fork_logs_text);
3312}
3313
3314pub trait CheckpointServiceNotify {
3315 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3316
3317 fn notify_checkpoint(&self) -> SuiResult;
3318}
3319
3320#[allow(clippy::large_enum_variant)]
3321enum CheckpointServiceState {
3322 Unstarted(
3323 (
3324 CheckpointBuilder,
3325 CheckpointAggregator,
3326 CheckpointStateHasher,
3327 ),
3328 ),
3329 Started,
3330}
3331
3332impl CheckpointServiceState {
3333 fn take_unstarted(
3334 &mut self,
3335 ) -> (
3336 CheckpointBuilder,
3337 CheckpointAggregator,
3338 CheckpointStateHasher,
3339 ) {
3340 let mut state = CheckpointServiceState::Started;
3341 std::mem::swap(self, &mut state);
3342
3343 match state {
3344 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3345 (builder, aggregator, hasher)
3346 }
3347 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3348 }
3349 }
3350}
3351
3352pub struct CheckpointService {
3353 tables: Arc<CheckpointStore>,
3354 notify_builder: Arc<Notify>,
3355 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3356 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3358 highest_previously_built_seq: CheckpointSequenceNumber,
3361 metrics: Arc<CheckpointMetrics>,
3362 state: Mutex<CheckpointServiceState>,
3363}
3364
3365impl CheckpointService {
3366 #[allow(clippy::disallowed_methods)]
3372 pub fn build(
3373 state: Arc<AuthorityState>,
3374 checkpoint_store: Arc<CheckpointStore>,
3375 epoch_store: Arc<AuthorityPerEpochStore>,
3376 effects_store: Arc<dyn TransactionCacheRead>,
3377 global_state_hasher: Weak<GlobalStateHasher>,
3378 checkpoint_output: Box<dyn CheckpointOutput>,
3379 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3380 metrics: Arc<CheckpointMetrics>,
3381 max_transactions_per_checkpoint: usize,
3382 max_checkpoint_size_bytes: usize,
3383 ) -> Arc<Self> {
3384 info!(
3385 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3386 );
3387 let notify_builder = Arc::new(Notify::new());
3388 let notify_aggregator = Arc::new(Notify::new());
3389
3390 let highest_previously_built_seq = checkpoint_store
3392 .get_latest_locally_computed_checkpoint()
3393 .expect("failed to get latest locally computed checkpoint")
3394 .map(|s| s.sequence_number)
3395 .unwrap_or(0);
3396
3397 let highest_currently_built_seq =
3398 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3399 .expect("epoch should not have ended")
3400 .map(|(seq, _)| seq)
3401 .unwrap_or(0);
3402
3403 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3404
3405 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3406
3407 let aggregator = CheckpointAggregator::new(
3408 checkpoint_store.clone(),
3409 epoch_store.clone(),
3410 notify_aggregator.clone(),
3411 signature_receiver,
3412 certified_checkpoint_output,
3413 state.clone(),
3414 metrics.clone(),
3415 );
3416
3417 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3418
3419 let ckpt_state_hasher = CheckpointStateHasher::new(
3420 epoch_store.clone(),
3421 global_state_hasher.clone(),
3422 receive_from_builder,
3423 );
3424
3425 let builder = CheckpointBuilder::new(
3426 state.clone(),
3427 checkpoint_store.clone(),
3428 epoch_store.clone(),
3429 notify_builder.clone(),
3430 effects_store,
3431 global_state_hasher,
3432 send_to_hasher,
3433 checkpoint_output,
3434 notify_aggregator.clone(),
3435 highest_currently_built_seq_tx.clone(),
3436 metrics.clone(),
3437 max_transactions_per_checkpoint,
3438 max_checkpoint_size_bytes,
3439 );
3440
3441 Arc::new(Self {
3442 tables: checkpoint_store,
3443 notify_builder,
3444 signature_sender,
3445 highest_currently_built_seq_tx,
3446 highest_previously_built_seq,
3447 metrics,
3448 state: Mutex::new(CheckpointServiceState::Unstarted((
3449 builder,
3450 aggregator,
3451 ckpt_state_hasher,
3452 ))),
3453 })
3454 }
3455
3456 pub async fn spawn(
3464 &self,
3465 epoch_store: Arc<AuthorityPerEpochStore>,
3466 consensus_replay_waiter: Option<ReplayWaiter>,
3467 ) {
3468 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3469
3470 let last_persisted_builder_seq = epoch_store
3480 .last_persisted_checkpoint_builder_summary()
3481 .expect("epoch should not have ended")
3482 .map(|s| s.summary.sequence_number);
3483
3484 let last_executed_seq = self
3485 .tables
3486 .get_highest_executed_checkpoint()
3487 .expect("Failed to get highest executed checkpoint")
3488 .map(|checkpoint| *checkpoint.sequence_number());
3489
3490 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3491 if let Err(e) = builder
3492 .epoch_store
3493 .clear_state_hashes_after_checkpoint(last_committed_seq)
3494 {
3495 error!(
3496 "Failed to clear state hashes after checkpoint {}: {:?}",
3497 last_committed_seq, e
3498 );
3499 } else {
3500 info!(
3501 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3502 last_committed_seq
3503 );
3504 }
3505 }
3506
3507 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3508
3509 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3510 let aggregator_task = spawn_monitored_task!(aggregator.run());
3511
3512 spawn_monitored_task!(async move {
3513 epoch_store
3514 .within_alive_epoch(async move {
3515 builder.run(consensus_replay_waiter).await;
3516 builder_finished_tx.send(()).ok();
3517 })
3518 .await
3519 .ok();
3520
3521 state_hasher_task
3523 .await
3524 .expect("state hasher should exit normally");
3525
3526 aggregator_task.abort();
3529 aggregator_task.await.ok();
3530 });
3531
3532 if tokio::time::timeout(Duration::from_secs(120), async move {
3538 tokio::select! {
3539 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3540 _ = self.wait_for_rebuilt_checkpoints() => (),
3541 }
3542 })
3543 .await
3544 .is_err()
3545 {
3546 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3547 }
3548 }
3549}
3550
3551impl CheckpointService {
3552 pub async fn wait_for_rebuilt_checkpoints(&self) {
3558 let highest_previously_built_seq = self.highest_previously_built_seq;
3559 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3560 let mut highest_currently_built_seq = *rx.borrow_and_update();
3561 info!(
3562 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3563 );
3564 loop {
3565 if highest_currently_built_seq >= highest_previously_built_seq {
3566 info!("Checkpoint rebuild complete");
3567 break;
3568 }
3569 rx.changed().await.unwrap();
3570 highest_currently_built_seq = *rx.borrow_and_update();
3571 }
3572 }
3573
3574 #[cfg(test)]
3575 fn write_and_notify_checkpoint_for_testing(
3576 &self,
3577 epoch_store: &AuthorityPerEpochStore,
3578 checkpoint: PendingCheckpoint,
3579 ) -> SuiResult {
3580 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3581
3582 let mut output = ConsensusCommitOutput::new(0);
3583 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3584 output.set_default_commit_stats_for_testing();
3585 epoch_store.push_consensus_output_for_tests(output);
3586 self.notify_checkpoint()?;
3587 Ok(())
3588 }
3589}
3590
3591impl CheckpointServiceNotify for CheckpointService {
3592 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3593 let sequence = info.summary.sequence_number;
3594 let signer = info.summary.auth_sig().authority.concise();
3595
3596 if let Some(highest_verified_checkpoint) = self
3597 .tables
3598 .get_highest_verified_checkpoint()?
3599 .map(|x| *x.sequence_number())
3600 && sequence <= highest_verified_checkpoint
3601 {
3602 trace!(
3603 checkpoint_seq = sequence,
3604 "Ignore checkpoint signature from {} - already certified", signer,
3605 );
3606 self.metrics
3607 .last_ignored_checkpoint_signature_received
3608 .set(sequence as i64);
3609 return Ok(());
3610 }
3611 trace!(
3612 checkpoint_seq = sequence,
3613 "Received checkpoint signature, digest {} from {}",
3614 info.summary.digest(),
3615 signer,
3616 );
3617 self.metrics
3618 .last_received_checkpoint_signatures
3619 .with_label_values(&[&signer.to_string()])
3620 .set(sequence as i64);
3621 self.signature_sender.send(info.clone()).ok();
3622 Ok(())
3623 }
3624
3625 fn notify_checkpoint(&self) -> SuiResult {
3626 self.notify_builder.notify_one();
3627 Ok(())
3628 }
3629}
3630
3631pub struct CheckpointServiceNoop {}
3633impl CheckpointServiceNotify for CheckpointServiceNoop {
3634 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3635 Ok(())
3636 }
3637
3638 fn notify_checkpoint(&self) -> SuiResult {
3639 Ok(())
3640 }
3641}
3642
3643impl PendingCheckpoint {
3644 pub fn height(&self) -> CheckpointHeight {
3645 self.details.checkpoint_height
3646 }
3647
3648 pub fn roots(&self) -> &Vec<TransactionKey> {
3649 &self.roots
3650 }
3651
3652 pub fn details(&self) -> &PendingCheckpointInfo {
3653 &self.details
3654 }
3655}
3656
3657impl PendingCheckpointV2 {
3658 pub fn height(&self) -> CheckpointHeight {
3659 self.details.checkpoint_height
3660 }
3661
3662 pub(crate) fn num_roots(&self) -> usize {
3663 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3664 }
3665}
3666
3667pin_project! {
3668 pub struct PollCounter<Fut> {
3669 #[pin]
3670 future: Fut,
3671 count: usize,
3672 }
3673}
3674
3675impl<Fut> PollCounter<Fut> {
3676 pub fn new(future: Fut) -> Self {
3677 Self { future, count: 0 }
3678 }
3679
3680 pub fn count(&self) -> usize {
3681 self.count
3682 }
3683}
3684
3685impl<Fut: Future> Future for PollCounter<Fut> {
3686 type Output = (usize, Fut::Output);
3687
3688 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3689 let this = self.project();
3690 *this.count += 1;
3691 match this.future.poll(cx) {
3692 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3693 Poll::Pending => Poll::Pending,
3694 }
3695 }
3696}
3697
3698fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3699 PollCounter::new(future)
3700}
3701
3702#[cfg(test)]
3703mod tests {
3704 use super::*;
3705 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3706 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3707 use futures::FutureExt as _;
3708 use futures::future::BoxFuture;
3709 use std::collections::HashMap;
3710 use std::ops::Deref;
3711 use sui_macros::sim_test;
3712 use sui_protocol_config::{Chain, ProtocolConfig};
3713 use sui_types::accumulator_event::AccumulatorEvent;
3714 use sui_types::authenticator_state::ActiveJwk;
3715 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3716 use sui_types::crypto::Signature;
3717 use sui_types::effects::{TransactionEffects, TransactionEvents};
3718 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3719 use sui_types::transaction::VerifiedTransaction;
3720 use tokio::sync::mpsc;
3721
3722 #[tokio::test]
3723 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3724 let store = CheckpointStore::new_for_tests();
3725 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3726 for seq in 70u64..=80u64 {
3727 let contents =
3728 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3729 [sui_types::base_types::ExecutionDigests::new(
3730 sui_types::digests::TransactionDigest::random(),
3731 sui_types::digests::TransactionEffectsDigest::ZERO,
3732 )],
3733 );
3734 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3735 &protocol,
3736 0,
3737 seq,
3738 0,
3739 &contents,
3740 None,
3741 sui_types::gas::GasCostSummary::default(),
3742 None,
3743 0,
3744 Vec::new(),
3745 Vec::new(),
3746 );
3747 store
3748 .tables
3749 .locally_computed_checkpoints
3750 .insert(&seq, &summary)
3751 .unwrap();
3752 }
3753
3754 store
3755 .clear_locally_computed_checkpoints_from(76)
3756 .expect("clear should succeed");
3757
3758 assert!(
3760 store
3761 .tables
3762 .locally_computed_checkpoints
3763 .get(&75)
3764 .unwrap()
3765 .is_some()
3766 );
3767 assert!(
3768 store
3769 .tables
3770 .locally_computed_checkpoints
3771 .get(&76)
3772 .unwrap()
3773 .is_none()
3774 );
3775
3776 for seq in 70u64..76u64 {
3777 assert!(
3778 store
3779 .tables
3780 .locally_computed_checkpoints
3781 .get(&seq)
3782 .unwrap()
3783 .is_some()
3784 );
3785 }
3786 for seq in 76u64..=80u64 {
3787 assert!(
3788 store
3789 .tables
3790 .locally_computed_checkpoints
3791 .get(&seq)
3792 .unwrap()
3793 .is_none()
3794 );
3795 }
3796 }
3797
3798 #[tokio::test]
3799 async fn test_fork_detection_storage() {
3800 let store = CheckpointStore::new_for_tests();
3801 let seq_num = 42;
3803 let digest = CheckpointDigest::random();
3804
3805 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3806
3807 store
3808 .record_checkpoint_fork_detected(seq_num, digest)
3809 .unwrap();
3810
3811 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3812 assert!(retrieved.is_some());
3813 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3814 assert_eq!(retrieved_seq, seq_num);
3815 assert_eq!(retrieved_digest, digest);
3816
3817 store.clear_checkpoint_fork_detected().unwrap();
3818 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3819
3820 let tx_digest = TransactionDigest::random();
3822 let expected_effects = TransactionEffectsDigest::random();
3823 let actual_effects = TransactionEffectsDigest::random();
3824
3825 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3826
3827 store
3828 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3829 .unwrap();
3830
3831 let retrieved = store.get_transaction_fork_detected().unwrap();
3832 assert!(retrieved.is_some());
3833 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3834 assert_eq!(retrieved_tx, tx_digest);
3835 assert_eq!(retrieved_expected, expected_effects);
3836 assert_eq!(retrieved_actual, actual_effects);
3837
3838 store.clear_transaction_fork_detected().unwrap();
3839 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3840 }
3841
3842 #[sim_test]
3843 pub async fn checkpoint_builder_test() {
3844 telemetry_subscribers::init_for_testing();
3845
3846 let mut protocol_config =
3847 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3848 protocol_config.disable_accumulators_for_testing();
3849 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3850 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3851 let state = TestAuthorityBuilder::new()
3852 .with_protocol_config(protocol_config)
3853 .build()
3854 .await;
3855
3856 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3857 0,
3858 0,
3859 vec![],
3860 SequenceNumber::new(),
3861 );
3862
3863 let jwks = {
3864 let mut jwks = Vec::new();
3865 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3866 jwks.push(ActiveJwk {
3867 jwk_id: JwkId::new(
3868 "https://accounts.google.com".to_string(),
3869 "1234567890".to_string(),
3870 ),
3871 jwk: JWK {
3872 kty: "RSA".to_string(),
3873 e: "AQAB".to_string(),
3874 n: "1234567890".to_string(),
3875 alg: "RS256".to_string(),
3876 },
3877 epoch: 0,
3878 });
3879 }
3880 jwks
3881 };
3882
3883 let dummy_tx_with_data =
3884 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3885
3886 for i in 0..15 {
3887 state
3888 .database_for_testing()
3889 .perpetual_tables
3890 .transactions
3891 .insert(&d(i), dummy_tx.serializable_ref())
3892 .unwrap();
3893 }
3894 for i in 15..20 {
3895 state
3896 .database_for_testing()
3897 .perpetual_tables
3898 .transactions
3899 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3900 .unwrap();
3901 }
3902
3903 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3904 commit_cert_for_test(
3905 &mut store,
3906 state.clone(),
3907 d(1),
3908 vec![d(2), d(3)],
3909 GasCostSummary::new(11, 12, 11, 1),
3910 );
3911 commit_cert_for_test(
3912 &mut store,
3913 state.clone(),
3914 d(2),
3915 vec![d(3), d(4)],
3916 GasCostSummary::new(21, 22, 21, 1),
3917 );
3918 commit_cert_for_test(
3919 &mut store,
3920 state.clone(),
3921 d(3),
3922 vec![],
3923 GasCostSummary::new(31, 32, 31, 1),
3924 );
3925 commit_cert_for_test(
3926 &mut store,
3927 state.clone(),
3928 d(4),
3929 vec![],
3930 GasCostSummary::new(41, 42, 41, 1),
3931 );
3932 for i in [5, 6, 7, 10, 11, 12, 13] {
3933 commit_cert_for_test(
3934 &mut store,
3935 state.clone(),
3936 d(i),
3937 vec![],
3938 GasCostSummary::new(41, 42, 41, 1),
3939 );
3940 }
3941 for i in [15, 16, 17] {
3942 commit_cert_for_test(
3943 &mut store,
3944 state.clone(),
3945 d(i),
3946 vec![],
3947 GasCostSummary::new(51, 52, 51, 1),
3948 );
3949 }
3950 let all_digests: Vec<_> = store.keys().copied().collect();
3951 for digest in all_digests {
3952 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3953 state
3954 .epoch_store_for_testing()
3955 .test_insert_user_signature(digest, vec![(signature, None)]);
3956 }
3957
3958 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3959 let (certified_output, mut certified_result) =
3960 mpsc::channel::<CertifiedCheckpointSummary>(10);
3961 let store = Arc::new(store);
3962
3963 let ckpt_dir = tempfile::tempdir().unwrap();
3964 let checkpoint_store =
3965 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3966 let epoch_store = state.epoch_store_for_testing();
3967
3968 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3969 state.get_global_state_hash_store().clone(),
3970 ));
3971
3972 let checkpoint_service = CheckpointService::build(
3973 state.clone(),
3974 checkpoint_store,
3975 epoch_store.clone(),
3976 store,
3977 Arc::downgrade(&global_state_hasher),
3978 Box::new(output),
3979 Box::new(certified_output),
3980 CheckpointMetrics::new_for_tests(),
3981 3,
3982 100_000,
3983 );
3984 checkpoint_service.spawn(epoch_store.clone(), None).await;
3985
3986 checkpoint_service
3987 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3988 .unwrap();
3989 checkpoint_service
3990 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3991 .unwrap();
3992 checkpoint_service
3993 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3994 .unwrap();
3995 checkpoint_service
3996 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3997 .unwrap();
3998 checkpoint_service
3999 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4000 .unwrap();
4001 checkpoint_service
4002 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4003 .unwrap();
4004
4005 let (c1c, c1s) = result.recv().await.unwrap();
4006 let (c2c, c2s) = result.recv().await.unwrap();
4007
4008 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4009 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4010 assert_eq!(c1t, vec![d(4)]);
4011 assert_eq!(c1s.previous_digest, None);
4012 assert_eq!(c1s.sequence_number, 0);
4013 assert_eq!(
4014 c1s.epoch_rolling_gas_cost_summary,
4015 GasCostSummary::new(41, 42, 41, 1)
4016 );
4017
4018 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4019 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4020 assert_eq!(c2s.sequence_number, 1);
4021 assert_eq!(
4022 c2s.epoch_rolling_gas_cost_summary,
4023 GasCostSummary::new(104, 108, 104, 4)
4024 );
4025
4026 let (c3c, c3s) = result.recv().await.unwrap();
4029 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4030 let (c4c, c4s) = result.recv().await.unwrap();
4031 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4032 assert_eq!(c3s.sequence_number, 2);
4033 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4034 assert_eq!(c4s.sequence_number, 3);
4035 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4036 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4037 assert_eq!(c4t, vec![d(13)]);
4038
4039 let (c5c, c5s) = result.recv().await.unwrap();
4042 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4043 let (c6c, c6s) = result.recv().await.unwrap();
4044 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4045 assert_eq!(c5s.sequence_number, 4);
4046 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4047 assert_eq!(c6s.sequence_number, 5);
4048 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4049 assert_eq!(c5t, vec![d(15), d(16)]);
4050 assert_eq!(c6t, vec![d(17)]);
4051
4052 let (c7c, c7s) = result.recv().await.unwrap();
4055 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4056 assert_eq!(c7t, vec![d(5), d(6)]);
4057 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4058 assert_eq!(c7s.sequence_number, 6);
4059
4060 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4061 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4062
4063 checkpoint_service
4064 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4065 .unwrap();
4066 checkpoint_service
4067 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4068 .unwrap();
4069
4070 let c1sc = certified_result.recv().await.unwrap();
4071 let c2sc = certified_result.recv().await.unwrap();
4072 assert_eq!(c1sc.sequence_number, 0);
4073 assert_eq!(c2sc.sequence_number, 1);
4074 }
4075
4076 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4077 fn notify_read_executed_effects_may_fail(
4078 &self,
4079 _: &str,
4080 digests: &[TransactionDigest],
4081 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4082 std::future::ready(Ok(digests
4083 .iter()
4084 .map(|d| self.get(d).expect("effects not found").clone())
4085 .collect()))
4086 .boxed()
4087 }
4088
4089 fn notify_read_executed_effects_digests(
4090 &self,
4091 _: &str,
4092 digests: &[TransactionDigest],
4093 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4094 std::future::ready(
4095 digests
4096 .iter()
4097 .map(|d| {
4098 self.get(d)
4099 .map(|fx| fx.digest())
4100 .expect("effects not found")
4101 })
4102 .collect(),
4103 )
4104 .boxed()
4105 }
4106
4107 fn multi_get_executed_effects(
4108 &self,
4109 digests: &[TransactionDigest],
4110 ) -> Vec<Option<TransactionEffects>> {
4111 digests.iter().map(|d| self.get(d).cloned()).collect()
4112 }
4113
4114 fn multi_get_transaction_blocks(
4120 &self,
4121 _: &[TransactionDigest],
4122 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4123 unimplemented!()
4124 }
4125
4126 fn multi_get_executed_effects_digests(
4127 &self,
4128 _: &[TransactionDigest],
4129 ) -> Vec<Option<TransactionEffectsDigest>> {
4130 unimplemented!()
4131 }
4132
4133 fn multi_get_effects(
4134 &self,
4135 _: &[TransactionEffectsDigest],
4136 ) -> Vec<Option<TransactionEffects>> {
4137 unimplemented!()
4138 }
4139
4140 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4141 unimplemented!()
4142 }
4143
4144 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4145 unimplemented!()
4146 }
4147
4148 fn get_unchanged_loaded_runtime_objects(
4149 &self,
4150 _digest: &TransactionDigest,
4151 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4152 unimplemented!()
4153 }
4154
4155 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4156 unimplemented!()
4157 }
4158 }
4159
4160 #[async_trait::async_trait]
4161 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4162 async fn checkpoint_created(
4163 &self,
4164 summary: &CheckpointSummary,
4165 contents: &CheckpointContents,
4166 _epoch_store: &Arc<AuthorityPerEpochStore>,
4167 _checkpoint_store: &Arc<CheckpointStore>,
4168 ) -> SuiResult {
4169 self.try_send((contents.clone(), summary.clone())).unwrap();
4170 Ok(())
4171 }
4172 }
4173
4174 #[async_trait::async_trait]
4175 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4176 async fn certified_checkpoint_created(
4177 &self,
4178 summary: &CertifiedCheckpointSummary,
4179 ) -> SuiResult {
4180 self.try_send(summary.clone()).unwrap();
4181 Ok(())
4182 }
4183 }
4184
4185 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4186 PendingCheckpoint {
4187 roots: t
4188 .into_iter()
4189 .map(|t| TransactionKey::Digest(d(t)))
4190 .collect(),
4191 details: PendingCheckpointInfo {
4192 timestamp_ms,
4193 last_of_epoch: false,
4194 checkpoint_height: i,
4195 consensus_commit_ref: CommitRef::default(),
4196 rejected_transactions_digest: Digest::default(),
4197 checkpoint_seq: None,
4198 },
4199 }
4200 }
4201
4202 fn d(i: u8) -> TransactionDigest {
4203 let mut bytes: [u8; 32] = Default::default();
4204 bytes[0] = i;
4205 TransactionDigest::new(bytes)
4206 }
4207
4208 fn e(
4209 transaction_digest: TransactionDigest,
4210 dependencies: Vec<TransactionDigest>,
4211 gas_used: GasCostSummary,
4212 ) -> TransactionEffects {
4213 let mut effects = TransactionEffects::default();
4214 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4215 *effects.dependencies_mut_for_testing() = dependencies;
4216 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4217 effects
4218 }
4219
4220 fn commit_cert_for_test(
4221 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4222 state: Arc<AuthorityState>,
4223 digest: TransactionDigest,
4224 dependencies: Vec<TransactionDigest>,
4225 gas_used: GasCostSummary,
4226 ) {
4227 let epoch_store = state.epoch_store_for_testing();
4228 let effects = e(digest, dependencies, gas_used);
4229 store.insert(digest, effects.clone());
4230 epoch_store.insert_executed_in_epoch(&digest);
4231 }
4232}