sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116    // Pre-assigned checkpoint sequence number from consensus handler.
117    // Only set when split_checkpoints_in_consensus_handler is enabled.
118    pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123    pub roots: Vec<TransactionKey>,
124    pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129    pub tx_roots: Vec<TransactionKey>,
130    pub settlement_root: Option<TransactionKey>,
131    pub height: CheckpointHeight,
132}
133
134/// PendingCheckpointV2 is merged and split in ConsensusHandler
135/// instead of CheckpointBuilder. It contains 1 or more
136/// CheckpointRoots which represents a group of transactions
137/// settled together.
138/// Usage of PendingCheckpointV2 is gated by split_checkpoints_in_consensus_handler
139/// We need to support dual implementations until this feature flag is removed.
140#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142    pub roots: Vec<CheckpointRoots>,
143    pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148    pub summary: CheckpointSummary,
149    // Height at which this checkpoint summary was built. None for genesis checkpoint
150    pub checkpoint_height: Option<CheckpointHeight>,
151    pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157    /// Maps checkpoint contents digest to checkpoint contents
158    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160    /// Maps checkpoint contents digest to checkpoint sequence number
161    pub(crate) checkpoint_sequence_by_contents_digest:
162        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
165    /// efficient reads of full checkpoints. Entries from this table are deleted after state
166    /// accumulation has completed.
167    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
169    // deprecate this table (and remove when possible).
170    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172    /// Stores certified checkpoints
173    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174    /// Map from checkpoint digest to certified checkpoint
175    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
178    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
179    /// certified checkpoint.
180    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
183    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185    /// Watermarks used to determine the highest verified, fully synced, and
186    /// fully executed checkpoints
187    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189    /// Stores transaction fork detection information
190    pub(crate) transaction_fork_detected: DBMap<
191        u8,
192        (
193            TransactionDigest,
194            TransactionEffectsDigest,
195            TransactionEffectsDigest,
196        ),
197    >,
198    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203    DBOptions {
204        options: default_db_options().options,
205        // We have seen potential data corruption issues in this table after forced shutdowns
206        // so we enable value hash logging to help with debugging.
207        // TODO: remove this once we have a better understanding of the root cause.
208        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209    }
210}
211
212impl CheckpointStoreTables {
213    #[cfg(not(tidehunter))]
214    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216    }
217
218    #[cfg(tidehunter)]
219    pub fn new(
220        path: &Path,
221        metric_name: &'static str,
222        pruner_watermarks: Arc<PrunerWatermarks>,
223    ) -> Self {
224        tracing::warn!("Checkpoint DB using tidehunter");
225        use crate::authority::authority_store_pruner::apply_relocation_filter;
226        use typed_store::tidehunter_util::{
227            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228            default_mutex_count, default_value_cache_size,
229        };
230        let mutexes = default_mutex_count() * 4;
231        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232        let override_dirty_keys_config = KeySpaceConfig::new()
233            .with_max_dirty_keys(64_000)
234            .with_value_cache_size(default_value_cache_size());
235        let config_u64 = ThConfig::new_with_config(
236            8,
237            mutexes,
238            u64_sequence_key,
239            override_dirty_keys_config.clone(),
240        );
241        let digest_config = ThConfig::new_with_rm_prefix(
242            32,
243            mutexes,
244            KeyType::uniform(default_cells_per_mutex()),
245            KeySpaceConfig::default(),
246            vec![0, 0, 0, 0, 0, 0, 0, 32],
247        );
248        let watermarks_config = KeySpaceConfig::new()
249            .with_value_cache_size(10)
250            .disable_unload();
251        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
252        let configs = vec![
253            (
254                "checkpoint_content",
255                digest_config.clone().with_config(
256                    lru_config
257                        .clone()
258                        .with_relocation_filter(|_, _| Decision::Remove),
259                ),
260            ),
261            (
262                "checkpoint_sequence_by_contents_digest",
263                digest_config.clone().with_config(apply_relocation_filter(
264                    KeySpaceConfig::default(),
265                    pruner_watermarks.checkpoint_id.clone(),
266                    |sequence_number: CheckpointSequenceNumber| sequence_number,
267                    false,
268                )),
269            ),
270            (
271                "full_checkpoint_content",
272                config_u64.clone().with_config(apply_relocation_filter(
273                    override_dirty_keys_config.clone(),
274                    pruner_watermarks.checkpoint_id.clone(),
275                    |sequence_number: CheckpointSequenceNumber| sequence_number,
276                    true,
277                )),
278            ),
279            ("certified_checkpoints", config_u64.clone()),
280            (
281                "checkpoint_by_digest",
282                digest_config.clone().with_config(apply_relocation_filter(
283                    lru_config,
284                    pruner_watermarks.epoch_id.clone(),
285                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
286                    false,
287                )),
288            ),
289            (
290                "locally_computed_checkpoints",
291                config_u64.clone().with_config(apply_relocation_filter(
292                    override_dirty_keys_config.clone(),
293                    pruner_watermarks.checkpoint_id.clone(),
294                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
295                    true,
296                )),
297            ),
298            ("epoch_last_checkpoint_map", config_u64.clone()),
299            (
300                "watermarks",
301                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
302            ),
303            (
304                "transaction_fork_detected",
305                ThConfig::new_with_config(
306                    1,
307                    1,
308                    KeyType::uniform(1),
309                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
310                ),
311            ),
312            (
313                "full_checkpoint_content_v2",
314                config_u64.clone().with_config(apply_relocation_filter(
315                    override_dirty_keys_config.clone(),
316                    pruner_watermarks.checkpoint_id.clone(),
317                    |sequence_number: CheckpointSequenceNumber| sequence_number,
318                    true,
319                )),
320            ),
321        ];
322        Self::open_tables_read_write(
323            path.to_path_buf(),
324            MetricConf::new(metric_name),
325            configs
326                .into_iter()
327                .map(|(cf, config)| (cf.to_string(), config))
328                .collect(),
329        )
330    }
331
332    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
333        Self::get_read_only_handle(
334            path.to_path_buf(),
335            None,
336            None,
337            MetricConf::new("checkpoint_readonly"),
338        )
339    }
340}
341
342pub struct CheckpointStore {
343    pub(crate) tables: CheckpointStoreTables,
344    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
345    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
346}
347
348impl CheckpointStore {
349    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
350        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
351        Arc::new(Self {
352            tables,
353            synced_checkpoint_notify_read: NotifyRead::new(),
354            executed_checkpoint_notify_read: NotifyRead::new(),
355        })
356    }
357
358    pub fn new_for_tests() -> Arc<Self> {
359        let ckpt_dir = mysten_common::tempdir().unwrap();
360        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
361    }
362
363    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
364        let tables = CheckpointStoreTables::new(
365            path,
366            "db_checkpoint",
367            Arc::new(PrunerWatermarks::default()),
368        );
369        Arc::new(Self {
370            tables,
371            synced_checkpoint_notify_read: NotifyRead::new(),
372            executed_checkpoint_notify_read: NotifyRead::new(),
373        })
374    }
375
376    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
377        CheckpointStoreTables::open_readonly(path)
378    }
379
380    #[instrument(level = "info", skip_all)]
381    pub fn insert_genesis_checkpoint(
382        &self,
383        checkpoint: VerifiedCheckpoint,
384        contents: CheckpointContents,
385        epoch_store: &AuthorityPerEpochStore,
386    ) {
387        assert_eq!(
388            checkpoint.epoch(),
389            0,
390            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
391        );
392        assert_eq!(
393            *checkpoint.sequence_number(),
394            0,
395            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
396        );
397
398        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
399        match self.get_checkpoint_by_sequence_number(0).unwrap() {
400            Some(existing_checkpoint) => {
401                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
402            }
403            None => {
404                if epoch_store.epoch() == checkpoint.epoch {
405                    epoch_store
406                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
407                        .unwrap();
408                } else {
409                    debug!(
410                        validator_epoch =% epoch_store.epoch(),
411                        genesis_epoch =% checkpoint.epoch(),
412                        "Not inserting checkpoint builder data for genesis checkpoint",
413                    );
414                }
415                self.insert_checkpoint_contents(contents).unwrap();
416                self.insert_verified_checkpoint(&checkpoint).unwrap();
417                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
418            }
419        }
420    }
421
422    pub fn get_checkpoint_by_digest(
423        &self,
424        digest: &CheckpointDigest,
425    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
426        self.tables
427            .checkpoint_by_digest
428            .get(digest)
429            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
430    }
431
432    pub fn get_checkpoint_by_sequence_number(
433        &self,
434        sequence_number: CheckpointSequenceNumber,
435    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
436        self.tables
437            .certified_checkpoints
438            .get(&sequence_number)
439            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
440    }
441
442    pub fn get_locally_computed_checkpoint(
443        &self,
444        sequence_number: CheckpointSequenceNumber,
445    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
446        self.tables
447            .locally_computed_checkpoints
448            .get(&sequence_number)
449    }
450
451    pub fn multi_get_locally_computed_checkpoints(
452        &self,
453        sequence_numbers: &[CheckpointSequenceNumber],
454    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
455        let checkpoints = self
456            .tables
457            .locally_computed_checkpoints
458            .multi_get(sequence_numbers)?;
459
460        Ok(checkpoints)
461    }
462
463    pub fn get_sequence_number_by_contents_digest(
464        &self,
465        digest: &CheckpointContentsDigest,
466    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
467        self.tables
468            .checkpoint_sequence_by_contents_digest
469            .get(digest)
470    }
471
472    pub fn delete_contents_digest_sequence_number_mapping(
473        &self,
474        digest: &CheckpointContentsDigest,
475    ) -> Result<(), TypedStoreError> {
476        self.tables
477            .checkpoint_sequence_by_contents_digest
478            .remove(digest)
479    }
480
481    pub fn get_latest_certified_checkpoint(
482        &self,
483    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
484        Ok(self
485            .tables
486            .certified_checkpoints
487            .reversed_safe_iter_with_bounds(None, None)?
488            .next()
489            .transpose()?
490            .map(|(_, v)| v.into()))
491    }
492
493    pub fn get_latest_locally_computed_checkpoint(
494        &self,
495    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
496        Ok(self
497            .tables
498            .locally_computed_checkpoints
499            .reversed_safe_iter_with_bounds(None, None)?
500            .next()
501            .transpose()?
502            .map(|(_, v)| v))
503    }
504
505    pub fn multi_get_checkpoint_by_sequence_number(
506        &self,
507        sequence_numbers: &[CheckpointSequenceNumber],
508    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
509        let checkpoints = self
510            .tables
511            .certified_checkpoints
512            .multi_get(sequence_numbers)?
513            .into_iter()
514            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
515            .collect();
516
517        Ok(checkpoints)
518    }
519
520    pub fn multi_get_checkpoint_content(
521        &self,
522        contents_digest: &[CheckpointContentsDigest],
523    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
524        self.tables.checkpoint_content.multi_get(contents_digest)
525    }
526
527    pub fn get_highest_verified_checkpoint(
528        &self,
529    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
530        let highest_verified = if let Some(highest_verified) = self
531            .tables
532            .watermarks
533            .get(&CheckpointWatermark::HighestVerified)?
534        {
535            highest_verified
536        } else {
537            return Ok(None);
538        };
539        self.get_checkpoint_by_digest(&highest_verified.1)
540    }
541
542    pub fn get_highest_synced_checkpoint(
543        &self,
544    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
545        let highest_synced = if let Some(highest_synced) = self
546            .tables
547            .watermarks
548            .get(&CheckpointWatermark::HighestSynced)?
549        {
550            highest_synced
551        } else {
552            return Ok(None);
553        };
554        self.get_checkpoint_by_digest(&highest_synced.1)
555    }
556
557    pub fn get_highest_synced_checkpoint_seq_number(
558        &self,
559    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
560        if let Some(highest_synced) = self
561            .tables
562            .watermarks
563            .get(&CheckpointWatermark::HighestSynced)?
564        {
565            Ok(Some(highest_synced.0))
566        } else {
567            Ok(None)
568        }
569    }
570
571    pub fn get_highest_executed_checkpoint_seq_number(
572        &self,
573    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
574        if let Some(highest_executed) = self
575            .tables
576            .watermarks
577            .get(&CheckpointWatermark::HighestExecuted)?
578        {
579            Ok(Some(highest_executed.0))
580        } else {
581            Ok(None)
582        }
583    }
584
585    pub fn get_highest_executed_checkpoint(
586        &self,
587    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
588        let highest_executed = if let Some(highest_executed) = self
589            .tables
590            .watermarks
591            .get(&CheckpointWatermark::HighestExecuted)?
592        {
593            highest_executed
594        } else {
595            return Ok(None);
596        };
597        self.get_checkpoint_by_digest(&highest_executed.1)
598    }
599
600    pub fn get_highest_pruned_checkpoint_seq_number(
601        &self,
602    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
603        self.tables
604            .watermarks
605            .get(&CheckpointWatermark::HighestPruned)
606            .map(|watermark| watermark.map(|w| w.0))
607    }
608
609    pub fn get_checkpoint_contents(
610        &self,
611        digest: &CheckpointContentsDigest,
612    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
613        self.tables.checkpoint_content.get(digest)
614    }
615
616    pub fn get_full_checkpoint_contents_by_sequence_number(
617        &self,
618        seq: CheckpointSequenceNumber,
619    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
620        self.tables.full_checkpoint_content_v2.get(&seq)
621    }
622
623    fn prune_local_summaries(&self) -> SuiResult {
624        if let Some((last_local_summary, _)) = self
625            .tables
626            .locally_computed_checkpoints
627            .reversed_safe_iter_with_bounds(None, None)?
628            .next()
629            .transpose()?
630        {
631            let mut batch = self.tables.locally_computed_checkpoints.batch();
632            batch.schedule_delete_range(
633                &self.tables.locally_computed_checkpoints,
634                &0,
635                &last_local_summary,
636            )?;
637            batch.write()?;
638            info!("Pruned local summaries up to {:?}", last_local_summary);
639        }
640        Ok(())
641    }
642
643    pub fn clear_locally_computed_checkpoints_from(
644        &self,
645        from_seq: CheckpointSequenceNumber,
646    ) -> SuiResult {
647        let keys: Vec<_> = self
648            .tables
649            .locally_computed_checkpoints
650            .safe_iter_with_bounds(Some(from_seq), None)
651            .map(|r| r.map(|(k, _)| k))
652            .collect::<Result<_, _>>()?;
653        if let Some(&last_local_summary) = keys.last() {
654            let mut batch = self.tables.locally_computed_checkpoints.batch();
655            batch
656                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
657                .expect("Failed to delete locally computed checkpoints");
658            batch
659                .write()
660                .expect("Failed to delete locally computed checkpoints");
661            warn!(
662                from_seq,
663                last_local_summary,
664                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
665                from_seq,
666                last_local_summary
667            );
668        }
669        Ok(())
670    }
671
672    fn check_for_checkpoint_fork(
673        &self,
674        local_checkpoint: &CheckpointSummary,
675        verified_checkpoint: &VerifiedCheckpoint,
676    ) {
677        if local_checkpoint != verified_checkpoint.data() {
678            let verified_contents = self
679                .get_checkpoint_contents(&verified_checkpoint.content_digest)
680                .map(|opt_contents| {
681                    opt_contents
682                        .map(|contents| format!("{:?}", contents))
683                        .unwrap_or_else(|| {
684                            format!(
685                                "Verified checkpoint contents not found, digest: {:?}",
686                                verified_checkpoint.content_digest,
687                            )
688                        })
689                })
690                .map_err(|e| {
691                    format!(
692                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
693                        verified_checkpoint.content_digest, e
694                    )
695                })
696                .unwrap_or_else(|err_msg| err_msg);
697
698            let local_contents = self
699                .get_checkpoint_contents(&local_checkpoint.content_digest)
700                .map(|opt_contents| {
701                    opt_contents
702                        .map(|contents| format!("{:?}", contents))
703                        .unwrap_or_else(|| {
704                            format!(
705                                "Local checkpoint contents not found, digest: {:?}",
706                                local_checkpoint.content_digest
707                            )
708                        })
709                })
710                .map_err(|e| {
711                    format!(
712                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
713                        local_checkpoint.content_digest, e
714                    )
715                })
716                .unwrap_or_else(|err_msg| err_msg);
717
718            // checkpoint contents may be too large for panic message.
719            error!(
720                verified_checkpoint = ?verified_checkpoint.data(),
721                ?verified_contents,
722                ?local_checkpoint,
723                ?local_contents,
724                "Local checkpoint fork detected!",
725            );
726
727            // Record the fork in the database before crashing
728            if let Err(e) = self.record_checkpoint_fork_detected(
729                *local_checkpoint.sequence_number(),
730                local_checkpoint.digest(),
731            ) {
732                error!("Failed to record checkpoint fork in database: {:?}", e);
733            }
734
735            fail_point_arg!(
736                "kill_checkpoint_fork_node",
737                |checkpoint_overrides: std::sync::Arc<
738                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
739                >| {
740                    #[cfg(msim)]
741                    {
742                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
743                            overrides.insert(
744                                local_checkpoint.sequence_number,
745                                verified_checkpoint.digest().to_string(),
746                            );
747                        }
748                        tracing::error!(
749                            fatal = true,
750                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
751                            local_checkpoint.sequence_number(),
752                            verified_checkpoint.digest()
753                        );
754                        sui_simulator::task::shutdown_current_node();
755                    }
756                }
757            );
758
759            fatal!(
760                "Local checkpoint fork detected for sequence number: {}",
761                local_checkpoint.sequence_number()
762            );
763        }
764    }
765
766    // Called by consensus (ConsensusAggregator).
767    // Different from `insert_verified_checkpoint`, it does not touch
768    // the highest_verified_checkpoint watermark such that state sync
769    // will have a chance to process this checkpoint and perform some
770    // state-sync only things.
771    pub fn insert_certified_checkpoint(
772        &self,
773        checkpoint: &VerifiedCheckpoint,
774    ) -> Result<(), TypedStoreError> {
775        debug!(
776            checkpoint_seq = checkpoint.sequence_number(),
777            "Inserting certified checkpoint",
778        );
779        let mut batch = self.tables.certified_checkpoints.batch();
780        batch
781            .insert_batch(
782                &self.tables.certified_checkpoints,
783                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
784            )?
785            .insert_batch(
786                &self.tables.checkpoint_by_digest,
787                [(checkpoint.digest(), checkpoint.serializable_ref())],
788            )?;
789        if checkpoint.next_epoch_committee().is_some() {
790            batch.insert_batch(
791                &self.tables.epoch_last_checkpoint_map,
792                [(&checkpoint.epoch(), checkpoint.sequence_number())],
793            )?;
794        }
795        batch.write()?;
796
797        if let Some(local_checkpoint) = self
798            .tables
799            .locally_computed_checkpoints
800            .get(checkpoint.sequence_number())?
801        {
802            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
803        }
804
805        Ok(())
806    }
807
808    // Called by state sync, apart from inserting the checkpoint and updating
809    // related tables, it also bumps the highest_verified_checkpoint watermark.
810    #[instrument(level = "debug", skip_all)]
811    pub fn insert_verified_checkpoint(
812        &self,
813        checkpoint: &VerifiedCheckpoint,
814    ) -> Result<(), TypedStoreError> {
815        self.insert_certified_checkpoint(checkpoint)?;
816        self.update_highest_verified_checkpoint(checkpoint)
817    }
818
819    pub fn update_highest_verified_checkpoint(
820        &self,
821        checkpoint: &VerifiedCheckpoint,
822    ) -> Result<(), TypedStoreError> {
823        if Some(*checkpoint.sequence_number())
824            > self
825                .get_highest_verified_checkpoint()?
826                .map(|x| *x.sequence_number())
827        {
828            debug!(
829                checkpoint_seq = checkpoint.sequence_number(),
830                "Updating highest verified checkpoint",
831            );
832            self.tables.watermarks.insert(
833                &CheckpointWatermark::HighestVerified,
834                &(*checkpoint.sequence_number(), *checkpoint.digest()),
835            )?;
836        }
837
838        Ok(())
839    }
840
841    pub fn update_highest_synced_checkpoint(
842        &self,
843        checkpoint: &VerifiedCheckpoint,
844    ) -> Result<(), TypedStoreError> {
845        let seq = *checkpoint.sequence_number();
846        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
847        self.tables.watermarks.insert(
848            &CheckpointWatermark::HighestSynced,
849            &(seq, *checkpoint.digest()),
850        )?;
851        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
852        Ok(())
853    }
854
855    async fn notify_read_checkpoint_watermark<F>(
856        &self,
857        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
858        seq: CheckpointSequenceNumber,
859        get_watermark: F,
860    ) -> VerifiedCheckpoint
861    where
862        F: Fn() -> Option<CheckpointSequenceNumber>,
863    {
864        notify_read
865            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
866                let seq = seqs[0];
867                let Some(highest) = get_watermark() else {
868                    return vec![None];
869                };
870                if highest < seq {
871                    return vec![None];
872                }
873                let checkpoint = self
874                    .get_checkpoint_by_sequence_number(seq)
875                    .expect("db error")
876                    .expect("checkpoint not found");
877                vec![Some(checkpoint)]
878            })
879            .await
880            .into_iter()
881            .next()
882            .unwrap()
883    }
884
885    pub async fn notify_read_synced_checkpoint(
886        &self,
887        seq: CheckpointSequenceNumber,
888    ) -> VerifiedCheckpoint {
889        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
890            self.get_highest_synced_checkpoint_seq_number()
891                .expect("db error")
892        })
893        .await
894    }
895
896    pub async fn notify_read_executed_checkpoint(
897        &self,
898        seq: CheckpointSequenceNumber,
899    ) -> VerifiedCheckpoint {
900        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
901            self.get_highest_executed_checkpoint_seq_number()
902                .expect("db error")
903        })
904        .await
905    }
906
907    pub fn update_highest_executed_checkpoint(
908        &self,
909        checkpoint: &VerifiedCheckpoint,
910    ) -> Result<(), TypedStoreError> {
911        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
912            if seq_number >= *checkpoint.sequence_number() {
913                return Ok(());
914            }
915            assert_eq!(
916                seq_number + 1,
917                *checkpoint.sequence_number(),
918                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
919                checkpoint.sequence_number(),
920                seq_number
921            );
922        }
923        let seq = *checkpoint.sequence_number();
924        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
925        self.tables.watermarks.insert(
926            &CheckpointWatermark::HighestExecuted,
927            &(seq, *checkpoint.digest()),
928        )?;
929        self.executed_checkpoint_notify_read
930            .notify(&seq, checkpoint);
931        Ok(())
932    }
933
934    pub fn update_highest_pruned_checkpoint(
935        &self,
936        checkpoint: &VerifiedCheckpoint,
937    ) -> Result<(), TypedStoreError> {
938        self.tables.watermarks.insert(
939            &CheckpointWatermark::HighestPruned,
940            &(*checkpoint.sequence_number(), *checkpoint.digest()),
941        )
942    }
943
944    /// Sets highest executed checkpoint to any value.
945    ///
946    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
947    /// It should only be used in one-off cases or tests after fully understanding the risk.
948    pub fn set_highest_executed_checkpoint_subtle(
949        &self,
950        checkpoint: &VerifiedCheckpoint,
951    ) -> Result<(), TypedStoreError> {
952        self.tables.watermarks.insert(
953            &CheckpointWatermark::HighestExecuted,
954            &(*checkpoint.sequence_number(), *checkpoint.digest()),
955        )
956    }
957
958    pub fn insert_checkpoint_contents(
959        &self,
960        contents: CheckpointContents,
961    ) -> Result<(), TypedStoreError> {
962        debug!(
963            checkpoint_seq = ?contents.digest(),
964            "Inserting checkpoint contents",
965        );
966        self.tables
967            .checkpoint_content
968            .insert(contents.digest(), &contents)
969    }
970
971    pub fn insert_verified_checkpoint_contents(
972        &self,
973        checkpoint: &VerifiedCheckpoint,
974        full_contents: VerifiedCheckpointContents,
975    ) -> Result<(), TypedStoreError> {
976        let mut batch = self.tables.full_checkpoint_content_v2.batch();
977        batch.insert_batch(
978            &self.tables.checkpoint_sequence_by_contents_digest,
979            [(&checkpoint.content_digest, checkpoint.sequence_number())],
980        )?;
981        let full_contents = full_contents.into_inner();
982        batch.insert_batch(
983            &self.tables.full_checkpoint_content_v2,
984            [(checkpoint.sequence_number(), &full_contents)],
985        )?;
986
987        let contents = full_contents.into_checkpoint_contents();
988        assert_eq!(&checkpoint.content_digest, contents.digest());
989
990        batch.insert_batch(
991            &self.tables.checkpoint_content,
992            [(contents.digest(), &contents)],
993        )?;
994
995        batch.write()
996    }
997
998    pub fn delete_full_checkpoint_contents(
999        &self,
1000        seq: CheckpointSequenceNumber,
1001    ) -> Result<(), TypedStoreError> {
1002        self.tables.full_checkpoint_content.remove(&seq)?;
1003        self.tables.full_checkpoint_content_v2.remove(&seq)
1004    }
1005
1006    pub fn get_epoch_last_checkpoint(
1007        &self,
1008        epoch_id: EpochId,
1009    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1010        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1011        let checkpoint = match seq {
1012            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1013            None => None,
1014        };
1015        Ok(checkpoint)
1016    }
1017
1018    pub fn get_epoch_last_checkpoint_seq_number(
1019        &self,
1020        epoch_id: EpochId,
1021    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1022        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1023        Ok(seq)
1024    }
1025
1026    pub fn insert_epoch_last_checkpoint(
1027        &self,
1028        epoch_id: EpochId,
1029        checkpoint: &VerifiedCheckpoint,
1030    ) -> SuiResult {
1031        self.tables
1032            .epoch_last_checkpoint_map
1033            .insert(&epoch_id, checkpoint.sequence_number())?;
1034        Ok(())
1035    }
1036
1037    pub fn get_epoch_state_commitments(
1038        &self,
1039        epoch: EpochId,
1040    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1041        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1042            checkpoint
1043                .end_of_epoch_data
1044                .as_ref()
1045                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1046                .epoch_commitments
1047                .clone()
1048        });
1049        Ok(commitments)
1050    }
1051
1052    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1053    pub fn get_epoch_stats(
1054        &self,
1055        epoch: EpochId,
1056        last_checkpoint: &CheckpointSummary,
1057    ) -> Option<EpochStats> {
1058        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1059            (0, 0)
1060        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1061            (
1062                checkpoint.sequence_number + 1,
1063                checkpoint.network_total_transactions,
1064            )
1065        } else {
1066            return None;
1067        };
1068        Some(EpochStats {
1069            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1070            transaction_count: last_checkpoint.network_total_transactions
1071                - prev_epoch_network_transactions,
1072            total_gas_reward: last_checkpoint
1073                .epoch_rolling_gas_cost_summary
1074                .computation_cost,
1075        })
1076    }
1077
1078    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1079        // This checkpoints the entire db and not one column family
1080        self.tables
1081            .checkpoint_content
1082            .checkpoint_db(path)
1083            .map_err(Into::into)
1084    }
1085
1086    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1087        let mut wb = self.tables.watermarks.batch();
1088        wb.delete_batch(
1089            &self.tables.watermarks,
1090            std::iter::once(CheckpointWatermark::HighestExecuted),
1091        )?;
1092        wb.write()?;
1093        Ok(())
1094    }
1095
1096    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1097        self.delete_highest_executed_checkpoint_test_only()?;
1098        Ok(())
1099    }
1100
1101    pub fn record_checkpoint_fork_detected(
1102        &self,
1103        checkpoint_seq: CheckpointSequenceNumber,
1104        checkpoint_digest: CheckpointDigest,
1105    ) -> Result<(), TypedStoreError> {
1106        info!(
1107            checkpoint_seq = checkpoint_seq,
1108            checkpoint_digest = ?checkpoint_digest,
1109            "Recording checkpoint fork detection in database"
1110        );
1111        self.tables.watermarks.insert(
1112            &CheckpointWatermark::CheckpointForkDetected,
1113            &(checkpoint_seq, checkpoint_digest),
1114        )
1115    }
1116
1117    pub fn get_checkpoint_fork_detected(
1118        &self,
1119    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1120        self.tables
1121            .watermarks
1122            .get(&CheckpointWatermark::CheckpointForkDetected)
1123    }
1124
1125    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1126        self.tables
1127            .watermarks
1128            .remove(&CheckpointWatermark::CheckpointForkDetected)
1129    }
1130
1131    pub fn record_transaction_fork_detected(
1132        &self,
1133        tx_digest: TransactionDigest,
1134        expected_effects_digest: TransactionEffectsDigest,
1135        actual_effects_digest: TransactionEffectsDigest,
1136    ) -> Result<(), TypedStoreError> {
1137        info!(
1138            tx_digest = ?tx_digest,
1139            expected_effects_digest = ?expected_effects_digest,
1140            actual_effects_digest = ?actual_effects_digest,
1141            "Recording transaction fork detection in database"
1142        );
1143        self.tables.transaction_fork_detected.insert(
1144            &TRANSACTION_FORK_DETECTED_KEY,
1145            &(tx_digest, expected_effects_digest, actual_effects_digest),
1146        )
1147    }
1148
1149    pub fn get_transaction_fork_detected(
1150        &self,
1151    ) -> Result<
1152        Option<(
1153            TransactionDigest,
1154            TransactionEffectsDigest,
1155            TransactionEffectsDigest,
1156        )>,
1157        TypedStoreError,
1158    > {
1159        self.tables
1160            .transaction_fork_detected
1161            .get(&TRANSACTION_FORK_DETECTED_KEY)
1162    }
1163
1164    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1165        self.tables
1166            .transaction_fork_detected
1167            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1168    }
1169}
1170
1171#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1172pub enum CheckpointWatermark {
1173    HighestVerified,
1174    HighestSynced,
1175    HighestExecuted,
1176    HighestPruned,
1177    CheckpointForkDetected,
1178}
1179
1180struct CheckpointStateHasher {
1181    epoch_store: Arc<AuthorityPerEpochStore>,
1182    hasher: Weak<GlobalStateHasher>,
1183    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1184}
1185
1186impl CheckpointStateHasher {
1187    fn new(
1188        epoch_store: Arc<AuthorityPerEpochStore>,
1189        hasher: Weak<GlobalStateHasher>,
1190        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1191    ) -> Self {
1192        Self {
1193            epoch_store,
1194            hasher,
1195            receive_from_builder,
1196        }
1197    }
1198
1199    async fn run(self) {
1200        let Self {
1201            epoch_store,
1202            hasher,
1203            mut receive_from_builder,
1204        } = self;
1205        while let Some((seq, effects)) = receive_from_builder.recv().await {
1206            let Some(hasher) = hasher.upgrade() else {
1207                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1208                break;
1209            };
1210            hasher
1211                .accumulate_checkpoint(&effects, seq, &epoch_store)
1212                .expect("epoch ended while accumulating checkpoint");
1213        }
1214    }
1215}
1216
1217#[derive(Debug)]
1218pub enum CheckpointBuilderError {
1219    ChangeEpochTxAlreadyExecuted,
1220    SystemPackagesMissing,
1221    Retry(anyhow::Error),
1222}
1223
1224impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1225    for CheckpointBuilderError
1226{
1227    fn from(e: SuiError) -> Self {
1228        Self::Retry(e.into())
1229    }
1230}
1231
1232pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1233
1234pub struct CheckpointBuilder {
1235    state: Arc<AuthorityState>,
1236    store: Arc<CheckpointStore>,
1237    epoch_store: Arc<AuthorityPerEpochStore>,
1238    notify: Arc<Notify>,
1239    notify_aggregator: Arc<Notify>,
1240    last_built: watch::Sender<CheckpointSequenceNumber>,
1241    effects_store: Arc<dyn TransactionCacheRead>,
1242    global_state_hasher: Weak<GlobalStateHasher>,
1243    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1244    output: Box<dyn CheckpointOutput>,
1245    metrics: Arc<CheckpointMetrics>,
1246    max_transactions_per_checkpoint: usize,
1247    max_checkpoint_size_bytes: usize,
1248}
1249
1250pub struct CheckpointAggregator {
1251    store: Arc<CheckpointStore>,
1252    epoch_store: Arc<AuthorityPerEpochStore>,
1253    notify: Arc<Notify>,
1254    receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1255    pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1256    current: Option<CheckpointSignatureAggregator>,
1257    output: Box<dyn CertifiedCheckpointOutput>,
1258    state: Arc<AuthorityState>,
1259    metrics: Arc<CheckpointMetrics>,
1260}
1261
1262// This holds information to aggregate signatures for one checkpoint
1263pub struct CheckpointSignatureAggregator {
1264    summary: CheckpointSummary,
1265    digest: CheckpointDigest,
1266    /// Aggregates voting stake for each signed checkpoint proposal by authority
1267    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1268    store: Arc<CheckpointStore>,
1269    state: Arc<AuthorityState>,
1270    metrics: Arc<CheckpointMetrics>,
1271}
1272
1273impl CheckpointBuilder {
1274    fn new(
1275        state: Arc<AuthorityState>,
1276        store: Arc<CheckpointStore>,
1277        epoch_store: Arc<AuthorityPerEpochStore>,
1278        notify: Arc<Notify>,
1279        effects_store: Arc<dyn TransactionCacheRead>,
1280        // for synchronous accumulation of end-of-epoch checkpoint
1281        global_state_hasher: Weak<GlobalStateHasher>,
1282        // for asynchronous/concurrent accumulation of regular checkpoints
1283        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1284        output: Box<dyn CheckpointOutput>,
1285        notify_aggregator: Arc<Notify>,
1286        last_built: watch::Sender<CheckpointSequenceNumber>,
1287        metrics: Arc<CheckpointMetrics>,
1288        max_transactions_per_checkpoint: usize,
1289        max_checkpoint_size_bytes: usize,
1290    ) -> Self {
1291        Self {
1292            state,
1293            store,
1294            epoch_store,
1295            notify,
1296            effects_store,
1297            global_state_hasher,
1298            send_to_hasher,
1299            output,
1300            notify_aggregator,
1301            last_built,
1302            metrics,
1303            max_transactions_per_checkpoint,
1304            max_checkpoint_size_bytes,
1305        }
1306    }
1307
1308    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1309    /// commits that have been processed before the last restart, if consensus_replay_waiter
1310    /// is supplied. Then it starts building checkpoints in a loop.
1311    ///
1312    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1313    /// if slow recovery of previously built checkpoints is due to consensus replay or
1314    /// checkpoint building.
1315    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1316        if let Some(replay_waiter) = consensus_replay_waiter {
1317            info!("Waiting for consensus commits to replay ...");
1318            replay_waiter.wait_for_replay().await;
1319            info!("Consensus commits finished replaying");
1320        }
1321        info!("Starting CheckpointBuilder");
1322        loop {
1323            match self.maybe_build_checkpoints().await {
1324                Ok(()) => {}
1325                err @ Err(
1326                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1327                    | CheckpointBuilderError::SystemPackagesMissing,
1328                ) => {
1329                    info!("CheckpointBuilder stopping: {:?}", err);
1330                    return;
1331                }
1332                Err(CheckpointBuilderError::Retry(inner)) => {
1333                    let msg = format!("{:?}", inner);
1334                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1335                    tokio::time::sleep(Duration::from_secs(1)).await;
1336                    self.metrics.checkpoint_errors.inc();
1337                    continue;
1338                }
1339            }
1340
1341            self.notify.notified().await;
1342        }
1343    }
1344
1345    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1346        if self
1347            .epoch_store
1348            .protocol_config()
1349            .split_checkpoints_in_consensus_handler()
1350        {
1351            self.maybe_build_checkpoints_v2().await
1352        } else {
1353            self.maybe_build_checkpoints_v1().await
1354        }
1355    }
1356
1357    async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1358        let _scope = monitored_scope("BuildCheckpoints");
1359
1360        // Collect info about the most recently built checkpoint.
1361        let summary = self
1362            .epoch_store
1363            .last_built_checkpoint_builder_summary()
1364            .expect("epoch should not have ended");
1365        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1366        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1367
1368        let min_checkpoint_interval_ms = self
1369            .epoch_store
1370            .protocol_config()
1371            .min_checkpoint_interval_ms_as_option()
1372            .unwrap_or_default();
1373        let mut grouped_pending_checkpoints = Vec::new();
1374        let mut checkpoints_iter = self
1375            .epoch_store
1376            .get_pending_checkpoints(last_height)
1377            .expect("unexpected epoch store error")
1378            .into_iter()
1379            .peekable();
1380        while let Some((height, pending)) = checkpoints_iter.next() {
1381            // Group PendingCheckpoints until:
1382            // - minimum interval has elapsed ...
1383            let current_timestamp = pending.details().timestamp_ms;
1384            let can_build = match last_timestamp {
1385                    Some(last_timestamp) => {
1386                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1387                    }
1388                    None => true,
1389                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1390                //   should be written separately) ...
1391                } || checkpoints_iter
1392                    .peek()
1393                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1394                // - or, we have reached end of epoch.
1395                    || pending.details().last_of_epoch;
1396            grouped_pending_checkpoints.push(pending);
1397            if !can_build {
1398                debug!(
1399                    checkpoint_commit_height = height,
1400                    ?last_timestamp,
1401                    ?current_timestamp,
1402                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1403                );
1404                continue;
1405            }
1406
1407            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1408            last_height = Some(height);
1409            last_timestamp = Some(current_timestamp);
1410            debug!(
1411                checkpoint_commit_height_from = grouped_pending_checkpoints
1412                    .first()
1413                    .unwrap()
1414                    .details()
1415                    .checkpoint_height,
1416                checkpoint_commit_height_to = last_height,
1417                "Making checkpoint with commit height range"
1418            );
1419
1420            let seq = self
1421                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1422                .await?;
1423
1424            self.last_built.send_if_modified(|cur| {
1425                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1426                if seq > *cur {
1427                    *cur = seq;
1428                    true
1429                } else {
1430                    false
1431                }
1432            });
1433
1434            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1435            // execution.
1436            tokio::task::yield_now().await;
1437        }
1438        debug!(
1439            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1440            grouped_pending_checkpoints.len(),
1441        );
1442
1443        Ok(())
1444    }
1445
1446    async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1447        let _scope = monitored_scope("BuildCheckpoints");
1448
1449        // Collect info about the most recently built checkpoint.
1450        let last_height = self
1451            .epoch_store
1452            .last_built_checkpoint_builder_summary()
1453            .expect("epoch should not have ended")
1454            .and_then(|s| s.checkpoint_height);
1455
1456        for (height, pending) in self
1457            .epoch_store
1458            .get_pending_checkpoints_v2(last_height)
1459            .expect("unexpected epoch store error")
1460        {
1461            debug!(checkpoint_commit_height = height, "Making checkpoint");
1462
1463            let seq = self.make_checkpoint_v2(pending).await?;
1464
1465            self.last_built.send_if_modified(|cur| {
1466                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1467                if seq > *cur {
1468                    *cur = seq;
1469                    true
1470                } else {
1471                    false
1472                }
1473            });
1474
1475            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1476            // execution.
1477            tokio::task::yield_now().await;
1478        }
1479
1480        Ok(())
1481    }
1482
1483    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1484    async fn make_checkpoint(
1485        &mut self,
1486        pendings: Vec<PendingCheckpoint>,
1487    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1488        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1489
1490        let pending_ckpt_str = pendings
1491            .iter()
1492            .map(|p| {
1493                format!(
1494                    "height={}, commit={}",
1495                    p.details().checkpoint_height,
1496                    p.details().consensus_commit_ref
1497                )
1498            })
1499            .join("; ");
1500
1501        let last_details = pendings.last().unwrap().details().clone();
1502
1503        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1504        // in this order.
1505        let highest_executed_sequence = self
1506            .store
1507            .get_highest_executed_checkpoint_seq_number()
1508            .expect("db error")
1509            .unwrap_or(0);
1510
1511        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1512        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1513
1514        let new_checkpoints = self
1515            .create_checkpoints(
1516                sorted_tx_effects_included_in_checkpoint,
1517                &last_details,
1518                &all_roots,
1519            )
1520            .await?;
1521        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1522        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1523            debug_fatal!(
1524                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1525            );
1526        }
1527
1528        let new_ckpt_str = new_checkpoints
1529            .iter()
1530            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1531            .join("; ");
1532
1533        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1534            .await?;
1535        info!(
1536            "Made new checkpoint {} from pending checkpoint {}",
1537            new_ckpt_str, pending_ckpt_str
1538        );
1539
1540        Ok(highest_sequence)
1541    }
1542
1543    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1544    async fn make_checkpoint_v2(
1545        &mut self,
1546        pending: PendingCheckpointV2,
1547    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1548        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1549
1550        let details = pending.details.clone();
1551
1552        let highest_executed_sequence = self
1553            .store
1554            .get_highest_executed_checkpoint_seq_number()
1555            .expect("db error")
1556            .unwrap_or(0);
1557
1558        let (poll_count, result) =
1559            poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1560        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1561
1562        let new_checkpoints = self
1563            .create_checkpoints(
1564                sorted_tx_effects_included_in_checkpoint,
1565                &details,
1566                &all_roots,
1567            )
1568            .await?;
1569        assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1570        let sequence = *new_checkpoints.first().0.sequence_number();
1571        let digest = new_checkpoints.first().0.digest();
1572        if sequence <= highest_executed_sequence && poll_count > 1 {
1573            debug_fatal!(
1574                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1575            );
1576        }
1577
1578        self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1579            .await?;
1580        info!(
1581            seq = sequence,
1582            %digest,
1583            height = details.checkpoint_height,
1584            commit = %details.consensus_commit_ref,
1585            "Made new checkpoint"
1586        );
1587
1588        Ok(sequence)
1589    }
1590
1591    async fn construct_and_execute_settlement_transactions(
1592        &self,
1593        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1594        checkpoint_height: CheckpointHeight,
1595        checkpoint_seq: CheckpointSequenceNumber,
1596        tx_index_offset: u64,
1597    ) -> (TransactionKey, Vec<TransactionEffects>) {
1598        let _scope =
1599            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1600
1601        let tx_key =
1602            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1603
1604        let epoch = self.epoch_store.epoch();
1605        let accumulator_root_obj_initial_shared_version = self
1606            .epoch_store
1607            .epoch_start_config()
1608            .accumulator_root_obj_initial_shared_version()
1609            .expect("accumulator root object must exist");
1610
1611        let builder = AccumulatorSettlementTxBuilder::new(
1612            Some(self.effects_store.as_ref()),
1613            sorted_tx_effects_included_in_checkpoint,
1614            checkpoint_seq,
1615            tx_index_offset,
1616        );
1617
1618        let funds_changes = builder.collect_funds_changes();
1619        let num_updates = builder.num_updates();
1620        let settlement_txns = builder.build_tx(
1621            self.epoch_store.protocol_config(),
1622            epoch,
1623            accumulator_root_obj_initial_shared_version,
1624            checkpoint_height,
1625            checkpoint_seq,
1626        );
1627
1628        let settlement_txns: Vec<_> = settlement_txns
1629            .into_iter()
1630            .map(|tx| {
1631                VerifiedExecutableTransaction::new_system(
1632                    VerifiedTransaction::new_system_transaction(tx),
1633                    self.epoch_store.epoch(),
1634                )
1635            })
1636            .collect();
1637
1638        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1639
1640        debug!(
1641            ?settlement_digests,
1642            ?tx_key,
1643            "created settlement transactions with {num_updates} updates"
1644        );
1645
1646        self.epoch_store
1647            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1648
1649        let settlement_effects = wait_for_effects_with_retry(
1650            self.effects_store.as_ref(),
1651            "CheckpointBuilder::notify_read_settlement_effects",
1652            &settlement_digests,
1653            tx_key,
1654        )
1655        .await;
1656
1657        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1658            epoch,
1659            accumulator_root_obj_initial_shared_version,
1660            checkpoint_height,
1661            &settlement_effects,
1662        );
1663
1664        let barrier_tx = VerifiedExecutableTransaction::new_system(
1665            VerifiedTransaction::new_system_transaction(barrier_tx),
1666            self.epoch_store.epoch(),
1667        );
1668        let barrier_digest = *barrier_tx.digest();
1669
1670        self.epoch_store
1671            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1672
1673        let barrier_effects = wait_for_effects_with_retry(
1674            self.effects_store.as_ref(),
1675            "CheckpointBuilder::notify_read_barrier_effects",
1676            &[barrier_digest],
1677            tx_key,
1678        )
1679        .await;
1680
1681        let settlement_effects: Vec<_> = settlement_effects
1682            .into_iter()
1683            .chain(barrier_effects)
1684            .collect();
1685
1686        let mut next_accumulator_version = None;
1687        for fx in settlement_effects.iter() {
1688            assert!(
1689                fx.status().is_ok(),
1690                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1691                fx.transaction_digest(),
1692                fx
1693            );
1694            if let Some(version) = fx
1695                .mutated()
1696                .iter()
1697                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1698            {
1699                assert!(
1700                    next_accumulator_version.is_none(),
1701                    "Only one settlement transaction should mutate the accumulator root object"
1702                );
1703                next_accumulator_version = Some(version);
1704            }
1705        }
1706        let settlements = FundsSettlement {
1707            next_accumulator_version: next_accumulator_version
1708                .expect("Accumulator root object should be mutated in the settlement transactions"),
1709            funds_changes,
1710        };
1711
1712        self.state
1713            .execution_scheduler()
1714            .settle_address_funds(settlements);
1715
1716        (tx_key, settlement_effects)
1717    }
1718
1719    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1720    // the checkpoint, and return them in the order they should be included in the checkpoint.
1721    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1722    // checkpoint.
1723    #[instrument(level = "debug", skip_all)]
1724    async fn resolve_checkpoint_transactions(
1725        &self,
1726        pending_checkpoints: Vec<PendingCheckpoint>,
1727    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1728        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1729
1730        // Keeps track of the effects that are already included in the current checkpoint.
1731        // This is used when there are multiple pending checkpoints to create a single checkpoint
1732        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1733        // or in earlier pending checkpoints.
1734        let mut effects_in_current_checkpoint = BTreeSet::new();
1735
1736        let mut tx_effects = Vec::new();
1737        let mut tx_roots = HashSet::new();
1738
1739        for pending_checkpoint in pending_checkpoints.into_iter() {
1740            let mut pending = pending_checkpoint;
1741            debug!(
1742                checkpoint_commit_height = pending.details.checkpoint_height,
1743                "Resolving checkpoint transactions for pending checkpoint.",
1744            );
1745
1746            trace!(
1747                "roots for pending checkpoint {:?}: {:?}",
1748                pending.details.checkpoint_height, pending.roots,
1749            );
1750
1751            let settlement_root = if self.epoch_store.accumulators_enabled() {
1752                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1753                    pending.roots.pop()
1754                else {
1755                    fatal!("No settlement root found");
1756                };
1757                Some(settlement_root)
1758            } else {
1759                None
1760            };
1761
1762            let roots = &pending.roots;
1763
1764            self.metrics
1765                .checkpoint_roots_count
1766                .inc_by(roots.len() as u64);
1767
1768            let root_digests = self
1769                .epoch_store
1770                .notify_read_tx_key_to_digest(roots)
1771                .in_monitored_scope("CheckpointNotifyDigests")
1772                .await?;
1773            let root_effects = self
1774                .effects_store
1775                .notify_read_executed_effects(
1776                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1777                    &root_digests,
1778                )
1779                .in_monitored_scope("CheckpointNotifyRead")
1780                .await;
1781
1782            assert!(
1783                self.epoch_store
1784                    .protocol_config()
1785                    .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1786            );
1787
1788            // If the roots contains consensus commit prologue transaction, we want to extract it,
1789            // and put it to the front of the checkpoint.
1790            let consensus_commit_prologue =
1791                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1792
1793            // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1794            // other dependencies that haven't been included in any previous checkpoints.
1795            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1796                let unsorted_ccp = self.complete_checkpoint_effects(
1797                    vec![ccp_effects.clone()],
1798                    &mut effects_in_current_checkpoint,
1799                )?;
1800
1801                // No other dependencies of this consensus commit prologue that haven't been included
1802                // in any previous checkpoint.
1803                if unsorted_ccp.len() != 1 {
1804                    fatal!(
1805                        "Expected 1 consensus commit prologue, got {:?}",
1806                        unsorted_ccp
1807                            .iter()
1808                            .map(|e| e.transaction_digest())
1809                            .collect::<Vec<_>>()
1810                    );
1811                }
1812                assert_eq!(unsorted_ccp.len(), 1);
1813                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1814            }
1815
1816            let unsorted =
1817                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1818
1819            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1820            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1821            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1822                if cfg!(debug_assertions) {
1823                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1824                    for tx in unsorted.iter() {
1825                        assert!(tx.transaction_digest() != &ccp_digest);
1826                    }
1827                }
1828                sorted.push(ccp_effects);
1829            }
1830            sorted.extend(CausalOrder::causal_sort(unsorted));
1831
1832            if let Some(settlement_root) = settlement_root {
1833                //TODO: this is an incorrect heuristic for checkpoint seq number
1834                //      due to checkpoint splitting, to be fixed separately
1835                let last_checkpoint =
1836                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1837                let next_checkpoint_seq = last_checkpoint
1838                    .as_ref()
1839                    .map(|(seq, _)| *seq)
1840                    .unwrap_or_default()
1841                    + 1;
1842                let tx_index_offset = tx_effects.len() as u64;
1843
1844                let (tx_key, settlement_effects) = self
1845                    .construct_and_execute_settlement_transactions(
1846                        &sorted,
1847                        pending.details.checkpoint_height,
1848                        next_checkpoint_seq,
1849                        tx_index_offset,
1850                    )
1851                    .await;
1852                debug!(?tx_key, "executed settlement transactions");
1853
1854                assert_eq!(settlement_root, tx_key);
1855
1856                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1857                // should only include the digests of transactions that were originally roots in
1858                // the pending checkpoint. It is later used to identify transactions which were
1859                // added as dependencies, so that those transactions can be waited on using
1860                // `consensus_messages_processed_notify()`. System transactions (such as
1861                // settlements) are exempt from this already.
1862                //
1863                // However, we DO need to add them to `effects_in_current_checkpoint` so that
1864                // `complete_checkpoint_effects` won't pull them in again as dependencies when
1865                // processing later pending checkpoints in the same batch.
1866                effects_in_current_checkpoint
1867                    .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1868                sorted.extend(settlement_effects);
1869            }
1870
1871            #[cfg(msim)]
1872            {
1873                // Check consensus commit prologue invariants in sim test.
1874                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1875            }
1876
1877            tx_effects.extend(sorted);
1878            tx_roots.extend(root_digests);
1879        }
1880
1881        Ok((tx_effects, tx_roots))
1882    }
1883
1884    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1885    // the checkpoint, and return them in the order they should be included in the checkpoint.
1886    #[instrument(level = "debug", skip_all)]
1887    async fn resolve_checkpoint_transactions_v2(
1888        &self,
1889        pending: PendingCheckpointV2,
1890    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1891        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1892
1893        debug!(
1894            checkpoint_commit_height = pending.details.checkpoint_height,
1895            "Resolving checkpoint transactions for pending checkpoint.",
1896        );
1897
1898        trace!(
1899            "roots for pending checkpoint {:?}: {:?}",
1900            pending.details.checkpoint_height, pending.roots,
1901        );
1902
1903        assert!(
1904            self.epoch_store
1905                .protocol_config()
1906                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1907        );
1908
1909        let mut all_effects: Vec<TransactionEffects> = Vec::new();
1910        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1911
1912        for checkpoint_roots in &pending.roots {
1913            let tx_roots = &checkpoint_roots.tx_roots;
1914
1915            self.metrics
1916                .checkpoint_roots_count
1917                .inc_by(tx_roots.len() as u64);
1918
1919            let root_digests = self
1920                .epoch_store
1921                .notify_read_tx_key_to_digest(tx_roots)
1922                .in_monitored_scope("CheckpointNotifyDigests")
1923                .await?;
1924
1925            all_root_digests.extend(root_digests.iter().cloned());
1926
1927            let root_effects = self
1928                .effects_store
1929                .notify_read_executed_effects(
1930                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1931                    &root_digests,
1932                )
1933                .in_monitored_scope("CheckpointNotifyRead")
1934                .await;
1935            let consensus_commit_prologue =
1936                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1937
1938            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1939            let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1940            let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1941
1942            if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1943                let checkpoint_seq = pending
1944                    .details
1945                    .checkpoint_seq
1946                    .expect("checkpoint_seq must be set");
1947                let tx_index_offset = all_effects.len() as u64;
1948                let effects = self
1949                    .resolve_settlement_effects(
1950                        *settlement_key,
1951                        &sorted,
1952                        checkpoint_roots.height,
1953                        checkpoint_seq,
1954                        tx_index_offset,
1955                    )
1956                    .await;
1957                sorted.extend(effects);
1958            }
1959
1960            #[cfg(msim)]
1961            {
1962                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1963            }
1964
1965            all_effects.extend(sorted);
1966        }
1967        Ok((all_effects, all_root_digests.into_iter().collect()))
1968    }
1969
1970    /// Constructs settlement transactions to compute their digests, then reads effects
1971    /// directly from the cache. If execution is ahead of the checkpoint builder, the
1972    /// effects are already cached and this returns instantly. Otherwise it waits for
1973    /// the execution scheduler's queue worker to execute them.
1974    async fn resolve_settlement_effects(
1975        &self,
1976        settlement_key: TransactionKey,
1977        sorted_root_effects: &[TransactionEffects],
1978        checkpoint_height: CheckpointHeight,
1979        checkpoint_seq: CheckpointSequenceNumber,
1980        tx_index_offset: u64,
1981    ) -> Vec<TransactionEffects> {
1982        let epoch = self.epoch_store.epoch();
1983        let accumulator_root_obj_initial_shared_version = self
1984            .epoch_store
1985            .epoch_start_config()
1986            .accumulator_root_obj_initial_shared_version()
1987            .expect("accumulator root object must exist");
1988
1989        let builder = AccumulatorSettlementTxBuilder::new(
1990            None,
1991            sorted_root_effects,
1992            checkpoint_seq,
1993            tx_index_offset,
1994        );
1995
1996        let settlement_digests: Vec<_> = builder
1997            .build_tx(
1998                self.epoch_store.protocol_config(),
1999                epoch,
2000                accumulator_root_obj_initial_shared_version,
2001                checkpoint_height,
2002                checkpoint_seq,
2003            )
2004            .into_iter()
2005            .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2006            .collect();
2007
2008        debug!(
2009            ?settlement_digests,
2010            ?settlement_key,
2011            "fallback: reading settlement effects from cache"
2012        );
2013
2014        let settlement_effects = wait_for_effects_with_retry(
2015            self.effects_store.as_ref(),
2016            "CheckpointBuilder::fallback_settlement_effects",
2017            &settlement_digests,
2018            settlement_key,
2019        )
2020        .await;
2021
2022        let barrier_digest = *VerifiedTransaction::new_system_transaction(
2023            accumulators::build_accumulator_barrier_tx(
2024                epoch,
2025                accumulator_root_obj_initial_shared_version,
2026                checkpoint_height,
2027                &settlement_effects,
2028            ),
2029        )
2030        .digest();
2031
2032        let barrier_effects = wait_for_effects_with_retry(
2033            self.effects_store.as_ref(),
2034            "CheckpointBuilder::fallback_barrier_effects",
2035            &[barrier_digest],
2036            settlement_key,
2037        )
2038        .await;
2039
2040        settlement_effects
2041            .into_iter()
2042            .chain(barrier_effects)
2043            .collect()
2044    }
2045
2046    // Extracts the consensus commit prologue digest and effects from the root transactions.
2047    // The consensus commit prologue is expected to be the first transaction in the roots.
2048    fn extract_consensus_commit_prologue(
2049        &self,
2050        root_digests: &[TransactionDigest],
2051        root_effects: &[TransactionEffects],
2052    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2053        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2054        if root_digests.is_empty() {
2055            return Ok(None);
2056        }
2057
2058        // Reads the first transaction in the roots, and checks whether it is a consensus commit
2059        // prologue transaction. The consensus commit prologue transaction should be the first
2060        // transaction in the roots written by the consensus handler.
2061        let first_tx = self
2062            .state
2063            .get_transaction_cache_reader()
2064            .get_transaction_block(&root_digests[0])
2065            .expect("Transaction block must exist");
2066
2067        Ok(first_tx
2068            .transaction_data()
2069            .is_consensus_commit_prologue()
2070            .then(|| {
2071                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2072                (*first_tx.digest(), root_effects[0].clone())
2073            }))
2074    }
2075
2076    #[instrument(level = "debug", skip_all)]
2077    async fn write_checkpoints(
2078        &mut self,
2079        height: CheckpointHeight,
2080        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2081    ) -> SuiResult {
2082        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2083        let mut batch = self.store.tables.checkpoint_content.batch();
2084        let mut all_tx_digests =
2085            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2086
2087        for (summary, contents) in &new_checkpoints {
2088            debug!(
2089                checkpoint_commit_height = height,
2090                checkpoint_seq = summary.sequence_number,
2091                contents_digest = ?contents.digest(),
2092                "writing checkpoint",
2093            );
2094
2095            if let Some(previously_computed_summary) = self
2096                .store
2097                .tables
2098                .locally_computed_checkpoints
2099                .get(&summary.sequence_number)?
2100                && previously_computed_summary.digest() != summary.digest()
2101            {
2102                fatal!(
2103                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2104                    summary.sequence_number,
2105                    previously_computed_summary.digest(),
2106                    summary.digest()
2107                );
2108            }
2109
2110            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2111
2112            self.metrics
2113                .transactions_included_in_checkpoint
2114                .inc_by(contents.size() as u64);
2115            let sequence_number = summary.sequence_number;
2116            self.metrics
2117                .last_constructed_checkpoint
2118                .set(sequence_number as i64);
2119
2120            batch.insert_batch(
2121                &self.store.tables.checkpoint_content,
2122                [(contents.digest(), contents)],
2123            )?;
2124
2125            batch.insert_batch(
2126                &self.store.tables.locally_computed_checkpoints,
2127                [(sequence_number, summary)],
2128            )?;
2129        }
2130
2131        batch.write()?;
2132
2133        // Send all checkpoint sigs to consensus.
2134        for (summary, contents) in &new_checkpoints {
2135            self.output
2136                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2137                .await?;
2138        }
2139
2140        for (local_checkpoint, _) in &new_checkpoints {
2141            if let Some(certified_checkpoint) = self
2142                .store
2143                .tables
2144                .certified_checkpoints
2145                .get(local_checkpoint.sequence_number())?
2146            {
2147                self.store
2148                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2149            }
2150        }
2151
2152        self.notify_aggregator.notify_one();
2153        self.epoch_store
2154            .process_constructed_checkpoint(height, new_checkpoints);
2155        Ok(())
2156    }
2157
2158    #[allow(clippy::type_complexity)]
2159    fn split_checkpoint_chunks(
2160        &self,
2161        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2162        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2163    ) -> CheckpointBuilderResult<
2164        Vec<
2165            Vec<(
2166                TransactionEffects,
2167                Vec<(GenericSignature, Option<SequenceNumber>)>,
2168            )>,
2169        >,
2170    > {
2171        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2172
2173        // If splitting is done in consensus_handler, return everything as one chunk.
2174        if self
2175            .epoch_store
2176            .protocol_config()
2177            .split_checkpoints_in_consensus_handler()
2178        {
2179            let chunk: Vec<_> = effects_and_transaction_sizes
2180                .into_iter()
2181                .zip(signatures)
2182                .map(|((effects, _size), sigs)| (effects, sigs))
2183                .collect();
2184            return Ok(vec![chunk]);
2185        }
2186        let mut chunks = Vec::new();
2187        let mut chunk = Vec::new();
2188        let mut chunk_size: usize = 0;
2189        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2190            .into_iter()
2191            .zip(signatures.into_iter())
2192        {
2193            // Roll over to a new chunk after either max count or max size is reached.
2194            // The size calculation here is intended to estimate the size of the
2195            // FullCheckpointContents struct. If this code is modified, that struct
2196            // should also be updated accordingly.
2197            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2198                bcs::serialized_size(&signatures)?
2199            } else {
2200                let signatures: Vec<&GenericSignature> =
2201                    signatures.iter().map(|(s, _)| s).collect();
2202                bcs::serialized_size(&signatures)?
2203            };
2204            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2205            if chunk.len() == self.max_transactions_per_checkpoint
2206                || (chunk_size + size) > self.max_checkpoint_size_bytes
2207            {
2208                if chunk.is_empty() {
2209                    // Always allow at least one tx in a checkpoint.
2210                    warn!(
2211                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2212                        self.max_checkpoint_size_bytes
2213                    );
2214                } else {
2215                    chunks.push(chunk);
2216                    chunk = Vec::new();
2217                    chunk_size = 0;
2218                }
2219            }
2220
2221            chunk.push((effects, signatures));
2222            chunk_size += size;
2223        }
2224
2225        if !chunk.is_empty() || chunks.is_empty() {
2226            // We intentionally create an empty checkpoint if there is no content provided
2227            // to make a 'heartbeat' checkpoint.
2228            // Important: if some conditions are added here later, we need to make sure we always
2229            // have at least one chunk if last_pending_of_epoch is set
2230            chunks.push(chunk);
2231            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
2232            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
2233            // distinguish between "no transactions have happened" and "i am not receiving new
2234            // checkpoints".
2235        }
2236        Ok(chunks)
2237    }
2238
2239    fn load_last_built_checkpoint_summary(
2240        epoch_store: &AuthorityPerEpochStore,
2241        store: &CheckpointStore,
2242    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2243        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2244        if last_checkpoint.is_none() {
2245            let epoch = epoch_store.epoch();
2246            if epoch > 0 {
2247                let previous_epoch = epoch - 1;
2248                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2249                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2250                if let Some((ref seq, _)) = last_checkpoint {
2251                    debug!(
2252                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2253                    );
2254                } else {
2255                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
2256                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2257                }
2258            }
2259        }
2260        Ok(last_checkpoint)
2261    }
2262
2263    #[instrument(level = "debug", skip_all)]
2264    async fn create_checkpoints(
2265        &self,
2266        all_effects: Vec<TransactionEffects>,
2267        details: &PendingCheckpointInfo,
2268        all_roots: &HashSet<TransactionDigest>,
2269    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2270        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2271
2272        let total = all_effects.len();
2273        let mut last_checkpoint =
2274            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2275        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2276        debug!(
2277            checkpoint_commit_height = details.checkpoint_height,
2278            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2279            checkpoint_timestamp = details.timestamp_ms,
2280            "Creating checkpoint(s) for {} transactions",
2281            all_effects.len(),
2282        );
2283
2284        let all_digests: Vec<_> = all_effects
2285            .iter()
2286            .map(|effect| *effect.transaction_digest())
2287            .collect();
2288        let transactions_and_sizes = self
2289            .state
2290            .get_transaction_cache_reader()
2291            .get_transactions_and_serialized_sizes(&all_digests)?;
2292        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2293        let mut transactions = Vec::with_capacity(all_effects.len());
2294        let mut transaction_keys = Vec::with_capacity(all_effects.len());
2295        let mut randomness_rounds = BTreeMap::new();
2296        {
2297            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2298            debug!(
2299                ?last_checkpoint_seq,
2300                "Waiting for {:?} certificates to appear in consensus",
2301                all_effects.len()
2302            );
2303
2304            for (effects, transaction_and_size) in all_effects
2305                .into_iter()
2306                .zip(transactions_and_sizes.into_iter())
2307            {
2308                let (transaction, size) = transaction_and_size
2309                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2310                match transaction.inner().transaction_data().kind() {
2311                    TransactionKind::ConsensusCommitPrologue(_)
2312                    | TransactionKind::ConsensusCommitPrologueV2(_)
2313                    | TransactionKind::ConsensusCommitPrologueV3(_)
2314                    | TransactionKind::ConsensusCommitPrologueV4(_)
2315                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2316                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2317                        // processed before we reach here.
2318                    }
2319                    TransactionKind::ProgrammableSystemTransaction(_) => {
2320                        // settlement transactions are added by checkpoint builder
2321                    }
2322                    TransactionKind::ChangeEpoch(_)
2323                    | TransactionKind::Genesis(_)
2324                    | TransactionKind::EndOfEpochTransaction(_) => {
2325                        fatal!(
2326                            "unexpected transaction in checkpoint effects: {:?}",
2327                            transaction
2328                        );
2329                    }
2330                    TransactionKind::RandomnessStateUpdate(rsu) => {
2331                        randomness_rounds
2332                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2333                    }
2334                    TransactionKind::ProgrammableTransaction(_) => {
2335                        // Only transactions that are not roots should be included in the call to
2336                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2337                        // commit and so are known to be processed already.
2338                        let digest = *effects.transaction_digest();
2339                        if !all_roots.contains(&digest) {
2340                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2341                                ConsensusTransactionKey::Certificate(digest),
2342                            ));
2343                        }
2344                    }
2345                }
2346                transactions.push(transaction);
2347                all_effects_and_transaction_sizes.push((effects, size));
2348            }
2349
2350            self.epoch_store
2351                .consensus_messages_processed_notify(transaction_keys)
2352                .await?;
2353        }
2354
2355        let signatures = self
2356            .epoch_store
2357            .user_signatures_for_checkpoint(&transactions, &all_digests);
2358        debug!(
2359            ?last_checkpoint_seq,
2360            "Received {} checkpoint user signatures from consensus",
2361            signatures.len()
2362        );
2363
2364        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2365            Some(
2366                transactions
2367                    .iter()
2368                    .flat_map(|tx| {
2369                        if let TransactionKind::ProgrammableTransaction(ptb) =
2370                            tx.transaction_data().kind()
2371                        {
2372                            itertools::Either::Left(
2373                                ptb.commands
2374                                    .iter()
2375                                    .map(ExecutionTimeObservationKey::from_command),
2376                            )
2377                        } else {
2378                            itertools::Either::Right(std::iter::empty())
2379                        }
2380                    })
2381                    .collect(),
2382            )
2383        } else {
2384            None
2385        };
2386
2387        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2388        let chunks_count = chunks.len();
2389
2390        let mut checkpoints = Vec::with_capacity(chunks_count);
2391        debug!(
2392            ?last_checkpoint_seq,
2393            "Creating {} checkpoints with {} transactions", chunks_count, total,
2394        );
2395
2396        let epoch = self.epoch_store.epoch();
2397        for (index, transactions) in chunks.into_iter().enumerate() {
2398            let first_checkpoint_of_epoch = index == 0
2399                && last_checkpoint
2400                    .as_ref()
2401                    .map(|(_, c)| c.epoch != epoch)
2402                    .unwrap_or(true);
2403            if first_checkpoint_of_epoch {
2404                self.epoch_store
2405                    .record_epoch_first_checkpoint_creation_time_metric();
2406            }
2407            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2408
2409            let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2410                preassigned_seq
2411            } else {
2412                last_checkpoint
2413                    .as_ref()
2414                    .map(|(_, c)| c.sequence_number + 1)
2415                    .unwrap_or_default()
2416            };
2417            let mut timestamp_ms = details.timestamp_ms;
2418            if let Some((_, last_checkpoint)) = &last_checkpoint
2419                && last_checkpoint.timestamp_ms > timestamp_ms
2420            {
2421                // First consensus commit of an epoch can have zero timestamp.
2422                debug!(
2423                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2424                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2425                );
2426                if self
2427                    .epoch_store
2428                    .protocol_config()
2429                    .enforce_checkpoint_timestamp_monotonicity()
2430                {
2431                    timestamp_ms = last_checkpoint.timestamp_ms;
2432                }
2433            }
2434
2435            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2436            let epoch_rolling_gas_cost_summary =
2437                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2438
2439            let end_of_epoch_data = if last_checkpoint_of_epoch {
2440                let system_state_obj = self
2441                    .augment_epoch_last_checkpoint(
2442                        &epoch_rolling_gas_cost_summary,
2443                        timestamp_ms,
2444                        &mut effects,
2445                        &mut signatures,
2446                        sequence_number,
2447                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2448                        last_checkpoint_seq.unwrap_or_default(),
2449                    )
2450                    .await?;
2451
2452                let committee = system_state_obj
2453                    .get_current_epoch_committee()
2454                    .committee()
2455                    .clone();
2456
2457                // This must happen after the call to augment_epoch_last_checkpoint,
2458                // otherwise we will not capture the change_epoch tx.
2459                let root_state_digest = {
2460                    let state_acc = self
2461                        .global_state_hasher
2462                        .upgrade()
2463                        .expect("No checkpoints should be getting built after local configuration");
2464                    let acc = state_acc.accumulate_checkpoint(
2465                        &effects,
2466                        sequence_number,
2467                        &self.epoch_store,
2468                    )?;
2469
2470                    state_acc
2471                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2472                        .await?;
2473
2474                    state_acc.accumulate_running_root(
2475                        &self.epoch_store,
2476                        sequence_number,
2477                        Some(acc),
2478                    )?;
2479                    state_acc
2480                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2481                        .await?
2482                };
2483                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2484                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2485
2486                let epoch_commitments = if self
2487                    .epoch_store
2488                    .protocol_config()
2489                    .check_commit_root_state_digest_supported()
2490                {
2491                    vec![root_state_digest.into()]
2492                } else {
2493                    vec![]
2494                };
2495
2496                Some(EndOfEpochData {
2497                    next_epoch_committee: committee.voting_rights,
2498                    next_epoch_protocol_version: ProtocolVersion::new(
2499                        system_state_obj.protocol_version(),
2500                    ),
2501                    epoch_commitments,
2502                })
2503            } else {
2504                self.send_to_hasher
2505                    .send((sequence_number, effects.clone()))
2506                    .await?;
2507
2508                None
2509            };
2510            let contents = if self.epoch_store.protocol_config().address_aliases() {
2511                CheckpointContents::new_v2(&effects, signatures)
2512            } else {
2513                CheckpointContents::new_with_digests_and_signatures(
2514                    effects.iter().map(TransactionEffects::execution_digests),
2515                    signatures
2516                        .into_iter()
2517                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2518                        .collect(),
2519                )
2520            };
2521
2522            let num_txns = contents.size() as u64;
2523
2524            let network_total_transactions = last_checkpoint
2525                .as_ref()
2526                .map(|(_, c)| c.network_total_transactions + num_txns)
2527                .unwrap_or(num_txns);
2528
2529            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2530
2531            let matching_randomness_rounds: Vec<_> = effects
2532                .iter()
2533                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2534                .copied()
2535                .collect();
2536
2537            let checkpoint_commitments = if self
2538                .epoch_store
2539                .protocol_config()
2540                .include_checkpoint_artifacts_digest_in_summary()
2541            {
2542                let artifacts = CheckpointArtifacts::from(&effects[..]);
2543                let artifacts_digest = artifacts.digest()?;
2544                vec![artifacts_digest.into()]
2545            } else {
2546                Default::default()
2547            };
2548
2549            let summary = CheckpointSummary::new(
2550                self.epoch_store.protocol_config(),
2551                epoch,
2552                sequence_number,
2553                network_total_transactions,
2554                &contents,
2555                previous_digest,
2556                epoch_rolling_gas_cost_summary,
2557                end_of_epoch_data,
2558                timestamp_ms,
2559                matching_randomness_rounds,
2560                checkpoint_commitments,
2561            );
2562            summary.report_checkpoint_age(
2563                &self.metrics.last_created_checkpoint_age,
2564                &self.metrics.last_created_checkpoint_age_ms,
2565            );
2566            if last_checkpoint_of_epoch {
2567                info!(
2568                    checkpoint_seq = sequence_number,
2569                    "creating last checkpoint of epoch {}", epoch
2570                );
2571                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2572                    self.epoch_store
2573                        .report_epoch_metrics_at_last_checkpoint(stats);
2574                }
2575            }
2576            last_checkpoint = Some((sequence_number, summary.clone()));
2577            checkpoints.push((summary, contents));
2578        }
2579
2580        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2581    }
2582
2583    fn get_epoch_total_gas_cost(
2584        &self,
2585        last_checkpoint: Option<&CheckpointSummary>,
2586        cur_checkpoint_effects: &[TransactionEffects],
2587    ) -> GasCostSummary {
2588        let (previous_epoch, previous_gas_costs) = last_checkpoint
2589            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2590            .unwrap_or_default();
2591        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2592        if previous_epoch == self.epoch_store.epoch() {
2593            // sum only when we are within the same epoch
2594            GasCostSummary::new(
2595                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2596                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2597                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2598                previous_gas_costs.non_refundable_storage_fee
2599                    + current_gas_costs.non_refundable_storage_fee,
2600            )
2601        } else {
2602            current_gas_costs
2603        }
2604    }
2605
2606    #[instrument(level = "error", skip_all)]
2607    async fn augment_epoch_last_checkpoint(
2608        &self,
2609        epoch_total_gas_cost: &GasCostSummary,
2610        epoch_start_timestamp_ms: CheckpointTimestamp,
2611        checkpoint_effects: &mut Vec<TransactionEffects>,
2612        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2613        checkpoint: CheckpointSequenceNumber,
2614        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2615        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2616        // >1 checkpoint.
2617        last_checkpoint: CheckpointSequenceNumber,
2618    ) -> CheckpointBuilderResult<SuiSystemState> {
2619        let (system_state, effects) = self
2620            .state
2621            .create_and_execute_advance_epoch_tx(
2622                &self.epoch_store,
2623                epoch_total_gas_cost,
2624                checkpoint,
2625                epoch_start_timestamp_ms,
2626                end_of_epoch_observation_keys,
2627                last_checkpoint,
2628            )
2629            .await?;
2630        checkpoint_effects.push(effects);
2631        signatures.push(vec![]);
2632        Ok(system_state)
2633    }
2634
2635    /// For the given roots return complete list of effects to include in checkpoint
2636    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2637    /// Note that this function may be called multiple times to construct the checkpoint.
2638    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2639    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2640    /// after the call of this function.
2641    #[instrument(level = "debug", skip_all)]
2642    fn complete_checkpoint_effects(
2643        &self,
2644        mut roots: Vec<TransactionEffects>,
2645        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2646    ) -> SuiResult<Vec<TransactionEffects>> {
2647        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2648        let mut results = vec![];
2649        let mut seen = HashSet::new();
2650        loop {
2651            let mut pending = HashSet::new();
2652
2653            let transactions_included = self
2654                .epoch_store
2655                .builder_included_transactions_in_checkpoint(
2656                    roots.iter().map(|e| e.transaction_digest()),
2657                )?;
2658
2659            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2660                let digest = effect.transaction_digest();
2661                // Unnecessary to read effects of a dependency if the effect is already processed.
2662                seen.insert(*digest);
2663
2664                // Skip roots that are already included in the checkpoint.
2665                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2666                    continue;
2667                }
2668
2669                // Skip roots already included in checkpoints or roots from previous epochs
2670                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2671                    continue;
2672                }
2673
2674                let existing_effects = self
2675                    .epoch_store
2676                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2677
2678                for (dependency, effects_signature_exists) in
2679                    effect.dependencies().iter().zip(existing_effects.iter())
2680                {
2681                    // Skip here if dependency not executed in the current epoch.
2682                    // Note that the existence of an effects signature in the
2683                    // epoch store for the given digest indicates that the transaction
2684                    // was locally executed in the current epoch
2685                    if !effects_signature_exists {
2686                        continue;
2687                    }
2688                    if seen.insert(*dependency) {
2689                        pending.insert(*dependency);
2690                    }
2691                }
2692                results.push(effect);
2693            }
2694            if pending.is_empty() {
2695                break;
2696            }
2697            let pending = pending.into_iter().collect::<Vec<_>>();
2698            let effects = self.effects_store.multi_get_executed_effects(&pending);
2699            let effects = effects
2700                .into_iter()
2701                .zip(pending)
2702                .map(|(opt, digest)| match opt {
2703                    Some(x) => x,
2704                    None => panic!(
2705                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2706                        digest
2707                    ),
2708                })
2709                .collect::<Vec<_>>();
2710            roots = effects;
2711        }
2712
2713        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2714        Ok(results)
2715    }
2716
2717    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2718    // in simtest.
2719    #[cfg(msim)]
2720    fn expensive_consensus_commit_prologue_invariants_check(
2721        &self,
2722        root_digests: &[TransactionDigest],
2723        sorted: &[TransactionEffects],
2724    ) {
2725        // Gets all the consensus commit prologue transactions from the roots.
2726        let root_txs = self
2727            .state
2728            .get_transaction_cache_reader()
2729            .multi_get_transaction_blocks(root_digests);
2730        let ccps = root_txs
2731            .iter()
2732            .filter_map(|tx| {
2733                if let Some(tx) = tx {
2734                    if tx.transaction_data().is_consensus_commit_prologue() {
2735                        Some(tx)
2736                    } else {
2737                        None
2738                    }
2739                } else {
2740                    None
2741                }
2742            })
2743            .collect::<Vec<_>>();
2744
2745        // There should be at most one consensus commit prologue transaction in the roots.
2746        assert!(ccps.len() <= 1);
2747
2748        // Get all the transactions in the checkpoint.
2749        let txs = self
2750            .state
2751            .get_transaction_cache_reader()
2752            .multi_get_transaction_blocks(
2753                &sorted
2754                    .iter()
2755                    .map(|tx| tx.transaction_digest().clone())
2756                    .collect::<Vec<_>>(),
2757            );
2758
2759        if ccps.len() == 0 {
2760            // If there is no consensus commit prologue transaction in the roots, then there should be no
2761            // consensus commit prologue transaction in the checkpoint.
2762            for tx in txs.iter() {
2763                if let Some(tx) = tx {
2764                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2765                }
2766            }
2767        } else {
2768            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2769            assert!(
2770                txs[0]
2771                    .as_ref()
2772                    .unwrap()
2773                    .transaction_data()
2774                    .is_consensus_commit_prologue()
2775            );
2776
2777            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2778
2779            for tx in txs.iter().skip(1) {
2780                if let Some(tx) = tx {
2781                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2782                }
2783            }
2784        }
2785    }
2786}
2787
2788async fn wait_for_effects_with_retry(
2789    effects_store: &dyn TransactionCacheRead,
2790    task_name: &'static str,
2791    digests: &[TransactionDigest],
2792    tx_key: TransactionKey,
2793) -> Vec<TransactionEffects> {
2794    let delay = if in_antithesis() {
2795        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2796        15
2797    } else {
2798        5
2799    };
2800    loop {
2801        match tokio::time::timeout(Duration::from_secs(delay), async {
2802            effects_store
2803                .notify_read_executed_effects(task_name, digests)
2804                .await
2805        })
2806        .await
2807        {
2808            Ok(effects) => break effects,
2809            Err(_) => {
2810                debug_fatal!(
2811                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2812                    tx_key
2813                );
2814            }
2815        }
2816    }
2817}
2818
2819impl CheckpointAggregator {
2820    fn new(
2821        tables: Arc<CheckpointStore>,
2822        epoch_store: Arc<AuthorityPerEpochStore>,
2823        notify: Arc<Notify>,
2824        receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2825        output: Box<dyn CertifiedCheckpointOutput>,
2826        state: Arc<AuthorityState>,
2827        metrics: Arc<CheckpointMetrics>,
2828    ) -> Self {
2829        Self {
2830            store: tables,
2831            epoch_store,
2832            notify,
2833            receiver,
2834            pending: BTreeMap::new(),
2835            current: None,
2836            output,
2837            state,
2838            metrics,
2839        }
2840    }
2841
2842    async fn run(mut self) {
2843        info!("Starting CheckpointAggregator");
2844        loop {
2845            // Drain all signatures that arrived since the last iteration into the pending buffer
2846            while let Ok(sig) = self.receiver.try_recv() {
2847                self.pending
2848                    .entry(sig.summary.sequence_number)
2849                    .or_default()
2850                    .push(sig);
2851            }
2852
2853            if let Err(e) = self.run_and_notify().await {
2854                error!(
2855                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2856                    e
2857                );
2858                self.metrics.checkpoint_errors.inc();
2859                tokio::time::sleep(Duration::from_secs(1)).await;
2860                continue;
2861            }
2862
2863            tokio::select! {
2864                Some(sig) = self.receiver.recv() => {
2865                    self.pending
2866                        .entry(sig.summary.sequence_number)
2867                        .or_default()
2868                        .push(sig);
2869                }
2870                _ = self.notify.notified() => {}
2871                _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2872            }
2873        }
2874    }
2875
2876    async fn run_and_notify(&mut self) -> SuiResult {
2877        let summaries = self.run_inner()?;
2878        for summary in summaries {
2879            self.output.certified_checkpoint_created(&summary).await?;
2880        }
2881        Ok(())
2882    }
2883
2884    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2885        let _scope = monitored_scope("CheckpointAggregator");
2886        let mut result = vec![];
2887        'outer: loop {
2888            let next_to_certify = self.next_checkpoint_to_certify()?;
2889            // Discard buffered signatures for checkpoints already certified
2890            // (e.g. certified via StateSync before local aggregation completed).
2891            self.pending.retain(|&seq, _| seq >= next_to_certify);
2892            let current = if let Some(current) = &mut self.current {
2893                // It's possible that the checkpoint was already certified by
2894                // the rest of the network and we've already received the
2895                // certified checkpoint via StateSync. In this case, we reset
2896                // the current signature aggregator to the next checkpoint to
2897                // be certified
2898                if current.summary.sequence_number < next_to_certify {
2899                    assert_reachable!("skip checkpoint certification");
2900                    self.current = None;
2901                    continue;
2902                }
2903                current
2904            } else {
2905                let Some(summary) = self
2906                    .epoch_store
2907                    .get_built_checkpoint_summary(next_to_certify)?
2908                else {
2909                    return Ok(result);
2910                };
2911                self.current = Some(CheckpointSignatureAggregator {
2912                    digest: summary.digest(),
2913                    summary,
2914                    signatures_by_digest: MultiStakeAggregator::new(
2915                        self.epoch_store.committee().clone(),
2916                    ),
2917                    store: self.store.clone(),
2918                    state: self.state.clone(),
2919                    metrics: self.metrics.clone(),
2920                });
2921                self.current.as_mut().unwrap()
2922            };
2923
2924            let seq = current.summary.sequence_number;
2925            let sigs = self.pending.remove(&seq).unwrap_or_default();
2926            if sigs.is_empty() {
2927                trace!(
2928                    checkpoint_seq =? seq,
2929                    "Not enough checkpoint signatures",
2930                );
2931                return Ok(result);
2932            }
2933            for data in sigs {
2934                trace!(
2935                    checkpoint_seq = seq,
2936                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2937                    current.summary.digest(),
2938                    data.summary.auth_sig().authority.concise()
2939                );
2940                self.metrics
2941                    .checkpoint_participation
2942                    .with_label_values(&[&format!(
2943                        "{:?}",
2944                        data.summary.auth_sig().authority.concise()
2945                    )])
2946                    .inc();
2947                if let Ok(auth_signature) = current.try_aggregate(data) {
2948                    debug!(
2949                        checkpoint_seq = seq,
2950                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2951                        current.summary.digest(),
2952                    );
2953                    let summary = VerifiedCheckpoint::new_unchecked(
2954                        CertifiedCheckpointSummary::new_from_data_and_sig(
2955                            current.summary.clone(),
2956                            auth_signature,
2957                        ),
2958                    );
2959
2960                    self.store.insert_certified_checkpoint(&summary)?;
2961                    self.metrics.last_certified_checkpoint.set(seq as i64);
2962                    current.summary.report_checkpoint_age(
2963                        &self.metrics.last_certified_checkpoint_age,
2964                        &self.metrics.last_certified_checkpoint_age_ms,
2965                    );
2966                    result.push(summary.into_inner());
2967                    self.current = None;
2968                    continue 'outer;
2969                }
2970            }
2971            break;
2972        }
2973        Ok(result)
2974    }
2975
2976    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2977        Ok(self
2978            .store
2979            .tables
2980            .certified_checkpoints
2981            .reversed_safe_iter_with_bounds(None, None)?
2982            .next()
2983            .transpose()?
2984            .map(|(seq, _)| seq + 1)
2985            .unwrap_or_default())
2986    }
2987}
2988
2989impl CheckpointSignatureAggregator {
2990    #[allow(clippy::result_unit_err)]
2991    pub fn try_aggregate(
2992        &mut self,
2993        data: CheckpointSignatureMessage,
2994    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2995        let their_digest = *data.summary.digest();
2996        let (_, signature) = data.summary.into_data_and_sig();
2997        let author = signature.authority;
2998        let envelope =
2999            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3000        match self.signatures_by_digest.insert(their_digest, envelope) {
3001            // ignore repeated signatures
3002            InsertResult::Failed { error }
3003                if matches!(
3004                    error.as_inner(),
3005                    SuiErrorKind::StakeAggregatorRepeatedSigner {
3006                        conflicting_sig: false,
3007                        ..
3008                    },
3009                ) =>
3010            {
3011                Err(())
3012            }
3013            InsertResult::Failed { error } => {
3014                warn!(
3015                    checkpoint_seq = self.summary.sequence_number,
3016                    "Failed to aggregate new signature from validator {:?}: {:?}",
3017                    author.concise(),
3018                    error
3019                );
3020                self.check_for_split_brain();
3021                Err(())
3022            }
3023            InsertResult::QuorumReached(cert) => {
3024                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
3025                // the signature so we know that the author signed the message at some point.
3026                if their_digest != self.digest {
3027                    self.metrics.remote_checkpoint_forks.inc();
3028                    warn!(
3029                        checkpoint_seq = self.summary.sequence_number,
3030                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3031                        author.concise(),
3032                        their_digest,
3033                        self.digest
3034                    );
3035                    return Err(());
3036                }
3037                Ok(cert)
3038            }
3039            InsertResult::NotEnoughVotes {
3040                bad_votes: _,
3041                bad_authorities: _,
3042            } => {
3043                self.check_for_split_brain();
3044                Err(())
3045            }
3046        }
3047    }
3048
3049    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
3050    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
3051    /// irrespective of the outcome of any outstanding votes.
3052    fn check_for_split_brain(&self) {
3053        debug!(
3054            checkpoint_seq = self.summary.sequence_number,
3055            "Checking for split brain condition"
3056        );
3057        if self.signatures_by_digest.quorum_unreachable() {
3058            // TODO: at this point we should immediately halt processing
3059            // of new transaction certificates to avoid building on top of
3060            // forked output
3061            // self.halt_all_execution();
3062
3063            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3064            let digests_by_stake_messages = all_unique_values
3065                .iter()
3066                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3067                .map(|(digest, (_authorities, total_stake))| {
3068                    format!("{:?} (total stake: {})", digest, total_stake)
3069                })
3070                .collect::<Vec<String>>();
3071            fail_point_arg!("kill_split_brain_node", |(
3072                checkpoint_overrides,
3073                forked_authorities,
3074            ): (
3075                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3076                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3077            )| {
3078                #[cfg(msim)]
3079                {
3080                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3081                        (checkpoint_overrides.lock(), forked_authorities.lock())
3082                    {
3083                        // Find the digest produced by non-forked authorities
3084                        let correct_digest = all_unique_values
3085                            .iter()
3086                            .find(|(_, (authorities, _))| {
3087                                // Check if any authority that produced this digest is NOT in the forked set
3088                                authorities
3089                                    .iter()
3090                                    .any(|auth| !forked_authorities_set.contains(auth))
3091                            })
3092                            .map(|(digest, _)| digest.to_string())
3093                            .unwrap_or_else(|| {
3094                                // Fallback: use the digest with the highest stake
3095                                all_unique_values
3096                                    .iter()
3097                                    .max_by_key(|(_, (_, stake))| *stake)
3098                                    .map(|(digest, _)| digest.to_string())
3099                                    .unwrap_or_else(|| self.digest.to_string())
3100                            });
3101
3102                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
3103
3104                        tracing::error!(
3105                            fatal = true,
3106                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3107                            self.summary.sequence_number,
3108                            correct_digest
3109                        );
3110                    }
3111                }
3112            });
3113
3114            debug_fatal!(
3115                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3116                self.summary.sequence_number,
3117                self.signatures_by_digest.uncommitted_stake(),
3118                digests_by_stake_messages
3119            );
3120            self.metrics.split_brain_checkpoint_forks.inc();
3121
3122            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3123            let local_summary = self.summary.clone();
3124            let state = self.state.clone();
3125            let tables = self.store.clone();
3126
3127            tokio::spawn(async move {
3128                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3129            });
3130        }
3131    }
3132}
3133
3134/// Create data dump containing relevant data for diagnosing cause of the
3135/// split brain by querying one disagreeing validator for full checkpoint contents.
3136/// To minimize peer chatter, we only query one validator at random from each
3137/// disagreeing faction, as all honest validators that participated in this round may
3138/// inevitably run the same process.
3139async fn diagnose_split_brain(
3140    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3141    local_summary: CheckpointSummary,
3142    state: Arc<AuthorityState>,
3143    tables: Arc<CheckpointStore>,
3144) {
3145    debug!(
3146        checkpoint_seq = local_summary.sequence_number,
3147        "Running split brain diagnostics..."
3148    );
3149    let time = SystemTime::now();
3150    // collect one random disagreeing validator per differing digest
3151    let digest_to_validator = all_unique_values
3152        .iter()
3153        .filter_map(|(digest, (validators, _))| {
3154            if *digest != local_summary.digest() {
3155                let random_validator = validators.choose(&mut get_rng()).unwrap();
3156                Some((*digest, *random_validator))
3157            } else {
3158                None
3159            }
3160        })
3161        .collect::<HashMap<_, _>>();
3162    if digest_to_validator.is_empty() {
3163        panic!(
3164            "Given split brain condition, there should be at \
3165                least one validator that disagrees with local signature"
3166        );
3167    }
3168
3169    let epoch_store = state.load_epoch_store_one_call_per_task();
3170    let committee = epoch_store
3171        .epoch_start_state()
3172        .get_sui_committee_with_network_metadata();
3173    let network_config = default_mysten_network_config();
3174    let network_clients =
3175        make_network_authority_clients_with_network_config(&committee, &network_config);
3176
3177    // Query all disagreeing validators
3178    let response_futures = digest_to_validator
3179        .values()
3180        .cloned()
3181        .map(|validator| {
3182            let client = network_clients
3183                .get(&validator)
3184                .expect("Failed to get network client");
3185            let request = CheckpointRequestV2 {
3186                sequence_number: Some(local_summary.sequence_number),
3187                request_content: true,
3188                certified: false,
3189            };
3190            client.handle_checkpoint_v2(request)
3191        })
3192        .collect::<Vec<_>>();
3193
3194    let digest_name_pair = digest_to_validator.iter();
3195    let response_data = futures::future::join_all(response_futures)
3196        .await
3197        .into_iter()
3198        .zip(digest_name_pair)
3199        .filter_map(|(response, (digest, name))| match response {
3200            Ok(response) => match response {
3201                CheckpointResponseV2 {
3202                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3203                    contents: Some(contents),
3204                } => Some((*name, *digest, summary, contents)),
3205                CheckpointResponseV2 {
3206                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3207                    contents: _,
3208                } => {
3209                    panic!("Expected pending checkpoint, but got certified checkpoint");
3210                }
3211                CheckpointResponseV2 {
3212                    checkpoint: None,
3213                    contents: _,
3214                } => {
3215                    error!(
3216                        "Summary for checkpoint {:?} not found on validator {:?}",
3217                        local_summary.sequence_number, name
3218                    );
3219                    None
3220                }
3221                CheckpointResponseV2 {
3222                    checkpoint: _,
3223                    contents: None,
3224                } => {
3225                    error!(
3226                        "Contents for checkpoint {:?} not found on validator {:?}",
3227                        local_summary.sequence_number, name
3228                    );
3229                    None
3230                }
3231            },
3232            Err(e) => {
3233                error!(
3234                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3235                    e
3236                );
3237                None
3238            }
3239        })
3240        .collect::<Vec<_>>();
3241
3242    let local_checkpoint_contents = tables
3243        .get_checkpoint_contents(&local_summary.content_digest)
3244        .unwrap_or_else(|_| {
3245            panic!(
3246                "Could not find checkpoint contents for digest {:?}",
3247                local_summary.digest()
3248            )
3249        })
3250        .unwrap_or_else(|| {
3251            panic!(
3252                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3253                local_summary.sequence_number,
3254                local_summary.digest()
3255            )
3256        });
3257    let local_contents_text = format!("{local_checkpoint_contents:?}");
3258
3259    let local_summary_text = format!("{local_summary:?}");
3260    let local_validator = state.name.concise();
3261    let diff_patches = response_data
3262        .iter()
3263        .map(|(name, other_digest, other_summary, contents)| {
3264            let other_contents_text = format!("{contents:?}");
3265            let other_summary_text = format!("{other_summary:?}");
3266            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3267                .enumerate_transactions(&local_summary)
3268                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3269                .unzip();
3270            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3271                .enumerate_transactions(other_summary)
3272                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3273                .unzip();
3274            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3275            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3276            let local_transactions_text = format!("{local_transactions:#?}");
3277            let other_transactions_text = format!("{other_transactions:#?}");
3278            let transactions_patch =
3279                create_patch(&local_transactions_text, &other_transactions_text);
3280            let local_effects_text = format!("{local_effects:#?}");
3281            let other_effects_text = format!("{other_effects:#?}");
3282            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3283            let seq_number = local_summary.sequence_number;
3284            let local_digest = local_summary.digest();
3285            let other_validator = name.concise();
3286            format!(
3287                "Checkpoint: {seq_number:?}\n\
3288                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3289                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3290                Summary Diff: \n{summary_patch}\n\n\
3291                Contents Diff: \n{contents_patch}\n\n\
3292                Transactions Diff: \n{transactions_patch}\n\n\
3293                Effects Diff: \n{effects_patch}",
3294            )
3295        })
3296        .collect::<Vec<_>>()
3297        .join("\n\n\n");
3298
3299    let header = format!(
3300        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3301        Datetime: {:?}",
3302        time
3303    );
3304    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3305    let path = tempfile::tempdir()
3306        .expect("Failed to create tempdir")
3307        .keep()
3308        .join(Path::new("checkpoint_fork_dump.txt"));
3309    let mut file = File::create(path).unwrap();
3310    write!(file, "{}", fork_logs_text).unwrap();
3311    debug!("{}", fork_logs_text);
3312}
3313
3314pub trait CheckpointServiceNotify {
3315    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3316
3317    fn notify_checkpoint(&self) -> SuiResult;
3318}
3319
3320#[allow(clippy::large_enum_variant)]
3321enum CheckpointServiceState {
3322    Unstarted(
3323        (
3324            CheckpointBuilder,
3325            CheckpointAggregator,
3326            CheckpointStateHasher,
3327        ),
3328    ),
3329    Started,
3330}
3331
3332impl CheckpointServiceState {
3333    fn take_unstarted(
3334        &mut self,
3335    ) -> (
3336        CheckpointBuilder,
3337        CheckpointAggregator,
3338        CheckpointStateHasher,
3339    ) {
3340        let mut state = CheckpointServiceState::Started;
3341        std::mem::swap(self, &mut state);
3342
3343        match state {
3344            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3345                (builder, aggregator, hasher)
3346            }
3347            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3348        }
3349    }
3350}
3351
3352pub struct CheckpointService {
3353    tables: Arc<CheckpointStore>,
3354    notify_builder: Arc<Notify>,
3355    signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3356    // A notification for the current highest built sequence number.
3357    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3358    // The highest sequence number that had already been built at the time CheckpointService
3359    // was constructed
3360    highest_previously_built_seq: CheckpointSequenceNumber,
3361    metrics: Arc<CheckpointMetrics>,
3362    state: Mutex<CheckpointServiceState>,
3363}
3364
3365impl CheckpointService {
3366    /// Constructs a new CheckpointService in an un-started state.
3367    // The signature channel is unbounded because notify_checkpoint_signature is called from a
3368    // sync context (consensus_validator.rs implements a sync external trait) and cannot block.
3369    // The channel is consumed by a single async aggregator task that drains it continuously, so
3370    // unbounded growth is not a concern in practice.
3371    #[allow(clippy::disallowed_methods)]
3372    pub fn build(
3373        state: Arc<AuthorityState>,
3374        checkpoint_store: Arc<CheckpointStore>,
3375        epoch_store: Arc<AuthorityPerEpochStore>,
3376        effects_store: Arc<dyn TransactionCacheRead>,
3377        global_state_hasher: Weak<GlobalStateHasher>,
3378        checkpoint_output: Box<dyn CheckpointOutput>,
3379        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3380        metrics: Arc<CheckpointMetrics>,
3381        max_transactions_per_checkpoint: usize,
3382        max_checkpoint_size_bytes: usize,
3383    ) -> Arc<Self> {
3384        info!(
3385            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3386        );
3387        let notify_builder = Arc::new(Notify::new());
3388        let notify_aggregator = Arc::new(Notify::new());
3389
3390        // We may have built higher checkpoint numbers before restarting.
3391        let highest_previously_built_seq = checkpoint_store
3392            .get_latest_locally_computed_checkpoint()
3393            .expect("failed to get latest locally computed checkpoint")
3394            .map(|s| s.sequence_number)
3395            .unwrap_or(0);
3396
3397        let highest_currently_built_seq =
3398            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3399                .expect("epoch should not have ended")
3400                .map(|(seq, _)| seq)
3401                .unwrap_or(0);
3402
3403        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3404
3405        let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3406
3407        let aggregator = CheckpointAggregator::new(
3408            checkpoint_store.clone(),
3409            epoch_store.clone(),
3410            notify_aggregator.clone(),
3411            signature_receiver,
3412            certified_checkpoint_output,
3413            state.clone(),
3414            metrics.clone(),
3415        );
3416
3417        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3418
3419        let ckpt_state_hasher = CheckpointStateHasher::new(
3420            epoch_store.clone(),
3421            global_state_hasher.clone(),
3422            receive_from_builder,
3423        );
3424
3425        let builder = CheckpointBuilder::new(
3426            state.clone(),
3427            checkpoint_store.clone(),
3428            epoch_store.clone(),
3429            notify_builder.clone(),
3430            effects_store,
3431            global_state_hasher,
3432            send_to_hasher,
3433            checkpoint_output,
3434            notify_aggregator.clone(),
3435            highest_currently_built_seq_tx.clone(),
3436            metrics.clone(),
3437            max_transactions_per_checkpoint,
3438            max_checkpoint_size_bytes,
3439        );
3440
3441        Arc::new(Self {
3442            tables: checkpoint_store,
3443            notify_builder,
3444            signature_sender,
3445            highest_currently_built_seq_tx,
3446            highest_previously_built_seq,
3447            metrics,
3448            state: Mutex::new(CheckpointServiceState::Unstarted((
3449                builder,
3450                aggregator,
3451                ckpt_state_hasher,
3452            ))),
3453        })
3454    }
3455
3456    /// Starts the CheckpointService.
3457    ///
3458    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3459    /// been built before the most recent restart. You can think of this as a WAL replay
3460    /// operation. Upon startup, we may have a number of consensus commits and resulting
3461    /// checkpoints that were built but not committed to disk. We want to reprocess the
3462    /// commits and rebuild the checkpoints before starting normal operation.
3463    pub async fn spawn(
3464        &self,
3465        epoch_store: Arc<AuthorityPerEpochStore>,
3466        consensus_replay_waiter: Option<ReplayWaiter>,
3467    ) {
3468        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3469
3470        // Clean up state hashes computed after the last built checkpoint
3471        // This prevents ECMH divergence after fork recovery restarts
3472
3473        // Note: there is a rare crash recovery edge case where we write the builder
3474        // summary, but crash before we can bump the highest executed checkpoint.
3475        // If we committed the builder summary, it was certified and unforked, so there
3476        // is no need to clear that state hash. If we do clear it, then checkpoint executor
3477        // will wait forever for checkpoint builder to produce the state hash, which will
3478        // never happen.
3479        let last_persisted_builder_seq = epoch_store
3480            .last_persisted_checkpoint_builder_summary()
3481            .expect("epoch should not have ended")
3482            .map(|s| s.summary.sequence_number);
3483
3484        let last_executed_seq = self
3485            .tables
3486            .get_highest_executed_checkpoint()
3487            .expect("Failed to get highest executed checkpoint")
3488            .map(|checkpoint| *checkpoint.sequence_number());
3489
3490        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3491            if let Err(e) = builder
3492                .epoch_store
3493                .clear_state_hashes_after_checkpoint(last_committed_seq)
3494            {
3495                error!(
3496                    "Failed to clear state hashes after checkpoint {}: {:?}",
3497                    last_committed_seq, e
3498                );
3499            } else {
3500                info!(
3501                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3502                    last_committed_seq
3503                );
3504            }
3505        }
3506
3507        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3508
3509        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3510        let aggregator_task = spawn_monitored_task!(aggregator.run());
3511
3512        spawn_monitored_task!(async move {
3513            epoch_store
3514                .within_alive_epoch(async move {
3515                    builder.run(consensus_replay_waiter).await;
3516                    builder_finished_tx.send(()).ok();
3517                })
3518                .await
3519                .ok();
3520
3521            // state hasher will terminate as soon as it has finished processing all messages from builder
3522            state_hasher_task
3523                .await
3524                .expect("state hasher should exit normally");
3525
3526            // builder must shut down before aggregator and state_hasher, since it sends
3527            // messages to them
3528            aggregator_task.abort();
3529            aggregator_task.await.ok();
3530        });
3531
3532        // If this times out, the validator may still start up. The worst that can
3533        // happen is that we will crash later on instead of immediately. The eventual
3534        // crash would occur because we may be missing transactions that are below the
3535        // highest_synced_checkpoint watermark, which can cause a crash in
3536        // `CheckpointExecutor::extract_randomness_rounds`.
3537        if tokio::time::timeout(Duration::from_secs(120), async move {
3538            tokio::select! {
3539                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3540                _ = self.wait_for_rebuilt_checkpoints() => (),
3541            }
3542        })
3543        .await
3544        .is_err()
3545        {
3546            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3547        }
3548    }
3549}
3550
3551impl CheckpointService {
3552    /// Waits until all checkpoints had been built before the node restarted
3553    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3554    /// (and their transactions) below the highest_synced_checkpoint watermark are
3555    /// available. Once the checkpoints are constructed, we can be sure that the
3556    /// transactions have also been executed.
3557    pub async fn wait_for_rebuilt_checkpoints(&self) {
3558        let highest_previously_built_seq = self.highest_previously_built_seq;
3559        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3560        let mut highest_currently_built_seq = *rx.borrow_and_update();
3561        info!(
3562            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3563        );
3564        loop {
3565            if highest_currently_built_seq >= highest_previously_built_seq {
3566                info!("Checkpoint rebuild complete");
3567                break;
3568            }
3569            rx.changed().await.unwrap();
3570            highest_currently_built_seq = *rx.borrow_and_update();
3571        }
3572    }
3573
3574    #[cfg(test)]
3575    fn write_and_notify_checkpoint_for_testing(
3576        &self,
3577        epoch_store: &AuthorityPerEpochStore,
3578        checkpoint: PendingCheckpoint,
3579    ) -> SuiResult {
3580        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3581
3582        let mut output = ConsensusCommitOutput::new(0);
3583        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3584        output.set_default_commit_stats_for_testing();
3585        epoch_store.push_consensus_output_for_tests(output);
3586        self.notify_checkpoint()?;
3587        Ok(())
3588    }
3589}
3590
3591impl CheckpointServiceNotify for CheckpointService {
3592    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3593        let sequence = info.summary.sequence_number;
3594        let signer = info.summary.auth_sig().authority.concise();
3595
3596        if let Some(highest_verified_checkpoint) = self
3597            .tables
3598            .get_highest_verified_checkpoint()?
3599            .map(|x| *x.sequence_number())
3600            && sequence <= highest_verified_checkpoint
3601        {
3602            trace!(
3603                checkpoint_seq = sequence,
3604                "Ignore checkpoint signature from {} - already certified", signer,
3605            );
3606            self.metrics
3607                .last_ignored_checkpoint_signature_received
3608                .set(sequence as i64);
3609            return Ok(());
3610        }
3611        trace!(
3612            checkpoint_seq = sequence,
3613            "Received checkpoint signature, digest {} from {}",
3614            info.summary.digest(),
3615            signer,
3616        );
3617        self.metrics
3618            .last_received_checkpoint_signatures
3619            .with_label_values(&[&signer.to_string()])
3620            .set(sequence as i64);
3621        self.signature_sender.send(info.clone()).ok();
3622        Ok(())
3623    }
3624
3625    fn notify_checkpoint(&self) -> SuiResult {
3626        self.notify_builder.notify_one();
3627        Ok(())
3628    }
3629}
3630
3631// test helper
3632pub struct CheckpointServiceNoop {}
3633impl CheckpointServiceNotify for CheckpointServiceNoop {
3634    fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3635        Ok(())
3636    }
3637
3638    fn notify_checkpoint(&self) -> SuiResult {
3639        Ok(())
3640    }
3641}
3642
3643impl PendingCheckpoint {
3644    pub fn height(&self) -> CheckpointHeight {
3645        self.details.checkpoint_height
3646    }
3647
3648    pub fn roots(&self) -> &Vec<TransactionKey> {
3649        &self.roots
3650    }
3651
3652    pub fn details(&self) -> &PendingCheckpointInfo {
3653        &self.details
3654    }
3655}
3656
3657impl PendingCheckpointV2 {
3658    pub fn height(&self) -> CheckpointHeight {
3659        self.details.checkpoint_height
3660    }
3661
3662    pub(crate) fn num_roots(&self) -> usize {
3663        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3664    }
3665}
3666
3667pin_project! {
3668    pub struct PollCounter<Fut> {
3669        #[pin]
3670        future: Fut,
3671        count: usize,
3672    }
3673}
3674
3675impl<Fut> PollCounter<Fut> {
3676    pub fn new(future: Fut) -> Self {
3677        Self { future, count: 0 }
3678    }
3679
3680    pub fn count(&self) -> usize {
3681        self.count
3682    }
3683}
3684
3685impl<Fut: Future> Future for PollCounter<Fut> {
3686    type Output = (usize, Fut::Output);
3687
3688    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3689        let this = self.project();
3690        *this.count += 1;
3691        match this.future.poll(cx) {
3692            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3693            Poll::Pending => Poll::Pending,
3694        }
3695    }
3696}
3697
3698fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3699    PollCounter::new(future)
3700}
3701
3702#[cfg(test)]
3703mod tests {
3704    use super::*;
3705    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3706    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3707    use futures::FutureExt as _;
3708    use futures::future::BoxFuture;
3709    use std::collections::HashMap;
3710    use std::ops::Deref;
3711    use sui_macros::sim_test;
3712    use sui_protocol_config::{Chain, ProtocolConfig};
3713    use sui_types::accumulator_event::AccumulatorEvent;
3714    use sui_types::authenticator_state::ActiveJwk;
3715    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3716    use sui_types::crypto::Signature;
3717    use sui_types::effects::{TransactionEffects, TransactionEvents};
3718    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3719    use sui_types::transaction::VerifiedTransaction;
3720    use tokio::sync::mpsc;
3721
3722    #[tokio::test]
3723    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3724        let store = CheckpointStore::new_for_tests();
3725        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3726        for seq in 70u64..=80u64 {
3727            let contents =
3728                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3729                    [sui_types::base_types::ExecutionDigests::new(
3730                        sui_types::digests::TransactionDigest::random(),
3731                        sui_types::digests::TransactionEffectsDigest::ZERO,
3732                    )],
3733                );
3734            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3735                &protocol,
3736                0,
3737                seq,
3738                0,
3739                &contents,
3740                None,
3741                sui_types::gas::GasCostSummary::default(),
3742                None,
3743                0,
3744                Vec::new(),
3745                Vec::new(),
3746            );
3747            store
3748                .tables
3749                .locally_computed_checkpoints
3750                .insert(&seq, &summary)
3751                .unwrap();
3752        }
3753
3754        store
3755            .clear_locally_computed_checkpoints_from(76)
3756            .expect("clear should succeed");
3757
3758        // Explicit boundary checks: 75 must remain, 76 must be deleted
3759        assert!(
3760            store
3761                .tables
3762                .locally_computed_checkpoints
3763                .get(&75)
3764                .unwrap()
3765                .is_some()
3766        );
3767        assert!(
3768            store
3769                .tables
3770                .locally_computed_checkpoints
3771                .get(&76)
3772                .unwrap()
3773                .is_none()
3774        );
3775
3776        for seq in 70u64..76u64 {
3777            assert!(
3778                store
3779                    .tables
3780                    .locally_computed_checkpoints
3781                    .get(&seq)
3782                    .unwrap()
3783                    .is_some()
3784            );
3785        }
3786        for seq in 76u64..=80u64 {
3787            assert!(
3788                store
3789                    .tables
3790                    .locally_computed_checkpoints
3791                    .get(&seq)
3792                    .unwrap()
3793                    .is_none()
3794            );
3795        }
3796    }
3797
3798    #[tokio::test]
3799    async fn test_fork_detection_storage() {
3800        let store = CheckpointStore::new_for_tests();
3801        // checkpoint fork
3802        let seq_num = 42;
3803        let digest = CheckpointDigest::random();
3804
3805        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3806
3807        store
3808            .record_checkpoint_fork_detected(seq_num, digest)
3809            .unwrap();
3810
3811        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3812        assert!(retrieved.is_some());
3813        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3814        assert_eq!(retrieved_seq, seq_num);
3815        assert_eq!(retrieved_digest, digest);
3816
3817        store.clear_checkpoint_fork_detected().unwrap();
3818        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3819
3820        // txn fork
3821        let tx_digest = TransactionDigest::random();
3822        let expected_effects = TransactionEffectsDigest::random();
3823        let actual_effects = TransactionEffectsDigest::random();
3824
3825        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3826
3827        store
3828            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3829            .unwrap();
3830
3831        let retrieved = store.get_transaction_fork_detected().unwrap();
3832        assert!(retrieved.is_some());
3833        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3834        assert_eq!(retrieved_tx, tx_digest);
3835        assert_eq!(retrieved_expected, expected_effects);
3836        assert_eq!(retrieved_actual, actual_effects);
3837
3838        store.clear_transaction_fork_detected().unwrap();
3839        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3840    }
3841
3842    #[sim_test]
3843    pub async fn checkpoint_builder_test() {
3844        telemetry_subscribers::init_for_testing();
3845
3846        let mut protocol_config =
3847            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3848        protocol_config.disable_accumulators_for_testing();
3849        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3850        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3851        let state = TestAuthorityBuilder::new()
3852            .with_protocol_config(protocol_config)
3853            .build()
3854            .await;
3855
3856        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3857            0,
3858            0,
3859            vec![],
3860            SequenceNumber::new(),
3861        );
3862
3863        let jwks = {
3864            let mut jwks = Vec::new();
3865            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3866                jwks.push(ActiveJwk {
3867                    jwk_id: JwkId::new(
3868                        "https://accounts.google.com".to_string(),
3869                        "1234567890".to_string(),
3870                    ),
3871                    jwk: JWK {
3872                        kty: "RSA".to_string(),
3873                        e: "AQAB".to_string(),
3874                        n: "1234567890".to_string(),
3875                        alg: "RS256".to_string(),
3876                    },
3877                    epoch: 0,
3878                });
3879            }
3880            jwks
3881        };
3882
3883        let dummy_tx_with_data =
3884            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3885
3886        for i in 0..15 {
3887            state
3888                .database_for_testing()
3889                .perpetual_tables
3890                .transactions
3891                .insert(&d(i), dummy_tx.serializable_ref())
3892                .unwrap();
3893        }
3894        for i in 15..20 {
3895            state
3896                .database_for_testing()
3897                .perpetual_tables
3898                .transactions
3899                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3900                .unwrap();
3901        }
3902
3903        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3904        commit_cert_for_test(
3905            &mut store,
3906            state.clone(),
3907            d(1),
3908            vec![d(2), d(3)],
3909            GasCostSummary::new(11, 12, 11, 1),
3910        );
3911        commit_cert_for_test(
3912            &mut store,
3913            state.clone(),
3914            d(2),
3915            vec![d(3), d(4)],
3916            GasCostSummary::new(21, 22, 21, 1),
3917        );
3918        commit_cert_for_test(
3919            &mut store,
3920            state.clone(),
3921            d(3),
3922            vec![],
3923            GasCostSummary::new(31, 32, 31, 1),
3924        );
3925        commit_cert_for_test(
3926            &mut store,
3927            state.clone(),
3928            d(4),
3929            vec![],
3930            GasCostSummary::new(41, 42, 41, 1),
3931        );
3932        for i in [5, 6, 7, 10, 11, 12, 13] {
3933            commit_cert_for_test(
3934                &mut store,
3935                state.clone(),
3936                d(i),
3937                vec![],
3938                GasCostSummary::new(41, 42, 41, 1),
3939            );
3940        }
3941        for i in [15, 16, 17] {
3942            commit_cert_for_test(
3943                &mut store,
3944                state.clone(),
3945                d(i),
3946                vec![],
3947                GasCostSummary::new(51, 52, 51, 1),
3948            );
3949        }
3950        let all_digests: Vec<_> = store.keys().copied().collect();
3951        for digest in all_digests {
3952            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3953            state
3954                .epoch_store_for_testing()
3955                .test_insert_user_signature(digest, vec![(signature, None)]);
3956        }
3957
3958        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3959        let (certified_output, mut certified_result) =
3960            mpsc::channel::<CertifiedCheckpointSummary>(10);
3961        let store = Arc::new(store);
3962
3963        let ckpt_dir = tempfile::tempdir().unwrap();
3964        let checkpoint_store =
3965            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3966        let epoch_store = state.epoch_store_for_testing();
3967
3968        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3969            state.get_global_state_hash_store().clone(),
3970        ));
3971
3972        let checkpoint_service = CheckpointService::build(
3973            state.clone(),
3974            checkpoint_store,
3975            epoch_store.clone(),
3976            store,
3977            Arc::downgrade(&global_state_hasher),
3978            Box::new(output),
3979            Box::new(certified_output),
3980            CheckpointMetrics::new_for_tests(),
3981            3,
3982            100_000,
3983        );
3984        checkpoint_service.spawn(epoch_store.clone(), None).await;
3985
3986        checkpoint_service
3987            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3988            .unwrap();
3989        checkpoint_service
3990            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3991            .unwrap();
3992        checkpoint_service
3993            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3994            .unwrap();
3995        checkpoint_service
3996            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3997            .unwrap();
3998        checkpoint_service
3999            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4000            .unwrap();
4001        checkpoint_service
4002            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4003            .unwrap();
4004
4005        let (c1c, c1s) = result.recv().await.unwrap();
4006        let (c2c, c2s) = result.recv().await.unwrap();
4007
4008        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4009        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4010        assert_eq!(c1t, vec![d(4)]);
4011        assert_eq!(c1s.previous_digest, None);
4012        assert_eq!(c1s.sequence_number, 0);
4013        assert_eq!(
4014            c1s.epoch_rolling_gas_cost_summary,
4015            GasCostSummary::new(41, 42, 41, 1)
4016        );
4017
4018        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4019        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4020        assert_eq!(c2s.sequence_number, 1);
4021        assert_eq!(
4022            c2s.epoch_rolling_gas_cost_summary,
4023            GasCostSummary::new(104, 108, 104, 4)
4024        );
4025
4026        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
4027        // Verify that we split into 2 checkpoints.
4028        let (c3c, c3s) = result.recv().await.unwrap();
4029        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4030        let (c4c, c4s) = result.recv().await.unwrap();
4031        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4032        assert_eq!(c3s.sequence_number, 2);
4033        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4034        assert_eq!(c4s.sequence_number, 3);
4035        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4036        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4037        assert_eq!(c4t, vec![d(13)]);
4038
4039        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
4040        // Verify that we split into 2 checkpoints.
4041        let (c5c, c5s) = result.recv().await.unwrap();
4042        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4043        let (c6c, c6s) = result.recv().await.unwrap();
4044        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4045        assert_eq!(c5s.sequence_number, 4);
4046        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4047        assert_eq!(c6s.sequence_number, 5);
4048        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4049        assert_eq!(c5t, vec![d(15), d(16)]);
4050        assert_eq!(c6t, vec![d(17)]);
4051
4052        // Pending at index 4 was too soon after the prior one and should be coalesced into
4053        // the next one.
4054        let (c7c, c7s) = result.recv().await.unwrap();
4055        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4056        assert_eq!(c7t, vec![d(5), d(6)]);
4057        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4058        assert_eq!(c7s.sequence_number, 6);
4059
4060        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4061        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4062
4063        checkpoint_service
4064            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4065            .unwrap();
4066        checkpoint_service
4067            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4068            .unwrap();
4069
4070        let c1sc = certified_result.recv().await.unwrap();
4071        let c2sc = certified_result.recv().await.unwrap();
4072        assert_eq!(c1sc.sequence_number, 0);
4073        assert_eq!(c2sc.sequence_number, 1);
4074    }
4075
4076    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4077        fn notify_read_executed_effects_may_fail(
4078            &self,
4079            _: &str,
4080            digests: &[TransactionDigest],
4081        ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4082            std::future::ready(Ok(digests
4083                .iter()
4084                .map(|d| self.get(d).expect("effects not found").clone())
4085                .collect()))
4086            .boxed()
4087        }
4088
4089        fn notify_read_executed_effects_digests(
4090            &self,
4091            _: &str,
4092            digests: &[TransactionDigest],
4093        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4094            std::future::ready(
4095                digests
4096                    .iter()
4097                    .map(|d| {
4098                        self.get(d)
4099                            .map(|fx| fx.digest())
4100                            .expect("effects not found")
4101                    })
4102                    .collect(),
4103            )
4104            .boxed()
4105        }
4106
4107        fn multi_get_executed_effects(
4108            &self,
4109            digests: &[TransactionDigest],
4110        ) -> Vec<Option<TransactionEffects>> {
4111            digests.iter().map(|d| self.get(d).cloned()).collect()
4112        }
4113
4114        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
4115        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
4116        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
4117        // ExecutionCacheRead implementors).
4118
4119        fn multi_get_transaction_blocks(
4120            &self,
4121            _: &[TransactionDigest],
4122        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4123            unimplemented!()
4124        }
4125
4126        fn multi_get_executed_effects_digests(
4127            &self,
4128            _: &[TransactionDigest],
4129        ) -> Vec<Option<TransactionEffectsDigest>> {
4130            unimplemented!()
4131        }
4132
4133        fn multi_get_effects(
4134            &self,
4135            _: &[TransactionEffectsDigest],
4136        ) -> Vec<Option<TransactionEffects>> {
4137            unimplemented!()
4138        }
4139
4140        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4141            unimplemented!()
4142        }
4143
4144        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4145            unimplemented!()
4146        }
4147
4148        fn get_unchanged_loaded_runtime_objects(
4149            &self,
4150            _digest: &TransactionDigest,
4151        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4152            unimplemented!()
4153        }
4154
4155        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4156            unimplemented!()
4157        }
4158    }
4159
4160    #[async_trait::async_trait]
4161    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4162        async fn checkpoint_created(
4163            &self,
4164            summary: &CheckpointSummary,
4165            contents: &CheckpointContents,
4166            _epoch_store: &Arc<AuthorityPerEpochStore>,
4167            _checkpoint_store: &Arc<CheckpointStore>,
4168        ) -> SuiResult {
4169            self.try_send((contents.clone(), summary.clone())).unwrap();
4170            Ok(())
4171        }
4172    }
4173
4174    #[async_trait::async_trait]
4175    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4176        async fn certified_checkpoint_created(
4177            &self,
4178            summary: &CertifiedCheckpointSummary,
4179        ) -> SuiResult {
4180            self.try_send(summary.clone()).unwrap();
4181            Ok(())
4182        }
4183    }
4184
4185    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4186        PendingCheckpoint {
4187            roots: t
4188                .into_iter()
4189                .map(|t| TransactionKey::Digest(d(t)))
4190                .collect(),
4191            details: PendingCheckpointInfo {
4192                timestamp_ms,
4193                last_of_epoch: false,
4194                checkpoint_height: i,
4195                consensus_commit_ref: CommitRef::default(),
4196                rejected_transactions_digest: Digest::default(),
4197                checkpoint_seq: None,
4198            },
4199        }
4200    }
4201
4202    fn d(i: u8) -> TransactionDigest {
4203        let mut bytes: [u8; 32] = Default::default();
4204        bytes[0] = i;
4205        TransactionDigest::new(bytes)
4206    }
4207
4208    fn e(
4209        transaction_digest: TransactionDigest,
4210        dependencies: Vec<TransactionDigest>,
4211        gas_used: GasCostSummary,
4212    ) -> TransactionEffects {
4213        let mut effects = TransactionEffects::default();
4214        *effects.transaction_digest_mut_for_testing() = transaction_digest;
4215        *effects.dependencies_mut_for_testing() = dependencies;
4216        *effects.gas_cost_summary_mut_for_testing() = gas_used;
4217        effects
4218    }
4219
4220    fn commit_cert_for_test(
4221        store: &mut HashMap<TransactionDigest, TransactionEffects>,
4222        state: Arc<AuthorityState>,
4223        digest: TransactionDigest,
4224        dependencies: Vec<TransactionDigest>,
4225        gas_used: GasCostSummary,
4226    ) {
4227        let epoch_store = state.epoch_store_for_testing();
4228        let effects = e(digest, dependencies, gas_used);
4229        store.insert(digest, effects.clone());
4230        epoch_store.insert_executed_in_epoch(&digest);
4231    }
4232}