1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::CertifiedCheckpointOutput;
15pub use crate::checkpoints::checkpoint_output::{
16 CheckpointOutput, LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::ZipDebugEqIteratorExt;
29use mysten_common::random::get_rng;
30use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
31use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
32use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
33use nonempty::NonEmpty;
34use parking_lot::Mutex;
35use pin_project_lite::pin_project;
36use serde::{Deserialize, Serialize};
37use sui_macros::fail_point_arg;
38use sui_network::default_mysten_network_config;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
47use tokio::sync::{mpsc, watch};
48use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
49
50use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
51use crate::authority::authority_store_pruner::PrunerWatermarks;
52use crate::consensus_handler::SequencedConsensusTransactionKey;
53use rand::seq::SliceRandom;
54use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
55use std::fs::File;
56use std::future::Future;
57use std::io::Write;
58use std::path::Path;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::Weak;
62use std::task::{Context, Poll};
63use std::time::{Duration, SystemTime};
64use sui_protocol_config::ProtocolVersion;
65use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
66use sui_types::committee::StakeUnit;
67use sui_types::crypto::AuthorityStrongQuorumSignInfo;
68use sui_types::digests::{
69 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
70};
71use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
72use sui_types::error::{SuiErrorKind, SuiResult};
73use sui_types::gas::GasCostSummary;
74use sui_types::message_envelope::Message;
75use sui_types::messages_checkpoint::{
76 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
77 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
78 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
79 VerifiedCheckpointContents,
80};
81use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
82use sui_types::messages_consensus::ConsensusTransactionKey;
83use sui_types::signature::GenericSignature;
84use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
85use sui_types::transaction::{
86 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
87};
88use tokio::sync::Notify;
89use tracing::{debug, error, info, instrument, trace, warn};
90use typed_store::DBMapUtils;
91use typed_store::Map;
92use typed_store::{
93 TypedStoreError,
94 rocks::{DBMap, MetricConf},
95};
96
97const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
98
99pub type CheckpointHeight = u64;
100
101pub struct EpochStats {
102 pub checkpoint_count: u64,
103 pub transaction_count: u64,
104 pub total_gas_reward: u64,
105}
106
107#[derive(Clone, Debug)]
108pub struct PendingCheckpointInfo {
109 pub timestamp_ms: CheckpointTimestamp,
110 pub last_of_epoch: bool,
111 pub checkpoint_height: CheckpointHeight,
114 pub consensus_commit_ref: CommitRef,
116 pub rejected_transactions_digest: Digest,
117 pub checkpoint_seq: Option<CheckpointSequenceNumber>,
120}
121
122#[derive(Clone, Debug)]
123pub struct PendingCheckpoint {
124 pub roots: Vec<TransactionKey>,
125 pub details: PendingCheckpointInfo,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct CheckpointRoots {
130 pub tx_roots: Vec<TransactionKey>,
131 pub settlement_root: Option<TransactionKey>,
132 pub height: CheckpointHeight,
133}
134
135#[derive(Clone, Debug)]
142pub struct PendingCheckpointV2 {
143 pub roots: Vec<CheckpointRoots>,
144 pub details: PendingCheckpointInfo,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize)]
148pub struct BuilderCheckpointSummary {
149 pub summary: CheckpointSummary,
150 pub checkpoint_height: Option<CheckpointHeight>,
152 pub position_in_commit: usize,
153}
154
155#[derive(DBMapUtils)]
156#[cfg_attr(tidehunter, tidehunter)]
157pub struct CheckpointStoreTables {
158 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
160
161 pub(crate) checkpoint_sequence_by_contents_digest:
163 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
164
165 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
169 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
172
173 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
175 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
177
178 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
182
183 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
185
186 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
189
190 pub(crate) transaction_fork_detected: DBMap<
192 u8,
193 (
194 TransactionDigest,
195 TransactionEffectsDigest,
196 TransactionEffectsDigest,
197 ),
198 >,
199 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
200 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
201}
202
203fn full_checkpoint_content_table_default_config() -> DBOptions {
204 DBOptions {
205 options: default_db_options().options,
206 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
210 }
211}
212
213impl CheckpointStoreTables {
214 #[cfg(not(tidehunter))]
215 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
216 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
217 }
218
219 #[cfg(tidehunter)]
220 pub fn new(
221 path: &Path,
222 metric_name: &'static str,
223 pruner_watermarks: Arc<PrunerWatermarks>,
224 ) -> Self {
225 tracing::warn!("Checkpoint DB using tidehunter");
226 use crate::authority::authority_store_pruner::apply_relocation_filter;
227 use typed_store::tidehunter_util::{
228 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
229 default_max_dirty_keys, default_mutex_count, default_value_cache_size,
230 };
231 let mutexes = default_mutex_count();
232 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
233 let override_dirty_keys_config = KeySpaceConfig::new()
234 .with_max_dirty_keys(16 * default_max_dirty_keys())
235 .with_value_cache_size(default_value_cache_size());
236 let config_u64 = ThConfig::new_with_config(
237 8,
238 mutexes,
239 u64_sequence_key,
240 override_dirty_keys_config.clone(),
241 );
242 let digest_config = ThConfig::new_with_rm_prefix(
243 32,
244 mutexes,
245 KeyType::uniform(default_cells_per_mutex()),
246 KeySpaceConfig::default(),
247 vec![0, 0, 0, 0, 0, 0, 0, 32],
248 );
249 let watermarks_config = KeySpaceConfig::new()
250 .with_value_cache_size(10)
251 .disable_unload();
252 let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
253 let configs = vec![
254 (
255 "checkpoint_content",
256 digest_config.clone().with_config(
257 KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
258 ),
259 ),
260 (
261 "checkpoint_sequence_by_contents_digest",
262 digest_config.clone().with_config(apply_relocation_filter(
263 KeySpaceConfig::default(),
264 pruner_watermarks.checkpoint_id.clone(),
265 |sequence_number: CheckpointSequenceNumber| sequence_number,
266 false,
267 )),
268 ),
269 (
270 "full_checkpoint_content",
271 config_u64.clone().with_config(apply_relocation_filter(
272 override_dirty_keys_config.clone(),
273 pruner_watermarks.checkpoint_id.clone(),
274 |sequence_number: CheckpointSequenceNumber| sequence_number,
275 true,
276 )),
277 ),
278 ("certified_checkpoints", config_u64.clone()),
279 (
280 "checkpoint_by_digest",
281 digest_config.clone().with_config(apply_relocation_filter(
282 lru_config,
283 pruner_watermarks.epoch_id.clone(),
284 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
285 false,
286 )),
287 ),
288 (
289 "locally_computed_checkpoints",
290 config_u64.clone().with_config(apply_relocation_filter(
291 override_dirty_keys_config.clone(),
292 pruner_watermarks.checkpoint_id.clone(),
293 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
294 true,
295 )),
296 ),
297 ("epoch_last_checkpoint_map", config_u64.clone()),
298 (
299 "watermarks",
300 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
301 ),
302 (
303 "transaction_fork_detected",
304 ThConfig::new_with_config(
305 1,
306 1,
307 KeyType::uniform(1),
308 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
309 ),
310 ),
311 (
312 "full_checkpoint_content_v2",
313 config_u64.clone().with_config(apply_relocation_filter(
314 override_dirty_keys_config.clone(),
315 pruner_watermarks.checkpoint_id.clone(),
316 |sequence_number: CheckpointSequenceNumber| sequence_number,
317 true,
318 )),
319 ),
320 ];
321 Self::open_tables_read_write(
322 path.to_path_buf(),
323 MetricConf::new(metric_name),
324 configs
325 .into_iter()
326 .map(|(cf, config)| (cf.to_string(), config))
327 .collect(),
328 )
329 }
330
331 #[cfg(not(tidehunter))]
332 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
333 Self::get_read_only_handle(
334 path.to_path_buf(),
335 None,
336 None,
337 MetricConf::new("checkpoint_readonly"),
338 )
339 }
340
341 #[cfg(tidehunter)]
342 pub fn open_readonly(path: &Path) -> Self {
343 Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
344 }
345}
346
347pub struct CheckpointStore {
348 pub(crate) tables: CheckpointStoreTables,
349 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
350 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
351}
352
353impl CheckpointStore {
354 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
355 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
356 Arc::new(Self {
357 tables,
358 synced_checkpoint_notify_read: NotifyRead::new(),
359 executed_checkpoint_notify_read: NotifyRead::new(),
360 })
361 }
362
363 pub fn new_for_tests() -> Arc<Self> {
364 let ckpt_dir = mysten_common::tempdir().unwrap();
365 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
366 }
367
368 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
369 let tables = CheckpointStoreTables::new(
370 path,
371 "db_checkpoint",
372 Arc::new(PrunerWatermarks::default()),
373 );
374 Arc::new(Self {
375 tables,
376 synced_checkpoint_notify_read: NotifyRead::new(),
377 executed_checkpoint_notify_read: NotifyRead::new(),
378 })
379 }
380
381 #[cfg(not(tidehunter))]
382 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
383 CheckpointStoreTables::open_readonly(path)
384 }
385
386 #[cfg(tidehunter)]
387 pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
388 CheckpointStoreTables::open_readonly(path)
389 }
390
391 #[instrument(level = "info", skip_all)]
392 pub fn insert_genesis_checkpoint(
393 &self,
394 checkpoint: VerifiedCheckpoint,
395 contents: CheckpointContents,
396 epoch_store: &AuthorityPerEpochStore,
397 ) {
398 assert_eq!(
399 checkpoint.epoch(),
400 0,
401 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
402 );
403 assert_eq!(
404 *checkpoint.sequence_number(),
405 0,
406 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
407 );
408
409 match self.get_checkpoint_by_sequence_number(0).unwrap() {
411 Some(existing_checkpoint) => {
412 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
413 }
414 None => {
415 if epoch_store.epoch() == checkpoint.epoch {
416 epoch_store
417 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
418 .unwrap();
419 } else {
420 debug!(
421 validator_epoch =% epoch_store.epoch(),
422 genesis_epoch =% checkpoint.epoch(),
423 "Not inserting checkpoint builder data for genesis checkpoint",
424 );
425 }
426 self.insert_checkpoint_contents(contents).unwrap();
427 self.insert_verified_checkpoint(&checkpoint).unwrap();
428 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
429 }
430 }
431 }
432
433 pub fn get_checkpoint_by_digest(
434 &self,
435 digest: &CheckpointDigest,
436 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
437 self.tables
438 .checkpoint_by_digest
439 .get(digest)
440 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
441 }
442
443 pub fn get_checkpoint_by_sequence_number(
444 &self,
445 sequence_number: CheckpointSequenceNumber,
446 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
447 self.tables
448 .certified_checkpoints
449 .get(&sequence_number)
450 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
451 }
452
453 pub fn get_locally_computed_checkpoint(
454 &self,
455 sequence_number: CheckpointSequenceNumber,
456 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
457 self.tables
458 .locally_computed_checkpoints
459 .get(&sequence_number)
460 }
461
462 pub fn multi_get_locally_computed_checkpoints(
463 &self,
464 sequence_numbers: &[CheckpointSequenceNumber],
465 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
466 let checkpoints = self
467 .tables
468 .locally_computed_checkpoints
469 .multi_get(sequence_numbers)?;
470
471 Ok(checkpoints)
472 }
473
474 pub fn get_sequence_number_by_contents_digest(
475 &self,
476 digest: &CheckpointContentsDigest,
477 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
478 self.tables
479 .checkpoint_sequence_by_contents_digest
480 .get(digest)
481 }
482
483 pub fn delete_contents_digest_sequence_number_mapping(
484 &self,
485 digest: &CheckpointContentsDigest,
486 ) -> Result<(), TypedStoreError> {
487 self.tables
488 .checkpoint_sequence_by_contents_digest
489 .remove(digest)
490 }
491
492 pub fn get_latest_certified_checkpoint(
493 &self,
494 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
495 Ok(self
496 .tables
497 .certified_checkpoints
498 .reversed_safe_iter_with_bounds(None, None)?
499 .next()
500 .transpose()?
501 .map(|(_, v)| v.into()))
502 }
503
504 pub fn get_latest_locally_computed_checkpoint(
505 &self,
506 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
507 Ok(self
508 .tables
509 .locally_computed_checkpoints
510 .reversed_safe_iter_with_bounds(None, None)?
511 .next()
512 .transpose()?
513 .map(|(_, v)| v))
514 }
515
516 pub fn multi_get_checkpoint_by_sequence_number(
517 &self,
518 sequence_numbers: &[CheckpointSequenceNumber],
519 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
520 let checkpoints = self
521 .tables
522 .certified_checkpoints
523 .multi_get(sequence_numbers)?
524 .into_iter()
525 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
526 .collect();
527
528 Ok(checkpoints)
529 }
530
531 pub fn multi_get_checkpoint_content(
532 &self,
533 contents_digest: &[CheckpointContentsDigest],
534 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
535 self.tables.checkpoint_content.multi_get(contents_digest)
536 }
537
538 pub fn get_highest_verified_checkpoint(
539 &self,
540 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
541 let highest_verified = if let Some(highest_verified) = self
542 .tables
543 .watermarks
544 .get(&CheckpointWatermark::HighestVerified)?
545 {
546 highest_verified
547 } else {
548 return Ok(None);
549 };
550 self.get_checkpoint_by_digest(&highest_verified.1)
551 }
552
553 pub fn get_highest_synced_checkpoint(
554 &self,
555 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
556 let highest_synced = if let Some(highest_synced) = self
557 .tables
558 .watermarks
559 .get(&CheckpointWatermark::HighestSynced)?
560 {
561 highest_synced
562 } else {
563 return Ok(None);
564 };
565 self.get_checkpoint_by_digest(&highest_synced.1)
566 }
567
568 pub fn get_highest_synced_checkpoint_seq_number(
569 &self,
570 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571 if let Some(highest_synced) = self
572 .tables
573 .watermarks
574 .get(&CheckpointWatermark::HighestSynced)?
575 {
576 Ok(Some(highest_synced.0))
577 } else {
578 Ok(None)
579 }
580 }
581
582 pub fn get_highest_executed_checkpoint_seq_number(
583 &self,
584 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
585 if let Some(highest_executed) = self
586 .tables
587 .watermarks
588 .get(&CheckpointWatermark::HighestExecuted)?
589 {
590 Ok(Some(highest_executed.0))
591 } else {
592 Ok(None)
593 }
594 }
595
596 pub fn get_highest_executed_checkpoint(
597 &self,
598 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
599 let highest_executed = if let Some(highest_executed) = self
600 .tables
601 .watermarks
602 .get(&CheckpointWatermark::HighestExecuted)?
603 {
604 highest_executed
605 } else {
606 return Ok(None);
607 };
608 self.get_checkpoint_by_digest(&highest_executed.1)
609 }
610
611 pub fn get_highest_pruned_checkpoint_seq_number(
612 &self,
613 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
614 self.tables
615 .watermarks
616 .get(&CheckpointWatermark::HighestPruned)
617 .map(|watermark| watermark.map(|w| w.0))
618 }
619
620 pub fn get_checkpoint_contents(
621 &self,
622 digest: &CheckpointContentsDigest,
623 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
624 self.tables.checkpoint_content.get(digest)
625 }
626
627 pub fn get_full_checkpoint_contents_by_sequence_number(
628 &self,
629 seq: CheckpointSequenceNumber,
630 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
631 self.tables.full_checkpoint_content_v2.get(&seq)
632 }
633
634 fn prune_local_summaries(&self) -> SuiResult {
635 if let Some((last_local_summary, _)) = self
636 .tables
637 .locally_computed_checkpoints
638 .reversed_safe_iter_with_bounds(None, None)?
639 .next()
640 .transpose()?
641 {
642 let mut batch = self.tables.locally_computed_checkpoints.batch();
643 batch.schedule_delete_range(
644 &self.tables.locally_computed_checkpoints,
645 &0,
646 &last_local_summary,
647 )?;
648 batch.write()?;
649 info!("Pruned local summaries up to {:?}", last_local_summary);
650 }
651 Ok(())
652 }
653
654 pub fn clear_locally_computed_checkpoints_from(
655 &self,
656 from_seq: CheckpointSequenceNumber,
657 ) -> SuiResult {
658 let keys: Vec<_> = self
659 .tables
660 .locally_computed_checkpoints
661 .safe_iter_with_bounds(Some(from_seq), None)
662 .map(|r| r.map(|(k, _)| k))
663 .collect::<Result<_, _>>()?;
664 if let Some(&last_local_summary) = keys.last() {
665 let mut batch = self.tables.locally_computed_checkpoints.batch();
666 batch
667 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
668 .expect("Failed to delete locally computed checkpoints");
669 batch
670 .write()
671 .expect("Failed to delete locally computed checkpoints");
672 warn!(
673 from_seq,
674 last_local_summary,
675 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
676 from_seq,
677 last_local_summary
678 );
679 }
680 Ok(())
681 }
682
683 fn check_for_checkpoint_fork(
684 &self,
685 local_checkpoint: &CheckpointSummary,
686 verified_checkpoint: &VerifiedCheckpoint,
687 ) {
688 if local_checkpoint != verified_checkpoint.data() {
689 let verified_contents = self
690 .get_checkpoint_contents(&verified_checkpoint.content_digest)
691 .map(|opt_contents| {
692 opt_contents
693 .map(|contents| format!("{:?}", contents))
694 .unwrap_or_else(|| {
695 format!(
696 "Verified checkpoint contents not found, digest: {:?}",
697 verified_checkpoint.content_digest,
698 )
699 })
700 })
701 .map_err(|e| {
702 format!(
703 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
704 verified_checkpoint.content_digest, e
705 )
706 })
707 .unwrap_or_else(|err_msg| err_msg);
708
709 let local_contents = self
710 .get_checkpoint_contents(&local_checkpoint.content_digest)
711 .map(|opt_contents| {
712 opt_contents
713 .map(|contents| format!("{:?}", contents))
714 .unwrap_or_else(|| {
715 format!(
716 "Local checkpoint contents not found, digest: {:?}",
717 local_checkpoint.content_digest
718 )
719 })
720 })
721 .map_err(|e| {
722 format!(
723 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
724 local_checkpoint.content_digest, e
725 )
726 })
727 .unwrap_or_else(|err_msg| err_msg);
728
729 error!(
731 verified_checkpoint = ?verified_checkpoint.data(),
732 ?verified_contents,
733 ?local_checkpoint,
734 ?local_contents,
735 "Local checkpoint fork detected!",
736 );
737
738 if let Err(e) = self.record_checkpoint_fork_detected(
740 *local_checkpoint.sequence_number(),
741 local_checkpoint.digest(),
742 ) {
743 error!("Failed to record checkpoint fork in database: {:?}", e);
744 }
745
746 fail_point_arg!(
747 "kill_checkpoint_fork_node",
748 |checkpoint_overrides: std::sync::Arc<
749 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
750 >| {
751 #[cfg(msim)]
752 {
753 if let Ok(mut overrides) = checkpoint_overrides.lock() {
754 overrides.insert(
755 local_checkpoint.sequence_number,
756 verified_checkpoint.digest().to_string(),
757 );
758 }
759 tracing::error!(
760 fatal = true,
761 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
762 local_checkpoint.sequence_number(),
763 verified_checkpoint.digest()
764 );
765 sui_simulator::task::shutdown_current_node();
766 }
767 }
768 );
769
770 fatal!(
771 "Local checkpoint fork detected for sequence number: {}",
772 local_checkpoint.sequence_number()
773 );
774 }
775 }
776
777 pub fn insert_certified_checkpoint(
783 &self,
784 checkpoint: &VerifiedCheckpoint,
785 ) -> Result<(), TypedStoreError> {
786 debug!(
787 checkpoint_seq = checkpoint.sequence_number(),
788 "Inserting certified checkpoint",
789 );
790 let mut batch = self.tables.certified_checkpoints.batch();
791 batch
792 .insert_batch(
793 &self.tables.certified_checkpoints,
794 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
795 )?
796 .insert_batch(
797 &self.tables.checkpoint_by_digest,
798 [(checkpoint.digest(), checkpoint.serializable_ref())],
799 )?;
800 if checkpoint.next_epoch_committee().is_some() {
801 batch.insert_batch(
802 &self.tables.epoch_last_checkpoint_map,
803 [(&checkpoint.epoch(), checkpoint.sequence_number())],
804 )?;
805 }
806 batch.write()?;
807
808 if let Some(local_checkpoint) = self
809 .tables
810 .locally_computed_checkpoints
811 .get(checkpoint.sequence_number())?
812 {
813 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
814 }
815
816 Ok(())
817 }
818
819 #[instrument(level = "debug", skip_all)]
822 pub fn insert_verified_checkpoint(
823 &self,
824 checkpoint: &VerifiedCheckpoint,
825 ) -> Result<(), TypedStoreError> {
826 self.insert_certified_checkpoint(checkpoint)?;
827 self.update_highest_verified_checkpoint(checkpoint)
828 }
829
830 pub fn update_highest_verified_checkpoint(
831 &self,
832 checkpoint: &VerifiedCheckpoint,
833 ) -> Result<(), TypedStoreError> {
834 if Some(*checkpoint.sequence_number())
835 > self
836 .get_highest_verified_checkpoint()?
837 .map(|x| *x.sequence_number())
838 {
839 debug!(
840 checkpoint_seq = checkpoint.sequence_number(),
841 "Updating highest verified checkpoint",
842 );
843 self.tables.watermarks.insert(
844 &CheckpointWatermark::HighestVerified,
845 &(*checkpoint.sequence_number(), *checkpoint.digest()),
846 )?;
847 }
848
849 Ok(())
850 }
851
852 pub fn update_highest_synced_checkpoint(
853 &self,
854 checkpoint: &VerifiedCheckpoint,
855 ) -> Result<(), TypedStoreError> {
856 let seq = *checkpoint.sequence_number();
857 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
858 self.tables.watermarks.insert(
859 &CheckpointWatermark::HighestSynced,
860 &(seq, *checkpoint.digest()),
861 )?;
862 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
863 Ok(())
864 }
865
866 async fn notify_read_checkpoint_watermark<F>(
867 &self,
868 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
869 seq: CheckpointSequenceNumber,
870 get_watermark: F,
871 ) -> VerifiedCheckpoint
872 where
873 F: Fn() -> Option<CheckpointSequenceNumber>,
874 {
875 notify_read
876 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
877 let seq = seqs[0];
878 let Some(highest) = get_watermark() else {
879 return vec![None];
880 };
881 if highest < seq {
882 return vec![None];
883 }
884 let checkpoint = self
885 .get_checkpoint_by_sequence_number(seq)
886 .expect("db error")
887 .expect("checkpoint not found");
888 vec![Some(checkpoint)]
889 })
890 .await
891 .into_iter()
892 .next()
893 .unwrap()
894 }
895
896 pub async fn notify_read_synced_checkpoint(
897 &self,
898 seq: CheckpointSequenceNumber,
899 ) -> VerifiedCheckpoint {
900 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
901 self.get_highest_synced_checkpoint_seq_number()
902 .expect("db error")
903 })
904 .await
905 }
906
907 pub async fn notify_read_executed_checkpoint(
908 &self,
909 seq: CheckpointSequenceNumber,
910 ) -> VerifiedCheckpoint {
911 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
912 self.get_highest_executed_checkpoint_seq_number()
913 .expect("db error")
914 })
915 .await
916 }
917
918 pub fn update_highest_executed_checkpoint(
919 &self,
920 checkpoint: &VerifiedCheckpoint,
921 ) -> Result<(), TypedStoreError> {
922 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
923 if seq_number >= *checkpoint.sequence_number() {
924 return Ok(());
925 }
926 assert_eq!(
927 seq_number + 1,
928 *checkpoint.sequence_number(),
929 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
930 checkpoint.sequence_number(),
931 seq_number
932 );
933 }
934 let seq = *checkpoint.sequence_number();
935 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
936 self.tables.watermarks.insert(
937 &CheckpointWatermark::HighestExecuted,
938 &(seq, *checkpoint.digest()),
939 )?;
940 self.executed_checkpoint_notify_read
941 .notify(&seq, checkpoint);
942 Ok(())
943 }
944
945 pub fn update_highest_pruned_checkpoint(
946 &self,
947 checkpoint: &VerifiedCheckpoint,
948 ) -> Result<(), TypedStoreError> {
949 self.tables.watermarks.insert(
950 &CheckpointWatermark::HighestPruned,
951 &(*checkpoint.sequence_number(), *checkpoint.digest()),
952 )
953 }
954
955 pub fn set_highest_executed_checkpoint_subtle(
960 &self,
961 checkpoint: &VerifiedCheckpoint,
962 ) -> Result<(), TypedStoreError> {
963 self.tables.watermarks.insert(
964 &CheckpointWatermark::HighestExecuted,
965 &(*checkpoint.sequence_number(), *checkpoint.digest()),
966 )
967 }
968
969 pub fn insert_checkpoint_contents(
970 &self,
971 contents: CheckpointContents,
972 ) -> Result<(), TypedStoreError> {
973 debug!(
974 checkpoint_seq = ?contents.digest(),
975 "Inserting checkpoint contents",
976 );
977 self.tables
978 .checkpoint_content
979 .insert(contents.digest(), &contents)
980 }
981
982 pub fn insert_verified_checkpoint_contents(
983 &self,
984 checkpoint: &VerifiedCheckpoint,
985 full_contents: VerifiedCheckpointContents,
986 ) -> Result<(), TypedStoreError> {
987 let mut batch = self.tables.full_checkpoint_content_v2.batch();
988 batch.insert_batch(
989 &self.tables.checkpoint_sequence_by_contents_digest,
990 [(&checkpoint.content_digest, checkpoint.sequence_number())],
991 )?;
992 let full_contents = full_contents.into_inner();
993 batch.insert_batch(
994 &self.tables.full_checkpoint_content_v2,
995 [(checkpoint.sequence_number(), &full_contents)],
996 )?;
997
998 let contents = full_contents.into_checkpoint_contents();
999 assert_eq!(&checkpoint.content_digest, contents.digest());
1000
1001 batch.insert_batch(
1002 &self.tables.checkpoint_content,
1003 [(contents.digest(), &contents)],
1004 )?;
1005
1006 batch.write()
1007 }
1008
1009 pub fn delete_full_checkpoint_contents(
1010 &self,
1011 seq: CheckpointSequenceNumber,
1012 ) -> Result<(), TypedStoreError> {
1013 self.tables.full_checkpoint_content.remove(&seq)?;
1014 self.tables.full_checkpoint_content_v2.remove(&seq)
1015 }
1016
1017 pub fn get_epoch_last_checkpoint(
1018 &self,
1019 epoch_id: EpochId,
1020 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1021 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1022 let checkpoint = match seq {
1023 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1024 None => None,
1025 };
1026 Ok(checkpoint)
1027 }
1028
1029 pub fn get_epoch_last_checkpoint_seq_number(
1030 &self,
1031 epoch_id: EpochId,
1032 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1033 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1034 Ok(seq)
1035 }
1036
1037 pub fn get_epoch_first_checkpoint_seq(
1040 &self,
1041 epoch: EpochId,
1042 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1043 if epoch == 0 {
1044 return Ok(Some(0));
1045 }
1046 Ok(self
1047 .tables
1048 .epoch_last_checkpoint_map
1049 .get(&(epoch - 1))?
1050 .map(|s| s + 1))
1051 }
1052
1053 pub fn list_checkpoints_from_seq(
1055 &self,
1056 start: Option<CheckpointSequenceNumber>,
1057 limit: usize,
1058 ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1059 self.tables
1060 .certified_checkpoints
1061 .safe_iter_with_bounds(start, None)
1062 .take(limit)
1063 .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1064 .collect()
1065 }
1066
1067 pub fn list_epoch_last_checkpoints(
1069 &self,
1070 start: Option<EpochId>,
1071 limit: usize,
1072 ) -> Result<Vec<(EpochId, CheckpointSequenceNumber)>, TypedStoreError> {
1073 self.tables
1074 .epoch_last_checkpoint_map
1075 .safe_iter_with_bounds(start, None)
1076 .take(limit)
1077 .collect()
1078 }
1079
1080 pub fn list_checkpoint_digests(
1082 &self,
1083 start: Option<CheckpointDigest>,
1084 limit: usize,
1085 ) -> Result<Vec<CheckpointDigest>, TypedStoreError> {
1086 self.tables
1087 .checkpoint_by_digest
1088 .safe_iter_with_bounds(start, None)
1089 .take(limit)
1090 .map(|r| r.map(|(d, _)| d))
1091 .collect()
1092 }
1093
1094 pub fn list_checkpoint_contents_digests(
1096 &self,
1097 start: Option<CheckpointContentsDigest>,
1098 limit: usize,
1099 ) -> Result<Vec<CheckpointContentsDigest>, TypedStoreError> {
1100 self.tables
1101 .checkpoint_content
1102 .safe_iter_with_bounds(start, None)
1103 .take(limit)
1104 .map(|r| r.map(|(d, _)| d))
1105 .collect()
1106 }
1107
1108 pub fn list_epoch_checkpoints(
1110 &self,
1111 epoch: EpochId,
1112 start_seq: Option<CheckpointSequenceNumber>,
1113 limit: usize,
1114 ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1115 let Some(last_seq) = self.tables.epoch_last_checkpoint_map.get(&epoch)? else {
1116 return Ok(vec![]);
1117 };
1118 let first_seq = if epoch == 0 {
1120 0
1121 } else {
1122 self.tables
1123 .epoch_last_checkpoint_map
1124 .get(&(epoch - 1))?
1125 .map(|s| s + 1)
1126 .unwrap_or(0)
1127 };
1128 let start = start_seq.map(|s| s.max(first_seq)).unwrap_or(first_seq);
1129 self.tables
1130 .certified_checkpoints
1131 .safe_iter_with_bounds(Some(start), Some(last_seq + 1))
1132 .take(limit)
1133 .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1134 .collect()
1135 }
1136
1137 pub fn insert_epoch_last_checkpoint(
1138 &self,
1139 epoch_id: EpochId,
1140 checkpoint: &VerifiedCheckpoint,
1141 ) -> SuiResult {
1142 self.tables
1143 .epoch_last_checkpoint_map
1144 .insert(&epoch_id, checkpoint.sequence_number())?;
1145 Ok(())
1146 }
1147
1148 pub fn get_epoch_state_commitments(
1149 &self,
1150 epoch: EpochId,
1151 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1152 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1153 checkpoint
1154 .end_of_epoch_data
1155 .as_ref()
1156 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1157 .epoch_commitments
1158 .clone()
1159 });
1160 Ok(commitments)
1161 }
1162
1163 pub fn get_epoch_stats(
1165 &self,
1166 epoch: EpochId,
1167 last_checkpoint: &CheckpointSummary,
1168 ) -> Option<EpochStats> {
1169 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1170 (0, 0)
1171 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1172 (
1173 checkpoint.sequence_number + 1,
1174 checkpoint.network_total_transactions,
1175 )
1176 } else {
1177 return None;
1178 };
1179 Some(EpochStats {
1180 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1181 transaction_count: last_checkpoint.network_total_transactions
1182 - prev_epoch_network_transactions,
1183 total_gas_reward: last_checkpoint
1184 .epoch_rolling_gas_cost_summary
1185 .computation_cost,
1186 })
1187 }
1188
1189 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1190 self.tables
1192 .checkpoint_content
1193 .checkpoint_db(path)
1194 .map_err(Into::into)
1195 }
1196
1197 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1198 let mut wb = self.tables.watermarks.batch();
1199 wb.delete_batch(
1200 &self.tables.watermarks,
1201 std::iter::once(CheckpointWatermark::HighestExecuted),
1202 )?;
1203 wb.write()?;
1204 Ok(())
1205 }
1206
1207 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1208 self.delete_highest_executed_checkpoint_test_only()?;
1209 Ok(())
1210 }
1211
1212 pub fn record_checkpoint_fork_detected(
1213 &self,
1214 checkpoint_seq: CheckpointSequenceNumber,
1215 checkpoint_digest: CheckpointDigest,
1216 ) -> Result<(), TypedStoreError> {
1217 info!(
1218 checkpoint_seq = checkpoint_seq,
1219 checkpoint_digest = ?checkpoint_digest,
1220 "Recording checkpoint fork detection in database"
1221 );
1222 self.tables.watermarks.insert(
1223 &CheckpointWatermark::CheckpointForkDetected,
1224 &(checkpoint_seq, checkpoint_digest),
1225 )
1226 }
1227
1228 pub fn get_checkpoint_fork_detected(
1229 &self,
1230 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1231 self.tables
1232 .watermarks
1233 .get(&CheckpointWatermark::CheckpointForkDetected)
1234 }
1235
1236 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1237 self.tables
1238 .watermarks
1239 .remove(&CheckpointWatermark::CheckpointForkDetected)
1240 }
1241
1242 pub fn record_transaction_fork_detected(
1243 &self,
1244 tx_digest: TransactionDigest,
1245 expected_effects_digest: TransactionEffectsDigest,
1246 actual_effects_digest: TransactionEffectsDigest,
1247 ) -> Result<(), TypedStoreError> {
1248 info!(
1249 tx_digest = ?tx_digest,
1250 expected_effects_digest = ?expected_effects_digest,
1251 actual_effects_digest = ?actual_effects_digest,
1252 "Recording transaction fork detection in database"
1253 );
1254 self.tables.transaction_fork_detected.insert(
1255 &TRANSACTION_FORK_DETECTED_KEY,
1256 &(tx_digest, expected_effects_digest, actual_effects_digest),
1257 )
1258 }
1259
1260 pub fn get_transaction_fork_detected(
1261 &self,
1262 ) -> Result<
1263 Option<(
1264 TransactionDigest,
1265 TransactionEffectsDigest,
1266 TransactionEffectsDigest,
1267 )>,
1268 TypedStoreError,
1269 > {
1270 self.tables
1271 .transaction_fork_detected
1272 .get(&TRANSACTION_FORK_DETECTED_KEY)
1273 }
1274
1275 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1276 self.tables
1277 .transaction_fork_detected
1278 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1279 }
1280}
1281
1282#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1283pub enum CheckpointWatermark {
1284 HighestVerified,
1285 HighestSynced,
1286 HighestExecuted,
1287 HighestPruned,
1288 CheckpointForkDetected,
1289}
1290
1291struct CheckpointStateHasher {
1292 epoch_store: Arc<AuthorityPerEpochStore>,
1293 hasher: Weak<GlobalStateHasher>,
1294 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1295}
1296
1297impl CheckpointStateHasher {
1298 fn new(
1299 epoch_store: Arc<AuthorityPerEpochStore>,
1300 hasher: Weak<GlobalStateHasher>,
1301 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1302 ) -> Self {
1303 Self {
1304 epoch_store,
1305 hasher,
1306 receive_from_builder,
1307 }
1308 }
1309
1310 async fn run(self) {
1311 let Self {
1312 epoch_store,
1313 hasher,
1314 mut receive_from_builder,
1315 } = self;
1316 while let Some((seq, effects)) = receive_from_builder.recv().await {
1317 let Some(hasher) = hasher.upgrade() else {
1318 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1319 break;
1320 };
1321 hasher
1322 .accumulate_checkpoint(&effects, seq, &epoch_store)
1323 .expect("epoch ended while accumulating checkpoint");
1324 }
1325 }
1326}
1327
1328#[derive(Debug)]
1329pub enum CheckpointBuilderError {
1330 ChangeEpochTxAlreadyExecuted,
1331 SystemPackagesMissing,
1332 Retry(anyhow::Error),
1333}
1334
1335impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1336 for CheckpointBuilderError
1337{
1338 fn from(e: SuiError) -> Self {
1339 Self::Retry(e.into())
1340 }
1341}
1342
1343pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1344
1345pub struct CheckpointBuilder {
1346 state: Arc<AuthorityState>,
1347 store: Arc<CheckpointStore>,
1348 epoch_store: Arc<AuthorityPerEpochStore>,
1349 notify: Arc<Notify>,
1350 notify_aggregator: Arc<Notify>,
1351 last_built: watch::Sender<CheckpointSequenceNumber>,
1352 effects_store: Arc<dyn TransactionCacheRead>,
1353 global_state_hasher: Weak<GlobalStateHasher>,
1354 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1355 output: Box<dyn CheckpointOutput>,
1356 metrics: Arc<CheckpointMetrics>,
1357 max_transactions_per_checkpoint: usize,
1358 max_checkpoint_size_bytes: usize,
1359}
1360
1361pub struct CheckpointAggregator {
1362 store: Arc<CheckpointStore>,
1363 epoch_store: Arc<AuthorityPerEpochStore>,
1364 notify: Arc<Notify>,
1365 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1366 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1367 current: Option<CheckpointSignatureAggregator>,
1368 output: Box<dyn CertifiedCheckpointOutput>,
1369 state: Arc<AuthorityState>,
1370 metrics: Arc<CheckpointMetrics>,
1371}
1372
1373pub struct CheckpointSignatureAggregator {
1375 summary: CheckpointSummary,
1376 digest: CheckpointDigest,
1377 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1379 store: Arc<CheckpointStore>,
1380 state: Arc<AuthorityState>,
1381 metrics: Arc<CheckpointMetrics>,
1382}
1383
1384impl CheckpointBuilder {
1385 fn new(
1386 state: Arc<AuthorityState>,
1387 store: Arc<CheckpointStore>,
1388 epoch_store: Arc<AuthorityPerEpochStore>,
1389 notify: Arc<Notify>,
1390 effects_store: Arc<dyn TransactionCacheRead>,
1391 global_state_hasher: Weak<GlobalStateHasher>,
1393 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1395 output: Box<dyn CheckpointOutput>,
1396 notify_aggregator: Arc<Notify>,
1397 last_built: watch::Sender<CheckpointSequenceNumber>,
1398 metrics: Arc<CheckpointMetrics>,
1399 max_transactions_per_checkpoint: usize,
1400 max_checkpoint_size_bytes: usize,
1401 ) -> Self {
1402 Self {
1403 state,
1404 store,
1405 epoch_store,
1406 notify,
1407 effects_store,
1408 global_state_hasher,
1409 send_to_hasher,
1410 output,
1411 notify_aggregator,
1412 last_built,
1413 metrics,
1414 max_transactions_per_checkpoint,
1415 max_checkpoint_size_bytes,
1416 }
1417 }
1418
1419 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1427 if let Some(replay_waiter) = consensus_replay_waiter {
1428 info!("Waiting for consensus commits to replay ...");
1429 replay_waiter.wait_for_replay().await;
1430 info!("Consensus commits finished replaying");
1431 }
1432 info!("Starting CheckpointBuilder");
1433 loop {
1434 match self.maybe_build_checkpoints().await {
1435 Ok(()) => {}
1436 err @ Err(
1437 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1438 | CheckpointBuilderError::SystemPackagesMissing,
1439 ) => {
1440 info!("CheckpointBuilder stopping: {:?}", err);
1441 return;
1442 }
1443 Err(CheckpointBuilderError::Retry(inner)) => {
1444 let msg = format!("{:?}", inner);
1445 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1446 tokio::time::sleep(Duration::from_secs(1)).await;
1447 self.metrics.checkpoint_errors.inc();
1448 continue;
1449 }
1450 }
1451
1452 self.notify.notified().await;
1453 }
1454 }
1455
1456 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1457 if self
1458 .epoch_store
1459 .protocol_config()
1460 .split_checkpoints_in_consensus_handler()
1461 {
1462 self.maybe_build_checkpoints_v2().await
1463 } else {
1464 self.maybe_build_checkpoints_v1().await
1465 }
1466 }
1467
1468 async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1469 let _scope = monitored_scope("BuildCheckpoints");
1470
1471 let summary = self
1473 .epoch_store
1474 .last_built_checkpoint_builder_summary()
1475 .expect("epoch should not have ended");
1476 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1477 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1478
1479 let min_checkpoint_interval_ms = self
1480 .epoch_store
1481 .protocol_config()
1482 .min_checkpoint_interval_ms_as_option()
1483 .unwrap_or_default();
1484 let mut grouped_pending_checkpoints = Vec::new();
1485 let mut checkpoints_iter = self
1486 .epoch_store
1487 .get_pending_checkpoints(last_height)
1488 .expect("unexpected epoch store error")
1489 .into_iter()
1490 .peekable();
1491 while let Some((height, pending)) = checkpoints_iter.next() {
1492 let current_timestamp = pending.details().timestamp_ms;
1495 let can_build = match last_timestamp {
1496 Some(last_timestamp) => {
1497 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1498 }
1499 None => true,
1500 } || checkpoints_iter
1503 .peek()
1504 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1505 || pending.details().last_of_epoch;
1507 grouped_pending_checkpoints.push(pending);
1508 if !can_build {
1509 debug!(
1510 checkpoint_commit_height = height,
1511 ?last_timestamp,
1512 ?current_timestamp,
1513 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1514 );
1515 continue;
1516 }
1517
1518 last_height = Some(height);
1520 last_timestamp = Some(current_timestamp);
1521 debug!(
1522 checkpoint_commit_height_from = grouped_pending_checkpoints
1523 .first()
1524 .unwrap()
1525 .details()
1526 .checkpoint_height,
1527 checkpoint_commit_height_to = last_height,
1528 "Making checkpoint with commit height range"
1529 );
1530
1531 let seq = self
1532 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1533 .await?;
1534
1535 self.last_built.send_if_modified(|cur| {
1536 if seq > *cur {
1538 *cur = seq;
1539 true
1540 } else {
1541 false
1542 }
1543 });
1544
1545 tokio::task::yield_now().await;
1548 }
1549 debug!(
1550 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1551 grouped_pending_checkpoints.len(),
1552 );
1553
1554 Ok(())
1555 }
1556
1557 async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1558 let _scope = monitored_scope("BuildCheckpoints");
1559
1560 let last_height = self
1562 .epoch_store
1563 .last_built_checkpoint_builder_summary()
1564 .expect("epoch should not have ended")
1565 .and_then(|s| s.checkpoint_height);
1566
1567 for (height, pending) in self
1568 .epoch_store
1569 .get_pending_checkpoints_v2(last_height)
1570 .expect("unexpected epoch store error")
1571 {
1572 debug!(checkpoint_commit_height = height, "Making checkpoint");
1573
1574 let seq = self.make_checkpoint_v2(pending).await?;
1575
1576 self.last_built.send_if_modified(|cur| {
1577 if seq > *cur {
1579 *cur = seq;
1580 true
1581 } else {
1582 false
1583 }
1584 });
1585
1586 tokio::task::yield_now().await;
1589 }
1590
1591 Ok(())
1592 }
1593
1594 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1595 async fn make_checkpoint(
1596 &mut self,
1597 pendings: Vec<PendingCheckpoint>,
1598 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1599 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1600
1601 let pending_ckpt_str = pendings
1602 .iter()
1603 .map(|p| {
1604 format!(
1605 "height={}, commit={}",
1606 p.details().checkpoint_height,
1607 p.details().consensus_commit_ref
1608 )
1609 })
1610 .join("; ");
1611
1612 let last_details = pendings.last().unwrap().details().clone();
1613
1614 let highest_executed_sequence = self
1617 .store
1618 .get_highest_executed_checkpoint_seq_number()
1619 .expect("db error")
1620 .unwrap_or(0);
1621
1622 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1623 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1624
1625 let new_checkpoints = self
1626 .create_checkpoints(
1627 sorted_tx_effects_included_in_checkpoint,
1628 &last_details,
1629 &all_roots,
1630 )
1631 .await?;
1632 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1633 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1634 debug_fatal!(
1635 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1636 );
1637 }
1638
1639 let new_ckpt_str = new_checkpoints
1640 .iter()
1641 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1642 .join("; ");
1643
1644 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1645 .await?;
1646 info!(
1647 "Made new checkpoint {} from pending checkpoint {}",
1648 new_ckpt_str, pending_ckpt_str
1649 );
1650
1651 Ok(highest_sequence)
1652 }
1653
1654 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1655 async fn make_checkpoint_v2(
1656 &mut self,
1657 pending: PendingCheckpointV2,
1658 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1659 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1660
1661 let details = pending.details.clone();
1662
1663 let highest_executed_sequence = self
1664 .store
1665 .get_highest_executed_checkpoint_seq_number()
1666 .expect("db error")
1667 .unwrap_or(0);
1668
1669 let (poll_count, result) =
1670 poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1671 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1672
1673 let new_checkpoints = self
1674 .create_checkpoints(
1675 sorted_tx_effects_included_in_checkpoint,
1676 &details,
1677 &all_roots,
1678 )
1679 .await?;
1680 assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1681 let sequence = *new_checkpoints.first().0.sequence_number();
1682 let digest = new_checkpoints.first().0.digest();
1683 if sequence <= highest_executed_sequence && poll_count > 1 {
1684 debug_fatal!(
1685 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1686 );
1687 }
1688
1689 self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1690 .await?;
1691 info!(
1692 seq = sequence,
1693 %digest,
1694 height = details.checkpoint_height,
1695 commit = %details.consensus_commit_ref,
1696 "Made new checkpoint"
1697 );
1698
1699 Ok(sequence)
1700 }
1701
1702 async fn construct_and_execute_settlement_transactions(
1703 &self,
1704 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1705 checkpoint_height: CheckpointHeight,
1706 checkpoint_seq: CheckpointSequenceNumber,
1707 tx_index_offset: u64,
1708 ) -> (TransactionKey, Vec<TransactionEffects>) {
1709 let _scope =
1710 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1711
1712 let tx_key =
1713 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1714
1715 let epoch = self.epoch_store.epoch();
1716 let accumulator_root_obj_initial_shared_version = self
1717 .epoch_store
1718 .epoch_start_config()
1719 .accumulator_root_obj_initial_shared_version()
1720 .expect("accumulator root object must exist");
1721
1722 let builder = AccumulatorSettlementTxBuilder::new(
1723 Some(self.effects_store.as_ref()),
1724 sorted_tx_effects_included_in_checkpoint,
1725 checkpoint_seq,
1726 tx_index_offset,
1727 );
1728
1729 let funds_changes = builder.collect_funds_changes();
1730 let num_updates = builder.num_updates();
1731 let settlement_txns = builder.build_tx(
1732 self.epoch_store.protocol_config(),
1733 epoch,
1734 accumulator_root_obj_initial_shared_version,
1735 checkpoint_height,
1736 checkpoint_seq,
1737 );
1738
1739 let settlement_txns: Vec<_> = settlement_txns
1740 .into_iter()
1741 .map(|tx| {
1742 VerifiedExecutableTransaction::new_system(
1743 VerifiedTransaction::new_system_transaction(tx),
1744 self.epoch_store.epoch(),
1745 )
1746 })
1747 .collect();
1748
1749 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1750
1751 debug!(
1752 ?settlement_digests,
1753 ?tx_key,
1754 "created settlement transactions with {num_updates} updates"
1755 );
1756
1757 self.epoch_store
1758 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1759
1760 let settlement_effects = wait_for_effects_with_retry(
1761 self.effects_store.as_ref(),
1762 "CheckpointBuilder::notify_read_settlement_effects",
1763 &settlement_digests,
1764 tx_key,
1765 )
1766 .await;
1767 let (accounts_created, accounts_deleted) =
1768 accumulators::count_accumulator_object_changes(&settlement_effects);
1769 self.metrics
1770 .report_accumulator_account_changes(accounts_created, accounts_deleted);
1771
1772 let barrier_tx = accumulators::build_accumulator_barrier_tx(
1773 epoch,
1774 accumulator_root_obj_initial_shared_version,
1775 checkpoint_height,
1776 &settlement_effects,
1777 );
1778
1779 let barrier_tx = VerifiedExecutableTransaction::new_system(
1780 VerifiedTransaction::new_system_transaction(barrier_tx),
1781 self.epoch_store.epoch(),
1782 );
1783 let barrier_digest = *barrier_tx.digest();
1784
1785 self.epoch_store
1786 .notify_barrier_transaction_ready(tx_key, barrier_tx);
1787
1788 let barrier_effects = wait_for_effects_with_retry(
1789 self.effects_store.as_ref(),
1790 "CheckpointBuilder::notify_read_barrier_effects",
1791 &[barrier_digest],
1792 tx_key,
1793 )
1794 .await;
1795
1796 let settlement_and_barrier_effects: Vec<_> = settlement_effects
1797 .into_iter()
1798 .chain(barrier_effects)
1799 .collect();
1800
1801 let mut next_accumulator_version = None;
1802 for fx in settlement_and_barrier_effects.iter() {
1803 assert!(
1804 fx.status().is_ok(),
1805 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1806 fx.transaction_digest(),
1807 fx
1808 );
1809 if let Some(version) = fx
1810 .mutated()
1811 .iter()
1812 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1813 {
1814 assert!(
1815 next_accumulator_version.is_none(),
1816 "Only one settlement transaction should mutate the accumulator root object"
1817 );
1818 next_accumulator_version = Some(version);
1819 }
1820 }
1821 let settlements = FundsSettlement {
1822 next_accumulator_version: next_accumulator_version
1823 .expect("Accumulator root object should be mutated in the settlement transactions"),
1824 funds_changes,
1825 };
1826
1827 self.state
1828 .execution_scheduler()
1829 .settle_address_funds(settlements);
1830
1831 (tx_key, settlement_and_barrier_effects)
1832 }
1833
1834 #[instrument(level = "debug", skip_all)]
1839 async fn resolve_checkpoint_transactions(
1840 &self,
1841 pending_checkpoints: Vec<PendingCheckpoint>,
1842 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1843 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1844
1845 let mut effects_in_current_checkpoint = BTreeSet::new();
1850
1851 let mut tx_effects = Vec::new();
1852 let mut tx_roots = HashSet::new();
1853
1854 for pending_checkpoint in pending_checkpoints.into_iter() {
1855 let mut pending = pending_checkpoint;
1856 debug!(
1857 checkpoint_commit_height = pending.details.checkpoint_height,
1858 "Resolving checkpoint transactions for pending checkpoint.",
1859 );
1860
1861 trace!(
1862 "roots for pending checkpoint {:?}: {:?}",
1863 pending.details.checkpoint_height, pending.roots,
1864 );
1865
1866 let settlement_root = if self.epoch_store.accumulators_enabled() {
1867 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1868 pending.roots.pop()
1869 else {
1870 fatal!("No settlement root found");
1871 };
1872 Some(settlement_root)
1873 } else {
1874 None
1875 };
1876
1877 let roots = &pending.roots;
1878
1879 self.metrics
1880 .checkpoint_roots_count
1881 .inc_by(roots.len() as u64);
1882
1883 let root_digests = self
1884 .epoch_store
1885 .notify_read_tx_key_to_digest(roots)
1886 .in_monitored_scope("CheckpointNotifyDigests")
1887 .await?;
1888 let root_effects = self
1889 .effects_store
1890 .notify_read_executed_effects(
1891 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1892 &root_digests,
1893 )
1894 .in_monitored_scope("CheckpointNotifyRead")
1895 .await;
1896
1897 assert!(
1898 self.epoch_store
1899 .protocol_config()
1900 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1901 );
1902
1903 let consensus_commit_prologue =
1906 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1907
1908 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1911 let unsorted_ccp = self.complete_checkpoint_effects(
1912 vec![ccp_effects.clone()],
1913 &mut effects_in_current_checkpoint,
1914 )?;
1915
1916 if unsorted_ccp.len() != 1 {
1919 fatal!(
1920 "Expected 1 consensus commit prologue, got {:?}",
1921 unsorted_ccp
1922 .iter()
1923 .map(|e| e.transaction_digest())
1924 .collect::<Vec<_>>()
1925 );
1926 }
1927 assert_eq!(unsorted_ccp.len(), 1);
1928 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1929 }
1930
1931 let unsorted =
1932 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1933
1934 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1935 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1936 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1937 if cfg!(debug_assertions) {
1938 for tx in unsorted.iter() {
1940 assert!(tx.transaction_digest() != &ccp_digest);
1941 }
1942 }
1943 sorted.push(ccp_effects);
1944 }
1945 sorted.extend(CausalOrder::causal_sort(unsorted));
1946
1947 if let Some(settlement_root) = settlement_root {
1948 let last_checkpoint =
1951 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1952 let next_checkpoint_seq = last_checkpoint
1953 .as_ref()
1954 .map(|(seq, _)| *seq)
1955 .unwrap_or_default()
1956 + 1;
1957 let tx_index_offset = tx_effects.len() as u64;
1958
1959 let (tx_key, settlement_effects) = self
1960 .construct_and_execute_settlement_transactions(
1961 &sorted,
1962 pending.details.checkpoint_height,
1963 next_checkpoint_seq,
1964 tx_index_offset,
1965 )
1966 .await;
1967 debug!(?tx_key, "executed settlement transactions");
1968
1969 assert_eq!(settlement_root, tx_key);
1970
1971 effects_in_current_checkpoint
1982 .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1983 sorted.extend(settlement_effects);
1984 }
1985
1986 #[cfg(msim)]
1987 {
1988 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1990 }
1991
1992 tx_effects.extend(sorted);
1993 tx_roots.extend(root_digests);
1994 }
1995
1996 Ok((tx_effects, tx_roots))
1997 }
1998
1999 #[instrument(level = "debug", skip_all)]
2002 async fn resolve_checkpoint_transactions_v2(
2003 &self,
2004 pending: PendingCheckpointV2,
2005 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
2006 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
2007
2008 debug!(
2009 checkpoint_commit_height = pending.details.checkpoint_height,
2010 "Resolving checkpoint transactions for pending checkpoint.",
2011 );
2012
2013 trace!(
2014 "roots for pending checkpoint {:?}: {:?}",
2015 pending.details.checkpoint_height, pending.roots,
2016 );
2017
2018 assert!(
2019 self.epoch_store
2020 .protocol_config()
2021 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2022 );
2023
2024 let mut all_effects: Vec<TransactionEffects> = Vec::new();
2025 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
2026
2027 for checkpoint_roots in &pending.roots {
2028 let tx_roots = &checkpoint_roots.tx_roots;
2029
2030 self.metrics
2031 .checkpoint_roots_count
2032 .inc_by(tx_roots.len() as u64);
2033
2034 let root_digests = self
2035 .epoch_store
2036 .notify_read_tx_key_to_digest(tx_roots)
2037 .in_monitored_scope("CheckpointNotifyDigests")
2038 .await?;
2039
2040 all_root_digests.extend(root_digests.iter().cloned());
2041
2042 let root_effects = self
2043 .effects_store
2044 .notify_read_executed_effects(
2045 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
2046 &root_digests,
2047 )
2048 .in_monitored_scope("CheckpointNotifyRead")
2049 .await;
2050 let consensus_commit_prologue =
2051 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
2052
2053 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
2054 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
2055 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
2056
2057 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
2058 let checkpoint_seq = pending
2059 .details
2060 .checkpoint_seq
2061 .expect("checkpoint_seq must be set");
2062 let tx_index_offset = all_effects.len() as u64;
2063 let effects = self
2064 .resolve_settlement_effects(
2065 *settlement_key,
2066 &sorted,
2067 checkpoint_roots.height,
2068 checkpoint_seq,
2069 tx_index_offset,
2070 )
2071 .await;
2072 sorted.extend(effects);
2073 }
2074
2075 #[cfg(msim)]
2076 {
2077 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
2078 }
2079
2080 all_effects.extend(sorted);
2081 }
2082 Ok((all_effects, all_root_digests.into_iter().collect()))
2083 }
2084
2085 async fn resolve_settlement_effects(
2090 &self,
2091 settlement_key: TransactionKey,
2092 sorted_root_effects: &[TransactionEffects],
2093 checkpoint_height: CheckpointHeight,
2094 checkpoint_seq: CheckpointSequenceNumber,
2095 tx_index_offset: u64,
2096 ) -> Vec<TransactionEffects> {
2097 let epoch = self.epoch_store.epoch();
2098 let accumulator_root_obj_initial_shared_version = self
2099 .epoch_store
2100 .epoch_start_config()
2101 .accumulator_root_obj_initial_shared_version()
2102 .expect("accumulator root object must exist");
2103
2104 let builder = AccumulatorSettlementTxBuilder::new(
2105 None,
2106 sorted_root_effects,
2107 checkpoint_seq,
2108 tx_index_offset,
2109 );
2110
2111 let settlement_digests: Vec<_> = builder
2112 .build_tx(
2113 self.epoch_store.protocol_config(),
2114 epoch,
2115 accumulator_root_obj_initial_shared_version,
2116 checkpoint_height,
2117 checkpoint_seq,
2118 )
2119 .into_iter()
2120 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2121 .collect();
2122
2123 debug!(
2124 ?settlement_digests,
2125 ?settlement_key,
2126 "reading settlement effects from cache"
2127 );
2128
2129 let settlement_effects = wait_for_effects_with_retry(
2130 self.effects_store.as_ref(),
2131 "CheckpointBuilder::settlement_effects",
2132 &settlement_digests,
2133 settlement_key,
2134 )
2135 .await;
2136
2137 let barrier_digest = *VerifiedTransaction::new_system_transaction(
2138 accumulators::build_accumulator_barrier_tx(
2139 epoch,
2140 accumulator_root_obj_initial_shared_version,
2141 checkpoint_height,
2142 &settlement_effects,
2143 ),
2144 )
2145 .digest();
2146
2147 let barrier_effects = wait_for_effects_with_retry(
2148 self.effects_store.as_ref(),
2149 "CheckpointBuilder::barrier_effects",
2150 &[barrier_digest],
2151 settlement_key,
2152 )
2153 .await;
2154
2155 settlement_effects
2156 .into_iter()
2157 .chain(barrier_effects)
2158 .collect()
2159 }
2160
2161 fn extract_consensus_commit_prologue(
2164 &self,
2165 root_digests: &[TransactionDigest],
2166 root_effects: &[TransactionEffects],
2167 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2168 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2169 if root_digests.is_empty() {
2170 return Ok(None);
2171 }
2172
2173 let first_tx = self
2177 .state
2178 .get_transaction_cache_reader()
2179 .get_transaction_block(&root_digests[0])
2180 .expect("Transaction block must exist");
2181
2182 Ok(first_tx
2183 .transaction_data()
2184 .is_consensus_commit_prologue()
2185 .then(|| {
2186 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2187 (*first_tx.digest(), root_effects[0].clone())
2188 }))
2189 }
2190
2191 #[instrument(level = "debug", skip_all)]
2192 async fn write_checkpoints(
2193 &mut self,
2194 height: CheckpointHeight,
2195 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2196 ) -> SuiResult {
2197 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2198 let mut batch = self.store.tables.checkpoint_content.batch();
2199 let mut all_tx_digests =
2200 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2201
2202 for (summary, contents) in &new_checkpoints {
2203 debug!(
2204 checkpoint_commit_height = height,
2205 checkpoint_seq = summary.sequence_number,
2206 contents_digest = ?contents.digest(),
2207 "writing checkpoint",
2208 );
2209
2210 if let Some(previously_computed_summary) = self
2211 .store
2212 .tables
2213 .locally_computed_checkpoints
2214 .get(&summary.sequence_number)?
2215 && previously_computed_summary.digest() != summary.digest()
2216 {
2217 fatal!(
2218 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2219 summary.sequence_number,
2220 previously_computed_summary.digest(),
2221 summary.digest()
2222 );
2223 }
2224
2225 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2226
2227 self.metrics
2228 .transactions_included_in_checkpoint
2229 .inc_by(contents.size() as u64);
2230 let sequence_number = summary.sequence_number;
2231 self.metrics
2232 .last_constructed_checkpoint
2233 .set(sequence_number as i64);
2234
2235 batch.insert_batch(
2236 &self.store.tables.checkpoint_content,
2237 [(contents.digest(), contents)],
2238 )?;
2239
2240 batch.insert_batch(
2241 &self.store.tables.locally_computed_checkpoints,
2242 [(sequence_number, summary)],
2243 )?;
2244 }
2245
2246 batch.write()?;
2247
2248 for (summary, contents) in &new_checkpoints {
2250 self.output
2251 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2252 .await?;
2253 }
2254
2255 for (local_checkpoint, _) in &new_checkpoints {
2256 if let Some(certified_checkpoint) = self
2257 .store
2258 .tables
2259 .certified_checkpoints
2260 .get(local_checkpoint.sequence_number())?
2261 {
2262 self.store
2263 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2264 }
2265 }
2266
2267 self.notify_aggregator.notify_one();
2268 self.epoch_store
2269 .process_constructed_checkpoint(height, new_checkpoints);
2270 Ok(())
2271 }
2272
2273 #[allow(clippy::type_complexity)]
2274 fn split_checkpoint_chunks(
2275 &self,
2276 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2277 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2278 ) -> CheckpointBuilderResult<
2279 Vec<
2280 Vec<(
2281 TransactionEffects,
2282 Vec<(GenericSignature, Option<SequenceNumber>)>,
2283 )>,
2284 >,
2285 > {
2286 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2287
2288 if self
2290 .epoch_store
2291 .protocol_config()
2292 .split_checkpoints_in_consensus_handler()
2293 {
2294 let chunk: Vec<_> = effects_and_transaction_sizes
2295 .into_iter()
2296 .zip_debug_eq(signatures)
2297 .map(|((effects, _size), sigs)| (effects, sigs))
2298 .collect();
2299 return Ok(vec![chunk]);
2300 }
2301 let mut chunks = Vec::new();
2302 let mut chunk = Vec::new();
2303 let mut chunk_size: usize = 0;
2304 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2305 .into_iter()
2306 .zip_debug_eq(signatures.into_iter())
2307 {
2308 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2313 bcs::serialized_size(&signatures)?
2314 } else {
2315 let signatures: Vec<&GenericSignature> =
2316 signatures.iter().map(|(s, _)| s).collect();
2317 bcs::serialized_size(&signatures)?
2318 };
2319 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2320 if chunk.len() == self.max_transactions_per_checkpoint
2321 || (chunk_size + size) > self.max_checkpoint_size_bytes
2322 {
2323 if chunk.is_empty() {
2324 warn!(
2326 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2327 self.max_checkpoint_size_bytes
2328 );
2329 } else {
2330 chunks.push(chunk);
2331 chunk = Vec::new();
2332 chunk_size = 0;
2333 }
2334 }
2335
2336 chunk.push((effects, signatures));
2337 chunk_size += size;
2338 }
2339
2340 if !chunk.is_empty() || chunks.is_empty() {
2341 chunks.push(chunk);
2346 }
2351 Ok(chunks)
2352 }
2353
2354 fn load_last_built_checkpoint_summary(
2355 epoch_store: &AuthorityPerEpochStore,
2356 store: &CheckpointStore,
2357 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2358 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2359 if last_checkpoint.is_none() {
2360 let epoch = epoch_store.epoch();
2361 if epoch > 0 {
2362 let previous_epoch = epoch - 1;
2363 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2364 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2365 if let Some((ref seq, _)) = last_checkpoint {
2366 debug!(
2367 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2368 );
2369 } else {
2370 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2372 }
2373 }
2374 }
2375 Ok(last_checkpoint)
2376 }
2377
2378 #[instrument(level = "debug", skip_all)]
2379 async fn create_checkpoints(
2380 &self,
2381 all_effects: Vec<TransactionEffects>,
2382 details: &PendingCheckpointInfo,
2383 all_roots: &HashSet<TransactionDigest>,
2384 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2385 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2386
2387 let total = all_effects.len();
2388 let mut last_checkpoint =
2389 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2390 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2391 debug!(
2392 checkpoint_commit_height = details.checkpoint_height,
2393 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2394 checkpoint_timestamp = details.timestamp_ms,
2395 "Creating checkpoint(s) for {} transactions",
2396 all_effects.len(),
2397 );
2398
2399 let all_digests: Vec<_> = all_effects
2400 .iter()
2401 .map(|effect| *effect.transaction_digest())
2402 .collect();
2403 let transactions_and_sizes = self
2404 .state
2405 .get_transaction_cache_reader()
2406 .get_transactions_and_serialized_sizes(&all_digests)?;
2407 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2408 let mut transactions = Vec::with_capacity(all_effects.len());
2409 let mut transaction_keys = Vec::with_capacity(all_effects.len());
2410 let mut randomness_rounds = BTreeMap::new();
2411 {
2412 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2413 debug!(
2414 ?last_checkpoint_seq,
2415 "Waiting for {:?} certificates to appear in consensus",
2416 all_effects.len()
2417 );
2418
2419 for (effects, transaction_and_size) in all_effects
2420 .into_iter()
2421 .zip_debug_eq(transactions_and_sizes.into_iter())
2422 {
2423 let (transaction, size) = transaction_and_size
2424 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2425 match transaction.inner().transaction_data().kind() {
2426 TransactionKind::ConsensusCommitPrologue(_)
2427 | TransactionKind::ConsensusCommitPrologueV2(_)
2428 | TransactionKind::ConsensusCommitPrologueV3(_)
2429 | TransactionKind::ConsensusCommitPrologueV4(_)
2430 | TransactionKind::AuthenticatorStateUpdate(_) => {
2431 }
2434 TransactionKind::ProgrammableSystemTransaction(_) => {
2435 }
2437 TransactionKind::ChangeEpoch(_)
2438 | TransactionKind::Genesis(_)
2439 | TransactionKind::EndOfEpochTransaction(_) => {
2440 fatal!(
2441 "unexpected transaction in checkpoint effects: {:?}",
2442 transaction
2443 );
2444 }
2445 TransactionKind::RandomnessStateUpdate(rsu) => {
2446 randomness_rounds
2447 .insert(*effects.transaction_digest(), rsu.randomness_round);
2448 }
2449 TransactionKind::ProgrammableTransaction(_) => {
2450 let digest = *effects.transaction_digest();
2454 if !all_roots.contains(&digest) {
2455 transaction_keys.push(SequencedConsensusTransactionKey::External(
2456 ConsensusTransactionKey::Certificate(digest),
2457 ));
2458 }
2459 }
2460 }
2461 transactions.push(transaction);
2462 all_effects_and_transaction_sizes.push((effects, size));
2463 }
2464
2465 self.epoch_store
2466 .consensus_messages_processed_notify(transaction_keys)
2467 .await?;
2468 }
2469
2470 let signatures = self
2471 .epoch_store
2472 .user_signatures_for_checkpoint(&transactions, &all_digests);
2473 debug!(
2474 ?last_checkpoint_seq,
2475 "Received {} checkpoint user signatures from consensus",
2476 signatures.len()
2477 );
2478
2479 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2480 Some(
2481 transactions
2482 .iter()
2483 .flat_map(|tx| {
2484 if let TransactionKind::ProgrammableTransaction(ptb) =
2485 tx.transaction_data().kind()
2486 {
2487 itertools::Either::Left(
2488 ptb.commands
2489 .iter()
2490 .map(ExecutionTimeObservationKey::from_command),
2491 )
2492 } else {
2493 itertools::Either::Right(std::iter::empty())
2494 }
2495 })
2496 .collect(),
2497 )
2498 } else {
2499 None
2500 };
2501
2502 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2503 let chunks_count = chunks.len();
2504
2505 let mut checkpoints = Vec::with_capacity(chunks_count);
2506 debug!(
2507 ?last_checkpoint_seq,
2508 "Creating {} checkpoints with {} transactions", chunks_count, total,
2509 );
2510
2511 let epoch = self.epoch_store.epoch();
2512 for (index, transactions) in chunks.into_iter().enumerate() {
2513 let first_checkpoint_of_epoch = index == 0
2514 && last_checkpoint
2515 .as_ref()
2516 .map(|(_, c)| c.epoch != epoch)
2517 .unwrap_or(true);
2518 if first_checkpoint_of_epoch {
2519 self.epoch_store
2520 .record_epoch_first_checkpoint_creation_time_metric();
2521 }
2522 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2523
2524 let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2525 preassigned_seq
2526 } else {
2527 last_checkpoint
2528 .as_ref()
2529 .map(|(_, c)| c.sequence_number + 1)
2530 .unwrap_or_default()
2531 };
2532 let mut timestamp_ms = details.timestamp_ms;
2533 if let Some((_, last_checkpoint)) = &last_checkpoint
2534 && last_checkpoint.timestamp_ms > timestamp_ms
2535 {
2536 debug!(
2538 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2539 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2540 );
2541 if self
2542 .epoch_store
2543 .protocol_config()
2544 .enforce_checkpoint_timestamp_monotonicity()
2545 {
2546 timestamp_ms = last_checkpoint.timestamp_ms;
2547 }
2548 }
2549
2550 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2551 let epoch_rolling_gas_cost_summary =
2552 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2553
2554 let end_of_epoch_data = if last_checkpoint_of_epoch {
2555 let system_state_obj = self
2556 .augment_epoch_last_checkpoint(
2557 &epoch_rolling_gas_cost_summary,
2558 timestamp_ms,
2559 &mut effects,
2560 &mut signatures,
2561 sequence_number,
2562 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2563 last_checkpoint_seq.unwrap_or_default(),
2564 )
2565 .await?;
2566
2567 let committee = system_state_obj
2568 .get_current_epoch_committee()
2569 .committee()
2570 .clone();
2571
2572 let root_state_digest = {
2575 let state_acc = self
2576 .global_state_hasher
2577 .upgrade()
2578 .expect("No checkpoints should be getting built after local configuration");
2579 let acc = state_acc.accumulate_checkpoint(
2580 &effects,
2581 sequence_number,
2582 &self.epoch_store,
2583 )?;
2584
2585 state_acc
2586 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2587 .await?;
2588
2589 state_acc.accumulate_running_root(
2590 &self.epoch_store,
2591 sequence_number,
2592 Some(acc),
2593 )?;
2594 state_acc
2595 .digest_epoch(self.epoch_store.clone(), sequence_number)
2596 .await?
2597 };
2598 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2599 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2600
2601 let epoch_commitments = if self
2602 .epoch_store
2603 .protocol_config()
2604 .check_commit_root_state_digest_supported()
2605 {
2606 vec![root_state_digest.into()]
2607 } else {
2608 vec![]
2609 };
2610
2611 Some(EndOfEpochData {
2612 next_epoch_committee: committee.voting_rights,
2613 next_epoch_protocol_version: ProtocolVersion::new(
2614 system_state_obj.protocol_version(),
2615 ),
2616 epoch_commitments,
2617 })
2618 } else {
2619 self.send_to_hasher
2620 .send((sequence_number, effects.clone()))
2621 .await?;
2622
2623 None
2624 };
2625 let contents = if self.epoch_store.protocol_config().address_aliases() {
2626 CheckpointContents::new_v2(&effects, signatures)
2627 } else {
2628 CheckpointContents::new_with_digests_and_signatures(
2629 effects.iter().map(TransactionEffects::execution_digests),
2630 signatures
2631 .into_iter()
2632 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2633 .collect(),
2634 )
2635 };
2636
2637 let num_txns = contents.size() as u64;
2638
2639 let network_total_transactions = last_checkpoint
2640 .as_ref()
2641 .map(|(_, c)| c.network_total_transactions + num_txns)
2642 .unwrap_or(num_txns);
2643
2644 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2645
2646 let matching_randomness_rounds: Vec<_> = effects
2647 .iter()
2648 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2649 .copied()
2650 .collect();
2651
2652 let checkpoint_commitments = if self
2653 .epoch_store
2654 .protocol_config()
2655 .include_checkpoint_artifacts_digest_in_summary()
2656 {
2657 let artifacts = CheckpointArtifacts::from(&effects[..]);
2658 let artifacts_digest = artifacts.digest()?;
2659 vec![artifacts_digest.into()]
2660 } else {
2661 Default::default()
2662 };
2663
2664 let summary = CheckpointSummary::new(
2665 self.epoch_store.protocol_config(),
2666 epoch,
2667 sequence_number,
2668 network_total_transactions,
2669 &contents,
2670 previous_digest,
2671 epoch_rolling_gas_cost_summary,
2672 end_of_epoch_data,
2673 timestamp_ms,
2674 matching_randomness_rounds,
2675 checkpoint_commitments,
2676 );
2677 summary.report_checkpoint_age(
2678 &self.metrics.last_created_checkpoint_age,
2679 &self.metrics.last_created_checkpoint_age_ms,
2680 );
2681 if last_checkpoint_of_epoch {
2682 info!(
2683 checkpoint_seq = sequence_number,
2684 "creating last checkpoint of epoch {}", epoch
2685 );
2686 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2687 self.epoch_store
2688 .report_epoch_metrics_at_last_checkpoint(stats);
2689 }
2690 }
2691 last_checkpoint = Some((sequence_number, summary.clone()));
2692 checkpoints.push((summary, contents));
2693 }
2694
2695 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2696 }
2697
2698 fn get_epoch_total_gas_cost(
2699 &self,
2700 last_checkpoint: Option<&CheckpointSummary>,
2701 cur_checkpoint_effects: &[TransactionEffects],
2702 ) -> GasCostSummary {
2703 let (previous_epoch, previous_gas_costs) = last_checkpoint
2704 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2705 .unwrap_or_default();
2706 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2707 if previous_epoch == self.epoch_store.epoch() {
2708 GasCostSummary::new(
2710 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2711 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2712 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2713 previous_gas_costs.non_refundable_storage_fee
2714 + current_gas_costs.non_refundable_storage_fee,
2715 )
2716 } else {
2717 current_gas_costs
2718 }
2719 }
2720
2721 #[instrument(level = "error", skip_all)]
2722 async fn augment_epoch_last_checkpoint(
2723 &self,
2724 epoch_total_gas_cost: &GasCostSummary,
2725 epoch_start_timestamp_ms: CheckpointTimestamp,
2726 checkpoint_effects: &mut Vec<TransactionEffects>,
2727 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2728 checkpoint: CheckpointSequenceNumber,
2729 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2730 last_checkpoint: CheckpointSequenceNumber,
2733 ) -> CheckpointBuilderResult<SuiSystemState> {
2734 let (system_state, effects) = self
2735 .state
2736 .create_and_execute_advance_epoch_tx(
2737 &self.epoch_store,
2738 epoch_total_gas_cost,
2739 checkpoint,
2740 epoch_start_timestamp_ms,
2741 end_of_epoch_observation_keys,
2742 last_checkpoint,
2743 )
2744 .await?;
2745 checkpoint_effects.push(effects);
2746 signatures.push(vec![]);
2747 Ok(system_state)
2748 }
2749
2750 #[instrument(level = "debug", skip_all)]
2757 fn complete_checkpoint_effects(
2758 &self,
2759 mut roots: Vec<TransactionEffects>,
2760 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2761 ) -> SuiResult<Vec<TransactionEffects>> {
2762 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2763 let mut results = vec![];
2764 let mut seen = HashSet::new();
2765 loop {
2766 let mut pending = HashSet::new();
2767
2768 let transactions_included = self
2769 .epoch_store
2770 .builder_included_transactions_in_checkpoint(
2771 roots.iter().map(|e| e.transaction_digest()),
2772 )?;
2773
2774 for (effect, tx_included) in roots
2775 .into_iter()
2776 .zip_debug_eq(transactions_included.into_iter())
2777 {
2778 let digest = effect.transaction_digest();
2779 seen.insert(*digest);
2781
2782 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2784 continue;
2785 }
2786
2787 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2789 continue;
2790 }
2791
2792 let existing_effects = self
2793 .epoch_store
2794 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2795
2796 for (dependency, effects_signature_exists) in effect
2797 .dependencies()
2798 .iter()
2799 .zip_debug_eq(existing_effects.iter())
2800 {
2801 if !effects_signature_exists {
2806 continue;
2807 }
2808 if seen.insert(*dependency) {
2809 pending.insert(*dependency);
2810 }
2811 }
2812 results.push(effect);
2813 }
2814 if pending.is_empty() {
2815 break;
2816 }
2817 let pending = pending.into_iter().collect::<Vec<_>>();
2818 let effects = self.effects_store.multi_get_executed_effects(&pending);
2819 let effects = effects
2820 .into_iter()
2821 .zip_debug_eq(pending)
2822 .map(|(opt, digest)| match opt {
2823 Some(x) => x,
2824 None => panic!(
2825 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2826 digest
2827 ),
2828 })
2829 .collect::<Vec<_>>();
2830 roots = effects;
2831 }
2832
2833 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2834 Ok(results)
2835 }
2836
2837 #[cfg(msim)]
2840 fn expensive_consensus_commit_prologue_invariants_check(
2841 &self,
2842 root_digests: &[TransactionDigest],
2843 sorted: &[TransactionEffects],
2844 ) {
2845 let root_txs = self
2847 .state
2848 .get_transaction_cache_reader()
2849 .multi_get_transaction_blocks(root_digests);
2850 let ccps = root_txs
2851 .iter()
2852 .filter_map(|tx| {
2853 if let Some(tx) = tx {
2854 if tx.transaction_data().is_consensus_commit_prologue() {
2855 Some(tx)
2856 } else {
2857 None
2858 }
2859 } else {
2860 None
2861 }
2862 })
2863 .collect::<Vec<_>>();
2864
2865 assert!(ccps.len() <= 1);
2867
2868 let txs = self
2870 .state
2871 .get_transaction_cache_reader()
2872 .multi_get_transaction_blocks(
2873 &sorted
2874 .iter()
2875 .map(|tx| tx.transaction_digest().clone())
2876 .collect::<Vec<_>>(),
2877 );
2878
2879 if ccps.len() == 0 {
2880 for tx in txs.iter() {
2883 if let Some(tx) = tx {
2884 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2885 }
2886 }
2887 } else {
2888 assert!(
2890 txs[0]
2891 .as_ref()
2892 .unwrap()
2893 .transaction_data()
2894 .is_consensus_commit_prologue()
2895 );
2896
2897 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2898
2899 for tx in txs.iter().skip(1) {
2900 if let Some(tx) = tx {
2901 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2902 }
2903 }
2904 }
2905 }
2906}
2907
2908async fn wait_for_effects_with_retry(
2909 effects_store: &dyn TransactionCacheRead,
2910 task_name: &'static str,
2911 digests: &[TransactionDigest],
2912 tx_key: TransactionKey,
2913) -> Vec<TransactionEffects> {
2914 let delay = if in_antithesis() {
2915 15
2917 } else {
2918 5
2919 };
2920 loop {
2921 match tokio::time::timeout(Duration::from_secs(delay), async {
2922 effects_store
2923 .notify_read_executed_effects(task_name, digests)
2924 .await
2925 })
2926 .await
2927 {
2928 Ok(effects) => break effects,
2929 Err(_) => {
2930 debug_fatal!(
2931 "Timeout waiting for transactions to be executed {:?}, retrying...",
2932 tx_key
2933 );
2934 }
2935 }
2936 }
2937}
2938
2939impl CheckpointAggregator {
2940 fn new(
2941 tables: Arc<CheckpointStore>,
2942 epoch_store: Arc<AuthorityPerEpochStore>,
2943 notify: Arc<Notify>,
2944 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2945 output: Box<dyn CertifiedCheckpointOutput>,
2946 state: Arc<AuthorityState>,
2947 metrics: Arc<CheckpointMetrics>,
2948 ) -> Self {
2949 Self {
2950 store: tables,
2951 epoch_store,
2952 notify,
2953 receiver,
2954 pending: BTreeMap::new(),
2955 current: None,
2956 output,
2957 state,
2958 metrics,
2959 }
2960 }
2961
2962 async fn run(mut self) {
2963 info!("Starting CheckpointAggregator");
2964 loop {
2965 while let Ok(sig) = self.receiver.try_recv() {
2967 self.pending
2968 .entry(sig.summary.sequence_number)
2969 .or_default()
2970 .push(sig);
2971 }
2972
2973 if let Err(e) = self.run_and_notify().await {
2974 error!(
2975 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2976 e
2977 );
2978 self.metrics.checkpoint_errors.inc();
2979 tokio::time::sleep(Duration::from_secs(1)).await;
2980 continue;
2981 }
2982
2983 tokio::select! {
2984 Some(sig) = self.receiver.recv() => {
2985 self.pending
2986 .entry(sig.summary.sequence_number)
2987 .or_default()
2988 .push(sig);
2989 }
2990 _ = self.notify.notified() => {}
2991 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2992 }
2993 }
2994 }
2995
2996 async fn run_and_notify(&mut self) -> SuiResult {
2997 let summaries = self.run_inner()?;
2998 for summary in summaries {
2999 self.output.certified_checkpoint_created(&summary).await?;
3000 }
3001 Ok(())
3002 }
3003
3004 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
3005 let _scope = monitored_scope("CheckpointAggregator");
3006 let mut result = vec![];
3007 'outer: loop {
3008 let next_to_certify = self.next_checkpoint_to_certify()?;
3009 self.pending.retain(|&seq, _| seq >= next_to_certify);
3012 let current = if let Some(current) = &mut self.current {
3013 if current.summary.sequence_number < next_to_certify {
3019 assert_reachable!("skip checkpoint certification");
3020 self.current = None;
3021 continue;
3022 }
3023 current
3024 } else {
3025 let Some(summary) = self
3026 .epoch_store
3027 .get_built_checkpoint_summary(next_to_certify)?
3028 else {
3029 return Ok(result);
3030 };
3031 self.current = Some(CheckpointSignatureAggregator {
3032 digest: summary.digest(),
3033 summary,
3034 signatures_by_digest: MultiStakeAggregator::new(
3035 self.epoch_store.committee().clone(),
3036 ),
3037 store: self.store.clone(),
3038 state: self.state.clone(),
3039 metrics: self.metrics.clone(),
3040 });
3041 self.current.as_mut().unwrap()
3042 };
3043
3044 let seq = current.summary.sequence_number;
3045 let sigs = self.pending.remove(&seq).unwrap_or_default();
3046 if sigs.is_empty() {
3047 trace!(
3048 checkpoint_seq =? seq,
3049 "Not enough checkpoint signatures",
3050 );
3051 return Ok(result);
3052 }
3053 for data in sigs {
3054 trace!(
3055 checkpoint_seq = seq,
3056 "Processing signature for checkpoint (digest: {:?}) from {:?}",
3057 current.summary.digest(),
3058 data.summary.auth_sig().authority.concise()
3059 );
3060 self.metrics
3061 .checkpoint_participation
3062 .with_label_values(&[&format!(
3063 "{:?}",
3064 data.summary.auth_sig().authority.concise()
3065 )])
3066 .inc();
3067 if let Ok(auth_signature) = current.try_aggregate(data) {
3068 debug!(
3069 checkpoint_seq = seq,
3070 "Successfully aggregated signatures for checkpoint (digest: {:?})",
3071 current.summary.digest(),
3072 );
3073 let summary = VerifiedCheckpoint::new_unchecked(
3074 CertifiedCheckpointSummary::new_from_data_and_sig(
3075 current.summary.clone(),
3076 auth_signature,
3077 ),
3078 );
3079
3080 self.store.insert_certified_checkpoint(&summary)?;
3081 self.metrics.last_certified_checkpoint.set(seq as i64);
3082 current.summary.report_checkpoint_age(
3083 &self.metrics.last_certified_checkpoint_age,
3084 &self.metrics.last_certified_checkpoint_age_ms,
3085 );
3086 result.push(summary.into_inner());
3087 self.current = None;
3088 continue 'outer;
3089 }
3090 }
3091 break;
3092 }
3093 Ok(result)
3094 }
3095
3096 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
3097 Ok(self
3098 .store
3099 .tables
3100 .certified_checkpoints
3101 .reversed_safe_iter_with_bounds(None, None)?
3102 .next()
3103 .transpose()?
3104 .map(|(seq, _)| seq + 1)
3105 .unwrap_or_default())
3106 }
3107}
3108
3109impl CheckpointSignatureAggregator {
3110 #[allow(clippy::result_unit_err)]
3111 pub fn try_aggregate(
3112 &mut self,
3113 data: CheckpointSignatureMessage,
3114 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3115 let their_digest = *data.summary.digest();
3116 let (_, signature) = data.summary.into_data_and_sig();
3117 let author = signature.authority;
3118 let envelope =
3119 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3120 match self.signatures_by_digest.insert(their_digest, envelope) {
3121 InsertResult::Failed { error }
3123 if matches!(
3124 error.as_inner(),
3125 SuiErrorKind::StakeAggregatorRepeatedSigner {
3126 conflicting_sig: false,
3127 ..
3128 },
3129 ) =>
3130 {
3131 Err(())
3132 }
3133 InsertResult::Failed { error } => {
3134 warn!(
3135 checkpoint_seq = self.summary.sequence_number,
3136 "Failed to aggregate new signature from validator {:?}: {:?}",
3137 author.concise(),
3138 error
3139 );
3140 self.check_for_split_brain();
3141 Err(())
3142 }
3143 InsertResult::QuorumReached(cert) => {
3144 if their_digest != self.digest {
3147 self.metrics.remote_checkpoint_forks.inc();
3148 warn!(
3149 checkpoint_seq = self.summary.sequence_number,
3150 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3151 author.concise(),
3152 their_digest,
3153 self.digest
3154 );
3155 return Err(());
3156 }
3157 Ok(cert)
3158 }
3159 InsertResult::NotEnoughVotes {
3160 bad_votes: _,
3161 bad_authorities: _,
3162 } => {
3163 self.check_for_split_brain();
3164 Err(())
3165 }
3166 }
3167 }
3168
3169 fn check_for_split_brain(&self) {
3173 debug!(
3174 checkpoint_seq = self.summary.sequence_number,
3175 "Checking for split brain condition"
3176 );
3177 if self.signatures_by_digest.quorum_unreachable() {
3178 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3184 let digests_by_stake_messages = all_unique_values
3185 .iter()
3186 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3187 .map(|(digest, (_authorities, total_stake))| {
3188 format!("{:?} (total stake: {})", digest, total_stake)
3189 })
3190 .collect::<Vec<String>>();
3191 fail_point_arg!("kill_split_brain_node", |(
3192 checkpoint_overrides,
3193 forked_authorities,
3194 ): (
3195 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3196 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3197 )| {
3198 #[cfg(msim)]
3199 {
3200 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3201 (checkpoint_overrides.lock(), forked_authorities.lock())
3202 {
3203 let correct_digest = all_unique_values
3205 .iter()
3206 .find(|(_, (authorities, _))| {
3207 authorities
3209 .iter()
3210 .any(|auth| !forked_authorities_set.contains(auth))
3211 })
3212 .map(|(digest, _)| digest.to_string())
3213 .unwrap_or_else(|| {
3214 all_unique_values
3216 .iter()
3217 .max_by_key(|(_, (_, stake))| *stake)
3218 .map(|(digest, _)| digest.to_string())
3219 .unwrap_or_else(|| self.digest.to_string())
3220 });
3221
3222 overrides.insert(self.summary.sequence_number, correct_digest.clone());
3223
3224 tracing::error!(
3225 fatal = true,
3226 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3227 self.summary.sequence_number,
3228 correct_digest
3229 );
3230 }
3231 }
3232 });
3233
3234 debug_fatal!(
3235 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3236 self.summary.sequence_number,
3237 self.signatures_by_digest.uncommitted_stake(),
3238 digests_by_stake_messages
3239 );
3240 self.metrics.split_brain_checkpoint_forks.inc();
3241
3242 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3243 let local_summary = self.summary.clone();
3244 let state = self.state.clone();
3245 let tables = self.store.clone();
3246
3247 tokio::spawn(async move {
3248 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3249 });
3250 }
3251 }
3252}
3253
3254async fn diagnose_split_brain(
3260 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3261 local_summary: CheckpointSummary,
3262 state: Arc<AuthorityState>,
3263 tables: Arc<CheckpointStore>,
3264) {
3265 debug!(
3266 checkpoint_seq = local_summary.sequence_number,
3267 "Running split brain diagnostics..."
3268 );
3269 let time = SystemTime::now();
3270 let digest_to_validator = all_unique_values
3272 .iter()
3273 .filter_map(|(digest, (validators, _))| {
3274 if *digest != local_summary.digest() {
3275 let random_validator = validators.choose(&mut get_rng()).unwrap();
3276 Some((*digest, *random_validator))
3277 } else {
3278 None
3279 }
3280 })
3281 .collect::<HashMap<_, _>>();
3282 if digest_to_validator.is_empty() {
3283 panic!(
3284 "Given split brain condition, there should be at \
3285 least one validator that disagrees with local signature"
3286 );
3287 }
3288
3289 let epoch_store = state.load_epoch_store_one_call_per_task();
3290 let committee = epoch_store
3291 .epoch_start_state()
3292 .get_sui_committee_with_network_metadata();
3293 let network_config = default_mysten_network_config();
3294 let network_clients =
3295 make_network_authority_clients_with_network_config(&committee, &network_config);
3296
3297 let response_futures = digest_to_validator
3299 .values()
3300 .cloned()
3301 .map(|validator| {
3302 let client = network_clients
3303 .get(&validator)
3304 .expect("Failed to get network client");
3305 let request = CheckpointRequestV2 {
3306 sequence_number: Some(local_summary.sequence_number),
3307 request_content: true,
3308 certified: false,
3309 };
3310 client.handle_checkpoint_v2(request)
3311 })
3312 .collect::<Vec<_>>();
3313
3314 let digest_name_pair = digest_to_validator.iter();
3315 let response_data = futures::future::join_all(response_futures)
3316 .await
3317 .into_iter()
3318 .zip_debug_eq(digest_name_pair)
3319 .filter_map(|(response, (digest, name))| match response {
3320 Ok(response) => match response {
3321 CheckpointResponseV2 {
3322 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3323 contents: Some(contents),
3324 } => Some((*name, *digest, summary, contents)),
3325 CheckpointResponseV2 {
3326 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3327 contents: _,
3328 } => {
3329 panic!("Expected pending checkpoint, but got certified checkpoint");
3330 }
3331 CheckpointResponseV2 {
3332 checkpoint: None,
3333 contents: _,
3334 } => {
3335 error!(
3336 "Summary for checkpoint {:?} not found on validator {:?}",
3337 local_summary.sequence_number, name
3338 );
3339 None
3340 }
3341 CheckpointResponseV2 {
3342 checkpoint: _,
3343 contents: None,
3344 } => {
3345 error!(
3346 "Contents for checkpoint {:?} not found on validator {:?}",
3347 local_summary.sequence_number, name
3348 );
3349 None
3350 }
3351 },
3352 Err(e) => {
3353 error!(
3354 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3355 e
3356 );
3357 None
3358 }
3359 })
3360 .collect::<Vec<_>>();
3361
3362 let local_checkpoint_contents = tables
3363 .get_checkpoint_contents(&local_summary.content_digest)
3364 .unwrap_or_else(|_| {
3365 panic!(
3366 "Could not find checkpoint contents for digest {:?}",
3367 local_summary.digest()
3368 )
3369 })
3370 .unwrap_or_else(|| {
3371 panic!(
3372 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3373 local_summary.sequence_number,
3374 local_summary.digest()
3375 )
3376 });
3377 let local_contents_text = format!("{local_checkpoint_contents:?}");
3378
3379 let local_summary_text = format!("{local_summary:?}");
3380 let local_validator = state.name.concise();
3381 let diff_patches = response_data
3382 .iter()
3383 .map(|(name, other_digest, other_summary, contents)| {
3384 let other_contents_text = format!("{contents:?}");
3385 let other_summary_text = format!("{other_summary:?}");
3386 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3387 .enumerate_transactions(&local_summary)
3388 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3389 .unzip();
3390 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3391 .enumerate_transactions(other_summary)
3392 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3393 .unzip();
3394 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3395 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3396 let local_transactions_text = format!("{local_transactions:#?}");
3397 let other_transactions_text = format!("{other_transactions:#?}");
3398 let transactions_patch =
3399 create_patch(&local_transactions_text, &other_transactions_text);
3400 let local_effects_text = format!("{local_effects:#?}");
3401 let other_effects_text = format!("{other_effects:#?}");
3402 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3403 let seq_number = local_summary.sequence_number;
3404 let local_digest = local_summary.digest();
3405 let other_validator = name.concise();
3406 format!(
3407 "Checkpoint: {seq_number:?}\n\
3408 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3409 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3410 Summary Diff: \n{summary_patch}\n\n\
3411 Contents Diff: \n{contents_patch}\n\n\
3412 Transactions Diff: \n{transactions_patch}\n\n\
3413 Effects Diff: \n{effects_patch}",
3414 )
3415 })
3416 .collect::<Vec<_>>()
3417 .join("\n\n\n");
3418
3419 let header = format!(
3420 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3421 Datetime: {:?}",
3422 time
3423 );
3424 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3425 let path = tempfile::tempdir()
3426 .expect("Failed to create tempdir")
3427 .keep()
3428 .join(Path::new("checkpoint_fork_dump.txt"));
3429 let mut file = File::create(path).unwrap();
3430 write!(file, "{}", fork_logs_text).unwrap();
3431 debug!("{}", fork_logs_text);
3432}
3433
3434pub trait CheckpointServiceNotify {
3435 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3436
3437 fn notify_checkpoint(&self) -> SuiResult;
3438}
3439
3440#[allow(clippy::large_enum_variant)]
3441enum CheckpointServiceState {
3442 Unstarted(
3443 (
3444 CheckpointBuilder,
3445 CheckpointAggregator,
3446 CheckpointStateHasher,
3447 ),
3448 ),
3449 Started,
3450}
3451
3452impl CheckpointServiceState {
3453 fn take_unstarted(
3454 &mut self,
3455 ) -> (
3456 CheckpointBuilder,
3457 CheckpointAggregator,
3458 CheckpointStateHasher,
3459 ) {
3460 let mut state = CheckpointServiceState::Started;
3461 std::mem::swap(self, &mut state);
3462
3463 match state {
3464 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3465 (builder, aggregator, hasher)
3466 }
3467 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3468 }
3469 }
3470}
3471
3472pub struct CheckpointService {
3473 tables: Arc<CheckpointStore>,
3474 notify_builder: Arc<Notify>,
3475 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3476 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3478 highest_previously_built_seq: CheckpointSequenceNumber,
3481 metrics: Arc<CheckpointMetrics>,
3482 state: Mutex<CheckpointServiceState>,
3483}
3484
3485impl CheckpointService {
3486 #[allow(clippy::disallowed_methods)]
3492 pub fn build(
3493 state: Arc<AuthorityState>,
3494 checkpoint_store: Arc<CheckpointStore>,
3495 epoch_store: Arc<AuthorityPerEpochStore>,
3496 effects_store: Arc<dyn TransactionCacheRead>,
3497 global_state_hasher: Weak<GlobalStateHasher>,
3498 checkpoint_output: Box<dyn CheckpointOutput>,
3499 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3500 metrics: Arc<CheckpointMetrics>,
3501 max_transactions_per_checkpoint: usize,
3502 max_checkpoint_size_bytes: usize,
3503 ) -> Arc<Self> {
3504 info!(
3505 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3506 );
3507 Self::initialize_accumulator_account_metrics(&state, &epoch_store, &metrics);
3508 let notify_builder = Arc::new(Notify::new());
3509 let notify_aggregator = Arc::new(Notify::new());
3510
3511 let highest_previously_built_seq = checkpoint_store
3513 .get_latest_locally_computed_checkpoint()
3514 .expect("failed to get latest locally computed checkpoint")
3515 .map(|s| s.sequence_number)
3516 .unwrap_or(0);
3517
3518 let highest_currently_built_seq =
3519 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3520 .expect("epoch should not have ended")
3521 .map(|(seq, _)| seq)
3522 .unwrap_or(0);
3523
3524 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3525
3526 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3527
3528 let aggregator = CheckpointAggregator::new(
3529 checkpoint_store.clone(),
3530 epoch_store.clone(),
3531 notify_aggregator.clone(),
3532 signature_receiver,
3533 certified_checkpoint_output,
3534 state.clone(),
3535 metrics.clone(),
3536 );
3537
3538 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3539
3540 let ckpt_state_hasher = CheckpointStateHasher::new(
3541 epoch_store.clone(),
3542 global_state_hasher.clone(),
3543 receive_from_builder,
3544 );
3545
3546 let builder = CheckpointBuilder::new(
3547 state.clone(),
3548 checkpoint_store.clone(),
3549 epoch_store.clone(),
3550 notify_builder.clone(),
3551 effects_store,
3552 global_state_hasher,
3553 send_to_hasher,
3554 checkpoint_output,
3555 notify_aggregator.clone(),
3556 highest_currently_built_seq_tx.clone(),
3557 metrics.clone(),
3558 max_transactions_per_checkpoint,
3559 max_checkpoint_size_bytes,
3560 );
3561
3562 Arc::new(Self {
3563 tables: checkpoint_store,
3564 notify_builder,
3565 signature_sender,
3566 highest_currently_built_seq_tx,
3567 highest_previously_built_seq,
3568 metrics,
3569 state: Mutex::new(CheckpointServiceState::Unstarted((
3570 builder,
3571 aggregator,
3572 ckpt_state_hasher,
3573 ))),
3574 })
3575 }
3576
3577 fn initialize_accumulator_account_metrics(
3578 state: &AuthorityState,
3579 epoch_store: &AuthorityPerEpochStore,
3580 metrics: &CheckpointMetrics,
3581 ) {
3582 if !epoch_store.protocol_config().enable_accumulators() {
3583 return;
3584 }
3585
3586 let object_store = state.get_object_store();
3587 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
3588 Ok(Some(count)) => metrics.initialize_accumulator_accounts_live(count),
3589 Ok(None) => {}
3590 Err(e) => fatal!("failed to initialize accumulator account metrics: {e}"),
3591 }
3592 }
3593
3594 pub async fn spawn(
3602 &self,
3603 epoch_store: Arc<AuthorityPerEpochStore>,
3604 consensus_replay_waiter: Option<ReplayWaiter>,
3605 ) {
3606 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3607
3608 let last_persisted_builder_seq = epoch_store
3618 .last_persisted_checkpoint_builder_summary()
3619 .expect("epoch should not have ended")
3620 .map(|s| s.summary.sequence_number);
3621
3622 let last_executed_seq = self
3623 .tables
3624 .get_highest_executed_checkpoint()
3625 .expect("Failed to get highest executed checkpoint")
3626 .map(|checkpoint| *checkpoint.sequence_number());
3627
3628 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3629 if let Err(e) = builder
3630 .epoch_store
3631 .clear_state_hashes_after_checkpoint(last_committed_seq)
3632 {
3633 error!(
3634 "Failed to clear state hashes after checkpoint {}: {:?}",
3635 last_committed_seq, e
3636 );
3637 } else {
3638 info!(
3639 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3640 last_committed_seq
3641 );
3642 }
3643 }
3644
3645 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3646
3647 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3648 let aggregator_task = spawn_monitored_task!(aggregator.run());
3649
3650 spawn_monitored_task!(async move {
3651 epoch_store
3652 .within_alive_epoch(async move {
3653 builder.run(consensus_replay_waiter).await;
3654 builder_finished_tx.send(()).ok();
3655 })
3656 .await
3657 .ok();
3658
3659 state_hasher_task
3661 .await
3662 .expect("state hasher should exit normally");
3663
3664 aggregator_task.abort();
3667 aggregator_task.await.ok();
3668 });
3669
3670 if tokio::time::timeout(Duration::from_secs(120), async move {
3676 tokio::select! {
3677 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3678 _ = self.wait_for_rebuilt_checkpoints() => (),
3679 }
3680 })
3681 .await
3682 .is_err()
3683 {
3684 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3685 }
3686 }
3687}
3688
3689impl CheckpointService {
3690 pub async fn wait_for_rebuilt_checkpoints(&self) {
3696 let highest_previously_built_seq = self.highest_previously_built_seq;
3697 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3698 let mut highest_currently_built_seq = *rx.borrow_and_update();
3699 info!(
3700 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3701 );
3702 loop {
3703 if highest_currently_built_seq >= highest_previously_built_seq {
3704 info!("Checkpoint rebuild complete");
3705 break;
3706 }
3707 rx.changed().await.unwrap();
3708 highest_currently_built_seq = *rx.borrow_and_update();
3709 }
3710 }
3711
3712 #[cfg(test)]
3713 fn write_and_notify_checkpoint_for_testing(
3714 &self,
3715 epoch_store: &AuthorityPerEpochStore,
3716 checkpoint: PendingCheckpoint,
3717 ) -> SuiResult {
3718 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3719
3720 let mut output = ConsensusCommitOutput::new(0);
3721 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3722 output.set_default_commit_stats_for_testing();
3723 epoch_store.push_consensus_output_for_tests(output);
3724 self.notify_checkpoint()?;
3725 Ok(())
3726 }
3727}
3728
3729impl CheckpointServiceNotify for CheckpointService {
3730 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3731 let sequence = info.summary.sequence_number;
3732 let signer = info.summary.auth_sig().authority.concise();
3733
3734 if let Some(highest_verified_checkpoint) = self
3735 .tables
3736 .get_highest_verified_checkpoint()?
3737 .map(|x| *x.sequence_number())
3738 && sequence <= highest_verified_checkpoint
3739 {
3740 trace!(
3741 checkpoint_seq = sequence,
3742 "Ignore checkpoint signature from {} - already certified", signer,
3743 );
3744 self.metrics
3745 .last_ignored_checkpoint_signature_received
3746 .set(sequence as i64);
3747 return Ok(());
3748 }
3749 trace!(
3750 checkpoint_seq = sequence,
3751 "Received checkpoint signature, digest {} from {}",
3752 info.summary.digest(),
3753 signer,
3754 );
3755 self.metrics
3756 .last_received_checkpoint_signatures
3757 .with_label_values(&[&signer.to_string()])
3758 .set(sequence as i64);
3759 self.signature_sender.send(info.clone()).ok();
3760 Ok(())
3761 }
3762
3763 fn notify_checkpoint(&self) -> SuiResult {
3764 self.notify_builder.notify_one();
3765 Ok(())
3766 }
3767}
3768
3769pub struct CheckpointServiceNoop {}
3771impl CheckpointServiceNotify for CheckpointServiceNoop {
3772 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3773 Ok(())
3774 }
3775
3776 fn notify_checkpoint(&self) -> SuiResult {
3777 Ok(())
3778 }
3779}
3780
3781impl PendingCheckpoint {
3782 pub fn height(&self) -> CheckpointHeight {
3783 self.details.checkpoint_height
3784 }
3785
3786 pub fn roots(&self) -> &Vec<TransactionKey> {
3787 &self.roots
3788 }
3789
3790 pub fn details(&self) -> &PendingCheckpointInfo {
3791 &self.details
3792 }
3793}
3794
3795impl PendingCheckpointV2 {
3796 pub fn height(&self) -> CheckpointHeight {
3797 self.details.checkpoint_height
3798 }
3799
3800 pub(crate) fn num_roots(&self) -> usize {
3801 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3802 }
3803}
3804
3805pin_project! {
3806 pub struct PollCounter<Fut> {
3807 #[pin]
3808 future: Fut,
3809 count: usize,
3810 }
3811}
3812
3813impl<Fut> PollCounter<Fut> {
3814 pub fn new(future: Fut) -> Self {
3815 Self { future, count: 0 }
3816 }
3817
3818 pub fn count(&self) -> usize {
3819 self.count
3820 }
3821}
3822
3823impl<Fut: Future> Future for PollCounter<Fut> {
3824 type Output = (usize, Fut::Output);
3825
3826 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3827 let this = self.project();
3828 *this.count += 1;
3829 match this.future.poll(cx) {
3830 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3831 Poll::Pending => Poll::Pending,
3832 }
3833 }
3834}
3835
3836fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3837 PollCounter::new(future)
3838}
3839
3840#[cfg(test)]
3841mod tests {
3842 use super::*;
3843 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3844 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3845 use futures::FutureExt as _;
3846 use futures::future::BoxFuture;
3847 use std::collections::HashMap;
3848 use std::ops::Deref;
3849 use sui_macros::sim_test;
3850 use sui_protocol_config::{Chain, ProtocolConfig};
3851 use sui_types::accumulator_event::AccumulatorEvent;
3852 use sui_types::authenticator_state::ActiveJwk;
3853 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3854 use sui_types::crypto::Signature;
3855 use sui_types::effects::{TransactionEffects, TransactionEvents};
3856 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3857 use sui_types::transaction::VerifiedTransaction;
3858 use tokio::sync::mpsc;
3859
3860 #[tokio::test]
3861 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3862 let store = CheckpointStore::new_for_tests();
3863 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3864 for seq in 70u64..=80u64 {
3865 let contents =
3866 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3867 [sui_types::base_types::ExecutionDigests::new(
3868 sui_types::digests::TransactionDigest::random(),
3869 sui_types::digests::TransactionEffectsDigest::ZERO,
3870 )],
3871 );
3872 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3873 &protocol,
3874 0,
3875 seq,
3876 0,
3877 &contents,
3878 None,
3879 sui_types::gas::GasCostSummary::default(),
3880 None,
3881 0,
3882 Vec::new(),
3883 Vec::new(),
3884 );
3885 store
3886 .tables
3887 .locally_computed_checkpoints
3888 .insert(&seq, &summary)
3889 .unwrap();
3890 }
3891
3892 store
3893 .clear_locally_computed_checkpoints_from(76)
3894 .expect("clear should succeed");
3895
3896 assert!(
3898 store
3899 .tables
3900 .locally_computed_checkpoints
3901 .get(&75)
3902 .unwrap()
3903 .is_some()
3904 );
3905 assert!(
3906 store
3907 .tables
3908 .locally_computed_checkpoints
3909 .get(&76)
3910 .unwrap()
3911 .is_none()
3912 );
3913
3914 for seq in 70u64..76u64 {
3915 assert!(
3916 store
3917 .tables
3918 .locally_computed_checkpoints
3919 .get(&seq)
3920 .unwrap()
3921 .is_some()
3922 );
3923 }
3924 for seq in 76u64..=80u64 {
3925 assert!(
3926 store
3927 .tables
3928 .locally_computed_checkpoints
3929 .get(&seq)
3930 .unwrap()
3931 .is_none()
3932 );
3933 }
3934 }
3935
3936 #[tokio::test]
3937 async fn test_fork_detection_storage() {
3938 let store = CheckpointStore::new_for_tests();
3939 let seq_num = 42;
3941 let digest = CheckpointDigest::random();
3942
3943 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3944
3945 store
3946 .record_checkpoint_fork_detected(seq_num, digest)
3947 .unwrap();
3948
3949 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3950 assert!(retrieved.is_some());
3951 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3952 assert_eq!(retrieved_seq, seq_num);
3953 assert_eq!(retrieved_digest, digest);
3954
3955 store.clear_checkpoint_fork_detected().unwrap();
3956 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3957
3958 let tx_digest = TransactionDigest::random();
3960 let expected_effects = TransactionEffectsDigest::random();
3961 let actual_effects = TransactionEffectsDigest::random();
3962
3963 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3964
3965 store
3966 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3967 .unwrap();
3968
3969 let retrieved = store.get_transaction_fork_detected().unwrap();
3970 assert!(retrieved.is_some());
3971 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3972 assert_eq!(retrieved_tx, tx_digest);
3973 assert_eq!(retrieved_expected, expected_effects);
3974 assert_eq!(retrieved_actual, actual_effects);
3975
3976 store.clear_transaction_fork_detected().unwrap();
3977 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3978 }
3979
3980 #[sim_test]
3981 pub async fn checkpoint_builder_test() {
3982 telemetry_subscribers::init_for_testing();
3983
3984 let mut protocol_config =
3985 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3986 protocol_config.disable_accumulators_for_testing();
3987 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3988 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3989 let state = TestAuthorityBuilder::new()
3990 .with_protocol_config(protocol_config)
3991 .build()
3992 .await;
3993
3994 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3995 0,
3996 0,
3997 vec![],
3998 SequenceNumber::new(),
3999 );
4000
4001 let jwks = {
4002 let mut jwks = Vec::new();
4003 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
4004 jwks.push(ActiveJwk {
4005 jwk_id: JwkId::new(
4006 "https://accounts.google.com".to_string(),
4007 "1234567890".to_string(),
4008 ),
4009 jwk: JWK {
4010 kty: "RSA".to_string(),
4011 e: "AQAB".to_string(),
4012 n: "1234567890".to_string(),
4013 alg: "RS256".to_string(),
4014 },
4015 epoch: 0,
4016 });
4017 }
4018 jwks
4019 };
4020
4021 let dummy_tx_with_data =
4022 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
4023
4024 for i in 0..15 {
4025 state
4026 .database_for_testing()
4027 .perpetual_tables
4028 .transactions
4029 .insert(&d(i), dummy_tx.serializable_ref())
4030 .unwrap();
4031 }
4032 for i in 15..20 {
4033 state
4034 .database_for_testing()
4035 .perpetual_tables
4036 .transactions
4037 .insert(&d(i), dummy_tx_with_data.serializable_ref())
4038 .unwrap();
4039 }
4040
4041 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
4042 commit_cert_for_test(
4043 &mut store,
4044 state.clone(),
4045 d(1),
4046 vec![d(2), d(3)],
4047 GasCostSummary::new(11, 12, 11, 1),
4048 );
4049 commit_cert_for_test(
4050 &mut store,
4051 state.clone(),
4052 d(2),
4053 vec![d(3), d(4)],
4054 GasCostSummary::new(21, 22, 21, 1),
4055 );
4056 commit_cert_for_test(
4057 &mut store,
4058 state.clone(),
4059 d(3),
4060 vec![],
4061 GasCostSummary::new(31, 32, 31, 1),
4062 );
4063 commit_cert_for_test(
4064 &mut store,
4065 state.clone(),
4066 d(4),
4067 vec![],
4068 GasCostSummary::new(41, 42, 41, 1),
4069 );
4070 for i in [5, 6, 7, 10, 11, 12, 13] {
4071 commit_cert_for_test(
4072 &mut store,
4073 state.clone(),
4074 d(i),
4075 vec![],
4076 GasCostSummary::new(41, 42, 41, 1),
4077 );
4078 }
4079 for i in [15, 16, 17] {
4080 commit_cert_for_test(
4081 &mut store,
4082 state.clone(),
4083 d(i),
4084 vec![],
4085 GasCostSummary::new(51, 52, 51, 1),
4086 );
4087 }
4088 let all_digests: Vec<_> = store.keys().copied().collect();
4089 for digest in all_digests {
4090 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
4091 state
4092 .epoch_store_for_testing()
4093 .test_insert_user_signature(digest, vec![(signature, None)]);
4094 }
4095
4096 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
4097 let (certified_output, mut certified_result) =
4098 mpsc::channel::<CertifiedCheckpointSummary>(10);
4099 let store = Arc::new(store);
4100
4101 let ckpt_dir = tempfile::tempdir().unwrap();
4102 let checkpoint_store =
4103 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
4104 let epoch_store = state.epoch_store_for_testing();
4105
4106 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
4107 state.get_global_state_hash_store().clone(),
4108 ));
4109
4110 let checkpoint_service = CheckpointService::build(
4111 state.clone(),
4112 checkpoint_store,
4113 epoch_store.clone(),
4114 store,
4115 Arc::downgrade(&global_state_hasher),
4116 Box::new(output),
4117 Box::new(certified_output),
4118 CheckpointMetrics::new_for_tests(),
4119 3,
4120 100_000,
4121 );
4122 checkpoint_service.spawn(epoch_store.clone(), None).await;
4123
4124 checkpoint_service
4125 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4126 .unwrap();
4127 checkpoint_service
4128 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4129 .unwrap();
4130 checkpoint_service
4131 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4132 .unwrap();
4133 checkpoint_service
4134 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4135 .unwrap();
4136 checkpoint_service
4137 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4138 .unwrap();
4139 checkpoint_service
4140 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4141 .unwrap();
4142
4143 let (c1c, c1s) = result.recv().await.unwrap();
4144 let (c2c, c2s) = result.recv().await.unwrap();
4145
4146 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4147 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4148 assert_eq!(c1t, vec![d(4)]);
4149 assert_eq!(c1s.previous_digest, None);
4150 assert_eq!(c1s.sequence_number, 0);
4151 assert_eq!(
4152 c1s.epoch_rolling_gas_cost_summary,
4153 GasCostSummary::new(41, 42, 41, 1)
4154 );
4155
4156 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4157 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4158 assert_eq!(c2s.sequence_number, 1);
4159 assert_eq!(
4160 c2s.epoch_rolling_gas_cost_summary,
4161 GasCostSummary::new(104, 108, 104, 4)
4162 );
4163
4164 let (c3c, c3s) = result.recv().await.unwrap();
4167 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4168 let (c4c, c4s) = result.recv().await.unwrap();
4169 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4170 assert_eq!(c3s.sequence_number, 2);
4171 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4172 assert_eq!(c4s.sequence_number, 3);
4173 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4174 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4175 assert_eq!(c4t, vec![d(13)]);
4176
4177 let (c5c, c5s) = result.recv().await.unwrap();
4180 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4181 let (c6c, c6s) = result.recv().await.unwrap();
4182 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4183 assert_eq!(c5s.sequence_number, 4);
4184 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4185 assert_eq!(c6s.sequence_number, 5);
4186 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4187 assert_eq!(c5t, vec![d(15), d(16)]);
4188 assert_eq!(c6t, vec![d(17)]);
4189
4190 let (c7c, c7s) = result.recv().await.unwrap();
4193 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4194 assert_eq!(c7t, vec![d(5), d(6)]);
4195 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4196 assert_eq!(c7s.sequence_number, 6);
4197
4198 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4199 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4200
4201 checkpoint_service
4202 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4203 .unwrap();
4204 checkpoint_service
4205 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4206 .unwrap();
4207
4208 let c1sc = certified_result.recv().await.unwrap();
4209 let c2sc = certified_result.recv().await.unwrap();
4210 assert_eq!(c1sc.sequence_number, 0);
4211 assert_eq!(c2sc.sequence_number, 1);
4212 }
4213
4214 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4215 fn notify_read_executed_effects_may_fail(
4216 &self,
4217 _: &str,
4218 digests: &[TransactionDigest],
4219 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4220 std::future::ready(Ok(digests
4221 .iter()
4222 .map(|d| self.get(d).expect("effects not found").clone())
4223 .collect()))
4224 .boxed()
4225 }
4226
4227 fn notify_read_executed_effects_digests(
4228 &self,
4229 _: &str,
4230 digests: &[TransactionDigest],
4231 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4232 std::future::ready(
4233 digests
4234 .iter()
4235 .map(|d| {
4236 self.get(d)
4237 .map(|fx| fx.digest())
4238 .expect("effects not found")
4239 })
4240 .collect(),
4241 )
4242 .boxed()
4243 }
4244
4245 fn multi_get_executed_effects(
4246 &self,
4247 digests: &[TransactionDigest],
4248 ) -> Vec<Option<TransactionEffects>> {
4249 digests.iter().map(|d| self.get(d).cloned()).collect()
4250 }
4251
4252 fn multi_get_transaction_blocks(
4258 &self,
4259 _: &[TransactionDigest],
4260 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4261 unimplemented!()
4262 }
4263
4264 fn multi_get_executed_effects_digests(
4265 &self,
4266 _: &[TransactionDigest],
4267 ) -> Vec<Option<TransactionEffectsDigest>> {
4268 unimplemented!()
4269 }
4270
4271 fn multi_get_effects(
4272 &self,
4273 _: &[TransactionEffectsDigest],
4274 ) -> Vec<Option<TransactionEffects>> {
4275 unimplemented!()
4276 }
4277
4278 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4279 unimplemented!()
4280 }
4281
4282 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4283 unimplemented!()
4284 }
4285
4286 fn get_unchanged_loaded_runtime_objects(
4287 &self,
4288 _digest: &TransactionDigest,
4289 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4290 unimplemented!()
4291 }
4292
4293 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4294 unimplemented!()
4295 }
4296 }
4297
4298 #[async_trait::async_trait]
4299 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4300 async fn checkpoint_created(
4301 &self,
4302 summary: &CheckpointSummary,
4303 contents: &CheckpointContents,
4304 _epoch_store: &Arc<AuthorityPerEpochStore>,
4305 _checkpoint_store: &Arc<CheckpointStore>,
4306 ) -> SuiResult {
4307 self.try_send((contents.clone(), summary.clone())).unwrap();
4308 Ok(())
4309 }
4310 }
4311
4312 #[async_trait::async_trait]
4313 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4314 async fn certified_checkpoint_created(
4315 &self,
4316 summary: &CertifiedCheckpointSummary,
4317 ) -> SuiResult {
4318 self.try_send(summary.clone()).unwrap();
4319 Ok(())
4320 }
4321 }
4322
4323 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4324 PendingCheckpoint {
4325 roots: t
4326 .into_iter()
4327 .map(|t| TransactionKey::Digest(d(t)))
4328 .collect(),
4329 details: PendingCheckpointInfo {
4330 timestamp_ms,
4331 last_of_epoch: false,
4332 checkpoint_height: i,
4333 consensus_commit_ref: CommitRef::default(),
4334 rejected_transactions_digest: Digest::default(),
4335 checkpoint_seq: None,
4336 },
4337 }
4338 }
4339
4340 fn d(i: u8) -> TransactionDigest {
4341 let mut bytes: [u8; 32] = Default::default();
4342 bytes[0] = i;
4343 TransactionDigest::new(bytes)
4344 }
4345
4346 fn e(
4347 transaction_digest: TransactionDigest,
4348 dependencies: Vec<TransactionDigest>,
4349 gas_used: GasCostSummary,
4350 ) -> TransactionEffects {
4351 let mut effects = TransactionEffects::default();
4352 *effects.transaction_digest_mut_for_testing() = transaction_digest;
4353 *effects.dependencies_mut_for_testing() = dependencies;
4354 *effects.gas_cost_summary_mut_for_testing() = gas_used;
4355 effects
4356 }
4357
4358 fn commit_cert_for_test(
4359 store: &mut HashMap<TransactionDigest, TransactionEffects>,
4360 state: Arc<AuthorityState>,
4361 digest: TransactionDigest,
4362 dependencies: Vec<TransactionDigest>,
4363 gas_used: GasCostSummary,
4364 ) {
4365 let epoch_store = state.epoch_store_for_testing();
4366 let effects = e(digest, dependencies, gas_used);
4367 store.insert(digest, effects.clone());
4368 epoch_store.insert_executed_in_epoch(&digest);
4369 }
4370}