sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116    // Pre-assigned checkpoint sequence number from consensus handler.
117    // Only set when split_checkpoints_in_consensus_handler is enabled.
118    pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123    pub roots: Vec<TransactionKey>,
124    pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129    pub tx_roots: Vec<TransactionKey>,
130    pub settlement_root: Option<TransactionKey>,
131    pub height: CheckpointHeight,
132}
133
134/// PendingCheckpointV2 is merged and split in ConsensusHandler
135/// instead of CheckpointBuilder. It contains 1 or more
136/// CheckpointRoots which represents a group of transactions
137/// settled together.
138/// Usage of PendingCheckpointV2 is gated by split_checkpoints_in_consensus_handler
139/// We need to support dual implementations until this feature flag is removed.
140#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142    pub roots: Vec<CheckpointRoots>,
143    pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148    pub summary: CheckpointSummary,
149    // Height at which this checkpoint summary was built. None for genesis checkpoint
150    pub checkpoint_height: Option<CheckpointHeight>,
151    pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157    /// Maps checkpoint contents digest to checkpoint contents
158    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160    /// Maps checkpoint contents digest to checkpoint sequence number
161    pub(crate) checkpoint_sequence_by_contents_digest:
162        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
165    /// efficient reads of full checkpoints. Entries from this table are deleted after state
166    /// accumulation has completed.
167    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
169    // deprecate this table (and remove when possible).
170    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172    /// Stores certified checkpoints
173    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174    /// Map from checkpoint digest to certified checkpoint
175    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
178    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
179    /// certified checkpoint.
180    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
183    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185    /// Watermarks used to determine the highest verified, fully synced, and
186    /// fully executed checkpoints
187    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189    /// Stores transaction fork detection information
190    pub(crate) transaction_fork_detected: DBMap<
191        u8,
192        (
193            TransactionDigest,
194            TransactionEffectsDigest,
195            TransactionEffectsDigest,
196        ),
197    >,
198    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203    DBOptions {
204        options: default_db_options().options,
205        // We have seen potential data corruption issues in this table after forced shutdowns
206        // so we enable value hash logging to help with debugging.
207        // TODO: remove this once we have a better understanding of the root cause.
208        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209    }
210}
211
212impl CheckpointStoreTables {
213    #[cfg(not(tidehunter))]
214    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216    }
217
218    #[cfg(tidehunter)]
219    pub fn new(
220        path: &Path,
221        metric_name: &'static str,
222        pruner_watermarks: Arc<PrunerWatermarks>,
223    ) -> Self {
224        tracing::warn!("Checkpoint DB using tidehunter");
225        use crate::authority::authority_store_pruner::apply_relocation_filter;
226        use typed_store::tidehunter_util::{
227            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228            default_mutex_count, default_value_cache_size,
229        };
230        let mutexes = default_mutex_count();
231        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232        let override_dirty_keys_config = KeySpaceConfig::new()
233            .with_max_dirty_keys(16_000)
234            .with_value_cache_size(default_value_cache_size());
235        let config_u64 = ThConfig::new_with_config(
236            8,
237            mutexes,
238            u64_sequence_key,
239            override_dirty_keys_config.clone(),
240        );
241        let digest_config = ThConfig::new_with_rm_prefix(
242            32,
243            mutexes,
244            KeyType::uniform(default_cells_per_mutex()),
245            KeySpaceConfig::default(),
246            vec![0, 0, 0, 0, 0, 0, 0, 32],
247        );
248        let watermarks_config = KeySpaceConfig::new()
249            .with_value_cache_size(10)
250            .disable_unload();
251        let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
252        let configs = vec![
253            (
254                "checkpoint_content",
255                digest_config.clone().with_config(
256                    KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
257                ),
258            ),
259            (
260                "checkpoint_sequence_by_contents_digest",
261                digest_config.clone().with_config(apply_relocation_filter(
262                    KeySpaceConfig::default(),
263                    pruner_watermarks.checkpoint_id.clone(),
264                    |sequence_number: CheckpointSequenceNumber| sequence_number,
265                    false,
266                )),
267            ),
268            (
269                "full_checkpoint_content",
270                config_u64.clone().with_config(apply_relocation_filter(
271                    override_dirty_keys_config.clone(),
272                    pruner_watermarks.checkpoint_id.clone(),
273                    |sequence_number: CheckpointSequenceNumber| sequence_number,
274                    true,
275                )),
276            ),
277            ("certified_checkpoints", config_u64.clone()),
278            (
279                "checkpoint_by_digest",
280                digest_config.clone().with_config(apply_relocation_filter(
281                    lru_config,
282                    pruner_watermarks.epoch_id.clone(),
283                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
284                    false,
285                )),
286            ),
287            (
288                "locally_computed_checkpoints",
289                config_u64.clone().with_config(apply_relocation_filter(
290                    override_dirty_keys_config.clone(),
291                    pruner_watermarks.checkpoint_id.clone(),
292                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
293                    true,
294                )),
295            ),
296            ("epoch_last_checkpoint_map", config_u64.clone()),
297            (
298                "watermarks",
299                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
300            ),
301            (
302                "transaction_fork_detected",
303                ThConfig::new_with_config(
304                    1,
305                    1,
306                    KeyType::uniform(1),
307                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
308                ),
309            ),
310            (
311                "full_checkpoint_content_v2",
312                config_u64.clone().with_config(apply_relocation_filter(
313                    override_dirty_keys_config.clone(),
314                    pruner_watermarks.checkpoint_id.clone(),
315                    |sequence_number: CheckpointSequenceNumber| sequence_number,
316                    true,
317                )),
318            ),
319        ];
320        Self::open_tables_read_write(
321            path.to_path_buf(),
322            MetricConf::new(metric_name),
323            configs
324                .into_iter()
325                .map(|(cf, config)| (cf.to_string(), config))
326                .collect(),
327        )
328    }
329
330    #[cfg(not(tidehunter))]
331    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
332        Self::get_read_only_handle(
333            path.to_path_buf(),
334            None,
335            None,
336            MetricConf::new("checkpoint_readonly"),
337        )
338    }
339
340    #[cfg(tidehunter)]
341    pub fn open_readonly(path: &Path) -> Self {
342        Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
343    }
344}
345
346pub struct CheckpointStore {
347    pub(crate) tables: CheckpointStoreTables,
348    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
349    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
350}
351
352impl CheckpointStore {
353    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
354        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
355        Arc::new(Self {
356            tables,
357            synced_checkpoint_notify_read: NotifyRead::new(),
358            executed_checkpoint_notify_read: NotifyRead::new(),
359        })
360    }
361
362    pub fn new_for_tests() -> Arc<Self> {
363        let ckpt_dir = mysten_common::tempdir().unwrap();
364        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
365    }
366
367    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
368        let tables = CheckpointStoreTables::new(
369            path,
370            "db_checkpoint",
371            Arc::new(PrunerWatermarks::default()),
372        );
373        Arc::new(Self {
374            tables,
375            synced_checkpoint_notify_read: NotifyRead::new(),
376            executed_checkpoint_notify_read: NotifyRead::new(),
377        })
378    }
379
380    #[cfg(not(tidehunter))]
381    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
382        CheckpointStoreTables::open_readonly(path)
383    }
384
385    #[cfg(tidehunter)]
386    pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
387        CheckpointStoreTables::open_readonly(path)
388    }
389
390    #[instrument(level = "info", skip_all)]
391    pub fn insert_genesis_checkpoint(
392        &self,
393        checkpoint: VerifiedCheckpoint,
394        contents: CheckpointContents,
395        epoch_store: &AuthorityPerEpochStore,
396    ) {
397        assert_eq!(
398            checkpoint.epoch(),
399            0,
400            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
401        );
402        assert_eq!(
403            *checkpoint.sequence_number(),
404            0,
405            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
406        );
407
408        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
409        match self.get_checkpoint_by_sequence_number(0).unwrap() {
410            Some(existing_checkpoint) => {
411                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
412            }
413            None => {
414                if epoch_store.epoch() == checkpoint.epoch {
415                    epoch_store
416                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
417                        .unwrap();
418                } else {
419                    debug!(
420                        validator_epoch =% epoch_store.epoch(),
421                        genesis_epoch =% checkpoint.epoch(),
422                        "Not inserting checkpoint builder data for genesis checkpoint",
423                    );
424                }
425                self.insert_checkpoint_contents(contents).unwrap();
426                self.insert_verified_checkpoint(&checkpoint).unwrap();
427                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
428            }
429        }
430    }
431
432    pub fn get_checkpoint_by_digest(
433        &self,
434        digest: &CheckpointDigest,
435    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
436        self.tables
437            .checkpoint_by_digest
438            .get(digest)
439            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
440    }
441
442    pub fn get_checkpoint_by_sequence_number(
443        &self,
444        sequence_number: CheckpointSequenceNumber,
445    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
446        self.tables
447            .certified_checkpoints
448            .get(&sequence_number)
449            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
450    }
451
452    pub fn get_locally_computed_checkpoint(
453        &self,
454        sequence_number: CheckpointSequenceNumber,
455    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
456        self.tables
457            .locally_computed_checkpoints
458            .get(&sequence_number)
459    }
460
461    pub fn multi_get_locally_computed_checkpoints(
462        &self,
463        sequence_numbers: &[CheckpointSequenceNumber],
464    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
465        let checkpoints = self
466            .tables
467            .locally_computed_checkpoints
468            .multi_get(sequence_numbers)?;
469
470        Ok(checkpoints)
471    }
472
473    pub fn get_sequence_number_by_contents_digest(
474        &self,
475        digest: &CheckpointContentsDigest,
476    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
477        self.tables
478            .checkpoint_sequence_by_contents_digest
479            .get(digest)
480    }
481
482    pub fn delete_contents_digest_sequence_number_mapping(
483        &self,
484        digest: &CheckpointContentsDigest,
485    ) -> Result<(), TypedStoreError> {
486        self.tables
487            .checkpoint_sequence_by_contents_digest
488            .remove(digest)
489    }
490
491    pub fn get_latest_certified_checkpoint(
492        &self,
493    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
494        Ok(self
495            .tables
496            .certified_checkpoints
497            .reversed_safe_iter_with_bounds(None, None)?
498            .next()
499            .transpose()?
500            .map(|(_, v)| v.into()))
501    }
502
503    pub fn get_latest_locally_computed_checkpoint(
504        &self,
505    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
506        Ok(self
507            .tables
508            .locally_computed_checkpoints
509            .reversed_safe_iter_with_bounds(None, None)?
510            .next()
511            .transpose()?
512            .map(|(_, v)| v))
513    }
514
515    pub fn multi_get_checkpoint_by_sequence_number(
516        &self,
517        sequence_numbers: &[CheckpointSequenceNumber],
518    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
519        let checkpoints = self
520            .tables
521            .certified_checkpoints
522            .multi_get(sequence_numbers)?
523            .into_iter()
524            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
525            .collect();
526
527        Ok(checkpoints)
528    }
529
530    pub fn multi_get_checkpoint_content(
531        &self,
532        contents_digest: &[CheckpointContentsDigest],
533    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
534        self.tables.checkpoint_content.multi_get(contents_digest)
535    }
536
537    pub fn get_highest_verified_checkpoint(
538        &self,
539    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
540        let highest_verified = if let Some(highest_verified) = self
541            .tables
542            .watermarks
543            .get(&CheckpointWatermark::HighestVerified)?
544        {
545            highest_verified
546        } else {
547            return Ok(None);
548        };
549        self.get_checkpoint_by_digest(&highest_verified.1)
550    }
551
552    pub fn get_highest_synced_checkpoint(
553        &self,
554    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
555        let highest_synced = if let Some(highest_synced) = self
556            .tables
557            .watermarks
558            .get(&CheckpointWatermark::HighestSynced)?
559        {
560            highest_synced
561        } else {
562            return Ok(None);
563        };
564        self.get_checkpoint_by_digest(&highest_synced.1)
565    }
566
567    pub fn get_highest_synced_checkpoint_seq_number(
568        &self,
569    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
570        if let Some(highest_synced) = self
571            .tables
572            .watermarks
573            .get(&CheckpointWatermark::HighestSynced)?
574        {
575            Ok(Some(highest_synced.0))
576        } else {
577            Ok(None)
578        }
579    }
580
581    pub fn get_highest_executed_checkpoint_seq_number(
582        &self,
583    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
584        if let Some(highest_executed) = self
585            .tables
586            .watermarks
587            .get(&CheckpointWatermark::HighestExecuted)?
588        {
589            Ok(Some(highest_executed.0))
590        } else {
591            Ok(None)
592        }
593    }
594
595    pub fn get_highest_executed_checkpoint(
596        &self,
597    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
598        let highest_executed = if let Some(highest_executed) = self
599            .tables
600            .watermarks
601            .get(&CheckpointWatermark::HighestExecuted)?
602        {
603            highest_executed
604        } else {
605            return Ok(None);
606        };
607        self.get_checkpoint_by_digest(&highest_executed.1)
608    }
609
610    pub fn get_highest_pruned_checkpoint_seq_number(
611        &self,
612    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
613        self.tables
614            .watermarks
615            .get(&CheckpointWatermark::HighestPruned)
616            .map(|watermark| watermark.map(|w| w.0))
617    }
618
619    pub fn get_checkpoint_contents(
620        &self,
621        digest: &CheckpointContentsDigest,
622    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
623        self.tables.checkpoint_content.get(digest)
624    }
625
626    pub fn get_full_checkpoint_contents_by_sequence_number(
627        &self,
628        seq: CheckpointSequenceNumber,
629    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
630        self.tables.full_checkpoint_content_v2.get(&seq)
631    }
632
633    fn prune_local_summaries(&self) -> SuiResult {
634        if let Some((last_local_summary, _)) = self
635            .tables
636            .locally_computed_checkpoints
637            .reversed_safe_iter_with_bounds(None, None)?
638            .next()
639            .transpose()?
640        {
641            let mut batch = self.tables.locally_computed_checkpoints.batch();
642            batch.schedule_delete_range(
643                &self.tables.locally_computed_checkpoints,
644                &0,
645                &last_local_summary,
646            )?;
647            batch.write()?;
648            info!("Pruned local summaries up to {:?}", last_local_summary);
649        }
650        Ok(())
651    }
652
653    pub fn clear_locally_computed_checkpoints_from(
654        &self,
655        from_seq: CheckpointSequenceNumber,
656    ) -> SuiResult {
657        let keys: Vec<_> = self
658            .tables
659            .locally_computed_checkpoints
660            .safe_iter_with_bounds(Some(from_seq), None)
661            .map(|r| r.map(|(k, _)| k))
662            .collect::<Result<_, _>>()?;
663        if let Some(&last_local_summary) = keys.last() {
664            let mut batch = self.tables.locally_computed_checkpoints.batch();
665            batch
666                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
667                .expect("Failed to delete locally computed checkpoints");
668            batch
669                .write()
670                .expect("Failed to delete locally computed checkpoints");
671            warn!(
672                from_seq,
673                last_local_summary,
674                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
675                from_seq,
676                last_local_summary
677            );
678        }
679        Ok(())
680    }
681
682    fn check_for_checkpoint_fork(
683        &self,
684        local_checkpoint: &CheckpointSummary,
685        verified_checkpoint: &VerifiedCheckpoint,
686    ) {
687        if local_checkpoint != verified_checkpoint.data() {
688            let verified_contents = self
689                .get_checkpoint_contents(&verified_checkpoint.content_digest)
690                .map(|opt_contents| {
691                    opt_contents
692                        .map(|contents| format!("{:?}", contents))
693                        .unwrap_or_else(|| {
694                            format!(
695                                "Verified checkpoint contents not found, digest: {:?}",
696                                verified_checkpoint.content_digest,
697                            )
698                        })
699                })
700                .map_err(|e| {
701                    format!(
702                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
703                        verified_checkpoint.content_digest, e
704                    )
705                })
706                .unwrap_or_else(|err_msg| err_msg);
707
708            let local_contents = self
709                .get_checkpoint_contents(&local_checkpoint.content_digest)
710                .map(|opt_contents| {
711                    opt_contents
712                        .map(|contents| format!("{:?}", contents))
713                        .unwrap_or_else(|| {
714                            format!(
715                                "Local checkpoint contents not found, digest: {:?}",
716                                local_checkpoint.content_digest
717                            )
718                        })
719                })
720                .map_err(|e| {
721                    format!(
722                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
723                        local_checkpoint.content_digest, e
724                    )
725                })
726                .unwrap_or_else(|err_msg| err_msg);
727
728            // checkpoint contents may be too large for panic message.
729            error!(
730                verified_checkpoint = ?verified_checkpoint.data(),
731                ?verified_contents,
732                ?local_checkpoint,
733                ?local_contents,
734                "Local checkpoint fork detected!",
735            );
736
737            // Record the fork in the database before crashing
738            if let Err(e) = self.record_checkpoint_fork_detected(
739                *local_checkpoint.sequence_number(),
740                local_checkpoint.digest(),
741            ) {
742                error!("Failed to record checkpoint fork in database: {:?}", e);
743            }
744
745            fail_point_arg!(
746                "kill_checkpoint_fork_node",
747                |checkpoint_overrides: std::sync::Arc<
748                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
749                >| {
750                    #[cfg(msim)]
751                    {
752                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
753                            overrides.insert(
754                                local_checkpoint.sequence_number,
755                                verified_checkpoint.digest().to_string(),
756                            );
757                        }
758                        tracing::error!(
759                            fatal = true,
760                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
761                            local_checkpoint.sequence_number(),
762                            verified_checkpoint.digest()
763                        );
764                        sui_simulator::task::shutdown_current_node();
765                    }
766                }
767            );
768
769            fatal!(
770                "Local checkpoint fork detected for sequence number: {}",
771                local_checkpoint.sequence_number()
772            );
773        }
774    }
775
776    // Called by consensus (ConsensusAggregator).
777    // Different from `insert_verified_checkpoint`, it does not touch
778    // the highest_verified_checkpoint watermark such that state sync
779    // will have a chance to process this checkpoint and perform some
780    // state-sync only things.
781    pub fn insert_certified_checkpoint(
782        &self,
783        checkpoint: &VerifiedCheckpoint,
784    ) -> Result<(), TypedStoreError> {
785        debug!(
786            checkpoint_seq = checkpoint.sequence_number(),
787            "Inserting certified checkpoint",
788        );
789        let mut batch = self.tables.certified_checkpoints.batch();
790        batch
791            .insert_batch(
792                &self.tables.certified_checkpoints,
793                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
794            )?
795            .insert_batch(
796                &self.tables.checkpoint_by_digest,
797                [(checkpoint.digest(), checkpoint.serializable_ref())],
798            )?;
799        if checkpoint.next_epoch_committee().is_some() {
800            batch.insert_batch(
801                &self.tables.epoch_last_checkpoint_map,
802                [(&checkpoint.epoch(), checkpoint.sequence_number())],
803            )?;
804        }
805        batch.write()?;
806
807        if let Some(local_checkpoint) = self
808            .tables
809            .locally_computed_checkpoints
810            .get(checkpoint.sequence_number())?
811        {
812            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
813        }
814
815        Ok(())
816    }
817
818    // Called by state sync, apart from inserting the checkpoint and updating
819    // related tables, it also bumps the highest_verified_checkpoint watermark.
820    #[instrument(level = "debug", skip_all)]
821    pub fn insert_verified_checkpoint(
822        &self,
823        checkpoint: &VerifiedCheckpoint,
824    ) -> Result<(), TypedStoreError> {
825        self.insert_certified_checkpoint(checkpoint)?;
826        self.update_highest_verified_checkpoint(checkpoint)
827    }
828
829    pub fn update_highest_verified_checkpoint(
830        &self,
831        checkpoint: &VerifiedCheckpoint,
832    ) -> Result<(), TypedStoreError> {
833        if Some(*checkpoint.sequence_number())
834            > self
835                .get_highest_verified_checkpoint()?
836                .map(|x| *x.sequence_number())
837        {
838            debug!(
839                checkpoint_seq = checkpoint.sequence_number(),
840                "Updating highest verified checkpoint",
841            );
842            self.tables.watermarks.insert(
843                &CheckpointWatermark::HighestVerified,
844                &(*checkpoint.sequence_number(), *checkpoint.digest()),
845            )?;
846        }
847
848        Ok(())
849    }
850
851    pub fn update_highest_synced_checkpoint(
852        &self,
853        checkpoint: &VerifiedCheckpoint,
854    ) -> Result<(), TypedStoreError> {
855        let seq = *checkpoint.sequence_number();
856        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
857        self.tables.watermarks.insert(
858            &CheckpointWatermark::HighestSynced,
859            &(seq, *checkpoint.digest()),
860        )?;
861        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
862        Ok(())
863    }
864
865    async fn notify_read_checkpoint_watermark<F>(
866        &self,
867        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
868        seq: CheckpointSequenceNumber,
869        get_watermark: F,
870    ) -> VerifiedCheckpoint
871    where
872        F: Fn() -> Option<CheckpointSequenceNumber>,
873    {
874        notify_read
875            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
876                let seq = seqs[0];
877                let Some(highest) = get_watermark() else {
878                    return vec![None];
879                };
880                if highest < seq {
881                    return vec![None];
882                }
883                let checkpoint = self
884                    .get_checkpoint_by_sequence_number(seq)
885                    .expect("db error")
886                    .expect("checkpoint not found");
887                vec![Some(checkpoint)]
888            })
889            .await
890            .into_iter()
891            .next()
892            .unwrap()
893    }
894
895    pub async fn notify_read_synced_checkpoint(
896        &self,
897        seq: CheckpointSequenceNumber,
898    ) -> VerifiedCheckpoint {
899        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
900            self.get_highest_synced_checkpoint_seq_number()
901                .expect("db error")
902        })
903        .await
904    }
905
906    pub async fn notify_read_executed_checkpoint(
907        &self,
908        seq: CheckpointSequenceNumber,
909    ) -> VerifiedCheckpoint {
910        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
911            self.get_highest_executed_checkpoint_seq_number()
912                .expect("db error")
913        })
914        .await
915    }
916
917    pub fn update_highest_executed_checkpoint(
918        &self,
919        checkpoint: &VerifiedCheckpoint,
920    ) -> Result<(), TypedStoreError> {
921        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
922            if seq_number >= *checkpoint.sequence_number() {
923                return Ok(());
924            }
925            assert_eq!(
926                seq_number + 1,
927                *checkpoint.sequence_number(),
928                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
929                checkpoint.sequence_number(),
930                seq_number
931            );
932        }
933        let seq = *checkpoint.sequence_number();
934        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
935        self.tables.watermarks.insert(
936            &CheckpointWatermark::HighestExecuted,
937            &(seq, *checkpoint.digest()),
938        )?;
939        self.executed_checkpoint_notify_read
940            .notify(&seq, checkpoint);
941        Ok(())
942    }
943
944    pub fn update_highest_pruned_checkpoint(
945        &self,
946        checkpoint: &VerifiedCheckpoint,
947    ) -> Result<(), TypedStoreError> {
948        self.tables.watermarks.insert(
949            &CheckpointWatermark::HighestPruned,
950            &(*checkpoint.sequence_number(), *checkpoint.digest()),
951        )
952    }
953
954    /// Sets highest executed checkpoint to any value.
955    ///
956    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
957    /// It should only be used in one-off cases or tests after fully understanding the risk.
958    pub fn set_highest_executed_checkpoint_subtle(
959        &self,
960        checkpoint: &VerifiedCheckpoint,
961    ) -> Result<(), TypedStoreError> {
962        self.tables.watermarks.insert(
963            &CheckpointWatermark::HighestExecuted,
964            &(*checkpoint.sequence_number(), *checkpoint.digest()),
965        )
966    }
967
968    pub fn insert_checkpoint_contents(
969        &self,
970        contents: CheckpointContents,
971    ) -> Result<(), TypedStoreError> {
972        debug!(
973            checkpoint_seq = ?contents.digest(),
974            "Inserting checkpoint contents",
975        );
976        self.tables
977            .checkpoint_content
978            .insert(contents.digest(), &contents)
979    }
980
981    pub fn insert_verified_checkpoint_contents(
982        &self,
983        checkpoint: &VerifiedCheckpoint,
984        full_contents: VerifiedCheckpointContents,
985    ) -> Result<(), TypedStoreError> {
986        let mut batch = self.tables.full_checkpoint_content_v2.batch();
987        batch.insert_batch(
988            &self.tables.checkpoint_sequence_by_contents_digest,
989            [(&checkpoint.content_digest, checkpoint.sequence_number())],
990        )?;
991        let full_contents = full_contents.into_inner();
992        batch.insert_batch(
993            &self.tables.full_checkpoint_content_v2,
994            [(checkpoint.sequence_number(), &full_contents)],
995        )?;
996
997        let contents = full_contents.into_checkpoint_contents();
998        assert_eq!(&checkpoint.content_digest, contents.digest());
999
1000        batch.insert_batch(
1001            &self.tables.checkpoint_content,
1002            [(contents.digest(), &contents)],
1003        )?;
1004
1005        batch.write()
1006    }
1007
1008    pub fn delete_full_checkpoint_contents(
1009        &self,
1010        seq: CheckpointSequenceNumber,
1011    ) -> Result<(), TypedStoreError> {
1012        self.tables.full_checkpoint_content.remove(&seq)?;
1013        self.tables.full_checkpoint_content_v2.remove(&seq)
1014    }
1015
1016    pub fn get_epoch_last_checkpoint(
1017        &self,
1018        epoch_id: EpochId,
1019    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1020        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1021        let checkpoint = match seq {
1022            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1023            None => None,
1024        };
1025        Ok(checkpoint)
1026    }
1027
1028    pub fn get_epoch_last_checkpoint_seq_number(
1029        &self,
1030        epoch_id: EpochId,
1031    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1032        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1033        Ok(seq)
1034    }
1035
1036    pub fn insert_epoch_last_checkpoint(
1037        &self,
1038        epoch_id: EpochId,
1039        checkpoint: &VerifiedCheckpoint,
1040    ) -> SuiResult {
1041        self.tables
1042            .epoch_last_checkpoint_map
1043            .insert(&epoch_id, checkpoint.sequence_number())?;
1044        Ok(())
1045    }
1046
1047    pub fn get_epoch_state_commitments(
1048        &self,
1049        epoch: EpochId,
1050    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1051        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1052            checkpoint
1053                .end_of_epoch_data
1054                .as_ref()
1055                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1056                .epoch_commitments
1057                .clone()
1058        });
1059        Ok(commitments)
1060    }
1061
1062    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1063    pub fn get_epoch_stats(
1064        &self,
1065        epoch: EpochId,
1066        last_checkpoint: &CheckpointSummary,
1067    ) -> Option<EpochStats> {
1068        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1069            (0, 0)
1070        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1071            (
1072                checkpoint.sequence_number + 1,
1073                checkpoint.network_total_transactions,
1074            )
1075        } else {
1076            return None;
1077        };
1078        Some(EpochStats {
1079            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1080            transaction_count: last_checkpoint.network_total_transactions
1081                - prev_epoch_network_transactions,
1082            total_gas_reward: last_checkpoint
1083                .epoch_rolling_gas_cost_summary
1084                .computation_cost,
1085        })
1086    }
1087
1088    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1089        // This checkpoints the entire db and not one column family
1090        self.tables
1091            .checkpoint_content
1092            .checkpoint_db(path)
1093            .map_err(Into::into)
1094    }
1095
1096    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1097        let mut wb = self.tables.watermarks.batch();
1098        wb.delete_batch(
1099            &self.tables.watermarks,
1100            std::iter::once(CheckpointWatermark::HighestExecuted),
1101        )?;
1102        wb.write()?;
1103        Ok(())
1104    }
1105
1106    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1107        self.delete_highest_executed_checkpoint_test_only()?;
1108        Ok(())
1109    }
1110
1111    pub fn record_checkpoint_fork_detected(
1112        &self,
1113        checkpoint_seq: CheckpointSequenceNumber,
1114        checkpoint_digest: CheckpointDigest,
1115    ) -> Result<(), TypedStoreError> {
1116        info!(
1117            checkpoint_seq = checkpoint_seq,
1118            checkpoint_digest = ?checkpoint_digest,
1119            "Recording checkpoint fork detection in database"
1120        );
1121        self.tables.watermarks.insert(
1122            &CheckpointWatermark::CheckpointForkDetected,
1123            &(checkpoint_seq, checkpoint_digest),
1124        )
1125    }
1126
1127    pub fn get_checkpoint_fork_detected(
1128        &self,
1129    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1130        self.tables
1131            .watermarks
1132            .get(&CheckpointWatermark::CheckpointForkDetected)
1133    }
1134
1135    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1136        self.tables
1137            .watermarks
1138            .remove(&CheckpointWatermark::CheckpointForkDetected)
1139    }
1140
1141    pub fn record_transaction_fork_detected(
1142        &self,
1143        tx_digest: TransactionDigest,
1144        expected_effects_digest: TransactionEffectsDigest,
1145        actual_effects_digest: TransactionEffectsDigest,
1146    ) -> Result<(), TypedStoreError> {
1147        info!(
1148            tx_digest = ?tx_digest,
1149            expected_effects_digest = ?expected_effects_digest,
1150            actual_effects_digest = ?actual_effects_digest,
1151            "Recording transaction fork detection in database"
1152        );
1153        self.tables.transaction_fork_detected.insert(
1154            &TRANSACTION_FORK_DETECTED_KEY,
1155            &(tx_digest, expected_effects_digest, actual_effects_digest),
1156        )
1157    }
1158
1159    pub fn get_transaction_fork_detected(
1160        &self,
1161    ) -> Result<
1162        Option<(
1163            TransactionDigest,
1164            TransactionEffectsDigest,
1165            TransactionEffectsDigest,
1166        )>,
1167        TypedStoreError,
1168    > {
1169        self.tables
1170            .transaction_fork_detected
1171            .get(&TRANSACTION_FORK_DETECTED_KEY)
1172    }
1173
1174    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1175        self.tables
1176            .transaction_fork_detected
1177            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1178    }
1179}
1180
1181#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1182pub enum CheckpointWatermark {
1183    HighestVerified,
1184    HighestSynced,
1185    HighestExecuted,
1186    HighestPruned,
1187    CheckpointForkDetected,
1188}
1189
1190struct CheckpointStateHasher {
1191    epoch_store: Arc<AuthorityPerEpochStore>,
1192    hasher: Weak<GlobalStateHasher>,
1193    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1194}
1195
1196impl CheckpointStateHasher {
1197    fn new(
1198        epoch_store: Arc<AuthorityPerEpochStore>,
1199        hasher: Weak<GlobalStateHasher>,
1200        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1201    ) -> Self {
1202        Self {
1203            epoch_store,
1204            hasher,
1205            receive_from_builder,
1206        }
1207    }
1208
1209    async fn run(self) {
1210        let Self {
1211            epoch_store,
1212            hasher,
1213            mut receive_from_builder,
1214        } = self;
1215        while let Some((seq, effects)) = receive_from_builder.recv().await {
1216            let Some(hasher) = hasher.upgrade() else {
1217                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1218                break;
1219            };
1220            hasher
1221                .accumulate_checkpoint(&effects, seq, &epoch_store)
1222                .expect("epoch ended while accumulating checkpoint");
1223        }
1224    }
1225}
1226
1227#[derive(Debug)]
1228pub enum CheckpointBuilderError {
1229    ChangeEpochTxAlreadyExecuted,
1230    SystemPackagesMissing,
1231    Retry(anyhow::Error),
1232}
1233
1234impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1235    for CheckpointBuilderError
1236{
1237    fn from(e: SuiError) -> Self {
1238        Self::Retry(e.into())
1239    }
1240}
1241
1242pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1243
1244pub struct CheckpointBuilder {
1245    state: Arc<AuthorityState>,
1246    store: Arc<CheckpointStore>,
1247    epoch_store: Arc<AuthorityPerEpochStore>,
1248    notify: Arc<Notify>,
1249    notify_aggregator: Arc<Notify>,
1250    last_built: watch::Sender<CheckpointSequenceNumber>,
1251    effects_store: Arc<dyn TransactionCacheRead>,
1252    global_state_hasher: Weak<GlobalStateHasher>,
1253    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1254    output: Box<dyn CheckpointOutput>,
1255    metrics: Arc<CheckpointMetrics>,
1256    max_transactions_per_checkpoint: usize,
1257    max_checkpoint_size_bytes: usize,
1258}
1259
1260pub struct CheckpointAggregator {
1261    store: Arc<CheckpointStore>,
1262    epoch_store: Arc<AuthorityPerEpochStore>,
1263    notify: Arc<Notify>,
1264    receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1265    pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1266    current: Option<CheckpointSignatureAggregator>,
1267    output: Box<dyn CertifiedCheckpointOutput>,
1268    state: Arc<AuthorityState>,
1269    metrics: Arc<CheckpointMetrics>,
1270}
1271
1272// This holds information to aggregate signatures for one checkpoint
1273pub struct CheckpointSignatureAggregator {
1274    summary: CheckpointSummary,
1275    digest: CheckpointDigest,
1276    /// Aggregates voting stake for each signed checkpoint proposal by authority
1277    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1278    store: Arc<CheckpointStore>,
1279    state: Arc<AuthorityState>,
1280    metrics: Arc<CheckpointMetrics>,
1281}
1282
1283impl CheckpointBuilder {
1284    fn new(
1285        state: Arc<AuthorityState>,
1286        store: Arc<CheckpointStore>,
1287        epoch_store: Arc<AuthorityPerEpochStore>,
1288        notify: Arc<Notify>,
1289        effects_store: Arc<dyn TransactionCacheRead>,
1290        // for synchronous accumulation of end-of-epoch checkpoint
1291        global_state_hasher: Weak<GlobalStateHasher>,
1292        // for asynchronous/concurrent accumulation of regular checkpoints
1293        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1294        output: Box<dyn CheckpointOutput>,
1295        notify_aggregator: Arc<Notify>,
1296        last_built: watch::Sender<CheckpointSequenceNumber>,
1297        metrics: Arc<CheckpointMetrics>,
1298        max_transactions_per_checkpoint: usize,
1299        max_checkpoint_size_bytes: usize,
1300    ) -> Self {
1301        Self {
1302            state,
1303            store,
1304            epoch_store,
1305            notify,
1306            effects_store,
1307            global_state_hasher,
1308            send_to_hasher,
1309            output,
1310            notify_aggregator,
1311            last_built,
1312            metrics,
1313            max_transactions_per_checkpoint,
1314            max_checkpoint_size_bytes,
1315        }
1316    }
1317
1318    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1319    /// commits that have been processed before the last restart, if consensus_replay_waiter
1320    /// is supplied. Then it starts building checkpoints in a loop.
1321    ///
1322    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1323    /// if slow recovery of previously built checkpoints is due to consensus replay or
1324    /// checkpoint building.
1325    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1326        if let Some(replay_waiter) = consensus_replay_waiter {
1327            info!("Waiting for consensus commits to replay ...");
1328            replay_waiter.wait_for_replay().await;
1329            info!("Consensus commits finished replaying");
1330        }
1331        info!("Starting CheckpointBuilder");
1332        loop {
1333            match self.maybe_build_checkpoints().await {
1334                Ok(()) => {}
1335                err @ Err(
1336                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1337                    | CheckpointBuilderError::SystemPackagesMissing,
1338                ) => {
1339                    info!("CheckpointBuilder stopping: {:?}", err);
1340                    return;
1341                }
1342                Err(CheckpointBuilderError::Retry(inner)) => {
1343                    let msg = format!("{:?}", inner);
1344                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1345                    tokio::time::sleep(Duration::from_secs(1)).await;
1346                    self.metrics.checkpoint_errors.inc();
1347                    continue;
1348                }
1349            }
1350
1351            self.notify.notified().await;
1352        }
1353    }
1354
1355    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1356        if self
1357            .epoch_store
1358            .protocol_config()
1359            .split_checkpoints_in_consensus_handler()
1360        {
1361            self.maybe_build_checkpoints_v2().await
1362        } else {
1363            self.maybe_build_checkpoints_v1().await
1364        }
1365    }
1366
1367    async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1368        let _scope = monitored_scope("BuildCheckpoints");
1369
1370        // Collect info about the most recently built checkpoint.
1371        let summary = self
1372            .epoch_store
1373            .last_built_checkpoint_builder_summary()
1374            .expect("epoch should not have ended");
1375        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1376        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1377
1378        let min_checkpoint_interval_ms = self
1379            .epoch_store
1380            .protocol_config()
1381            .min_checkpoint_interval_ms_as_option()
1382            .unwrap_or_default();
1383        let mut grouped_pending_checkpoints = Vec::new();
1384        let mut checkpoints_iter = self
1385            .epoch_store
1386            .get_pending_checkpoints(last_height)
1387            .expect("unexpected epoch store error")
1388            .into_iter()
1389            .peekable();
1390        while let Some((height, pending)) = checkpoints_iter.next() {
1391            // Group PendingCheckpoints until:
1392            // - minimum interval has elapsed ...
1393            let current_timestamp = pending.details().timestamp_ms;
1394            let can_build = match last_timestamp {
1395                    Some(last_timestamp) => {
1396                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1397                    }
1398                    None => true,
1399                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1400                //   should be written separately) ...
1401                } || checkpoints_iter
1402                    .peek()
1403                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1404                // - or, we have reached end of epoch.
1405                    || pending.details().last_of_epoch;
1406            grouped_pending_checkpoints.push(pending);
1407            if !can_build {
1408                debug!(
1409                    checkpoint_commit_height = height,
1410                    ?last_timestamp,
1411                    ?current_timestamp,
1412                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1413                );
1414                continue;
1415            }
1416
1417            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1418            last_height = Some(height);
1419            last_timestamp = Some(current_timestamp);
1420            debug!(
1421                checkpoint_commit_height_from = grouped_pending_checkpoints
1422                    .first()
1423                    .unwrap()
1424                    .details()
1425                    .checkpoint_height,
1426                checkpoint_commit_height_to = last_height,
1427                "Making checkpoint with commit height range"
1428            );
1429
1430            let seq = self
1431                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1432                .await?;
1433
1434            self.last_built.send_if_modified(|cur| {
1435                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1436                if seq > *cur {
1437                    *cur = seq;
1438                    true
1439                } else {
1440                    false
1441                }
1442            });
1443
1444            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1445            // execution.
1446            tokio::task::yield_now().await;
1447        }
1448        debug!(
1449            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1450            grouped_pending_checkpoints.len(),
1451        );
1452
1453        Ok(())
1454    }
1455
1456    async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1457        let _scope = monitored_scope("BuildCheckpoints");
1458
1459        // Collect info about the most recently built checkpoint.
1460        let last_height = self
1461            .epoch_store
1462            .last_built_checkpoint_builder_summary()
1463            .expect("epoch should not have ended")
1464            .and_then(|s| s.checkpoint_height);
1465
1466        for (height, pending) in self
1467            .epoch_store
1468            .get_pending_checkpoints_v2(last_height)
1469            .expect("unexpected epoch store error")
1470        {
1471            debug!(checkpoint_commit_height = height, "Making checkpoint");
1472
1473            let seq = self.make_checkpoint_v2(pending).await?;
1474
1475            self.last_built.send_if_modified(|cur| {
1476                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1477                if seq > *cur {
1478                    *cur = seq;
1479                    true
1480                } else {
1481                    false
1482                }
1483            });
1484
1485            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1486            // execution.
1487            tokio::task::yield_now().await;
1488        }
1489
1490        Ok(())
1491    }
1492
1493    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1494    async fn make_checkpoint(
1495        &mut self,
1496        pendings: Vec<PendingCheckpoint>,
1497    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1498        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1499
1500        let pending_ckpt_str = pendings
1501            .iter()
1502            .map(|p| {
1503                format!(
1504                    "height={}, commit={}",
1505                    p.details().checkpoint_height,
1506                    p.details().consensus_commit_ref
1507                )
1508            })
1509            .join("; ");
1510
1511        let last_details = pendings.last().unwrap().details().clone();
1512
1513        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1514        // in this order.
1515        let highest_executed_sequence = self
1516            .store
1517            .get_highest_executed_checkpoint_seq_number()
1518            .expect("db error")
1519            .unwrap_or(0);
1520
1521        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1522        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1523
1524        let new_checkpoints = self
1525            .create_checkpoints(
1526                sorted_tx_effects_included_in_checkpoint,
1527                &last_details,
1528                &all_roots,
1529            )
1530            .await?;
1531        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1532        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1533            debug_fatal!(
1534                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1535            );
1536        }
1537
1538        let new_ckpt_str = new_checkpoints
1539            .iter()
1540            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1541            .join("; ");
1542
1543        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1544            .await?;
1545        info!(
1546            "Made new checkpoint {} from pending checkpoint {}",
1547            new_ckpt_str, pending_ckpt_str
1548        );
1549
1550        Ok(highest_sequence)
1551    }
1552
1553    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1554    async fn make_checkpoint_v2(
1555        &mut self,
1556        pending: PendingCheckpointV2,
1557    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1558        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1559
1560        let details = pending.details.clone();
1561
1562        let highest_executed_sequence = self
1563            .store
1564            .get_highest_executed_checkpoint_seq_number()
1565            .expect("db error")
1566            .unwrap_or(0);
1567
1568        let (poll_count, result) =
1569            poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1570        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1571
1572        let new_checkpoints = self
1573            .create_checkpoints(
1574                sorted_tx_effects_included_in_checkpoint,
1575                &details,
1576                &all_roots,
1577            )
1578            .await?;
1579        assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1580        let sequence = *new_checkpoints.first().0.sequence_number();
1581        let digest = new_checkpoints.first().0.digest();
1582        if sequence <= highest_executed_sequence && poll_count > 1 {
1583            debug_fatal!(
1584                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1585            );
1586        }
1587
1588        self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1589            .await?;
1590        info!(
1591            seq = sequence,
1592            %digest,
1593            height = details.checkpoint_height,
1594            commit = %details.consensus_commit_ref,
1595            "Made new checkpoint"
1596        );
1597
1598        Ok(sequence)
1599    }
1600
1601    async fn construct_and_execute_settlement_transactions(
1602        &self,
1603        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1604        checkpoint_height: CheckpointHeight,
1605        checkpoint_seq: CheckpointSequenceNumber,
1606        tx_index_offset: u64,
1607    ) -> (TransactionKey, Vec<TransactionEffects>) {
1608        let _scope =
1609            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1610
1611        let tx_key =
1612            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1613
1614        let epoch = self.epoch_store.epoch();
1615        let accumulator_root_obj_initial_shared_version = self
1616            .epoch_store
1617            .epoch_start_config()
1618            .accumulator_root_obj_initial_shared_version()
1619            .expect("accumulator root object must exist");
1620
1621        let builder = AccumulatorSettlementTxBuilder::new(
1622            Some(self.effects_store.as_ref()),
1623            sorted_tx_effects_included_in_checkpoint,
1624            checkpoint_seq,
1625            tx_index_offset,
1626        );
1627
1628        let funds_changes = builder.collect_funds_changes();
1629        let num_updates = builder.num_updates();
1630        let settlement_txns = builder.build_tx(
1631            self.epoch_store.protocol_config(),
1632            epoch,
1633            accumulator_root_obj_initial_shared_version,
1634            checkpoint_height,
1635            checkpoint_seq,
1636        );
1637
1638        let settlement_txns: Vec<_> = settlement_txns
1639            .into_iter()
1640            .map(|tx| {
1641                VerifiedExecutableTransaction::new_system(
1642                    VerifiedTransaction::new_system_transaction(tx),
1643                    self.epoch_store.epoch(),
1644                )
1645            })
1646            .collect();
1647
1648        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1649
1650        debug!(
1651            ?settlement_digests,
1652            ?tx_key,
1653            "created settlement transactions with {num_updates} updates"
1654        );
1655
1656        self.epoch_store
1657            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1658
1659        let settlement_effects = wait_for_effects_with_retry(
1660            self.effects_store.as_ref(),
1661            "CheckpointBuilder::notify_read_settlement_effects",
1662            &settlement_digests,
1663            tx_key,
1664        )
1665        .await;
1666
1667        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1668            epoch,
1669            accumulator_root_obj_initial_shared_version,
1670            checkpoint_height,
1671            &settlement_effects,
1672        );
1673
1674        let barrier_tx = VerifiedExecutableTransaction::new_system(
1675            VerifiedTransaction::new_system_transaction(barrier_tx),
1676            self.epoch_store.epoch(),
1677        );
1678        let barrier_digest = *barrier_tx.digest();
1679
1680        self.epoch_store
1681            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1682
1683        let barrier_effects = wait_for_effects_with_retry(
1684            self.effects_store.as_ref(),
1685            "CheckpointBuilder::notify_read_barrier_effects",
1686            &[barrier_digest],
1687            tx_key,
1688        )
1689        .await;
1690
1691        let settlement_effects: Vec<_> = settlement_effects
1692            .into_iter()
1693            .chain(barrier_effects)
1694            .collect();
1695
1696        let mut next_accumulator_version = None;
1697        for fx in settlement_effects.iter() {
1698            assert!(
1699                fx.status().is_ok(),
1700                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1701                fx.transaction_digest(),
1702                fx
1703            );
1704            if let Some(version) = fx
1705                .mutated()
1706                .iter()
1707                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1708            {
1709                assert!(
1710                    next_accumulator_version.is_none(),
1711                    "Only one settlement transaction should mutate the accumulator root object"
1712                );
1713                next_accumulator_version = Some(version);
1714            }
1715        }
1716        let settlements = FundsSettlement {
1717            next_accumulator_version: next_accumulator_version
1718                .expect("Accumulator root object should be mutated in the settlement transactions"),
1719            funds_changes,
1720        };
1721
1722        self.state
1723            .execution_scheduler()
1724            .settle_address_funds(settlements);
1725
1726        (tx_key, settlement_effects)
1727    }
1728
1729    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1730    // the checkpoint, and return them in the order they should be included in the checkpoint.
1731    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1732    // checkpoint.
1733    #[instrument(level = "debug", skip_all)]
1734    async fn resolve_checkpoint_transactions(
1735        &self,
1736        pending_checkpoints: Vec<PendingCheckpoint>,
1737    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1738        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1739
1740        // Keeps track of the effects that are already included in the current checkpoint.
1741        // This is used when there are multiple pending checkpoints to create a single checkpoint
1742        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1743        // or in earlier pending checkpoints.
1744        let mut effects_in_current_checkpoint = BTreeSet::new();
1745
1746        let mut tx_effects = Vec::new();
1747        let mut tx_roots = HashSet::new();
1748
1749        for pending_checkpoint in pending_checkpoints.into_iter() {
1750            let mut pending = pending_checkpoint;
1751            debug!(
1752                checkpoint_commit_height = pending.details.checkpoint_height,
1753                "Resolving checkpoint transactions for pending checkpoint.",
1754            );
1755
1756            trace!(
1757                "roots for pending checkpoint {:?}: {:?}",
1758                pending.details.checkpoint_height, pending.roots,
1759            );
1760
1761            let settlement_root = if self.epoch_store.accumulators_enabled() {
1762                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1763                    pending.roots.pop()
1764                else {
1765                    fatal!("No settlement root found");
1766                };
1767                Some(settlement_root)
1768            } else {
1769                None
1770            };
1771
1772            let roots = &pending.roots;
1773
1774            self.metrics
1775                .checkpoint_roots_count
1776                .inc_by(roots.len() as u64);
1777
1778            let root_digests = self
1779                .epoch_store
1780                .notify_read_tx_key_to_digest(roots)
1781                .in_monitored_scope("CheckpointNotifyDigests")
1782                .await?;
1783            let root_effects = self
1784                .effects_store
1785                .notify_read_executed_effects(
1786                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1787                    &root_digests,
1788                )
1789                .in_monitored_scope("CheckpointNotifyRead")
1790                .await;
1791
1792            assert!(
1793                self.epoch_store
1794                    .protocol_config()
1795                    .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1796            );
1797
1798            // If the roots contains consensus commit prologue transaction, we want to extract it,
1799            // and put it to the front of the checkpoint.
1800            let consensus_commit_prologue =
1801                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1802
1803            // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1804            // other dependencies that haven't been included in any previous checkpoints.
1805            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1806                let unsorted_ccp = self.complete_checkpoint_effects(
1807                    vec![ccp_effects.clone()],
1808                    &mut effects_in_current_checkpoint,
1809                )?;
1810
1811                // No other dependencies of this consensus commit prologue that haven't been included
1812                // in any previous checkpoint.
1813                if unsorted_ccp.len() != 1 {
1814                    fatal!(
1815                        "Expected 1 consensus commit prologue, got {:?}",
1816                        unsorted_ccp
1817                            .iter()
1818                            .map(|e| e.transaction_digest())
1819                            .collect::<Vec<_>>()
1820                    );
1821                }
1822                assert_eq!(unsorted_ccp.len(), 1);
1823                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1824            }
1825
1826            let unsorted =
1827                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1828
1829            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1830            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1831            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1832                if cfg!(debug_assertions) {
1833                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1834                    for tx in unsorted.iter() {
1835                        assert!(tx.transaction_digest() != &ccp_digest);
1836                    }
1837                }
1838                sorted.push(ccp_effects);
1839            }
1840            sorted.extend(CausalOrder::causal_sort(unsorted));
1841
1842            if let Some(settlement_root) = settlement_root {
1843                //TODO: this is an incorrect heuristic for checkpoint seq number
1844                //      due to checkpoint splitting, to be fixed separately
1845                let last_checkpoint =
1846                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1847                let next_checkpoint_seq = last_checkpoint
1848                    .as_ref()
1849                    .map(|(seq, _)| *seq)
1850                    .unwrap_or_default()
1851                    + 1;
1852                let tx_index_offset = tx_effects.len() as u64;
1853
1854                let (tx_key, settlement_effects) = self
1855                    .construct_and_execute_settlement_transactions(
1856                        &sorted,
1857                        pending.details.checkpoint_height,
1858                        next_checkpoint_seq,
1859                        tx_index_offset,
1860                    )
1861                    .await;
1862                debug!(?tx_key, "executed settlement transactions");
1863
1864                assert_eq!(settlement_root, tx_key);
1865
1866                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1867                // should only include the digests of transactions that were originally roots in
1868                // the pending checkpoint. It is later used to identify transactions which were
1869                // added as dependencies, so that those transactions can be waited on using
1870                // `consensus_messages_processed_notify()`. System transactions (such as
1871                // settlements) are exempt from this already.
1872                //
1873                // However, we DO need to add them to `effects_in_current_checkpoint` so that
1874                // `complete_checkpoint_effects` won't pull them in again as dependencies when
1875                // processing later pending checkpoints in the same batch.
1876                effects_in_current_checkpoint
1877                    .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1878                sorted.extend(settlement_effects);
1879            }
1880
1881            #[cfg(msim)]
1882            {
1883                // Check consensus commit prologue invariants in sim test.
1884                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1885            }
1886
1887            tx_effects.extend(sorted);
1888            tx_roots.extend(root_digests);
1889        }
1890
1891        Ok((tx_effects, tx_roots))
1892    }
1893
1894    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1895    // the checkpoint, and return them in the order they should be included in the checkpoint.
1896    #[instrument(level = "debug", skip_all)]
1897    async fn resolve_checkpoint_transactions_v2(
1898        &self,
1899        pending: PendingCheckpointV2,
1900    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1901        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1902
1903        debug!(
1904            checkpoint_commit_height = pending.details.checkpoint_height,
1905            "Resolving checkpoint transactions for pending checkpoint.",
1906        );
1907
1908        trace!(
1909            "roots for pending checkpoint {:?}: {:?}",
1910            pending.details.checkpoint_height, pending.roots,
1911        );
1912
1913        assert!(
1914            self.epoch_store
1915                .protocol_config()
1916                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1917        );
1918
1919        let mut all_effects: Vec<TransactionEffects> = Vec::new();
1920        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1921
1922        for checkpoint_roots in &pending.roots {
1923            let tx_roots = &checkpoint_roots.tx_roots;
1924
1925            self.metrics
1926                .checkpoint_roots_count
1927                .inc_by(tx_roots.len() as u64);
1928
1929            let root_digests = self
1930                .epoch_store
1931                .notify_read_tx_key_to_digest(tx_roots)
1932                .in_monitored_scope("CheckpointNotifyDigests")
1933                .await?;
1934
1935            all_root_digests.extend(root_digests.iter().cloned());
1936
1937            let root_effects = self
1938                .effects_store
1939                .notify_read_executed_effects(
1940                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1941                    &root_digests,
1942                )
1943                .in_monitored_scope("CheckpointNotifyRead")
1944                .await;
1945            let consensus_commit_prologue =
1946                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1947
1948            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1949            let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1950            let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1951
1952            if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1953                let checkpoint_seq = pending
1954                    .details
1955                    .checkpoint_seq
1956                    .expect("checkpoint_seq must be set");
1957                let tx_index_offset = all_effects.len() as u64;
1958                let effects = self
1959                    .resolve_settlement_effects(
1960                        *settlement_key,
1961                        &sorted,
1962                        checkpoint_roots.height,
1963                        checkpoint_seq,
1964                        tx_index_offset,
1965                    )
1966                    .await;
1967                sorted.extend(effects);
1968            }
1969
1970            #[cfg(msim)]
1971            {
1972                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1973            }
1974
1975            all_effects.extend(sorted);
1976        }
1977        Ok((all_effects, all_root_digests.into_iter().collect()))
1978    }
1979
1980    /// Constructs settlement transactions to compute their digests, then reads effects
1981    /// directly from the cache. If execution is ahead of the checkpoint builder, the
1982    /// effects are already cached and this returns instantly. Otherwise it waits for
1983    /// the execution scheduler's queue worker to execute them.
1984    async fn resolve_settlement_effects(
1985        &self,
1986        settlement_key: TransactionKey,
1987        sorted_root_effects: &[TransactionEffects],
1988        checkpoint_height: CheckpointHeight,
1989        checkpoint_seq: CheckpointSequenceNumber,
1990        tx_index_offset: u64,
1991    ) -> Vec<TransactionEffects> {
1992        let epoch = self.epoch_store.epoch();
1993        let accumulator_root_obj_initial_shared_version = self
1994            .epoch_store
1995            .epoch_start_config()
1996            .accumulator_root_obj_initial_shared_version()
1997            .expect("accumulator root object must exist");
1998
1999        let builder = AccumulatorSettlementTxBuilder::new(
2000            None,
2001            sorted_root_effects,
2002            checkpoint_seq,
2003            tx_index_offset,
2004        );
2005
2006        let settlement_digests: Vec<_> = builder
2007            .build_tx(
2008                self.epoch_store.protocol_config(),
2009                epoch,
2010                accumulator_root_obj_initial_shared_version,
2011                checkpoint_height,
2012                checkpoint_seq,
2013            )
2014            .into_iter()
2015            .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2016            .collect();
2017
2018        debug!(
2019            ?settlement_digests,
2020            ?settlement_key,
2021            "fallback: reading settlement effects from cache"
2022        );
2023
2024        let settlement_effects = wait_for_effects_with_retry(
2025            self.effects_store.as_ref(),
2026            "CheckpointBuilder::fallback_settlement_effects",
2027            &settlement_digests,
2028            settlement_key,
2029        )
2030        .await;
2031
2032        let barrier_digest = *VerifiedTransaction::new_system_transaction(
2033            accumulators::build_accumulator_barrier_tx(
2034                epoch,
2035                accumulator_root_obj_initial_shared_version,
2036                checkpoint_height,
2037                &settlement_effects,
2038            ),
2039        )
2040        .digest();
2041
2042        let barrier_effects = wait_for_effects_with_retry(
2043            self.effects_store.as_ref(),
2044            "CheckpointBuilder::fallback_barrier_effects",
2045            &[barrier_digest],
2046            settlement_key,
2047        )
2048        .await;
2049
2050        settlement_effects
2051            .into_iter()
2052            .chain(barrier_effects)
2053            .collect()
2054    }
2055
2056    // Extracts the consensus commit prologue digest and effects from the root transactions.
2057    // The consensus commit prologue is expected to be the first transaction in the roots.
2058    fn extract_consensus_commit_prologue(
2059        &self,
2060        root_digests: &[TransactionDigest],
2061        root_effects: &[TransactionEffects],
2062    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2063        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2064        if root_digests.is_empty() {
2065            return Ok(None);
2066        }
2067
2068        // Reads the first transaction in the roots, and checks whether it is a consensus commit
2069        // prologue transaction. The consensus commit prologue transaction should be the first
2070        // transaction in the roots written by the consensus handler.
2071        let first_tx = self
2072            .state
2073            .get_transaction_cache_reader()
2074            .get_transaction_block(&root_digests[0])
2075            .expect("Transaction block must exist");
2076
2077        Ok(first_tx
2078            .transaction_data()
2079            .is_consensus_commit_prologue()
2080            .then(|| {
2081                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2082                (*first_tx.digest(), root_effects[0].clone())
2083            }))
2084    }
2085
2086    #[instrument(level = "debug", skip_all)]
2087    async fn write_checkpoints(
2088        &mut self,
2089        height: CheckpointHeight,
2090        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2091    ) -> SuiResult {
2092        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2093        let mut batch = self.store.tables.checkpoint_content.batch();
2094        let mut all_tx_digests =
2095            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2096
2097        for (summary, contents) in &new_checkpoints {
2098            debug!(
2099                checkpoint_commit_height = height,
2100                checkpoint_seq = summary.sequence_number,
2101                contents_digest = ?contents.digest(),
2102                "writing checkpoint",
2103            );
2104
2105            if let Some(previously_computed_summary) = self
2106                .store
2107                .tables
2108                .locally_computed_checkpoints
2109                .get(&summary.sequence_number)?
2110                && previously_computed_summary.digest() != summary.digest()
2111            {
2112                fatal!(
2113                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2114                    summary.sequence_number,
2115                    previously_computed_summary.digest(),
2116                    summary.digest()
2117                );
2118            }
2119
2120            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2121
2122            self.metrics
2123                .transactions_included_in_checkpoint
2124                .inc_by(contents.size() as u64);
2125            let sequence_number = summary.sequence_number;
2126            self.metrics
2127                .last_constructed_checkpoint
2128                .set(sequence_number as i64);
2129
2130            batch.insert_batch(
2131                &self.store.tables.checkpoint_content,
2132                [(contents.digest(), contents)],
2133            )?;
2134
2135            batch.insert_batch(
2136                &self.store.tables.locally_computed_checkpoints,
2137                [(sequence_number, summary)],
2138            )?;
2139        }
2140
2141        batch.write()?;
2142
2143        // Send all checkpoint sigs to consensus.
2144        for (summary, contents) in &new_checkpoints {
2145            self.output
2146                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2147                .await?;
2148        }
2149
2150        for (local_checkpoint, _) in &new_checkpoints {
2151            if let Some(certified_checkpoint) = self
2152                .store
2153                .tables
2154                .certified_checkpoints
2155                .get(local_checkpoint.sequence_number())?
2156            {
2157                self.store
2158                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2159            }
2160        }
2161
2162        self.notify_aggregator.notify_one();
2163        self.epoch_store
2164            .process_constructed_checkpoint(height, new_checkpoints);
2165        Ok(())
2166    }
2167
2168    #[allow(clippy::type_complexity)]
2169    fn split_checkpoint_chunks(
2170        &self,
2171        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2172        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2173    ) -> CheckpointBuilderResult<
2174        Vec<
2175            Vec<(
2176                TransactionEffects,
2177                Vec<(GenericSignature, Option<SequenceNumber>)>,
2178            )>,
2179        >,
2180    > {
2181        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2182
2183        // If splitting is done in consensus_handler, return everything as one chunk.
2184        if self
2185            .epoch_store
2186            .protocol_config()
2187            .split_checkpoints_in_consensus_handler()
2188        {
2189            let chunk: Vec<_> = effects_and_transaction_sizes
2190                .into_iter()
2191                .zip(signatures)
2192                .map(|((effects, _size), sigs)| (effects, sigs))
2193                .collect();
2194            return Ok(vec![chunk]);
2195        }
2196        let mut chunks = Vec::new();
2197        let mut chunk = Vec::new();
2198        let mut chunk_size: usize = 0;
2199        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2200            .into_iter()
2201            .zip(signatures.into_iter())
2202        {
2203            // Roll over to a new chunk after either max count or max size is reached.
2204            // The size calculation here is intended to estimate the size of the
2205            // FullCheckpointContents struct. If this code is modified, that struct
2206            // should also be updated accordingly.
2207            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2208                bcs::serialized_size(&signatures)?
2209            } else {
2210                let signatures: Vec<&GenericSignature> =
2211                    signatures.iter().map(|(s, _)| s).collect();
2212                bcs::serialized_size(&signatures)?
2213            };
2214            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2215            if chunk.len() == self.max_transactions_per_checkpoint
2216                || (chunk_size + size) > self.max_checkpoint_size_bytes
2217            {
2218                if chunk.is_empty() {
2219                    // Always allow at least one tx in a checkpoint.
2220                    warn!(
2221                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2222                        self.max_checkpoint_size_bytes
2223                    );
2224                } else {
2225                    chunks.push(chunk);
2226                    chunk = Vec::new();
2227                    chunk_size = 0;
2228                }
2229            }
2230
2231            chunk.push((effects, signatures));
2232            chunk_size += size;
2233        }
2234
2235        if !chunk.is_empty() || chunks.is_empty() {
2236            // We intentionally create an empty checkpoint if there is no content provided
2237            // to make a 'heartbeat' checkpoint.
2238            // Important: if some conditions are added here later, we need to make sure we always
2239            // have at least one chunk if last_pending_of_epoch is set
2240            chunks.push(chunk);
2241            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
2242            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
2243            // distinguish between "no transactions have happened" and "i am not receiving new
2244            // checkpoints".
2245        }
2246        Ok(chunks)
2247    }
2248
2249    fn load_last_built_checkpoint_summary(
2250        epoch_store: &AuthorityPerEpochStore,
2251        store: &CheckpointStore,
2252    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2253        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2254        if last_checkpoint.is_none() {
2255            let epoch = epoch_store.epoch();
2256            if epoch > 0 {
2257                let previous_epoch = epoch - 1;
2258                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2259                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2260                if let Some((ref seq, _)) = last_checkpoint {
2261                    debug!(
2262                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2263                    );
2264                } else {
2265                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
2266                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2267                }
2268            }
2269        }
2270        Ok(last_checkpoint)
2271    }
2272
2273    #[instrument(level = "debug", skip_all)]
2274    async fn create_checkpoints(
2275        &self,
2276        all_effects: Vec<TransactionEffects>,
2277        details: &PendingCheckpointInfo,
2278        all_roots: &HashSet<TransactionDigest>,
2279    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2280        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2281
2282        let total = all_effects.len();
2283        let mut last_checkpoint =
2284            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2285        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2286        debug!(
2287            checkpoint_commit_height = details.checkpoint_height,
2288            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2289            checkpoint_timestamp = details.timestamp_ms,
2290            "Creating checkpoint(s) for {} transactions",
2291            all_effects.len(),
2292        );
2293
2294        let all_digests: Vec<_> = all_effects
2295            .iter()
2296            .map(|effect| *effect.transaction_digest())
2297            .collect();
2298        let transactions_and_sizes = self
2299            .state
2300            .get_transaction_cache_reader()
2301            .get_transactions_and_serialized_sizes(&all_digests)?;
2302        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2303        let mut transactions = Vec::with_capacity(all_effects.len());
2304        let mut transaction_keys = Vec::with_capacity(all_effects.len());
2305        let mut randomness_rounds = BTreeMap::new();
2306        {
2307            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2308            debug!(
2309                ?last_checkpoint_seq,
2310                "Waiting for {:?} certificates to appear in consensus",
2311                all_effects.len()
2312            );
2313
2314            for (effects, transaction_and_size) in all_effects
2315                .into_iter()
2316                .zip(transactions_and_sizes.into_iter())
2317            {
2318                let (transaction, size) = transaction_and_size
2319                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2320                match transaction.inner().transaction_data().kind() {
2321                    TransactionKind::ConsensusCommitPrologue(_)
2322                    | TransactionKind::ConsensusCommitPrologueV2(_)
2323                    | TransactionKind::ConsensusCommitPrologueV3(_)
2324                    | TransactionKind::ConsensusCommitPrologueV4(_)
2325                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2326                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2327                        // processed before we reach here.
2328                    }
2329                    TransactionKind::ProgrammableSystemTransaction(_) => {
2330                        // settlement transactions are added by checkpoint builder
2331                    }
2332                    TransactionKind::ChangeEpoch(_)
2333                    | TransactionKind::Genesis(_)
2334                    | TransactionKind::EndOfEpochTransaction(_) => {
2335                        fatal!(
2336                            "unexpected transaction in checkpoint effects: {:?}",
2337                            transaction
2338                        );
2339                    }
2340                    TransactionKind::RandomnessStateUpdate(rsu) => {
2341                        randomness_rounds
2342                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2343                    }
2344                    TransactionKind::ProgrammableTransaction(_) => {
2345                        // Only transactions that are not roots should be included in the call to
2346                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2347                        // commit and so are known to be processed already.
2348                        let digest = *effects.transaction_digest();
2349                        if !all_roots.contains(&digest) {
2350                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2351                                ConsensusTransactionKey::Certificate(digest),
2352                            ));
2353                        }
2354                    }
2355                }
2356                transactions.push(transaction);
2357                all_effects_and_transaction_sizes.push((effects, size));
2358            }
2359
2360            self.epoch_store
2361                .consensus_messages_processed_notify(transaction_keys)
2362                .await?;
2363        }
2364
2365        let signatures = self
2366            .epoch_store
2367            .user_signatures_for_checkpoint(&transactions, &all_digests);
2368        debug!(
2369            ?last_checkpoint_seq,
2370            "Received {} checkpoint user signatures from consensus",
2371            signatures.len()
2372        );
2373
2374        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2375            Some(
2376                transactions
2377                    .iter()
2378                    .flat_map(|tx| {
2379                        if let TransactionKind::ProgrammableTransaction(ptb) =
2380                            tx.transaction_data().kind()
2381                        {
2382                            itertools::Either::Left(
2383                                ptb.commands
2384                                    .iter()
2385                                    .map(ExecutionTimeObservationKey::from_command),
2386                            )
2387                        } else {
2388                            itertools::Either::Right(std::iter::empty())
2389                        }
2390                    })
2391                    .collect(),
2392            )
2393        } else {
2394            None
2395        };
2396
2397        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2398        let chunks_count = chunks.len();
2399
2400        let mut checkpoints = Vec::with_capacity(chunks_count);
2401        debug!(
2402            ?last_checkpoint_seq,
2403            "Creating {} checkpoints with {} transactions", chunks_count, total,
2404        );
2405
2406        let epoch = self.epoch_store.epoch();
2407        for (index, transactions) in chunks.into_iter().enumerate() {
2408            let first_checkpoint_of_epoch = index == 0
2409                && last_checkpoint
2410                    .as_ref()
2411                    .map(|(_, c)| c.epoch != epoch)
2412                    .unwrap_or(true);
2413            if first_checkpoint_of_epoch {
2414                self.epoch_store
2415                    .record_epoch_first_checkpoint_creation_time_metric();
2416            }
2417            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2418
2419            let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2420                preassigned_seq
2421            } else {
2422                last_checkpoint
2423                    .as_ref()
2424                    .map(|(_, c)| c.sequence_number + 1)
2425                    .unwrap_or_default()
2426            };
2427            let mut timestamp_ms = details.timestamp_ms;
2428            if let Some((_, last_checkpoint)) = &last_checkpoint
2429                && last_checkpoint.timestamp_ms > timestamp_ms
2430            {
2431                // First consensus commit of an epoch can have zero timestamp.
2432                debug!(
2433                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2434                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2435                );
2436                if self
2437                    .epoch_store
2438                    .protocol_config()
2439                    .enforce_checkpoint_timestamp_monotonicity()
2440                {
2441                    timestamp_ms = last_checkpoint.timestamp_ms;
2442                }
2443            }
2444
2445            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2446            let epoch_rolling_gas_cost_summary =
2447                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2448
2449            let end_of_epoch_data = if last_checkpoint_of_epoch {
2450                let system_state_obj = self
2451                    .augment_epoch_last_checkpoint(
2452                        &epoch_rolling_gas_cost_summary,
2453                        timestamp_ms,
2454                        &mut effects,
2455                        &mut signatures,
2456                        sequence_number,
2457                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2458                        last_checkpoint_seq.unwrap_or_default(),
2459                    )
2460                    .await?;
2461
2462                let committee = system_state_obj
2463                    .get_current_epoch_committee()
2464                    .committee()
2465                    .clone();
2466
2467                // This must happen after the call to augment_epoch_last_checkpoint,
2468                // otherwise we will not capture the change_epoch tx.
2469                let root_state_digest = {
2470                    let state_acc = self
2471                        .global_state_hasher
2472                        .upgrade()
2473                        .expect("No checkpoints should be getting built after local configuration");
2474                    let acc = state_acc.accumulate_checkpoint(
2475                        &effects,
2476                        sequence_number,
2477                        &self.epoch_store,
2478                    )?;
2479
2480                    state_acc
2481                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2482                        .await?;
2483
2484                    state_acc.accumulate_running_root(
2485                        &self.epoch_store,
2486                        sequence_number,
2487                        Some(acc),
2488                    )?;
2489                    state_acc
2490                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2491                        .await?
2492                };
2493                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2494                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2495
2496                let epoch_commitments = if self
2497                    .epoch_store
2498                    .protocol_config()
2499                    .check_commit_root_state_digest_supported()
2500                {
2501                    vec![root_state_digest.into()]
2502                } else {
2503                    vec![]
2504                };
2505
2506                Some(EndOfEpochData {
2507                    next_epoch_committee: committee.voting_rights,
2508                    next_epoch_protocol_version: ProtocolVersion::new(
2509                        system_state_obj.protocol_version(),
2510                    ),
2511                    epoch_commitments,
2512                })
2513            } else {
2514                self.send_to_hasher
2515                    .send((sequence_number, effects.clone()))
2516                    .await?;
2517
2518                None
2519            };
2520            let contents = if self.epoch_store.protocol_config().address_aliases() {
2521                CheckpointContents::new_v2(&effects, signatures)
2522            } else {
2523                CheckpointContents::new_with_digests_and_signatures(
2524                    effects.iter().map(TransactionEffects::execution_digests),
2525                    signatures
2526                        .into_iter()
2527                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2528                        .collect(),
2529                )
2530            };
2531
2532            let num_txns = contents.size() as u64;
2533
2534            let network_total_transactions = last_checkpoint
2535                .as_ref()
2536                .map(|(_, c)| c.network_total_transactions + num_txns)
2537                .unwrap_or(num_txns);
2538
2539            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2540
2541            let matching_randomness_rounds: Vec<_> = effects
2542                .iter()
2543                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2544                .copied()
2545                .collect();
2546
2547            let checkpoint_commitments = if self
2548                .epoch_store
2549                .protocol_config()
2550                .include_checkpoint_artifacts_digest_in_summary()
2551            {
2552                let artifacts = CheckpointArtifacts::from(&effects[..]);
2553                let artifacts_digest = artifacts.digest()?;
2554                vec![artifacts_digest.into()]
2555            } else {
2556                Default::default()
2557            };
2558
2559            let summary = CheckpointSummary::new(
2560                self.epoch_store.protocol_config(),
2561                epoch,
2562                sequence_number,
2563                network_total_transactions,
2564                &contents,
2565                previous_digest,
2566                epoch_rolling_gas_cost_summary,
2567                end_of_epoch_data,
2568                timestamp_ms,
2569                matching_randomness_rounds,
2570                checkpoint_commitments,
2571            );
2572            summary.report_checkpoint_age(
2573                &self.metrics.last_created_checkpoint_age,
2574                &self.metrics.last_created_checkpoint_age_ms,
2575            );
2576            if last_checkpoint_of_epoch {
2577                info!(
2578                    checkpoint_seq = sequence_number,
2579                    "creating last checkpoint of epoch {}", epoch
2580                );
2581                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2582                    self.epoch_store
2583                        .report_epoch_metrics_at_last_checkpoint(stats);
2584                }
2585            }
2586            last_checkpoint = Some((sequence_number, summary.clone()));
2587            checkpoints.push((summary, contents));
2588        }
2589
2590        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2591    }
2592
2593    fn get_epoch_total_gas_cost(
2594        &self,
2595        last_checkpoint: Option<&CheckpointSummary>,
2596        cur_checkpoint_effects: &[TransactionEffects],
2597    ) -> GasCostSummary {
2598        let (previous_epoch, previous_gas_costs) = last_checkpoint
2599            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2600            .unwrap_or_default();
2601        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2602        if previous_epoch == self.epoch_store.epoch() {
2603            // sum only when we are within the same epoch
2604            GasCostSummary::new(
2605                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2606                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2607                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2608                previous_gas_costs.non_refundable_storage_fee
2609                    + current_gas_costs.non_refundable_storage_fee,
2610            )
2611        } else {
2612            current_gas_costs
2613        }
2614    }
2615
2616    #[instrument(level = "error", skip_all)]
2617    async fn augment_epoch_last_checkpoint(
2618        &self,
2619        epoch_total_gas_cost: &GasCostSummary,
2620        epoch_start_timestamp_ms: CheckpointTimestamp,
2621        checkpoint_effects: &mut Vec<TransactionEffects>,
2622        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2623        checkpoint: CheckpointSequenceNumber,
2624        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2625        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2626        // >1 checkpoint.
2627        last_checkpoint: CheckpointSequenceNumber,
2628    ) -> CheckpointBuilderResult<SuiSystemState> {
2629        let (system_state, effects) = self
2630            .state
2631            .create_and_execute_advance_epoch_tx(
2632                &self.epoch_store,
2633                epoch_total_gas_cost,
2634                checkpoint,
2635                epoch_start_timestamp_ms,
2636                end_of_epoch_observation_keys,
2637                last_checkpoint,
2638            )
2639            .await?;
2640        checkpoint_effects.push(effects);
2641        signatures.push(vec![]);
2642        Ok(system_state)
2643    }
2644
2645    /// For the given roots return complete list of effects to include in checkpoint
2646    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2647    /// Note that this function may be called multiple times to construct the checkpoint.
2648    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2649    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2650    /// after the call of this function.
2651    #[instrument(level = "debug", skip_all)]
2652    fn complete_checkpoint_effects(
2653        &self,
2654        mut roots: Vec<TransactionEffects>,
2655        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2656    ) -> SuiResult<Vec<TransactionEffects>> {
2657        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2658        let mut results = vec![];
2659        let mut seen = HashSet::new();
2660        loop {
2661            let mut pending = HashSet::new();
2662
2663            let transactions_included = self
2664                .epoch_store
2665                .builder_included_transactions_in_checkpoint(
2666                    roots.iter().map(|e| e.transaction_digest()),
2667                )?;
2668
2669            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2670                let digest = effect.transaction_digest();
2671                // Unnecessary to read effects of a dependency if the effect is already processed.
2672                seen.insert(*digest);
2673
2674                // Skip roots that are already included in the checkpoint.
2675                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2676                    continue;
2677                }
2678
2679                // Skip roots already included in checkpoints or roots from previous epochs
2680                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2681                    continue;
2682                }
2683
2684                let existing_effects = self
2685                    .epoch_store
2686                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2687
2688                for (dependency, effects_signature_exists) in
2689                    effect.dependencies().iter().zip(existing_effects.iter())
2690                {
2691                    // Skip here if dependency not executed in the current epoch.
2692                    // Note that the existence of an effects signature in the
2693                    // epoch store for the given digest indicates that the transaction
2694                    // was locally executed in the current epoch
2695                    if !effects_signature_exists {
2696                        continue;
2697                    }
2698                    if seen.insert(*dependency) {
2699                        pending.insert(*dependency);
2700                    }
2701                }
2702                results.push(effect);
2703            }
2704            if pending.is_empty() {
2705                break;
2706            }
2707            let pending = pending.into_iter().collect::<Vec<_>>();
2708            let effects = self.effects_store.multi_get_executed_effects(&pending);
2709            let effects = effects
2710                .into_iter()
2711                .zip(pending)
2712                .map(|(opt, digest)| match opt {
2713                    Some(x) => x,
2714                    None => panic!(
2715                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2716                        digest
2717                    ),
2718                })
2719                .collect::<Vec<_>>();
2720            roots = effects;
2721        }
2722
2723        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2724        Ok(results)
2725    }
2726
2727    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2728    // in simtest.
2729    #[cfg(msim)]
2730    fn expensive_consensus_commit_prologue_invariants_check(
2731        &self,
2732        root_digests: &[TransactionDigest],
2733        sorted: &[TransactionEffects],
2734    ) {
2735        // Gets all the consensus commit prologue transactions from the roots.
2736        let root_txs = self
2737            .state
2738            .get_transaction_cache_reader()
2739            .multi_get_transaction_blocks(root_digests);
2740        let ccps = root_txs
2741            .iter()
2742            .filter_map(|tx| {
2743                if let Some(tx) = tx {
2744                    if tx.transaction_data().is_consensus_commit_prologue() {
2745                        Some(tx)
2746                    } else {
2747                        None
2748                    }
2749                } else {
2750                    None
2751                }
2752            })
2753            .collect::<Vec<_>>();
2754
2755        // There should be at most one consensus commit prologue transaction in the roots.
2756        assert!(ccps.len() <= 1);
2757
2758        // Get all the transactions in the checkpoint.
2759        let txs = self
2760            .state
2761            .get_transaction_cache_reader()
2762            .multi_get_transaction_blocks(
2763                &sorted
2764                    .iter()
2765                    .map(|tx| tx.transaction_digest().clone())
2766                    .collect::<Vec<_>>(),
2767            );
2768
2769        if ccps.len() == 0 {
2770            // If there is no consensus commit prologue transaction in the roots, then there should be no
2771            // consensus commit prologue transaction in the checkpoint.
2772            for tx in txs.iter() {
2773                if let Some(tx) = tx {
2774                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2775                }
2776            }
2777        } else {
2778            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2779            assert!(
2780                txs[0]
2781                    .as_ref()
2782                    .unwrap()
2783                    .transaction_data()
2784                    .is_consensus_commit_prologue()
2785            );
2786
2787            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2788
2789            for tx in txs.iter().skip(1) {
2790                if let Some(tx) = tx {
2791                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2792                }
2793            }
2794        }
2795    }
2796}
2797
2798async fn wait_for_effects_with_retry(
2799    effects_store: &dyn TransactionCacheRead,
2800    task_name: &'static str,
2801    digests: &[TransactionDigest],
2802    tx_key: TransactionKey,
2803) -> Vec<TransactionEffects> {
2804    let delay = if in_antithesis() {
2805        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2806        15
2807    } else {
2808        5
2809    };
2810    loop {
2811        match tokio::time::timeout(Duration::from_secs(delay), async {
2812            effects_store
2813                .notify_read_executed_effects(task_name, digests)
2814                .await
2815        })
2816        .await
2817        {
2818            Ok(effects) => break effects,
2819            Err(_) => {
2820                debug_fatal!(
2821                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2822                    tx_key
2823                );
2824            }
2825        }
2826    }
2827}
2828
2829impl CheckpointAggregator {
2830    fn new(
2831        tables: Arc<CheckpointStore>,
2832        epoch_store: Arc<AuthorityPerEpochStore>,
2833        notify: Arc<Notify>,
2834        receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2835        output: Box<dyn CertifiedCheckpointOutput>,
2836        state: Arc<AuthorityState>,
2837        metrics: Arc<CheckpointMetrics>,
2838    ) -> Self {
2839        Self {
2840            store: tables,
2841            epoch_store,
2842            notify,
2843            receiver,
2844            pending: BTreeMap::new(),
2845            current: None,
2846            output,
2847            state,
2848            metrics,
2849        }
2850    }
2851
2852    async fn run(mut self) {
2853        info!("Starting CheckpointAggregator");
2854        loop {
2855            // Drain all signatures that arrived since the last iteration into the pending buffer
2856            while let Ok(sig) = self.receiver.try_recv() {
2857                self.pending
2858                    .entry(sig.summary.sequence_number)
2859                    .or_default()
2860                    .push(sig);
2861            }
2862
2863            if let Err(e) = self.run_and_notify().await {
2864                error!(
2865                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2866                    e
2867                );
2868                self.metrics.checkpoint_errors.inc();
2869                tokio::time::sleep(Duration::from_secs(1)).await;
2870                continue;
2871            }
2872
2873            tokio::select! {
2874                Some(sig) = self.receiver.recv() => {
2875                    self.pending
2876                        .entry(sig.summary.sequence_number)
2877                        .or_default()
2878                        .push(sig);
2879                }
2880                _ = self.notify.notified() => {}
2881                _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2882            }
2883        }
2884    }
2885
2886    async fn run_and_notify(&mut self) -> SuiResult {
2887        let summaries = self.run_inner()?;
2888        for summary in summaries {
2889            self.output.certified_checkpoint_created(&summary).await?;
2890        }
2891        Ok(())
2892    }
2893
2894    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2895        let _scope = monitored_scope("CheckpointAggregator");
2896        let mut result = vec![];
2897        'outer: loop {
2898            let next_to_certify = self.next_checkpoint_to_certify()?;
2899            // Discard buffered signatures for checkpoints already certified
2900            // (e.g. certified via StateSync before local aggregation completed).
2901            self.pending.retain(|&seq, _| seq >= next_to_certify);
2902            let current = if let Some(current) = &mut self.current {
2903                // It's possible that the checkpoint was already certified by
2904                // the rest of the network and we've already received the
2905                // certified checkpoint via StateSync. In this case, we reset
2906                // the current signature aggregator to the next checkpoint to
2907                // be certified
2908                if current.summary.sequence_number < next_to_certify {
2909                    assert_reachable!("skip checkpoint certification");
2910                    self.current = None;
2911                    continue;
2912                }
2913                current
2914            } else {
2915                let Some(summary) = self
2916                    .epoch_store
2917                    .get_built_checkpoint_summary(next_to_certify)?
2918                else {
2919                    return Ok(result);
2920                };
2921                self.current = Some(CheckpointSignatureAggregator {
2922                    digest: summary.digest(),
2923                    summary,
2924                    signatures_by_digest: MultiStakeAggregator::new(
2925                        self.epoch_store.committee().clone(),
2926                    ),
2927                    store: self.store.clone(),
2928                    state: self.state.clone(),
2929                    metrics: self.metrics.clone(),
2930                });
2931                self.current.as_mut().unwrap()
2932            };
2933
2934            let seq = current.summary.sequence_number;
2935            let sigs = self.pending.remove(&seq).unwrap_or_default();
2936            if sigs.is_empty() {
2937                trace!(
2938                    checkpoint_seq =? seq,
2939                    "Not enough checkpoint signatures",
2940                );
2941                return Ok(result);
2942            }
2943            for data in sigs {
2944                trace!(
2945                    checkpoint_seq = seq,
2946                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2947                    current.summary.digest(),
2948                    data.summary.auth_sig().authority.concise()
2949                );
2950                self.metrics
2951                    .checkpoint_participation
2952                    .with_label_values(&[&format!(
2953                        "{:?}",
2954                        data.summary.auth_sig().authority.concise()
2955                    )])
2956                    .inc();
2957                if let Ok(auth_signature) = current.try_aggregate(data) {
2958                    debug!(
2959                        checkpoint_seq = seq,
2960                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2961                        current.summary.digest(),
2962                    );
2963                    let summary = VerifiedCheckpoint::new_unchecked(
2964                        CertifiedCheckpointSummary::new_from_data_and_sig(
2965                            current.summary.clone(),
2966                            auth_signature,
2967                        ),
2968                    );
2969
2970                    self.store.insert_certified_checkpoint(&summary)?;
2971                    self.metrics.last_certified_checkpoint.set(seq as i64);
2972                    current.summary.report_checkpoint_age(
2973                        &self.metrics.last_certified_checkpoint_age,
2974                        &self.metrics.last_certified_checkpoint_age_ms,
2975                    );
2976                    result.push(summary.into_inner());
2977                    self.current = None;
2978                    continue 'outer;
2979                }
2980            }
2981            break;
2982        }
2983        Ok(result)
2984    }
2985
2986    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2987        Ok(self
2988            .store
2989            .tables
2990            .certified_checkpoints
2991            .reversed_safe_iter_with_bounds(None, None)?
2992            .next()
2993            .transpose()?
2994            .map(|(seq, _)| seq + 1)
2995            .unwrap_or_default())
2996    }
2997}
2998
2999impl CheckpointSignatureAggregator {
3000    #[allow(clippy::result_unit_err)]
3001    pub fn try_aggregate(
3002        &mut self,
3003        data: CheckpointSignatureMessage,
3004    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3005        let their_digest = *data.summary.digest();
3006        let (_, signature) = data.summary.into_data_and_sig();
3007        let author = signature.authority;
3008        let envelope =
3009            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3010        match self.signatures_by_digest.insert(their_digest, envelope) {
3011            // ignore repeated signatures
3012            InsertResult::Failed { error }
3013                if matches!(
3014                    error.as_inner(),
3015                    SuiErrorKind::StakeAggregatorRepeatedSigner {
3016                        conflicting_sig: false,
3017                        ..
3018                    },
3019                ) =>
3020            {
3021                Err(())
3022            }
3023            InsertResult::Failed { error } => {
3024                warn!(
3025                    checkpoint_seq = self.summary.sequence_number,
3026                    "Failed to aggregate new signature from validator {:?}: {:?}",
3027                    author.concise(),
3028                    error
3029                );
3030                self.check_for_split_brain();
3031                Err(())
3032            }
3033            InsertResult::QuorumReached(cert) => {
3034                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
3035                // the signature so we know that the author signed the message at some point.
3036                if their_digest != self.digest {
3037                    self.metrics.remote_checkpoint_forks.inc();
3038                    warn!(
3039                        checkpoint_seq = self.summary.sequence_number,
3040                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3041                        author.concise(),
3042                        their_digest,
3043                        self.digest
3044                    );
3045                    return Err(());
3046                }
3047                Ok(cert)
3048            }
3049            InsertResult::NotEnoughVotes {
3050                bad_votes: _,
3051                bad_authorities: _,
3052            } => {
3053                self.check_for_split_brain();
3054                Err(())
3055            }
3056        }
3057    }
3058
3059    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
3060    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
3061    /// irrespective of the outcome of any outstanding votes.
3062    fn check_for_split_brain(&self) {
3063        debug!(
3064            checkpoint_seq = self.summary.sequence_number,
3065            "Checking for split brain condition"
3066        );
3067        if self.signatures_by_digest.quorum_unreachable() {
3068            // TODO: at this point we should immediately halt processing
3069            // of new transaction certificates to avoid building on top of
3070            // forked output
3071            // self.halt_all_execution();
3072
3073            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3074            let digests_by_stake_messages = all_unique_values
3075                .iter()
3076                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3077                .map(|(digest, (_authorities, total_stake))| {
3078                    format!("{:?} (total stake: {})", digest, total_stake)
3079                })
3080                .collect::<Vec<String>>();
3081            fail_point_arg!("kill_split_brain_node", |(
3082                checkpoint_overrides,
3083                forked_authorities,
3084            ): (
3085                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3086                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3087            )| {
3088                #[cfg(msim)]
3089                {
3090                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3091                        (checkpoint_overrides.lock(), forked_authorities.lock())
3092                    {
3093                        // Find the digest produced by non-forked authorities
3094                        let correct_digest = all_unique_values
3095                            .iter()
3096                            .find(|(_, (authorities, _))| {
3097                                // Check if any authority that produced this digest is NOT in the forked set
3098                                authorities
3099                                    .iter()
3100                                    .any(|auth| !forked_authorities_set.contains(auth))
3101                            })
3102                            .map(|(digest, _)| digest.to_string())
3103                            .unwrap_or_else(|| {
3104                                // Fallback: use the digest with the highest stake
3105                                all_unique_values
3106                                    .iter()
3107                                    .max_by_key(|(_, (_, stake))| *stake)
3108                                    .map(|(digest, _)| digest.to_string())
3109                                    .unwrap_or_else(|| self.digest.to_string())
3110                            });
3111
3112                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
3113
3114                        tracing::error!(
3115                            fatal = true,
3116                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3117                            self.summary.sequence_number,
3118                            correct_digest
3119                        );
3120                    }
3121                }
3122            });
3123
3124            debug_fatal!(
3125                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3126                self.summary.sequence_number,
3127                self.signatures_by_digest.uncommitted_stake(),
3128                digests_by_stake_messages
3129            );
3130            self.metrics.split_brain_checkpoint_forks.inc();
3131
3132            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3133            let local_summary = self.summary.clone();
3134            let state = self.state.clone();
3135            let tables = self.store.clone();
3136
3137            tokio::spawn(async move {
3138                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3139            });
3140        }
3141    }
3142}
3143
3144/// Create data dump containing relevant data for diagnosing cause of the
3145/// split brain by querying one disagreeing validator for full checkpoint contents.
3146/// To minimize peer chatter, we only query one validator at random from each
3147/// disagreeing faction, as all honest validators that participated in this round may
3148/// inevitably run the same process.
3149async fn diagnose_split_brain(
3150    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3151    local_summary: CheckpointSummary,
3152    state: Arc<AuthorityState>,
3153    tables: Arc<CheckpointStore>,
3154) {
3155    debug!(
3156        checkpoint_seq = local_summary.sequence_number,
3157        "Running split brain diagnostics..."
3158    );
3159    let time = SystemTime::now();
3160    // collect one random disagreeing validator per differing digest
3161    let digest_to_validator = all_unique_values
3162        .iter()
3163        .filter_map(|(digest, (validators, _))| {
3164            if *digest != local_summary.digest() {
3165                let random_validator = validators.choose(&mut get_rng()).unwrap();
3166                Some((*digest, *random_validator))
3167            } else {
3168                None
3169            }
3170        })
3171        .collect::<HashMap<_, _>>();
3172    if digest_to_validator.is_empty() {
3173        panic!(
3174            "Given split brain condition, there should be at \
3175                least one validator that disagrees with local signature"
3176        );
3177    }
3178
3179    let epoch_store = state.load_epoch_store_one_call_per_task();
3180    let committee = epoch_store
3181        .epoch_start_state()
3182        .get_sui_committee_with_network_metadata();
3183    let network_config = default_mysten_network_config();
3184    let network_clients =
3185        make_network_authority_clients_with_network_config(&committee, &network_config);
3186
3187    // Query all disagreeing validators
3188    let response_futures = digest_to_validator
3189        .values()
3190        .cloned()
3191        .map(|validator| {
3192            let client = network_clients
3193                .get(&validator)
3194                .expect("Failed to get network client");
3195            let request = CheckpointRequestV2 {
3196                sequence_number: Some(local_summary.sequence_number),
3197                request_content: true,
3198                certified: false,
3199            };
3200            client.handle_checkpoint_v2(request)
3201        })
3202        .collect::<Vec<_>>();
3203
3204    let digest_name_pair = digest_to_validator.iter();
3205    let response_data = futures::future::join_all(response_futures)
3206        .await
3207        .into_iter()
3208        .zip(digest_name_pair)
3209        .filter_map(|(response, (digest, name))| match response {
3210            Ok(response) => match response {
3211                CheckpointResponseV2 {
3212                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3213                    contents: Some(contents),
3214                } => Some((*name, *digest, summary, contents)),
3215                CheckpointResponseV2 {
3216                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3217                    contents: _,
3218                } => {
3219                    panic!("Expected pending checkpoint, but got certified checkpoint");
3220                }
3221                CheckpointResponseV2 {
3222                    checkpoint: None,
3223                    contents: _,
3224                } => {
3225                    error!(
3226                        "Summary for checkpoint {:?} not found on validator {:?}",
3227                        local_summary.sequence_number, name
3228                    );
3229                    None
3230                }
3231                CheckpointResponseV2 {
3232                    checkpoint: _,
3233                    contents: None,
3234                } => {
3235                    error!(
3236                        "Contents for checkpoint {:?} not found on validator {:?}",
3237                        local_summary.sequence_number, name
3238                    );
3239                    None
3240                }
3241            },
3242            Err(e) => {
3243                error!(
3244                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3245                    e
3246                );
3247                None
3248            }
3249        })
3250        .collect::<Vec<_>>();
3251
3252    let local_checkpoint_contents = tables
3253        .get_checkpoint_contents(&local_summary.content_digest)
3254        .unwrap_or_else(|_| {
3255            panic!(
3256                "Could not find checkpoint contents for digest {:?}",
3257                local_summary.digest()
3258            )
3259        })
3260        .unwrap_or_else(|| {
3261            panic!(
3262                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3263                local_summary.sequence_number,
3264                local_summary.digest()
3265            )
3266        });
3267    let local_contents_text = format!("{local_checkpoint_contents:?}");
3268
3269    let local_summary_text = format!("{local_summary:?}");
3270    let local_validator = state.name.concise();
3271    let diff_patches = response_data
3272        .iter()
3273        .map(|(name, other_digest, other_summary, contents)| {
3274            let other_contents_text = format!("{contents:?}");
3275            let other_summary_text = format!("{other_summary:?}");
3276            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3277                .enumerate_transactions(&local_summary)
3278                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3279                .unzip();
3280            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3281                .enumerate_transactions(other_summary)
3282                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3283                .unzip();
3284            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3285            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3286            let local_transactions_text = format!("{local_transactions:#?}");
3287            let other_transactions_text = format!("{other_transactions:#?}");
3288            let transactions_patch =
3289                create_patch(&local_transactions_text, &other_transactions_text);
3290            let local_effects_text = format!("{local_effects:#?}");
3291            let other_effects_text = format!("{other_effects:#?}");
3292            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3293            let seq_number = local_summary.sequence_number;
3294            let local_digest = local_summary.digest();
3295            let other_validator = name.concise();
3296            format!(
3297                "Checkpoint: {seq_number:?}\n\
3298                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3299                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3300                Summary Diff: \n{summary_patch}\n\n\
3301                Contents Diff: \n{contents_patch}\n\n\
3302                Transactions Diff: \n{transactions_patch}\n\n\
3303                Effects Diff: \n{effects_patch}",
3304            )
3305        })
3306        .collect::<Vec<_>>()
3307        .join("\n\n\n");
3308
3309    let header = format!(
3310        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3311        Datetime: {:?}",
3312        time
3313    );
3314    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3315    let path = tempfile::tempdir()
3316        .expect("Failed to create tempdir")
3317        .keep()
3318        .join(Path::new("checkpoint_fork_dump.txt"));
3319    let mut file = File::create(path).unwrap();
3320    write!(file, "{}", fork_logs_text).unwrap();
3321    debug!("{}", fork_logs_text);
3322}
3323
3324pub trait CheckpointServiceNotify {
3325    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3326
3327    fn notify_checkpoint(&self) -> SuiResult;
3328}
3329
3330#[allow(clippy::large_enum_variant)]
3331enum CheckpointServiceState {
3332    Unstarted(
3333        (
3334            CheckpointBuilder,
3335            CheckpointAggregator,
3336            CheckpointStateHasher,
3337        ),
3338    ),
3339    Started,
3340}
3341
3342impl CheckpointServiceState {
3343    fn take_unstarted(
3344        &mut self,
3345    ) -> (
3346        CheckpointBuilder,
3347        CheckpointAggregator,
3348        CheckpointStateHasher,
3349    ) {
3350        let mut state = CheckpointServiceState::Started;
3351        std::mem::swap(self, &mut state);
3352
3353        match state {
3354            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3355                (builder, aggregator, hasher)
3356            }
3357            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3358        }
3359    }
3360}
3361
3362pub struct CheckpointService {
3363    tables: Arc<CheckpointStore>,
3364    notify_builder: Arc<Notify>,
3365    signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3366    // A notification for the current highest built sequence number.
3367    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3368    // The highest sequence number that had already been built at the time CheckpointService
3369    // was constructed
3370    highest_previously_built_seq: CheckpointSequenceNumber,
3371    metrics: Arc<CheckpointMetrics>,
3372    state: Mutex<CheckpointServiceState>,
3373}
3374
3375impl CheckpointService {
3376    /// Constructs a new CheckpointService in an un-started state.
3377    // The signature channel is unbounded because notify_checkpoint_signature is called from a
3378    // sync context (consensus_validator.rs implements a sync external trait) and cannot block.
3379    // The channel is consumed by a single async aggregator task that drains it continuously, so
3380    // unbounded growth is not a concern in practice.
3381    #[allow(clippy::disallowed_methods)]
3382    pub fn build(
3383        state: Arc<AuthorityState>,
3384        checkpoint_store: Arc<CheckpointStore>,
3385        epoch_store: Arc<AuthorityPerEpochStore>,
3386        effects_store: Arc<dyn TransactionCacheRead>,
3387        global_state_hasher: Weak<GlobalStateHasher>,
3388        checkpoint_output: Box<dyn CheckpointOutput>,
3389        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3390        metrics: Arc<CheckpointMetrics>,
3391        max_transactions_per_checkpoint: usize,
3392        max_checkpoint_size_bytes: usize,
3393    ) -> Arc<Self> {
3394        info!(
3395            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3396        );
3397        let notify_builder = Arc::new(Notify::new());
3398        let notify_aggregator = Arc::new(Notify::new());
3399
3400        // We may have built higher checkpoint numbers before restarting.
3401        let highest_previously_built_seq = checkpoint_store
3402            .get_latest_locally_computed_checkpoint()
3403            .expect("failed to get latest locally computed checkpoint")
3404            .map(|s| s.sequence_number)
3405            .unwrap_or(0);
3406
3407        let highest_currently_built_seq =
3408            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3409                .expect("epoch should not have ended")
3410                .map(|(seq, _)| seq)
3411                .unwrap_or(0);
3412
3413        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3414
3415        let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3416
3417        let aggregator = CheckpointAggregator::new(
3418            checkpoint_store.clone(),
3419            epoch_store.clone(),
3420            notify_aggregator.clone(),
3421            signature_receiver,
3422            certified_checkpoint_output,
3423            state.clone(),
3424            metrics.clone(),
3425        );
3426
3427        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3428
3429        let ckpt_state_hasher = CheckpointStateHasher::new(
3430            epoch_store.clone(),
3431            global_state_hasher.clone(),
3432            receive_from_builder,
3433        );
3434
3435        let builder = CheckpointBuilder::new(
3436            state.clone(),
3437            checkpoint_store.clone(),
3438            epoch_store.clone(),
3439            notify_builder.clone(),
3440            effects_store,
3441            global_state_hasher,
3442            send_to_hasher,
3443            checkpoint_output,
3444            notify_aggregator.clone(),
3445            highest_currently_built_seq_tx.clone(),
3446            metrics.clone(),
3447            max_transactions_per_checkpoint,
3448            max_checkpoint_size_bytes,
3449        );
3450
3451        Arc::new(Self {
3452            tables: checkpoint_store,
3453            notify_builder,
3454            signature_sender,
3455            highest_currently_built_seq_tx,
3456            highest_previously_built_seq,
3457            metrics,
3458            state: Mutex::new(CheckpointServiceState::Unstarted((
3459                builder,
3460                aggregator,
3461                ckpt_state_hasher,
3462            ))),
3463        })
3464    }
3465
3466    /// Starts the CheckpointService.
3467    ///
3468    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3469    /// been built before the most recent restart. You can think of this as a WAL replay
3470    /// operation. Upon startup, we may have a number of consensus commits and resulting
3471    /// checkpoints that were built but not committed to disk. We want to reprocess the
3472    /// commits and rebuild the checkpoints before starting normal operation.
3473    pub async fn spawn(
3474        &self,
3475        epoch_store: Arc<AuthorityPerEpochStore>,
3476        consensus_replay_waiter: Option<ReplayWaiter>,
3477    ) {
3478        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3479
3480        // Clean up state hashes computed after the last built checkpoint
3481        // This prevents ECMH divergence after fork recovery restarts
3482
3483        // Note: there is a rare crash recovery edge case where we write the builder
3484        // summary, but crash before we can bump the highest executed checkpoint.
3485        // If we committed the builder summary, it was certified and unforked, so there
3486        // is no need to clear that state hash. If we do clear it, then checkpoint executor
3487        // will wait forever for checkpoint builder to produce the state hash, which will
3488        // never happen.
3489        let last_persisted_builder_seq = epoch_store
3490            .last_persisted_checkpoint_builder_summary()
3491            .expect("epoch should not have ended")
3492            .map(|s| s.summary.sequence_number);
3493
3494        let last_executed_seq = self
3495            .tables
3496            .get_highest_executed_checkpoint()
3497            .expect("Failed to get highest executed checkpoint")
3498            .map(|checkpoint| *checkpoint.sequence_number());
3499
3500        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3501            if let Err(e) = builder
3502                .epoch_store
3503                .clear_state_hashes_after_checkpoint(last_committed_seq)
3504            {
3505                error!(
3506                    "Failed to clear state hashes after checkpoint {}: {:?}",
3507                    last_committed_seq, e
3508                );
3509            } else {
3510                info!(
3511                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3512                    last_committed_seq
3513                );
3514            }
3515        }
3516
3517        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3518
3519        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3520        let aggregator_task = spawn_monitored_task!(aggregator.run());
3521
3522        spawn_monitored_task!(async move {
3523            epoch_store
3524                .within_alive_epoch(async move {
3525                    builder.run(consensus_replay_waiter).await;
3526                    builder_finished_tx.send(()).ok();
3527                })
3528                .await
3529                .ok();
3530
3531            // state hasher will terminate as soon as it has finished processing all messages from builder
3532            state_hasher_task
3533                .await
3534                .expect("state hasher should exit normally");
3535
3536            // builder must shut down before aggregator and state_hasher, since it sends
3537            // messages to them
3538            aggregator_task.abort();
3539            aggregator_task.await.ok();
3540        });
3541
3542        // If this times out, the validator may still start up. The worst that can
3543        // happen is that we will crash later on instead of immediately. The eventual
3544        // crash would occur because we may be missing transactions that are below the
3545        // highest_synced_checkpoint watermark, which can cause a crash in
3546        // `CheckpointExecutor::extract_randomness_rounds`.
3547        if tokio::time::timeout(Duration::from_secs(120), async move {
3548            tokio::select! {
3549                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3550                _ = self.wait_for_rebuilt_checkpoints() => (),
3551            }
3552        })
3553        .await
3554        .is_err()
3555        {
3556            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3557        }
3558    }
3559}
3560
3561impl CheckpointService {
3562    /// Waits until all checkpoints had been built before the node restarted
3563    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3564    /// (and their transactions) below the highest_synced_checkpoint watermark are
3565    /// available. Once the checkpoints are constructed, we can be sure that the
3566    /// transactions have also been executed.
3567    pub async fn wait_for_rebuilt_checkpoints(&self) {
3568        let highest_previously_built_seq = self.highest_previously_built_seq;
3569        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3570        let mut highest_currently_built_seq = *rx.borrow_and_update();
3571        info!(
3572            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3573        );
3574        loop {
3575            if highest_currently_built_seq >= highest_previously_built_seq {
3576                info!("Checkpoint rebuild complete");
3577                break;
3578            }
3579            rx.changed().await.unwrap();
3580            highest_currently_built_seq = *rx.borrow_and_update();
3581        }
3582    }
3583
3584    #[cfg(test)]
3585    fn write_and_notify_checkpoint_for_testing(
3586        &self,
3587        epoch_store: &AuthorityPerEpochStore,
3588        checkpoint: PendingCheckpoint,
3589    ) -> SuiResult {
3590        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3591
3592        let mut output = ConsensusCommitOutput::new(0);
3593        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3594        output.set_default_commit_stats_for_testing();
3595        epoch_store.push_consensus_output_for_tests(output);
3596        self.notify_checkpoint()?;
3597        Ok(())
3598    }
3599}
3600
3601impl CheckpointServiceNotify for CheckpointService {
3602    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3603        let sequence = info.summary.sequence_number;
3604        let signer = info.summary.auth_sig().authority.concise();
3605
3606        if let Some(highest_verified_checkpoint) = self
3607            .tables
3608            .get_highest_verified_checkpoint()?
3609            .map(|x| *x.sequence_number())
3610            && sequence <= highest_verified_checkpoint
3611        {
3612            trace!(
3613                checkpoint_seq = sequence,
3614                "Ignore checkpoint signature from {} - already certified", signer,
3615            );
3616            self.metrics
3617                .last_ignored_checkpoint_signature_received
3618                .set(sequence as i64);
3619            return Ok(());
3620        }
3621        trace!(
3622            checkpoint_seq = sequence,
3623            "Received checkpoint signature, digest {} from {}",
3624            info.summary.digest(),
3625            signer,
3626        );
3627        self.metrics
3628            .last_received_checkpoint_signatures
3629            .with_label_values(&[&signer.to_string()])
3630            .set(sequence as i64);
3631        self.signature_sender.send(info.clone()).ok();
3632        Ok(())
3633    }
3634
3635    fn notify_checkpoint(&self) -> SuiResult {
3636        self.notify_builder.notify_one();
3637        Ok(())
3638    }
3639}
3640
3641// test helper
3642pub struct CheckpointServiceNoop {}
3643impl CheckpointServiceNotify for CheckpointServiceNoop {
3644    fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3645        Ok(())
3646    }
3647
3648    fn notify_checkpoint(&self) -> SuiResult {
3649        Ok(())
3650    }
3651}
3652
3653impl PendingCheckpoint {
3654    pub fn height(&self) -> CheckpointHeight {
3655        self.details.checkpoint_height
3656    }
3657
3658    pub fn roots(&self) -> &Vec<TransactionKey> {
3659        &self.roots
3660    }
3661
3662    pub fn details(&self) -> &PendingCheckpointInfo {
3663        &self.details
3664    }
3665}
3666
3667impl PendingCheckpointV2 {
3668    pub fn height(&self) -> CheckpointHeight {
3669        self.details.checkpoint_height
3670    }
3671
3672    pub(crate) fn num_roots(&self) -> usize {
3673        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3674    }
3675}
3676
3677pin_project! {
3678    pub struct PollCounter<Fut> {
3679        #[pin]
3680        future: Fut,
3681        count: usize,
3682    }
3683}
3684
3685impl<Fut> PollCounter<Fut> {
3686    pub fn new(future: Fut) -> Self {
3687        Self { future, count: 0 }
3688    }
3689
3690    pub fn count(&self) -> usize {
3691        self.count
3692    }
3693}
3694
3695impl<Fut: Future> Future for PollCounter<Fut> {
3696    type Output = (usize, Fut::Output);
3697
3698    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3699        let this = self.project();
3700        *this.count += 1;
3701        match this.future.poll(cx) {
3702            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3703            Poll::Pending => Poll::Pending,
3704        }
3705    }
3706}
3707
3708fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3709    PollCounter::new(future)
3710}
3711
3712#[cfg(test)]
3713mod tests {
3714    use super::*;
3715    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3716    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3717    use futures::FutureExt as _;
3718    use futures::future::BoxFuture;
3719    use std::collections::HashMap;
3720    use std::ops::Deref;
3721    use sui_macros::sim_test;
3722    use sui_protocol_config::{Chain, ProtocolConfig};
3723    use sui_types::accumulator_event::AccumulatorEvent;
3724    use sui_types::authenticator_state::ActiveJwk;
3725    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3726    use sui_types::crypto::Signature;
3727    use sui_types::effects::{TransactionEffects, TransactionEvents};
3728    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3729    use sui_types::transaction::VerifiedTransaction;
3730    use tokio::sync::mpsc;
3731
3732    #[tokio::test]
3733    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3734        let store = CheckpointStore::new_for_tests();
3735        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3736        for seq in 70u64..=80u64 {
3737            let contents =
3738                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3739                    [sui_types::base_types::ExecutionDigests::new(
3740                        sui_types::digests::TransactionDigest::random(),
3741                        sui_types::digests::TransactionEffectsDigest::ZERO,
3742                    )],
3743                );
3744            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3745                &protocol,
3746                0,
3747                seq,
3748                0,
3749                &contents,
3750                None,
3751                sui_types::gas::GasCostSummary::default(),
3752                None,
3753                0,
3754                Vec::new(),
3755                Vec::new(),
3756            );
3757            store
3758                .tables
3759                .locally_computed_checkpoints
3760                .insert(&seq, &summary)
3761                .unwrap();
3762        }
3763
3764        store
3765            .clear_locally_computed_checkpoints_from(76)
3766            .expect("clear should succeed");
3767
3768        // Explicit boundary checks: 75 must remain, 76 must be deleted
3769        assert!(
3770            store
3771                .tables
3772                .locally_computed_checkpoints
3773                .get(&75)
3774                .unwrap()
3775                .is_some()
3776        );
3777        assert!(
3778            store
3779                .tables
3780                .locally_computed_checkpoints
3781                .get(&76)
3782                .unwrap()
3783                .is_none()
3784        );
3785
3786        for seq in 70u64..76u64 {
3787            assert!(
3788                store
3789                    .tables
3790                    .locally_computed_checkpoints
3791                    .get(&seq)
3792                    .unwrap()
3793                    .is_some()
3794            );
3795        }
3796        for seq in 76u64..=80u64 {
3797            assert!(
3798                store
3799                    .tables
3800                    .locally_computed_checkpoints
3801                    .get(&seq)
3802                    .unwrap()
3803                    .is_none()
3804            );
3805        }
3806    }
3807
3808    #[tokio::test]
3809    async fn test_fork_detection_storage() {
3810        let store = CheckpointStore::new_for_tests();
3811        // checkpoint fork
3812        let seq_num = 42;
3813        let digest = CheckpointDigest::random();
3814
3815        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3816
3817        store
3818            .record_checkpoint_fork_detected(seq_num, digest)
3819            .unwrap();
3820
3821        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3822        assert!(retrieved.is_some());
3823        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3824        assert_eq!(retrieved_seq, seq_num);
3825        assert_eq!(retrieved_digest, digest);
3826
3827        store.clear_checkpoint_fork_detected().unwrap();
3828        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3829
3830        // txn fork
3831        let tx_digest = TransactionDigest::random();
3832        let expected_effects = TransactionEffectsDigest::random();
3833        let actual_effects = TransactionEffectsDigest::random();
3834
3835        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3836
3837        store
3838            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3839            .unwrap();
3840
3841        let retrieved = store.get_transaction_fork_detected().unwrap();
3842        assert!(retrieved.is_some());
3843        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3844        assert_eq!(retrieved_tx, tx_digest);
3845        assert_eq!(retrieved_expected, expected_effects);
3846        assert_eq!(retrieved_actual, actual_effects);
3847
3848        store.clear_transaction_fork_detected().unwrap();
3849        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3850    }
3851
3852    #[sim_test]
3853    pub async fn checkpoint_builder_test() {
3854        telemetry_subscribers::init_for_testing();
3855
3856        let mut protocol_config =
3857            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3858        protocol_config.disable_accumulators_for_testing();
3859        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3860        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3861        let state = TestAuthorityBuilder::new()
3862            .with_protocol_config(protocol_config)
3863            .build()
3864            .await;
3865
3866        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3867            0,
3868            0,
3869            vec![],
3870            SequenceNumber::new(),
3871        );
3872
3873        let jwks = {
3874            let mut jwks = Vec::new();
3875            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3876                jwks.push(ActiveJwk {
3877                    jwk_id: JwkId::new(
3878                        "https://accounts.google.com".to_string(),
3879                        "1234567890".to_string(),
3880                    ),
3881                    jwk: JWK {
3882                        kty: "RSA".to_string(),
3883                        e: "AQAB".to_string(),
3884                        n: "1234567890".to_string(),
3885                        alg: "RS256".to_string(),
3886                    },
3887                    epoch: 0,
3888                });
3889            }
3890            jwks
3891        };
3892
3893        let dummy_tx_with_data =
3894            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3895
3896        for i in 0..15 {
3897            state
3898                .database_for_testing()
3899                .perpetual_tables
3900                .transactions
3901                .insert(&d(i), dummy_tx.serializable_ref())
3902                .unwrap();
3903        }
3904        for i in 15..20 {
3905            state
3906                .database_for_testing()
3907                .perpetual_tables
3908                .transactions
3909                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3910                .unwrap();
3911        }
3912
3913        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3914        commit_cert_for_test(
3915            &mut store,
3916            state.clone(),
3917            d(1),
3918            vec![d(2), d(3)],
3919            GasCostSummary::new(11, 12, 11, 1),
3920        );
3921        commit_cert_for_test(
3922            &mut store,
3923            state.clone(),
3924            d(2),
3925            vec![d(3), d(4)],
3926            GasCostSummary::new(21, 22, 21, 1),
3927        );
3928        commit_cert_for_test(
3929            &mut store,
3930            state.clone(),
3931            d(3),
3932            vec![],
3933            GasCostSummary::new(31, 32, 31, 1),
3934        );
3935        commit_cert_for_test(
3936            &mut store,
3937            state.clone(),
3938            d(4),
3939            vec![],
3940            GasCostSummary::new(41, 42, 41, 1),
3941        );
3942        for i in [5, 6, 7, 10, 11, 12, 13] {
3943            commit_cert_for_test(
3944                &mut store,
3945                state.clone(),
3946                d(i),
3947                vec![],
3948                GasCostSummary::new(41, 42, 41, 1),
3949            );
3950        }
3951        for i in [15, 16, 17] {
3952            commit_cert_for_test(
3953                &mut store,
3954                state.clone(),
3955                d(i),
3956                vec![],
3957                GasCostSummary::new(51, 52, 51, 1),
3958            );
3959        }
3960        let all_digests: Vec<_> = store.keys().copied().collect();
3961        for digest in all_digests {
3962            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3963            state
3964                .epoch_store_for_testing()
3965                .test_insert_user_signature(digest, vec![(signature, None)]);
3966        }
3967
3968        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3969        let (certified_output, mut certified_result) =
3970            mpsc::channel::<CertifiedCheckpointSummary>(10);
3971        let store = Arc::new(store);
3972
3973        let ckpt_dir = tempfile::tempdir().unwrap();
3974        let checkpoint_store =
3975            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3976        let epoch_store = state.epoch_store_for_testing();
3977
3978        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3979            state.get_global_state_hash_store().clone(),
3980        ));
3981
3982        let checkpoint_service = CheckpointService::build(
3983            state.clone(),
3984            checkpoint_store,
3985            epoch_store.clone(),
3986            store,
3987            Arc::downgrade(&global_state_hasher),
3988            Box::new(output),
3989            Box::new(certified_output),
3990            CheckpointMetrics::new_for_tests(),
3991            3,
3992            100_000,
3993        );
3994        checkpoint_service.spawn(epoch_store.clone(), None).await;
3995
3996        checkpoint_service
3997            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3998            .unwrap();
3999        checkpoint_service
4000            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4001            .unwrap();
4002        checkpoint_service
4003            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4004            .unwrap();
4005        checkpoint_service
4006            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4007            .unwrap();
4008        checkpoint_service
4009            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4010            .unwrap();
4011        checkpoint_service
4012            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4013            .unwrap();
4014
4015        let (c1c, c1s) = result.recv().await.unwrap();
4016        let (c2c, c2s) = result.recv().await.unwrap();
4017
4018        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4019        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4020        assert_eq!(c1t, vec![d(4)]);
4021        assert_eq!(c1s.previous_digest, None);
4022        assert_eq!(c1s.sequence_number, 0);
4023        assert_eq!(
4024            c1s.epoch_rolling_gas_cost_summary,
4025            GasCostSummary::new(41, 42, 41, 1)
4026        );
4027
4028        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4029        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4030        assert_eq!(c2s.sequence_number, 1);
4031        assert_eq!(
4032            c2s.epoch_rolling_gas_cost_summary,
4033            GasCostSummary::new(104, 108, 104, 4)
4034        );
4035
4036        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
4037        // Verify that we split into 2 checkpoints.
4038        let (c3c, c3s) = result.recv().await.unwrap();
4039        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4040        let (c4c, c4s) = result.recv().await.unwrap();
4041        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4042        assert_eq!(c3s.sequence_number, 2);
4043        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4044        assert_eq!(c4s.sequence_number, 3);
4045        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4046        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4047        assert_eq!(c4t, vec![d(13)]);
4048
4049        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
4050        // Verify that we split into 2 checkpoints.
4051        let (c5c, c5s) = result.recv().await.unwrap();
4052        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4053        let (c6c, c6s) = result.recv().await.unwrap();
4054        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055        assert_eq!(c5s.sequence_number, 4);
4056        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4057        assert_eq!(c6s.sequence_number, 5);
4058        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4059        assert_eq!(c5t, vec![d(15), d(16)]);
4060        assert_eq!(c6t, vec![d(17)]);
4061
4062        // Pending at index 4 was too soon after the prior one and should be coalesced into
4063        // the next one.
4064        let (c7c, c7s) = result.recv().await.unwrap();
4065        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4066        assert_eq!(c7t, vec![d(5), d(6)]);
4067        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4068        assert_eq!(c7s.sequence_number, 6);
4069
4070        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4071        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4072
4073        checkpoint_service
4074            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4075            .unwrap();
4076        checkpoint_service
4077            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4078            .unwrap();
4079
4080        let c1sc = certified_result.recv().await.unwrap();
4081        let c2sc = certified_result.recv().await.unwrap();
4082        assert_eq!(c1sc.sequence_number, 0);
4083        assert_eq!(c2sc.sequence_number, 1);
4084    }
4085
4086    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4087        fn notify_read_executed_effects_may_fail(
4088            &self,
4089            _: &str,
4090            digests: &[TransactionDigest],
4091        ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4092            std::future::ready(Ok(digests
4093                .iter()
4094                .map(|d| self.get(d).expect("effects not found").clone())
4095                .collect()))
4096            .boxed()
4097        }
4098
4099        fn notify_read_executed_effects_digests(
4100            &self,
4101            _: &str,
4102            digests: &[TransactionDigest],
4103        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4104            std::future::ready(
4105                digests
4106                    .iter()
4107                    .map(|d| {
4108                        self.get(d)
4109                            .map(|fx| fx.digest())
4110                            .expect("effects not found")
4111                    })
4112                    .collect(),
4113            )
4114            .boxed()
4115        }
4116
4117        fn multi_get_executed_effects(
4118            &self,
4119            digests: &[TransactionDigest],
4120        ) -> Vec<Option<TransactionEffects>> {
4121            digests.iter().map(|d| self.get(d).cloned()).collect()
4122        }
4123
4124        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
4125        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
4126        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
4127        // ExecutionCacheRead implementors).
4128
4129        fn multi_get_transaction_blocks(
4130            &self,
4131            _: &[TransactionDigest],
4132        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4133            unimplemented!()
4134        }
4135
4136        fn multi_get_executed_effects_digests(
4137            &self,
4138            _: &[TransactionDigest],
4139        ) -> Vec<Option<TransactionEffectsDigest>> {
4140            unimplemented!()
4141        }
4142
4143        fn multi_get_effects(
4144            &self,
4145            _: &[TransactionEffectsDigest],
4146        ) -> Vec<Option<TransactionEffects>> {
4147            unimplemented!()
4148        }
4149
4150        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4151            unimplemented!()
4152        }
4153
4154        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4155            unimplemented!()
4156        }
4157
4158        fn get_unchanged_loaded_runtime_objects(
4159            &self,
4160            _digest: &TransactionDigest,
4161        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4162            unimplemented!()
4163        }
4164
4165        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4166            unimplemented!()
4167        }
4168    }
4169
4170    #[async_trait::async_trait]
4171    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4172        async fn checkpoint_created(
4173            &self,
4174            summary: &CheckpointSummary,
4175            contents: &CheckpointContents,
4176            _epoch_store: &Arc<AuthorityPerEpochStore>,
4177            _checkpoint_store: &Arc<CheckpointStore>,
4178        ) -> SuiResult {
4179            self.try_send((contents.clone(), summary.clone())).unwrap();
4180            Ok(())
4181        }
4182    }
4183
4184    #[async_trait::async_trait]
4185    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4186        async fn certified_checkpoint_created(
4187            &self,
4188            summary: &CertifiedCheckpointSummary,
4189        ) -> SuiResult {
4190            self.try_send(summary.clone()).unwrap();
4191            Ok(())
4192        }
4193    }
4194
4195    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4196        PendingCheckpoint {
4197            roots: t
4198                .into_iter()
4199                .map(|t| TransactionKey::Digest(d(t)))
4200                .collect(),
4201            details: PendingCheckpointInfo {
4202                timestamp_ms,
4203                last_of_epoch: false,
4204                checkpoint_height: i,
4205                consensus_commit_ref: CommitRef::default(),
4206                rejected_transactions_digest: Digest::default(),
4207                checkpoint_seq: None,
4208            },
4209        }
4210    }
4211
4212    fn d(i: u8) -> TransactionDigest {
4213        let mut bytes: [u8; 32] = Default::default();
4214        bytes[0] = i;
4215        TransactionDigest::new(bytes)
4216    }
4217
4218    fn e(
4219        transaction_digest: TransactionDigest,
4220        dependencies: Vec<TransactionDigest>,
4221        gas_used: GasCostSummary,
4222    ) -> TransactionEffects {
4223        let mut effects = TransactionEffects::default();
4224        *effects.transaction_digest_mut_for_testing() = transaction_digest;
4225        *effects.dependencies_mut_for_testing() = dependencies;
4226        *effects.gas_cost_summary_mut_for_testing() = gas_used;
4227        effects
4228    }
4229
4230    fn commit_cert_for_test(
4231        store: &mut HashMap<TransactionDigest, TransactionEffects>,
4232        state: Arc<AuthorityState>,
4233        digest: TransactionDigest,
4234        dependencies: Vec<TransactionDigest>,
4235        gas_used: GasCostSummary,
4236    ) {
4237        let epoch_store = state.epoch_store_for_testing();
4238        let effects = e(digest, dependencies, gas_used);
4239        store.insert(digest, effects.clone());
4240        epoch_store.insert_executed_in_epoch(&digest);
4241    }
4242}