sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120    pub roots: Vec<TransactionKey>,
121    pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Default)]
125pub struct CheckpointRoots {
126    pub tx_roots: Vec<TransactionKey>,
127    pub settlement_root: Option<TransactionKey>,
128    pub height: CheckpointHeight,
129}
130
131/// PendingCheckpointV2 is merged and split in ConsensusHandler
132/// instead of CheckpointBuilder. It contains 1 or more
133/// CheckpointRoots which represents a group of transactions
134/// settled together.
135/// Usage of PendingCheckpointV2 is gated by split_checkpoints_in_consensus_handler
136/// We need to support dual implementations until this feature flag is removed.
137#[derive(Clone, Debug)]
138pub struct PendingCheckpointV2 {
139    pub roots: Vec<CheckpointRoots>,
140    pub details: PendingCheckpointInfo,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize)]
144pub struct BuilderCheckpointSummary {
145    pub summary: CheckpointSummary,
146    // Height at which this checkpoint summary was built. None for genesis checkpoint
147    pub checkpoint_height: Option<CheckpointHeight>,
148    pub position_in_commit: usize,
149}
150
151#[derive(DBMapUtils)]
152#[cfg_attr(tidehunter, tidehunter)]
153pub struct CheckpointStoreTables {
154    /// Maps checkpoint contents digest to checkpoint contents
155    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
156
157    /// Maps checkpoint contents digest to checkpoint sequence number
158    pub(crate) checkpoint_sequence_by_contents_digest:
159        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
160
161    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
162    /// efficient reads of full checkpoints. Entries from this table are deleted after state
163    /// accumulation has completed.
164    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
165    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
166    // deprecate this table (and remove when possible).
167    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
168
169    /// Stores certified checkpoints
170    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
171    /// Map from checkpoint digest to certified checkpoint
172    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
173
174    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
175    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
176    /// certified checkpoint.
177    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
178
179    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
180    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
181
182    /// Watermarks used to determine the highest verified, fully synced, and
183    /// fully executed checkpoints
184    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
185
186    /// Stores transaction fork detection information
187    pub(crate) transaction_fork_detected: DBMap<
188        u8,
189        (
190            TransactionDigest,
191            TransactionEffectsDigest,
192            TransactionEffectsDigest,
193        ),
194    >,
195    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
196    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
197}
198
199fn full_checkpoint_content_table_default_config() -> DBOptions {
200    DBOptions {
201        options: default_db_options().options,
202        // We have seen potential data corruption issues in this table after forced shutdowns
203        // so we enable value hash logging to help with debugging.
204        // TODO: remove this once we have a better understanding of the root cause.
205        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
206    }
207}
208
209impl CheckpointStoreTables {
210    #[cfg(not(tidehunter))]
211    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
212        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
213    }
214
215    #[cfg(tidehunter)]
216    pub fn new(
217        path: &Path,
218        metric_name: &'static str,
219        pruner_watermarks: Arc<PrunerWatermarks>,
220    ) -> Self {
221        tracing::warn!("Checkpoint DB using tidehunter");
222        use crate::authority::authority_store_pruner::apply_relocation_filter;
223        use typed_store::tidehunter_util::{
224            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
225            default_mutex_count, default_value_cache_size,
226        };
227        let mutexes = default_mutex_count() * 4;
228        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
229        let override_dirty_keys_config = KeySpaceConfig::new()
230            .with_max_dirty_keys(64_000)
231            .with_value_cache_size(default_value_cache_size());
232        let config_u64 = ThConfig::new_with_config(
233            8,
234            mutexes,
235            u64_sequence_key,
236            override_dirty_keys_config.clone(),
237        );
238        let digest_config = ThConfig::new_with_rm_prefix(
239            32,
240            mutexes,
241            KeyType::uniform(default_cells_per_mutex()),
242            KeySpaceConfig::default(),
243            vec![0, 0, 0, 0, 0, 0, 0, 32],
244        );
245        let watermarks_config = KeySpaceConfig::new()
246            .with_value_cache_size(10)
247            .disable_unload();
248        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
249        let configs = vec![
250            (
251                "checkpoint_content",
252                digest_config.clone().with_config(
253                    lru_config
254                        .clone()
255                        .with_relocation_filter(|_, _| Decision::Remove),
256                ),
257            ),
258            (
259                "checkpoint_sequence_by_contents_digest",
260                digest_config.clone().with_config(apply_relocation_filter(
261                    KeySpaceConfig::default(),
262                    pruner_watermarks.checkpoint_id.clone(),
263                    |sequence_number: CheckpointSequenceNumber| sequence_number,
264                    false,
265                )),
266            ),
267            (
268                "full_checkpoint_content",
269                config_u64.clone().with_config(apply_relocation_filter(
270                    override_dirty_keys_config.clone(),
271                    pruner_watermarks.checkpoint_id.clone(),
272                    |sequence_number: CheckpointSequenceNumber| sequence_number,
273                    true,
274                )),
275            ),
276            ("certified_checkpoints", config_u64.clone()),
277            (
278                "checkpoint_by_digest",
279                digest_config.clone().with_config(apply_relocation_filter(
280                    lru_config,
281                    pruner_watermarks.epoch_id.clone(),
282                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
283                    false,
284                )),
285            ),
286            (
287                "locally_computed_checkpoints",
288                config_u64.clone().with_config(apply_relocation_filter(
289                    override_dirty_keys_config.clone(),
290                    pruner_watermarks.checkpoint_id.clone(),
291                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
292                    true,
293                )),
294            ),
295            ("epoch_last_checkpoint_map", config_u64.clone()),
296            (
297                "watermarks",
298                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
299            ),
300            (
301                "transaction_fork_detected",
302                ThConfig::new_with_config(
303                    1,
304                    1,
305                    KeyType::uniform(1),
306                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
307                ),
308            ),
309            (
310                "full_checkpoint_content_v2",
311                config_u64.clone().with_config(apply_relocation_filter(
312                    override_dirty_keys_config.clone(),
313                    pruner_watermarks.checkpoint_id.clone(),
314                    |sequence_number: CheckpointSequenceNumber| sequence_number,
315                    true,
316                )),
317            ),
318        ];
319        Self::open_tables_read_write(
320            path.to_path_buf(),
321            MetricConf::new(metric_name),
322            configs
323                .into_iter()
324                .map(|(cf, config)| (cf.to_string(), config))
325                .collect(),
326        )
327    }
328
329    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
330        Self::get_read_only_handle(
331            path.to_path_buf(),
332            None,
333            None,
334            MetricConf::new("checkpoint_readonly"),
335        )
336    }
337}
338
339pub struct CheckpointStore {
340    pub(crate) tables: CheckpointStoreTables,
341    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
342    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
343}
344
345impl CheckpointStore {
346    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
347        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
348        Arc::new(Self {
349            tables,
350            synced_checkpoint_notify_read: NotifyRead::new(),
351            executed_checkpoint_notify_read: NotifyRead::new(),
352        })
353    }
354
355    pub fn new_for_tests() -> Arc<Self> {
356        let ckpt_dir = mysten_common::tempdir().unwrap();
357        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
358    }
359
360    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
361        let tables = CheckpointStoreTables::new(
362            path,
363            "db_checkpoint",
364            Arc::new(PrunerWatermarks::default()),
365        );
366        Arc::new(Self {
367            tables,
368            synced_checkpoint_notify_read: NotifyRead::new(),
369            executed_checkpoint_notify_read: NotifyRead::new(),
370        })
371    }
372
373    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
374        CheckpointStoreTables::open_readonly(path)
375    }
376
377    #[instrument(level = "info", skip_all)]
378    pub fn insert_genesis_checkpoint(
379        &self,
380        checkpoint: VerifiedCheckpoint,
381        contents: CheckpointContents,
382        epoch_store: &AuthorityPerEpochStore,
383    ) {
384        assert_eq!(
385            checkpoint.epoch(),
386            0,
387            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
388        );
389        assert_eq!(
390            *checkpoint.sequence_number(),
391            0,
392            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
393        );
394
395        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
396        match self.get_checkpoint_by_sequence_number(0).unwrap() {
397            Some(existing_checkpoint) => {
398                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
399            }
400            None => {
401                if epoch_store.epoch() == checkpoint.epoch {
402                    epoch_store
403                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
404                        .unwrap();
405                } else {
406                    debug!(
407                        validator_epoch =% epoch_store.epoch(),
408                        genesis_epoch =% checkpoint.epoch(),
409                        "Not inserting checkpoint builder data for genesis checkpoint",
410                    );
411                }
412                self.insert_checkpoint_contents(contents).unwrap();
413                self.insert_verified_checkpoint(&checkpoint).unwrap();
414                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
415            }
416        }
417    }
418
419    pub fn get_checkpoint_by_digest(
420        &self,
421        digest: &CheckpointDigest,
422    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
423        self.tables
424            .checkpoint_by_digest
425            .get(digest)
426            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
427    }
428
429    pub fn get_checkpoint_by_sequence_number(
430        &self,
431        sequence_number: CheckpointSequenceNumber,
432    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433        self.tables
434            .certified_checkpoints
435            .get(&sequence_number)
436            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
437    }
438
439    pub fn get_locally_computed_checkpoint(
440        &self,
441        sequence_number: CheckpointSequenceNumber,
442    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
443        self.tables
444            .locally_computed_checkpoints
445            .get(&sequence_number)
446    }
447
448    pub fn multi_get_locally_computed_checkpoints(
449        &self,
450        sequence_numbers: &[CheckpointSequenceNumber],
451    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
452        let checkpoints = self
453            .tables
454            .locally_computed_checkpoints
455            .multi_get(sequence_numbers)?;
456
457        Ok(checkpoints)
458    }
459
460    pub fn get_sequence_number_by_contents_digest(
461        &self,
462        digest: &CheckpointContentsDigest,
463    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
464        self.tables
465            .checkpoint_sequence_by_contents_digest
466            .get(digest)
467    }
468
469    pub fn delete_contents_digest_sequence_number_mapping(
470        &self,
471        digest: &CheckpointContentsDigest,
472    ) -> Result<(), TypedStoreError> {
473        self.tables
474            .checkpoint_sequence_by_contents_digest
475            .remove(digest)
476    }
477
478    pub fn get_latest_certified_checkpoint(
479        &self,
480    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
481        Ok(self
482            .tables
483            .certified_checkpoints
484            .reversed_safe_iter_with_bounds(None, None)?
485            .next()
486            .transpose()?
487            .map(|(_, v)| v.into()))
488    }
489
490    pub fn get_latest_locally_computed_checkpoint(
491        &self,
492    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
493        Ok(self
494            .tables
495            .locally_computed_checkpoints
496            .reversed_safe_iter_with_bounds(None, None)?
497            .next()
498            .transpose()?
499            .map(|(_, v)| v))
500    }
501
502    pub fn multi_get_checkpoint_by_sequence_number(
503        &self,
504        sequence_numbers: &[CheckpointSequenceNumber],
505    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
506        let checkpoints = self
507            .tables
508            .certified_checkpoints
509            .multi_get(sequence_numbers)?
510            .into_iter()
511            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
512            .collect();
513
514        Ok(checkpoints)
515    }
516
517    pub fn multi_get_checkpoint_content(
518        &self,
519        contents_digest: &[CheckpointContentsDigest],
520    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
521        self.tables.checkpoint_content.multi_get(contents_digest)
522    }
523
524    pub fn get_highest_verified_checkpoint(
525        &self,
526    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
527        let highest_verified = if let Some(highest_verified) = self
528            .tables
529            .watermarks
530            .get(&CheckpointWatermark::HighestVerified)?
531        {
532            highest_verified
533        } else {
534            return Ok(None);
535        };
536        self.get_checkpoint_by_digest(&highest_verified.1)
537    }
538
539    pub fn get_highest_synced_checkpoint(
540        &self,
541    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542        let highest_synced = if let Some(highest_synced) = self
543            .tables
544            .watermarks
545            .get(&CheckpointWatermark::HighestSynced)?
546        {
547            highest_synced
548        } else {
549            return Ok(None);
550        };
551        self.get_checkpoint_by_digest(&highest_synced.1)
552    }
553
554    pub fn get_highest_synced_checkpoint_seq_number(
555        &self,
556    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
557        if let Some(highest_synced) = self
558            .tables
559            .watermarks
560            .get(&CheckpointWatermark::HighestSynced)?
561        {
562            Ok(Some(highest_synced.0))
563        } else {
564            Ok(None)
565        }
566    }
567
568    pub fn get_highest_executed_checkpoint_seq_number(
569        &self,
570    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571        if let Some(highest_executed) = self
572            .tables
573            .watermarks
574            .get(&CheckpointWatermark::HighestExecuted)?
575        {
576            Ok(Some(highest_executed.0))
577        } else {
578            Ok(None)
579        }
580    }
581
582    pub fn get_highest_executed_checkpoint(
583        &self,
584    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
585        let highest_executed = if let Some(highest_executed) = self
586            .tables
587            .watermarks
588            .get(&CheckpointWatermark::HighestExecuted)?
589        {
590            highest_executed
591        } else {
592            return Ok(None);
593        };
594        self.get_checkpoint_by_digest(&highest_executed.1)
595    }
596
597    pub fn get_highest_pruned_checkpoint_seq_number(
598        &self,
599    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
600        self.tables
601            .watermarks
602            .get(&CheckpointWatermark::HighestPruned)
603            .map(|watermark| watermark.map(|w| w.0))
604    }
605
606    pub fn get_checkpoint_contents(
607        &self,
608        digest: &CheckpointContentsDigest,
609    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
610        self.tables.checkpoint_content.get(digest)
611    }
612
613    pub fn get_full_checkpoint_contents_by_sequence_number(
614        &self,
615        seq: CheckpointSequenceNumber,
616    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
617        self.tables.full_checkpoint_content_v2.get(&seq)
618    }
619
620    fn prune_local_summaries(&self) -> SuiResult {
621        if let Some((last_local_summary, _)) = self
622            .tables
623            .locally_computed_checkpoints
624            .reversed_safe_iter_with_bounds(None, None)?
625            .next()
626            .transpose()?
627        {
628            let mut batch = self.tables.locally_computed_checkpoints.batch();
629            batch.schedule_delete_range(
630                &self.tables.locally_computed_checkpoints,
631                &0,
632                &last_local_summary,
633            )?;
634            batch.write()?;
635            info!("Pruned local summaries up to {:?}", last_local_summary);
636        }
637        Ok(())
638    }
639
640    pub fn clear_locally_computed_checkpoints_from(
641        &self,
642        from_seq: CheckpointSequenceNumber,
643    ) -> SuiResult {
644        let keys: Vec<_> = self
645            .tables
646            .locally_computed_checkpoints
647            .safe_iter_with_bounds(Some(from_seq), None)
648            .map(|r| r.map(|(k, _)| k))
649            .collect::<Result<_, _>>()?;
650        if let Some(&last_local_summary) = keys.last() {
651            let mut batch = self.tables.locally_computed_checkpoints.batch();
652            batch
653                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
654                .expect("Failed to delete locally computed checkpoints");
655            batch
656                .write()
657                .expect("Failed to delete locally computed checkpoints");
658            warn!(
659                from_seq,
660                last_local_summary,
661                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
662                from_seq,
663                last_local_summary
664            );
665        }
666        Ok(())
667    }
668
669    fn check_for_checkpoint_fork(
670        &self,
671        local_checkpoint: &CheckpointSummary,
672        verified_checkpoint: &VerifiedCheckpoint,
673    ) {
674        if local_checkpoint != verified_checkpoint.data() {
675            let verified_contents = self
676                .get_checkpoint_contents(&verified_checkpoint.content_digest)
677                .map(|opt_contents| {
678                    opt_contents
679                        .map(|contents| format!("{:?}", contents))
680                        .unwrap_or_else(|| {
681                            format!(
682                                "Verified checkpoint contents not found, digest: {:?}",
683                                verified_checkpoint.content_digest,
684                            )
685                        })
686                })
687                .map_err(|e| {
688                    format!(
689                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
690                        verified_checkpoint.content_digest, e
691                    )
692                })
693                .unwrap_or_else(|err_msg| err_msg);
694
695            let local_contents = self
696                .get_checkpoint_contents(&local_checkpoint.content_digest)
697                .map(|opt_contents| {
698                    opt_contents
699                        .map(|contents| format!("{:?}", contents))
700                        .unwrap_or_else(|| {
701                            format!(
702                                "Local checkpoint contents not found, digest: {:?}",
703                                local_checkpoint.content_digest
704                            )
705                        })
706                })
707                .map_err(|e| {
708                    format!(
709                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
710                        local_checkpoint.content_digest, e
711                    )
712                })
713                .unwrap_or_else(|err_msg| err_msg);
714
715            // checkpoint contents may be too large for panic message.
716            error!(
717                verified_checkpoint = ?verified_checkpoint.data(),
718                ?verified_contents,
719                ?local_checkpoint,
720                ?local_contents,
721                "Local checkpoint fork detected!",
722            );
723
724            // Record the fork in the database before crashing
725            if let Err(e) = self.record_checkpoint_fork_detected(
726                *local_checkpoint.sequence_number(),
727                local_checkpoint.digest(),
728            ) {
729                error!("Failed to record checkpoint fork in database: {:?}", e);
730            }
731
732            fail_point_arg!(
733                "kill_checkpoint_fork_node",
734                |checkpoint_overrides: std::sync::Arc<
735                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
736                >| {
737                    #[cfg(msim)]
738                    {
739                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
740                            overrides.insert(
741                                local_checkpoint.sequence_number,
742                                verified_checkpoint.digest().to_string(),
743                            );
744                        }
745                        tracing::error!(
746                            fatal = true,
747                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
748                            local_checkpoint.sequence_number(),
749                            verified_checkpoint.digest()
750                        );
751                        sui_simulator::task::shutdown_current_node();
752                    }
753                }
754            );
755
756            fatal!(
757                "Local checkpoint fork detected for sequence number: {}",
758                local_checkpoint.sequence_number()
759            );
760        }
761    }
762
763    // Called by consensus (ConsensusAggregator).
764    // Different from `insert_verified_checkpoint`, it does not touch
765    // the highest_verified_checkpoint watermark such that state sync
766    // will have a chance to process this checkpoint and perform some
767    // state-sync only things.
768    pub fn insert_certified_checkpoint(
769        &self,
770        checkpoint: &VerifiedCheckpoint,
771    ) -> Result<(), TypedStoreError> {
772        debug!(
773            checkpoint_seq = checkpoint.sequence_number(),
774            "Inserting certified checkpoint",
775        );
776        let mut batch = self.tables.certified_checkpoints.batch();
777        batch
778            .insert_batch(
779                &self.tables.certified_checkpoints,
780                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
781            )?
782            .insert_batch(
783                &self.tables.checkpoint_by_digest,
784                [(checkpoint.digest(), checkpoint.serializable_ref())],
785            )?;
786        if checkpoint.next_epoch_committee().is_some() {
787            batch.insert_batch(
788                &self.tables.epoch_last_checkpoint_map,
789                [(&checkpoint.epoch(), checkpoint.sequence_number())],
790            )?;
791        }
792        batch.write()?;
793
794        if let Some(local_checkpoint) = self
795            .tables
796            .locally_computed_checkpoints
797            .get(checkpoint.sequence_number())?
798        {
799            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
800        }
801
802        Ok(())
803    }
804
805    // Called by state sync, apart from inserting the checkpoint and updating
806    // related tables, it also bumps the highest_verified_checkpoint watermark.
807    #[instrument(level = "debug", skip_all)]
808    pub fn insert_verified_checkpoint(
809        &self,
810        checkpoint: &VerifiedCheckpoint,
811    ) -> Result<(), TypedStoreError> {
812        self.insert_certified_checkpoint(checkpoint)?;
813        self.update_highest_verified_checkpoint(checkpoint)
814    }
815
816    pub fn update_highest_verified_checkpoint(
817        &self,
818        checkpoint: &VerifiedCheckpoint,
819    ) -> Result<(), TypedStoreError> {
820        if Some(*checkpoint.sequence_number())
821            > self
822                .get_highest_verified_checkpoint()?
823                .map(|x| *x.sequence_number())
824        {
825            debug!(
826                checkpoint_seq = checkpoint.sequence_number(),
827                "Updating highest verified checkpoint",
828            );
829            self.tables.watermarks.insert(
830                &CheckpointWatermark::HighestVerified,
831                &(*checkpoint.sequence_number(), *checkpoint.digest()),
832            )?;
833        }
834
835        Ok(())
836    }
837
838    pub fn update_highest_synced_checkpoint(
839        &self,
840        checkpoint: &VerifiedCheckpoint,
841    ) -> Result<(), TypedStoreError> {
842        let seq = *checkpoint.sequence_number();
843        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
844        self.tables.watermarks.insert(
845            &CheckpointWatermark::HighestSynced,
846            &(seq, *checkpoint.digest()),
847        )?;
848        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
849        Ok(())
850    }
851
852    async fn notify_read_checkpoint_watermark<F>(
853        &self,
854        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
855        seq: CheckpointSequenceNumber,
856        get_watermark: F,
857    ) -> VerifiedCheckpoint
858    where
859        F: Fn() -> Option<CheckpointSequenceNumber>,
860    {
861        notify_read
862            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
863                let seq = seqs[0];
864                let Some(highest) = get_watermark() else {
865                    return vec![None];
866                };
867                if highest < seq {
868                    return vec![None];
869                }
870                let checkpoint = self
871                    .get_checkpoint_by_sequence_number(seq)
872                    .expect("db error")
873                    .expect("checkpoint not found");
874                vec![Some(checkpoint)]
875            })
876            .await
877            .into_iter()
878            .next()
879            .unwrap()
880    }
881
882    pub async fn notify_read_synced_checkpoint(
883        &self,
884        seq: CheckpointSequenceNumber,
885    ) -> VerifiedCheckpoint {
886        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
887            self.get_highest_synced_checkpoint_seq_number()
888                .expect("db error")
889        })
890        .await
891    }
892
893    pub async fn notify_read_executed_checkpoint(
894        &self,
895        seq: CheckpointSequenceNumber,
896    ) -> VerifiedCheckpoint {
897        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
898            self.get_highest_executed_checkpoint_seq_number()
899                .expect("db error")
900        })
901        .await
902    }
903
904    pub fn update_highest_executed_checkpoint(
905        &self,
906        checkpoint: &VerifiedCheckpoint,
907    ) -> Result<(), TypedStoreError> {
908        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
909            if seq_number >= *checkpoint.sequence_number() {
910                return Ok(());
911            }
912            assert_eq!(
913                seq_number + 1,
914                *checkpoint.sequence_number(),
915                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
916                checkpoint.sequence_number(),
917                seq_number
918            );
919        }
920        let seq = *checkpoint.sequence_number();
921        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
922        self.tables.watermarks.insert(
923            &CheckpointWatermark::HighestExecuted,
924            &(seq, *checkpoint.digest()),
925        )?;
926        self.executed_checkpoint_notify_read
927            .notify(&seq, checkpoint);
928        Ok(())
929    }
930
931    pub fn update_highest_pruned_checkpoint(
932        &self,
933        checkpoint: &VerifiedCheckpoint,
934    ) -> Result<(), TypedStoreError> {
935        self.tables.watermarks.insert(
936            &CheckpointWatermark::HighestPruned,
937            &(*checkpoint.sequence_number(), *checkpoint.digest()),
938        )
939    }
940
941    /// Sets highest executed checkpoint to any value.
942    ///
943    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
944    /// It should only be used in one-off cases or tests after fully understanding the risk.
945    pub fn set_highest_executed_checkpoint_subtle(
946        &self,
947        checkpoint: &VerifiedCheckpoint,
948    ) -> Result<(), TypedStoreError> {
949        self.tables.watermarks.insert(
950            &CheckpointWatermark::HighestExecuted,
951            &(*checkpoint.sequence_number(), *checkpoint.digest()),
952        )
953    }
954
955    pub fn insert_checkpoint_contents(
956        &self,
957        contents: CheckpointContents,
958    ) -> Result<(), TypedStoreError> {
959        debug!(
960            checkpoint_seq = ?contents.digest(),
961            "Inserting checkpoint contents",
962        );
963        self.tables
964            .checkpoint_content
965            .insert(contents.digest(), &contents)
966    }
967
968    pub fn insert_verified_checkpoint_contents(
969        &self,
970        checkpoint: &VerifiedCheckpoint,
971        full_contents: VerifiedCheckpointContents,
972    ) -> Result<(), TypedStoreError> {
973        let mut batch = self.tables.full_checkpoint_content_v2.batch();
974        batch.insert_batch(
975            &self.tables.checkpoint_sequence_by_contents_digest,
976            [(&checkpoint.content_digest, checkpoint.sequence_number())],
977        )?;
978        let full_contents = full_contents.into_inner();
979        batch.insert_batch(
980            &self.tables.full_checkpoint_content_v2,
981            [(checkpoint.sequence_number(), &full_contents)],
982        )?;
983
984        let contents = full_contents.into_checkpoint_contents();
985        assert_eq!(&checkpoint.content_digest, contents.digest());
986
987        batch.insert_batch(
988            &self.tables.checkpoint_content,
989            [(contents.digest(), &contents)],
990        )?;
991
992        batch.write()
993    }
994
995    pub fn delete_full_checkpoint_contents(
996        &self,
997        seq: CheckpointSequenceNumber,
998    ) -> Result<(), TypedStoreError> {
999        self.tables.full_checkpoint_content.remove(&seq)?;
1000        self.tables.full_checkpoint_content_v2.remove(&seq)
1001    }
1002
1003    pub fn get_epoch_last_checkpoint(
1004        &self,
1005        epoch_id: EpochId,
1006    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1007        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1008        let checkpoint = match seq {
1009            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1010            None => None,
1011        };
1012        Ok(checkpoint)
1013    }
1014
1015    pub fn get_epoch_last_checkpoint_seq_number(
1016        &self,
1017        epoch_id: EpochId,
1018    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1019        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1020        Ok(seq)
1021    }
1022
1023    pub fn insert_epoch_last_checkpoint(
1024        &self,
1025        epoch_id: EpochId,
1026        checkpoint: &VerifiedCheckpoint,
1027    ) -> SuiResult {
1028        self.tables
1029            .epoch_last_checkpoint_map
1030            .insert(&epoch_id, checkpoint.sequence_number())?;
1031        Ok(())
1032    }
1033
1034    pub fn get_epoch_state_commitments(
1035        &self,
1036        epoch: EpochId,
1037    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1038        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1039            checkpoint
1040                .end_of_epoch_data
1041                .as_ref()
1042                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1043                .epoch_commitments
1044                .clone()
1045        });
1046        Ok(commitments)
1047    }
1048
1049    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1050    pub fn get_epoch_stats(
1051        &self,
1052        epoch: EpochId,
1053        last_checkpoint: &CheckpointSummary,
1054    ) -> Option<EpochStats> {
1055        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1056            (0, 0)
1057        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1058            (
1059                checkpoint.sequence_number + 1,
1060                checkpoint.network_total_transactions,
1061            )
1062        } else {
1063            return None;
1064        };
1065        Some(EpochStats {
1066            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1067            transaction_count: last_checkpoint.network_total_transactions
1068                - prev_epoch_network_transactions,
1069            total_gas_reward: last_checkpoint
1070                .epoch_rolling_gas_cost_summary
1071                .computation_cost,
1072        })
1073    }
1074
1075    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1076        // This checkpoints the entire db and not one column family
1077        self.tables
1078            .checkpoint_content
1079            .checkpoint_db(path)
1080            .map_err(Into::into)
1081    }
1082
1083    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1084        let mut wb = self.tables.watermarks.batch();
1085        wb.delete_batch(
1086            &self.tables.watermarks,
1087            std::iter::once(CheckpointWatermark::HighestExecuted),
1088        )?;
1089        wb.write()?;
1090        Ok(())
1091    }
1092
1093    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1094        self.delete_highest_executed_checkpoint_test_only()?;
1095        Ok(())
1096    }
1097
1098    pub fn record_checkpoint_fork_detected(
1099        &self,
1100        checkpoint_seq: CheckpointSequenceNumber,
1101        checkpoint_digest: CheckpointDigest,
1102    ) -> Result<(), TypedStoreError> {
1103        info!(
1104            checkpoint_seq = checkpoint_seq,
1105            checkpoint_digest = ?checkpoint_digest,
1106            "Recording checkpoint fork detection in database"
1107        );
1108        self.tables.watermarks.insert(
1109            &CheckpointWatermark::CheckpointForkDetected,
1110            &(checkpoint_seq, checkpoint_digest),
1111        )
1112    }
1113
1114    pub fn get_checkpoint_fork_detected(
1115        &self,
1116    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1117        self.tables
1118            .watermarks
1119            .get(&CheckpointWatermark::CheckpointForkDetected)
1120    }
1121
1122    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1123        self.tables
1124            .watermarks
1125            .remove(&CheckpointWatermark::CheckpointForkDetected)
1126    }
1127
1128    pub fn record_transaction_fork_detected(
1129        &self,
1130        tx_digest: TransactionDigest,
1131        expected_effects_digest: TransactionEffectsDigest,
1132        actual_effects_digest: TransactionEffectsDigest,
1133    ) -> Result<(), TypedStoreError> {
1134        info!(
1135            tx_digest = ?tx_digest,
1136            expected_effects_digest = ?expected_effects_digest,
1137            actual_effects_digest = ?actual_effects_digest,
1138            "Recording transaction fork detection in database"
1139        );
1140        self.tables.transaction_fork_detected.insert(
1141            &TRANSACTION_FORK_DETECTED_KEY,
1142            &(tx_digest, expected_effects_digest, actual_effects_digest),
1143        )
1144    }
1145
1146    pub fn get_transaction_fork_detected(
1147        &self,
1148    ) -> Result<
1149        Option<(
1150            TransactionDigest,
1151            TransactionEffectsDigest,
1152            TransactionEffectsDigest,
1153        )>,
1154        TypedStoreError,
1155    > {
1156        self.tables
1157            .transaction_fork_detected
1158            .get(&TRANSACTION_FORK_DETECTED_KEY)
1159    }
1160
1161    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1162        self.tables
1163            .transaction_fork_detected
1164            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1165    }
1166}
1167
1168#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1169pub enum CheckpointWatermark {
1170    HighestVerified,
1171    HighestSynced,
1172    HighestExecuted,
1173    HighestPruned,
1174    CheckpointForkDetected,
1175}
1176
1177struct CheckpointStateHasher {
1178    epoch_store: Arc<AuthorityPerEpochStore>,
1179    hasher: Weak<GlobalStateHasher>,
1180    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1181}
1182
1183impl CheckpointStateHasher {
1184    fn new(
1185        epoch_store: Arc<AuthorityPerEpochStore>,
1186        hasher: Weak<GlobalStateHasher>,
1187        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1188    ) -> Self {
1189        Self {
1190            epoch_store,
1191            hasher,
1192            receive_from_builder,
1193        }
1194    }
1195
1196    async fn run(self) {
1197        let Self {
1198            epoch_store,
1199            hasher,
1200            mut receive_from_builder,
1201        } = self;
1202        while let Some((seq, effects)) = receive_from_builder.recv().await {
1203            let Some(hasher) = hasher.upgrade() else {
1204                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1205                break;
1206            };
1207            hasher
1208                .accumulate_checkpoint(&effects, seq, &epoch_store)
1209                .expect("epoch ended while accumulating checkpoint");
1210        }
1211    }
1212}
1213
1214#[derive(Debug)]
1215pub enum CheckpointBuilderError {
1216    ChangeEpochTxAlreadyExecuted,
1217    SystemPackagesMissing,
1218    Retry(anyhow::Error),
1219}
1220
1221impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1222    for CheckpointBuilderError
1223{
1224    fn from(e: SuiError) -> Self {
1225        Self::Retry(e.into())
1226    }
1227}
1228
1229pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1230
1231pub struct CheckpointBuilder {
1232    state: Arc<AuthorityState>,
1233    store: Arc<CheckpointStore>,
1234    epoch_store: Arc<AuthorityPerEpochStore>,
1235    notify: Arc<Notify>,
1236    notify_aggregator: Arc<Notify>,
1237    last_built: watch::Sender<CheckpointSequenceNumber>,
1238    effects_store: Arc<dyn TransactionCacheRead>,
1239    global_state_hasher: Weak<GlobalStateHasher>,
1240    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1241    output: Box<dyn CheckpointOutput>,
1242    metrics: Arc<CheckpointMetrics>,
1243    max_transactions_per_checkpoint: usize,
1244    max_checkpoint_size_bytes: usize,
1245}
1246
1247pub struct CheckpointAggregator {
1248    store: Arc<CheckpointStore>,
1249    epoch_store: Arc<AuthorityPerEpochStore>,
1250    notify: Arc<Notify>,
1251    current: Option<CheckpointSignatureAggregator>,
1252    output: Box<dyn CertifiedCheckpointOutput>,
1253    state: Arc<AuthorityState>,
1254    metrics: Arc<CheckpointMetrics>,
1255}
1256
1257// This holds information to aggregate signatures for one checkpoint
1258pub struct CheckpointSignatureAggregator {
1259    next_index: u64,
1260    summary: CheckpointSummary,
1261    digest: CheckpointDigest,
1262    /// Aggregates voting stake for each signed checkpoint proposal by authority
1263    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1264    store: Arc<CheckpointStore>,
1265    state: Arc<AuthorityState>,
1266    metrics: Arc<CheckpointMetrics>,
1267}
1268
1269impl CheckpointBuilder {
1270    fn new(
1271        state: Arc<AuthorityState>,
1272        store: Arc<CheckpointStore>,
1273        epoch_store: Arc<AuthorityPerEpochStore>,
1274        notify: Arc<Notify>,
1275        effects_store: Arc<dyn TransactionCacheRead>,
1276        // for synchronous accumulation of end-of-epoch checkpoint
1277        global_state_hasher: Weak<GlobalStateHasher>,
1278        // for asynchronous/concurrent accumulation of regular checkpoints
1279        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1280        output: Box<dyn CheckpointOutput>,
1281        notify_aggregator: Arc<Notify>,
1282        last_built: watch::Sender<CheckpointSequenceNumber>,
1283        metrics: Arc<CheckpointMetrics>,
1284        max_transactions_per_checkpoint: usize,
1285        max_checkpoint_size_bytes: usize,
1286    ) -> Self {
1287        Self {
1288            state,
1289            store,
1290            epoch_store,
1291            notify,
1292            effects_store,
1293            global_state_hasher,
1294            send_to_hasher,
1295            output,
1296            notify_aggregator,
1297            last_built,
1298            metrics,
1299            max_transactions_per_checkpoint,
1300            max_checkpoint_size_bytes,
1301        }
1302    }
1303
1304    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1305    /// commits that have been processed before the last restart, if consensus_replay_waiter
1306    /// is supplied. Then it starts building checkpoints in a loop.
1307    ///
1308    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1309    /// if slow recovery of previously built checkpoints is due to consensus replay or
1310    /// checkpoint building.
1311    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1312        if let Some(replay_waiter) = consensus_replay_waiter {
1313            info!("Waiting for consensus commits to replay ...");
1314            replay_waiter.wait_for_replay().await;
1315            info!("Consensus commits finished replaying");
1316        }
1317        info!("Starting CheckpointBuilder");
1318        loop {
1319            match self.maybe_build_checkpoints().await {
1320                Ok(()) => {}
1321                err @ Err(
1322                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1323                    | CheckpointBuilderError::SystemPackagesMissing,
1324                ) => {
1325                    info!("CheckpointBuilder stopping: {:?}", err);
1326                    return;
1327                }
1328                Err(CheckpointBuilderError::Retry(inner)) => {
1329                    let msg = format!("{:?}", inner);
1330                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1331                    tokio::time::sleep(Duration::from_secs(1)).await;
1332                    self.metrics.checkpoint_errors.inc();
1333                    continue;
1334                }
1335            }
1336
1337            self.notify.notified().await;
1338        }
1339    }
1340
1341    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1342        if self
1343            .epoch_store
1344            .protocol_config()
1345            .split_checkpoints_in_consensus_handler()
1346        {
1347            self.maybe_build_checkpoints_v2().await
1348        } else {
1349            self.maybe_build_checkpoints_v1().await
1350        }
1351    }
1352
1353    async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1354        let _scope = monitored_scope("BuildCheckpoints");
1355
1356        // Collect info about the most recently built checkpoint.
1357        let summary = self
1358            .epoch_store
1359            .last_built_checkpoint_builder_summary()
1360            .expect("epoch should not have ended");
1361        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1362        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1363
1364        let min_checkpoint_interval_ms = self
1365            .epoch_store
1366            .protocol_config()
1367            .min_checkpoint_interval_ms_as_option()
1368            .unwrap_or_default();
1369        let mut grouped_pending_checkpoints = Vec::new();
1370        let mut checkpoints_iter = self
1371            .epoch_store
1372            .get_pending_checkpoints(last_height)
1373            .expect("unexpected epoch store error")
1374            .into_iter()
1375            .peekable();
1376        while let Some((height, pending)) = checkpoints_iter.next() {
1377            // Group PendingCheckpoints until:
1378            // - minimum interval has elapsed ...
1379            let current_timestamp = pending.details().timestamp_ms;
1380            let can_build = match last_timestamp {
1381                    Some(last_timestamp) => {
1382                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1383                    }
1384                    None => true,
1385                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1386                //   should be written separately) ...
1387                } || checkpoints_iter
1388                    .peek()
1389                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1390                // - or, we have reached end of epoch.
1391                    || pending.details().last_of_epoch;
1392            grouped_pending_checkpoints.push(pending);
1393            if !can_build {
1394                debug!(
1395                    checkpoint_commit_height = height,
1396                    ?last_timestamp,
1397                    ?current_timestamp,
1398                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1399                );
1400                continue;
1401            }
1402
1403            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1404            last_height = Some(height);
1405            last_timestamp = Some(current_timestamp);
1406            debug!(
1407                checkpoint_commit_height_from = grouped_pending_checkpoints
1408                    .first()
1409                    .unwrap()
1410                    .details()
1411                    .checkpoint_height,
1412                checkpoint_commit_height_to = last_height,
1413                "Making checkpoint with commit height range"
1414            );
1415
1416            let seq = self
1417                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1418                .await?;
1419
1420            self.last_built.send_if_modified(|cur| {
1421                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1422                if seq > *cur {
1423                    *cur = seq;
1424                    true
1425                } else {
1426                    false
1427                }
1428            });
1429
1430            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1431            // execution.
1432            tokio::task::yield_now().await;
1433        }
1434        debug!(
1435            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1436            grouped_pending_checkpoints.len(),
1437        );
1438
1439        Ok(())
1440    }
1441
1442    async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1443        let _scope = monitored_scope("BuildCheckpoints");
1444
1445        // Collect info about the most recently built checkpoint.
1446        let last_height = self
1447            .epoch_store
1448            .last_built_checkpoint_builder_summary()
1449            .expect("epoch should not have ended")
1450            .and_then(|s| s.checkpoint_height);
1451
1452        for (height, pending) in self
1453            .epoch_store
1454            .get_pending_checkpoints_v2(last_height)
1455            .expect("unexpected epoch store error")
1456        {
1457            debug!(checkpoint_commit_height = height, "Making checkpoint");
1458
1459            let seq = self.make_checkpoint_v2(pending).await?;
1460
1461            self.last_built.send_if_modified(|cur| {
1462                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1463                if seq > *cur {
1464                    *cur = seq;
1465                    true
1466                } else {
1467                    false
1468                }
1469            });
1470
1471            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1472            // execution.
1473            tokio::task::yield_now().await;
1474        }
1475
1476        Ok(())
1477    }
1478
1479    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1480    async fn make_checkpoint(
1481        &mut self,
1482        pendings: Vec<PendingCheckpoint>,
1483    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1484        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1485
1486        let pending_ckpt_str = pendings
1487            .iter()
1488            .map(|p| {
1489                format!(
1490                    "height={}, commit={}",
1491                    p.details().checkpoint_height,
1492                    p.details().consensus_commit_ref
1493                )
1494            })
1495            .join("; ");
1496
1497        let last_details = pendings.last().unwrap().details().clone();
1498
1499        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1500        // in this order.
1501        let highest_executed_sequence = self
1502            .store
1503            .get_highest_executed_checkpoint_seq_number()
1504            .expect("db error")
1505            .unwrap_or(0);
1506
1507        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1508        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1509
1510        let new_checkpoints = self
1511            .create_checkpoints(
1512                sorted_tx_effects_included_in_checkpoint,
1513                &last_details,
1514                &all_roots,
1515            )
1516            .await?;
1517        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1518        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1519            debug_fatal!(
1520                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1521            );
1522        }
1523
1524        let new_ckpt_str = new_checkpoints
1525            .iter()
1526            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1527            .join("; ");
1528
1529        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1530            .await?;
1531        info!(
1532            "Made new checkpoint {} from pending checkpoint {}",
1533            new_ckpt_str, pending_ckpt_str
1534        );
1535
1536        Ok(highest_sequence)
1537    }
1538
1539    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1540    async fn make_checkpoint_v2(
1541        &mut self,
1542        pending: PendingCheckpointV2,
1543    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1544        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1545
1546        let details = pending.details.clone();
1547
1548        let highest_executed_sequence = self
1549            .store
1550            .get_highest_executed_checkpoint_seq_number()
1551            .expect("db error")
1552            .unwrap_or(0);
1553
1554        let (poll_count, result) =
1555            poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1556        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1557
1558        let new_checkpoints = self
1559            .create_checkpoints(
1560                sorted_tx_effects_included_in_checkpoint,
1561                &details,
1562                &all_roots,
1563            )
1564            .await?;
1565        assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1566        let sequence = *new_checkpoints.first().0.sequence_number();
1567        let digest = new_checkpoints.first().0.digest();
1568        if sequence <= highest_executed_sequence && poll_count > 1 {
1569            debug_fatal!(
1570                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1571            );
1572        }
1573
1574        self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1575            .await?;
1576        info!(
1577            seq = sequence,
1578            %digest,
1579            height = details.checkpoint_height,
1580            commit = %details.consensus_commit_ref,
1581            "Made new checkpoint"
1582        );
1583
1584        Ok(sequence)
1585    }
1586
1587    async fn construct_and_execute_settlement_transactions(
1588        &self,
1589        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1590        checkpoint_height: CheckpointHeight,
1591        checkpoint_seq: CheckpointSequenceNumber,
1592        tx_index_offset: u64,
1593    ) -> (TransactionKey, Vec<TransactionEffects>) {
1594        let _scope =
1595            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1596
1597        let tx_key =
1598            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1599
1600        let epoch = self.epoch_store.epoch();
1601        let accumulator_root_obj_initial_shared_version = self
1602            .epoch_store
1603            .epoch_start_config()
1604            .accumulator_root_obj_initial_shared_version()
1605            .expect("accumulator root object must exist");
1606
1607        let builder = AccumulatorSettlementTxBuilder::new(
1608            Some(self.effects_store.as_ref()),
1609            sorted_tx_effects_included_in_checkpoint,
1610            checkpoint_seq,
1611            tx_index_offset,
1612        );
1613
1614        let funds_changes = builder.collect_funds_changes();
1615        let num_updates = builder.num_updates();
1616        let settlement_txns = builder.build_tx(
1617            self.epoch_store.protocol_config(),
1618            epoch,
1619            accumulator_root_obj_initial_shared_version,
1620            checkpoint_height,
1621            checkpoint_seq,
1622        );
1623
1624        let settlement_txns: Vec<_> = settlement_txns
1625            .into_iter()
1626            .map(|tx| {
1627                VerifiedExecutableTransaction::new_system(
1628                    VerifiedTransaction::new_system_transaction(tx),
1629                    self.epoch_store.epoch(),
1630                )
1631            })
1632            .collect();
1633
1634        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1635
1636        debug!(
1637            ?settlement_digests,
1638            ?tx_key,
1639            "created settlement transactions with {num_updates} updates"
1640        );
1641
1642        self.epoch_store
1643            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1644
1645        let settlement_effects = wait_for_effects_with_retry(
1646            self.effects_store.as_ref(),
1647            "CheckpointBuilder::notify_read_settlement_effects",
1648            &settlement_digests,
1649            tx_key,
1650        )
1651        .await;
1652
1653        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1654            epoch,
1655            accumulator_root_obj_initial_shared_version,
1656            checkpoint_height,
1657            &settlement_effects,
1658        );
1659
1660        let barrier_tx = VerifiedExecutableTransaction::new_system(
1661            VerifiedTransaction::new_system_transaction(barrier_tx),
1662            self.epoch_store.epoch(),
1663        );
1664        let barrier_digest = *barrier_tx.digest();
1665
1666        self.epoch_store
1667            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1668
1669        let barrier_effects = wait_for_effects_with_retry(
1670            self.effects_store.as_ref(),
1671            "CheckpointBuilder::notify_read_barrier_effects",
1672            &[barrier_digest],
1673            tx_key,
1674        )
1675        .await;
1676
1677        let settlement_effects: Vec<_> = settlement_effects
1678            .into_iter()
1679            .chain(barrier_effects)
1680            .collect();
1681
1682        let mut next_accumulator_version = None;
1683        for fx in settlement_effects.iter() {
1684            assert!(
1685                fx.status().is_ok(),
1686                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1687                fx.transaction_digest(),
1688                fx
1689            );
1690            if let Some(version) = fx
1691                .mutated()
1692                .iter()
1693                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1694            {
1695                assert!(
1696                    next_accumulator_version.is_none(),
1697                    "Only one settlement transaction should mutate the accumulator root object"
1698                );
1699                next_accumulator_version = Some(version);
1700            }
1701        }
1702        let settlements = FundsSettlement {
1703            next_accumulator_version: next_accumulator_version
1704                .expect("Accumulator root object should be mutated in the settlement transactions"),
1705            funds_changes,
1706        };
1707
1708        self.state
1709            .execution_scheduler()
1710            .settle_address_funds(settlements);
1711
1712        (tx_key, settlement_effects)
1713    }
1714
1715    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1716    // the checkpoint, and return them in the order they should be included in the checkpoint.
1717    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1718    // checkpoint.
1719    #[instrument(level = "debug", skip_all)]
1720    async fn resolve_checkpoint_transactions(
1721        &self,
1722        pending_checkpoints: Vec<PendingCheckpoint>,
1723    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1724        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1725
1726        // Keeps track of the effects that are already included in the current checkpoint.
1727        // This is used when there are multiple pending checkpoints to create a single checkpoint
1728        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1729        // or in earlier pending checkpoints.
1730        let mut effects_in_current_checkpoint = BTreeSet::new();
1731
1732        let mut tx_effects = Vec::new();
1733        let mut tx_roots = HashSet::new();
1734
1735        for pending_checkpoint in pending_checkpoints.into_iter() {
1736            let mut pending = pending_checkpoint;
1737            debug!(
1738                checkpoint_commit_height = pending.details.checkpoint_height,
1739                "Resolving checkpoint transactions for pending checkpoint.",
1740            );
1741
1742            trace!(
1743                "roots for pending checkpoint {:?}: {:?}",
1744                pending.details.checkpoint_height, pending.roots,
1745            );
1746
1747            let settlement_root = if self.epoch_store.accumulators_enabled() {
1748                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1749                    pending.roots.pop()
1750                else {
1751                    fatal!("No settlement root found");
1752                };
1753                Some(settlement_root)
1754            } else {
1755                None
1756            };
1757
1758            let roots = &pending.roots;
1759
1760            self.metrics
1761                .checkpoint_roots_count
1762                .inc_by(roots.len() as u64);
1763
1764            let root_digests = self
1765                .epoch_store
1766                .notify_read_tx_key_to_digest(roots)
1767                .in_monitored_scope("CheckpointNotifyDigests")
1768                .await?;
1769            let root_effects = self
1770                .effects_store
1771                .notify_read_executed_effects(
1772                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1773                    &root_digests,
1774                )
1775                .in_monitored_scope("CheckpointNotifyRead")
1776                .await;
1777
1778            assert!(
1779                self.epoch_store
1780                    .protocol_config()
1781                    .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1782            );
1783
1784            // If the roots contains consensus commit prologue transaction, we want to extract it,
1785            // and put it to the front of the checkpoint.
1786            let consensus_commit_prologue =
1787                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1788
1789            // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1790            // other dependencies that haven't been included in any previous checkpoints.
1791            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1792                let unsorted_ccp = self.complete_checkpoint_effects(
1793                    vec![ccp_effects.clone()],
1794                    &mut effects_in_current_checkpoint,
1795                )?;
1796
1797                // No other dependencies of this consensus commit prologue that haven't been included
1798                // in any previous checkpoint.
1799                if unsorted_ccp.len() != 1 {
1800                    fatal!(
1801                        "Expected 1 consensus commit prologue, got {:?}",
1802                        unsorted_ccp
1803                            .iter()
1804                            .map(|e| e.transaction_digest())
1805                            .collect::<Vec<_>>()
1806                    );
1807                }
1808                assert_eq!(unsorted_ccp.len(), 1);
1809                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1810            }
1811
1812            let unsorted =
1813                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1814
1815            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1816            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1817            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1818                if cfg!(debug_assertions) {
1819                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1820                    for tx in unsorted.iter() {
1821                        assert!(tx.transaction_digest() != &ccp_digest);
1822                    }
1823                }
1824                sorted.push(ccp_effects);
1825            }
1826            sorted.extend(CausalOrder::causal_sort(unsorted));
1827
1828            if let Some(settlement_root) = settlement_root {
1829                //TODO: this is an incorrect heuristic for checkpoint seq number
1830                //      due to checkpoint splitting, to be fixed separately
1831                let last_checkpoint =
1832                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1833                let next_checkpoint_seq = last_checkpoint
1834                    .as_ref()
1835                    .map(|(seq, _)| *seq)
1836                    .unwrap_or_default()
1837                    + 1;
1838                let tx_index_offset = tx_effects.len() as u64;
1839
1840                let (tx_key, settlement_effects) = self
1841                    .construct_and_execute_settlement_transactions(
1842                        &sorted,
1843                        pending.details.checkpoint_height,
1844                        next_checkpoint_seq,
1845                        tx_index_offset,
1846                    )
1847                    .await;
1848                debug!(?tx_key, "executed settlement transactions");
1849
1850                assert_eq!(settlement_root, tx_key);
1851
1852                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1853                // should only include the digests of transactions that were originally roots in
1854                // the pending checkpoint. It is later used to identify transactions which were
1855                // added as dependencies, so that those transactions can be waited on using
1856                // `consensus_messages_processed_notify()`. System transactions (such as
1857                // settlements) are exempt from this already.
1858                sorted.extend(settlement_effects);
1859            }
1860
1861            #[cfg(msim)]
1862            {
1863                // Check consensus commit prologue invariants in sim test.
1864                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1865            }
1866
1867            tx_effects.extend(sorted);
1868            tx_roots.extend(root_digests);
1869        }
1870
1871        Ok((tx_effects, tx_roots))
1872    }
1873
1874    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1875    // the checkpoint, and return them in the order they should be included in the checkpoint.
1876    #[instrument(level = "debug", skip_all)]
1877    async fn resolve_checkpoint_transactions_v2(
1878        &self,
1879        pending: PendingCheckpointV2,
1880    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1881        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1882
1883        let mut effects_in_current_checkpoint = BTreeSet::new();
1884        debug!(
1885            checkpoint_commit_height = pending.details.checkpoint_height,
1886            "Resolving checkpoint transactions for pending checkpoint.",
1887        );
1888
1889        trace!(
1890            "roots for pending checkpoint {:?}: {:?}",
1891            pending.details.checkpoint_height, pending.roots,
1892        );
1893
1894        assert!(
1895            self.epoch_store
1896                .protocol_config()
1897                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1898        );
1899
1900        let mut all_effects: Vec<TransactionEffects> = Vec::new();
1901        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1902
1903        let last_checkpoint =
1904            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1905        let next_checkpoint_seq = last_checkpoint
1906            .as_ref()
1907            .map(|(seq, _)| *seq)
1908            .unwrap_or_default()
1909            + 1;
1910
1911        for checkpoint_roots in &pending.roots {
1912            let tx_roots = &checkpoint_roots.tx_roots;
1913
1914            self.metrics
1915                .checkpoint_roots_count
1916                .inc_by(tx_roots.len() as u64);
1917
1918            let root_digests = self
1919                .epoch_store
1920                .notify_read_tx_key_to_digest(tx_roots)
1921                .in_monitored_scope("CheckpointNotifyDigests")
1922                .await?;
1923
1924            all_root_digests.extend(root_digests.iter().cloned());
1925
1926            let root_effects = self
1927                .effects_store
1928                .notify_read_executed_effects(
1929                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1930                    &root_digests,
1931                )
1932                .in_monitored_scope("CheckpointNotifyRead")
1933                .await;
1934
1935            let consensus_commit_prologue = {
1936                let ccp = self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1937
1938                if let Some((ccp_digest, ccp_effects)) = &ccp {
1939                    let unsorted_ccp = self.complete_checkpoint_effects(
1940                        vec![ccp_effects.clone()],
1941                        &mut effects_in_current_checkpoint,
1942                    )?;
1943
1944                    if unsorted_ccp.is_empty() {
1945                        // Each CheckpointRoots normally begins with a CCP, unless
1946                        // the commit was split across multiple checkpoints; in which case
1947                        // the CCP was included in a previous commit.
1948                        None
1949                    } else if unsorted_ccp.len() != 1 {
1950                        fatal!(
1951                            "Expected 1 consensus commit prologue, got {:?}",
1952                            unsorted_ccp
1953                                .iter()
1954                                .map(|e| e.transaction_digest())
1955                                .collect::<Vec<_>>()
1956                        );
1957                    } else {
1958                        assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1959                        ccp.clone()
1960                    }
1961                } else {
1962                    None
1963                }
1964            };
1965
1966            let unsorted =
1967                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1968
1969            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1970            let tx_index_offset = all_effects.len() as u64;
1971            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1972
1973            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1974                if cfg!(debug_assertions) {
1975                    for tx in unsorted.iter() {
1976                        assert!(tx.transaction_digest() != &ccp_digest);
1977                    }
1978                }
1979                sorted.push(ccp_effects);
1980            }
1981            sorted.extend(CausalOrder::causal_sort(unsorted));
1982
1983            if checkpoint_roots.settlement_root.is_some() {
1984                let (tx_key, settlement_effects) = self
1985                    .construct_and_execute_settlement_transactions(
1986                        &sorted,
1987                        checkpoint_roots.height,
1988                        next_checkpoint_seq,
1989                        tx_index_offset,
1990                    )
1991                    .await;
1992                debug!(?tx_key, "executed settlement transactions");
1993
1994                sorted.extend(settlement_effects);
1995            }
1996
1997            all_effects.extend(sorted);
1998        }
1999
2000        #[cfg(msim)]
2001        {
2002            self.expensive_consensus_commit_prologue_invariants_check_v2(
2003                &all_root_digests,
2004                &all_effects,
2005            );
2006        }
2007        Ok((all_effects, all_root_digests.into_iter().collect()))
2008    }
2009
2010    // Extracts the consensus commit prologue digest and effects from the root transactions.
2011    // The consensus commit prologue is expected to be the first transaction in the roots.
2012    fn extract_consensus_commit_prologue(
2013        &self,
2014        root_digests: &[TransactionDigest],
2015        root_effects: &[TransactionEffects],
2016    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2017        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2018        if root_digests.is_empty() {
2019            return Ok(None);
2020        }
2021
2022        // Reads the first transaction in the roots, and checks whether it is a consensus commit
2023        // prologue transaction. The consensus commit prologue transaction should be the first
2024        // transaction in the roots written by the consensus handler.
2025        let first_tx = self
2026            .state
2027            .get_transaction_cache_reader()
2028            .get_transaction_block(&root_digests[0])
2029            .expect("Transaction block must exist");
2030
2031        Ok(first_tx
2032            .transaction_data()
2033            .is_consensus_commit_prologue()
2034            .then(|| {
2035                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2036                (*first_tx.digest(), root_effects[0].clone())
2037            }))
2038    }
2039
2040    #[instrument(level = "debug", skip_all)]
2041    async fn write_checkpoints(
2042        &mut self,
2043        height: CheckpointHeight,
2044        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2045    ) -> SuiResult {
2046        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2047        let mut batch = self.store.tables.checkpoint_content.batch();
2048        let mut all_tx_digests =
2049            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2050
2051        for (summary, contents) in &new_checkpoints {
2052            debug!(
2053                checkpoint_commit_height = height,
2054                checkpoint_seq = summary.sequence_number,
2055                contents_digest = ?contents.digest(),
2056                "writing checkpoint",
2057            );
2058
2059            if let Some(previously_computed_summary) = self
2060                .store
2061                .tables
2062                .locally_computed_checkpoints
2063                .get(&summary.sequence_number)?
2064                && previously_computed_summary.digest() != summary.digest()
2065            {
2066                fatal!(
2067                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2068                    summary.sequence_number,
2069                    previously_computed_summary.digest(),
2070                    summary.digest()
2071                );
2072            }
2073
2074            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2075
2076            self.metrics
2077                .transactions_included_in_checkpoint
2078                .inc_by(contents.size() as u64);
2079            let sequence_number = summary.sequence_number;
2080            self.metrics
2081                .last_constructed_checkpoint
2082                .set(sequence_number as i64);
2083
2084            batch.insert_batch(
2085                &self.store.tables.checkpoint_content,
2086                [(contents.digest(), contents)],
2087            )?;
2088
2089            batch.insert_batch(
2090                &self.store.tables.locally_computed_checkpoints,
2091                [(sequence_number, summary)],
2092            )?;
2093        }
2094
2095        batch.write()?;
2096
2097        // Send all checkpoint sigs to consensus.
2098        for (summary, contents) in &new_checkpoints {
2099            self.output
2100                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2101                .await?;
2102        }
2103
2104        for (local_checkpoint, _) in &new_checkpoints {
2105            if let Some(certified_checkpoint) = self
2106                .store
2107                .tables
2108                .certified_checkpoints
2109                .get(local_checkpoint.sequence_number())?
2110            {
2111                self.store
2112                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2113            }
2114        }
2115
2116        self.notify_aggregator.notify_one();
2117        self.epoch_store
2118            .process_constructed_checkpoint(height, new_checkpoints);
2119        Ok(())
2120    }
2121
2122    #[allow(clippy::type_complexity)]
2123    fn split_checkpoint_chunks(
2124        &self,
2125        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2126        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2127    ) -> CheckpointBuilderResult<
2128        Vec<
2129            Vec<(
2130                TransactionEffects,
2131                Vec<(GenericSignature, Option<SequenceNumber>)>,
2132            )>,
2133        >,
2134    > {
2135        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2136
2137        // If splitting is done in consensus_handler, return everything as one chunk.
2138        if self
2139            .epoch_store
2140            .protocol_config()
2141            .split_checkpoints_in_consensus_handler()
2142        {
2143            let chunk: Vec<_> = effects_and_transaction_sizes
2144                .into_iter()
2145                .zip(signatures)
2146                .map(|((effects, _size), sigs)| (effects, sigs))
2147                .collect();
2148            return Ok(vec![chunk]);
2149        }
2150        let mut chunks = Vec::new();
2151        let mut chunk = Vec::new();
2152        let mut chunk_size: usize = 0;
2153        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2154            .into_iter()
2155            .zip(signatures.into_iter())
2156        {
2157            // Roll over to a new chunk after either max count or max size is reached.
2158            // The size calculation here is intended to estimate the size of the
2159            // FullCheckpointContents struct. If this code is modified, that struct
2160            // should also be updated accordingly.
2161            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2162                bcs::serialized_size(&signatures)?
2163            } else {
2164                let signatures: Vec<&GenericSignature> =
2165                    signatures.iter().map(|(s, _)| s).collect();
2166                bcs::serialized_size(&signatures)?
2167            };
2168            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2169            if chunk.len() == self.max_transactions_per_checkpoint
2170                || (chunk_size + size) > self.max_checkpoint_size_bytes
2171            {
2172                if chunk.is_empty() {
2173                    // Always allow at least one tx in a checkpoint.
2174                    warn!(
2175                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2176                        self.max_checkpoint_size_bytes
2177                    );
2178                } else {
2179                    chunks.push(chunk);
2180                    chunk = Vec::new();
2181                    chunk_size = 0;
2182                }
2183            }
2184
2185            chunk.push((effects, signatures));
2186            chunk_size += size;
2187        }
2188
2189        if !chunk.is_empty() || chunks.is_empty() {
2190            // We intentionally create an empty checkpoint if there is no content provided
2191            // to make a 'heartbeat' checkpoint.
2192            // Important: if some conditions are added here later, we need to make sure we always
2193            // have at least one chunk if last_pending_of_epoch is set
2194            chunks.push(chunk);
2195            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
2196            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
2197            // distinguish between "no transactions have happened" and "i am not receiving new
2198            // checkpoints".
2199        }
2200        Ok(chunks)
2201    }
2202
2203    fn load_last_built_checkpoint_summary(
2204        epoch_store: &AuthorityPerEpochStore,
2205        store: &CheckpointStore,
2206    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2207        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2208        if last_checkpoint.is_none() {
2209            let epoch = epoch_store.epoch();
2210            if epoch > 0 {
2211                let previous_epoch = epoch - 1;
2212                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2213                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2214                if let Some((ref seq, _)) = last_checkpoint {
2215                    debug!(
2216                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2217                    );
2218                } else {
2219                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
2220                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2221                }
2222            }
2223        }
2224        Ok(last_checkpoint)
2225    }
2226
2227    #[instrument(level = "debug", skip_all)]
2228    async fn create_checkpoints(
2229        &self,
2230        all_effects: Vec<TransactionEffects>,
2231        details: &PendingCheckpointInfo,
2232        all_roots: &HashSet<TransactionDigest>,
2233    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2234        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2235
2236        let total = all_effects.len();
2237        let mut last_checkpoint =
2238            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2239        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2240        debug!(
2241            checkpoint_commit_height = details.checkpoint_height,
2242            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2243            checkpoint_timestamp = details.timestamp_ms,
2244            "Creating checkpoint(s) for {} transactions",
2245            all_effects.len(),
2246        );
2247
2248        let all_digests: Vec<_> = all_effects
2249            .iter()
2250            .map(|effect| *effect.transaction_digest())
2251            .collect();
2252        let transactions_and_sizes = self
2253            .state
2254            .get_transaction_cache_reader()
2255            .get_transactions_and_serialized_sizes(&all_digests)?;
2256        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2257        let mut transactions = Vec::with_capacity(all_effects.len());
2258        let mut transaction_keys = Vec::with_capacity(all_effects.len());
2259        let mut randomness_rounds = BTreeMap::new();
2260        {
2261            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2262            debug!(
2263                ?last_checkpoint_seq,
2264                "Waiting for {:?} certificates to appear in consensus",
2265                all_effects.len()
2266            );
2267
2268            for (effects, transaction_and_size) in all_effects
2269                .into_iter()
2270                .zip(transactions_and_sizes.into_iter())
2271            {
2272                let (transaction, size) = transaction_and_size
2273                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2274                match transaction.inner().transaction_data().kind() {
2275                    TransactionKind::ConsensusCommitPrologue(_)
2276                    | TransactionKind::ConsensusCommitPrologueV2(_)
2277                    | TransactionKind::ConsensusCommitPrologueV3(_)
2278                    | TransactionKind::ConsensusCommitPrologueV4(_)
2279                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2280                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2281                        // processed before we reach here.
2282                    }
2283                    TransactionKind::ProgrammableSystemTransaction(_) => {
2284                        // settlement transactions are added by checkpoint builder
2285                    }
2286                    TransactionKind::ChangeEpoch(_)
2287                    | TransactionKind::Genesis(_)
2288                    | TransactionKind::EndOfEpochTransaction(_) => {
2289                        fatal!(
2290                            "unexpected transaction in checkpoint effects: {:?}",
2291                            transaction
2292                        );
2293                    }
2294                    TransactionKind::RandomnessStateUpdate(rsu) => {
2295                        randomness_rounds
2296                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2297                    }
2298                    TransactionKind::ProgrammableTransaction(_) => {
2299                        // Only transactions that are not roots should be included in the call to
2300                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2301                        // commit and so are known to be processed already.
2302                        let digest = *effects.transaction_digest();
2303                        if !all_roots.contains(&digest) {
2304                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2305                                ConsensusTransactionKey::Certificate(digest),
2306                            ));
2307                        }
2308                    }
2309                }
2310                transactions.push(transaction);
2311                all_effects_and_transaction_sizes.push((effects, size));
2312            }
2313
2314            self.epoch_store
2315                .consensus_messages_processed_notify(transaction_keys)
2316                .await?;
2317        }
2318
2319        let signatures = self
2320            .epoch_store
2321            .user_signatures_for_checkpoint(&transactions, &all_digests);
2322        debug!(
2323            ?last_checkpoint_seq,
2324            "Received {} checkpoint user signatures from consensus",
2325            signatures.len()
2326        );
2327
2328        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2329            Some(
2330                transactions
2331                    .iter()
2332                    .flat_map(|tx| {
2333                        if let TransactionKind::ProgrammableTransaction(ptb) =
2334                            tx.transaction_data().kind()
2335                        {
2336                            itertools::Either::Left(
2337                                ptb.commands
2338                                    .iter()
2339                                    .map(ExecutionTimeObservationKey::from_command),
2340                            )
2341                        } else {
2342                            itertools::Either::Right(std::iter::empty())
2343                        }
2344                    })
2345                    .collect(),
2346            )
2347        } else {
2348            None
2349        };
2350
2351        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2352        let chunks_count = chunks.len();
2353
2354        let mut checkpoints = Vec::with_capacity(chunks_count);
2355        debug!(
2356            ?last_checkpoint_seq,
2357            "Creating {} checkpoints with {} transactions", chunks_count, total,
2358        );
2359
2360        let epoch = self.epoch_store.epoch();
2361        for (index, transactions) in chunks.into_iter().enumerate() {
2362            let first_checkpoint_of_epoch = index == 0
2363                && last_checkpoint
2364                    .as_ref()
2365                    .map(|(_, c)| c.epoch != epoch)
2366                    .unwrap_or(true);
2367            if first_checkpoint_of_epoch {
2368                self.epoch_store
2369                    .record_epoch_first_checkpoint_creation_time_metric();
2370            }
2371            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2372
2373            let sequence_number = last_checkpoint
2374                .as_ref()
2375                .map(|(_, c)| c.sequence_number + 1)
2376                .unwrap_or_default();
2377            let mut timestamp_ms = details.timestamp_ms;
2378            if let Some((_, last_checkpoint)) = &last_checkpoint
2379                && last_checkpoint.timestamp_ms > timestamp_ms
2380            {
2381                // First consensus commit of an epoch can have zero timestamp.
2382                debug!(
2383                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2384                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2385                );
2386                if self
2387                    .epoch_store
2388                    .protocol_config()
2389                    .enforce_checkpoint_timestamp_monotonicity()
2390                {
2391                    timestamp_ms = last_checkpoint.timestamp_ms;
2392                }
2393            }
2394
2395            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2396            let epoch_rolling_gas_cost_summary =
2397                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2398
2399            let end_of_epoch_data = if last_checkpoint_of_epoch {
2400                let system_state_obj = self
2401                    .augment_epoch_last_checkpoint(
2402                        &epoch_rolling_gas_cost_summary,
2403                        timestamp_ms,
2404                        &mut effects,
2405                        &mut signatures,
2406                        sequence_number,
2407                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2408                        last_checkpoint_seq.unwrap_or_default(),
2409                    )
2410                    .await?;
2411
2412                let committee = system_state_obj
2413                    .get_current_epoch_committee()
2414                    .committee()
2415                    .clone();
2416
2417                // This must happen after the call to augment_epoch_last_checkpoint,
2418                // otherwise we will not capture the change_epoch tx.
2419                let root_state_digest = {
2420                    let state_acc = self
2421                        .global_state_hasher
2422                        .upgrade()
2423                        .expect("No checkpoints should be getting built after local configuration");
2424                    let acc = state_acc.accumulate_checkpoint(
2425                        &effects,
2426                        sequence_number,
2427                        &self.epoch_store,
2428                    )?;
2429
2430                    state_acc
2431                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2432                        .await?;
2433
2434                    state_acc.accumulate_running_root(
2435                        &self.epoch_store,
2436                        sequence_number,
2437                        Some(acc),
2438                    )?;
2439                    state_acc
2440                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2441                        .await?
2442                };
2443                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2444                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2445
2446                let epoch_commitments = if self
2447                    .epoch_store
2448                    .protocol_config()
2449                    .check_commit_root_state_digest_supported()
2450                {
2451                    vec![root_state_digest.into()]
2452                } else {
2453                    vec![]
2454                };
2455
2456                Some(EndOfEpochData {
2457                    next_epoch_committee: committee.voting_rights,
2458                    next_epoch_protocol_version: ProtocolVersion::new(
2459                        system_state_obj.protocol_version(),
2460                    ),
2461                    epoch_commitments,
2462                })
2463            } else {
2464                self.send_to_hasher
2465                    .send((sequence_number, effects.clone()))
2466                    .await?;
2467
2468                None
2469            };
2470            let contents = if self.epoch_store.protocol_config().address_aliases() {
2471                CheckpointContents::new_v2(&effects, signatures)
2472            } else {
2473                CheckpointContents::new_with_digests_and_signatures(
2474                    effects.iter().map(TransactionEffects::execution_digests),
2475                    signatures
2476                        .into_iter()
2477                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2478                        .collect(),
2479                )
2480            };
2481
2482            let num_txns = contents.size() as u64;
2483
2484            let network_total_transactions = last_checkpoint
2485                .as_ref()
2486                .map(|(_, c)| c.network_total_transactions + num_txns)
2487                .unwrap_or(num_txns);
2488
2489            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2490
2491            let matching_randomness_rounds: Vec<_> = effects
2492                .iter()
2493                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2494                .copied()
2495                .collect();
2496
2497            let checkpoint_commitments = if self
2498                .epoch_store
2499                .protocol_config()
2500                .include_checkpoint_artifacts_digest_in_summary()
2501            {
2502                let artifacts = CheckpointArtifacts::from(&effects[..]);
2503                let artifacts_digest = artifacts.digest()?;
2504                vec![artifacts_digest.into()]
2505            } else {
2506                Default::default()
2507            };
2508
2509            let summary = CheckpointSummary::new(
2510                self.epoch_store.protocol_config(),
2511                epoch,
2512                sequence_number,
2513                network_total_transactions,
2514                &contents,
2515                previous_digest,
2516                epoch_rolling_gas_cost_summary,
2517                end_of_epoch_data,
2518                timestamp_ms,
2519                matching_randomness_rounds,
2520                checkpoint_commitments,
2521            );
2522            summary.report_checkpoint_age(
2523                &self.metrics.last_created_checkpoint_age,
2524                &self.metrics.last_created_checkpoint_age_ms,
2525            );
2526            if last_checkpoint_of_epoch {
2527                info!(
2528                    checkpoint_seq = sequence_number,
2529                    "creating last checkpoint of epoch {}", epoch
2530                );
2531                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2532                    self.epoch_store
2533                        .report_epoch_metrics_at_last_checkpoint(stats);
2534                }
2535            }
2536            last_checkpoint = Some((sequence_number, summary.clone()));
2537            checkpoints.push((summary, contents));
2538        }
2539
2540        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2541    }
2542
2543    fn get_epoch_total_gas_cost(
2544        &self,
2545        last_checkpoint: Option<&CheckpointSummary>,
2546        cur_checkpoint_effects: &[TransactionEffects],
2547    ) -> GasCostSummary {
2548        let (previous_epoch, previous_gas_costs) = last_checkpoint
2549            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2550            .unwrap_or_default();
2551        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2552        if previous_epoch == self.epoch_store.epoch() {
2553            // sum only when we are within the same epoch
2554            GasCostSummary::new(
2555                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2556                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2557                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2558                previous_gas_costs.non_refundable_storage_fee
2559                    + current_gas_costs.non_refundable_storage_fee,
2560            )
2561        } else {
2562            current_gas_costs
2563        }
2564    }
2565
2566    #[instrument(level = "error", skip_all)]
2567    async fn augment_epoch_last_checkpoint(
2568        &self,
2569        epoch_total_gas_cost: &GasCostSummary,
2570        epoch_start_timestamp_ms: CheckpointTimestamp,
2571        checkpoint_effects: &mut Vec<TransactionEffects>,
2572        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2573        checkpoint: CheckpointSequenceNumber,
2574        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2575        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2576        // >1 checkpoint.
2577        last_checkpoint: CheckpointSequenceNumber,
2578    ) -> CheckpointBuilderResult<SuiSystemState> {
2579        let (system_state, effects) = self
2580            .state
2581            .create_and_execute_advance_epoch_tx(
2582                &self.epoch_store,
2583                epoch_total_gas_cost,
2584                checkpoint,
2585                epoch_start_timestamp_ms,
2586                end_of_epoch_observation_keys,
2587                last_checkpoint,
2588            )
2589            .await?;
2590        checkpoint_effects.push(effects);
2591        signatures.push(vec![]);
2592        Ok(system_state)
2593    }
2594
2595    /// For the given roots return complete list of effects to include in checkpoint
2596    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2597    /// Note that this function may be called multiple times to construct the checkpoint.
2598    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2599    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2600    /// after the call of this function.
2601    #[instrument(level = "debug", skip_all)]
2602    fn complete_checkpoint_effects(
2603        &self,
2604        mut roots: Vec<TransactionEffects>,
2605        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2606    ) -> SuiResult<Vec<TransactionEffects>> {
2607        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2608        let mut results = vec![];
2609        let mut seen = HashSet::new();
2610        loop {
2611            let mut pending = HashSet::new();
2612
2613            let transactions_included = self
2614                .epoch_store
2615                .builder_included_transactions_in_checkpoint(
2616                    roots.iter().map(|e| e.transaction_digest()),
2617                )?;
2618
2619            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2620                let digest = effect.transaction_digest();
2621                // Unnecessary to read effects of a dependency if the effect is already processed.
2622                seen.insert(*digest);
2623
2624                // Skip roots that are already included in the checkpoint.
2625                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2626                    continue;
2627                }
2628
2629                // Skip roots already included in checkpoints or roots from previous epochs
2630                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2631                    continue;
2632                }
2633
2634                let existing_effects = self
2635                    .epoch_store
2636                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2637
2638                for (dependency, effects_signature_exists) in
2639                    effect.dependencies().iter().zip(existing_effects.iter())
2640                {
2641                    // Skip here if dependency not executed in the current epoch.
2642                    // Note that the existence of an effects signature in the
2643                    // epoch store for the given digest indicates that the transaction
2644                    // was locally executed in the current epoch
2645                    if !effects_signature_exists {
2646                        continue;
2647                    }
2648                    if seen.insert(*dependency) {
2649                        pending.insert(*dependency);
2650                    }
2651                }
2652                results.push(effect);
2653            }
2654            if pending.is_empty() {
2655                break;
2656            }
2657            let pending = pending.into_iter().collect::<Vec<_>>();
2658            let effects = self.effects_store.multi_get_executed_effects(&pending);
2659            let effects = effects
2660                .into_iter()
2661                .zip(pending)
2662                .map(|(opt, digest)| match opt {
2663                    Some(x) => x,
2664                    None => panic!(
2665                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2666                        digest
2667                    ),
2668                })
2669                .collect::<Vec<_>>();
2670            roots = effects;
2671        }
2672
2673        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2674        Ok(results)
2675    }
2676
2677    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2678    // in simtest.
2679    #[cfg(msim)]
2680    fn expensive_consensus_commit_prologue_invariants_check(
2681        &self,
2682        root_digests: &[TransactionDigest],
2683        sorted: &[TransactionEffects],
2684    ) {
2685        // Gets all the consensus commit prologue transactions from the roots.
2686        let root_txs = self
2687            .state
2688            .get_transaction_cache_reader()
2689            .multi_get_transaction_blocks(root_digests);
2690        let ccps = root_txs
2691            .iter()
2692            .filter_map(|tx| {
2693                if let Some(tx) = tx {
2694                    if tx.transaction_data().is_consensus_commit_prologue() {
2695                        Some(tx)
2696                    } else {
2697                        None
2698                    }
2699                } else {
2700                    None
2701                }
2702            })
2703            .collect::<Vec<_>>();
2704
2705        // There should be at most one consensus commit prologue transaction in the roots.
2706        assert!(ccps.len() <= 1);
2707
2708        // Get all the transactions in the checkpoint.
2709        let txs = self
2710            .state
2711            .get_transaction_cache_reader()
2712            .multi_get_transaction_blocks(
2713                &sorted
2714                    .iter()
2715                    .map(|tx| tx.transaction_digest().clone())
2716                    .collect::<Vec<_>>(),
2717            );
2718
2719        if ccps.len() == 0 {
2720            // If there is no consensus commit prologue transaction in the roots, then there should be no
2721            // consensus commit prologue transaction in the checkpoint.
2722            for tx in txs.iter() {
2723                if let Some(tx) = tx {
2724                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2725                }
2726            }
2727        } else {
2728            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2729            assert!(
2730                txs[0]
2731                    .as_ref()
2732                    .unwrap()
2733                    .transaction_data()
2734                    .is_consensus_commit_prologue()
2735            );
2736
2737            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2738
2739            for tx in txs.iter().skip(1) {
2740                if let Some(tx) = tx {
2741                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2742                }
2743            }
2744        }
2745    }
2746
2747    #[cfg(msim)]
2748    fn expensive_consensus_commit_prologue_invariants_check_v2(
2749        &self,
2750        root_digests: &[TransactionDigest],
2751        sorted: &[TransactionEffects],
2752    ) {
2753        // Gets all the consensus commit prologue transactions from the roots.
2754        let root_txs = self
2755            .state
2756            .get_transaction_cache_reader()
2757            .multi_get_transaction_blocks(root_digests);
2758        let ccp_digests_from_roots: HashSet<_> = root_txs
2759            .iter()
2760            .filter_map(|tx| {
2761                if let Some(tx) = tx {
2762                    if tx.transaction_data().is_consensus_commit_prologue() {
2763                        Some(*tx.digest())
2764                    } else {
2765                        None
2766                    }
2767                } else {
2768                    None
2769                }
2770            })
2771            .collect();
2772
2773        // Get all the transactions in the checkpoint.
2774        let txs = self
2775            .state
2776            .get_transaction_cache_reader()
2777            .multi_get_transaction_blocks(
2778                &sorted
2779                    .iter()
2780                    .map(|tx| tx.transaction_digest().clone())
2781                    .collect::<Vec<_>>(),
2782            );
2783
2784        // Count CCPs in the checkpoint and verify they match the ones from roots.
2785        // With checkpoint merging, we can have multiple CCPs (one per merged consensus commit).
2786        let ccps_in_checkpoint: Vec<_> = txs
2787            .iter()
2788            .filter_map(|tx| {
2789                if let Some(tx) = tx {
2790                    if tx.transaction_data().is_consensus_commit_prologue() {
2791                        Some(*tx.digest())
2792                    } else {
2793                        None
2794                    }
2795                } else {
2796                    None
2797                }
2798            })
2799            .collect();
2800
2801        // All CCPs in the checkpoint must be from the roots.
2802        for ccp_digest in &ccps_in_checkpoint {
2803            assert!(
2804                ccp_digests_from_roots.contains(ccp_digest),
2805                "CCP in checkpoint not found in roots"
2806            );
2807        }
2808
2809        // If there are CCPs from roots that are in this checkpoint, the first transaction
2810        // in sorted must be a CCP.
2811        if !ccps_in_checkpoint.is_empty() {
2812            assert!(
2813                txs[0]
2814                    .as_ref()
2815                    .unwrap()
2816                    .transaction_data()
2817                    .is_consensus_commit_prologue(),
2818                "First transaction must be a CCP when CCPs are present"
2819            );
2820        }
2821    }
2822}
2823
2824async fn wait_for_effects_with_retry(
2825    effects_store: &dyn TransactionCacheRead,
2826    task_name: &'static str,
2827    digests: &[TransactionDigest],
2828    tx_key: TransactionKey,
2829) -> Vec<TransactionEffects> {
2830    let delay = if in_antithesis() {
2831        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2832        15
2833    } else {
2834        5
2835    };
2836    loop {
2837        match tokio::time::timeout(Duration::from_secs(delay), async {
2838            effects_store
2839                .notify_read_executed_effects(task_name, digests)
2840                .await
2841        })
2842        .await
2843        {
2844            Ok(effects) => break effects,
2845            Err(_) => {
2846                debug_fatal!(
2847                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2848                    tx_key
2849                );
2850            }
2851        }
2852    }
2853}
2854
2855impl CheckpointAggregator {
2856    fn new(
2857        tables: Arc<CheckpointStore>,
2858        epoch_store: Arc<AuthorityPerEpochStore>,
2859        notify: Arc<Notify>,
2860        output: Box<dyn CertifiedCheckpointOutput>,
2861        state: Arc<AuthorityState>,
2862        metrics: Arc<CheckpointMetrics>,
2863    ) -> Self {
2864        let current = None;
2865        Self {
2866            store: tables,
2867            epoch_store,
2868            notify,
2869            current,
2870            output,
2871            state,
2872            metrics,
2873        }
2874    }
2875
2876    async fn run(mut self) {
2877        info!("Starting CheckpointAggregator");
2878        loop {
2879            if let Err(e) = self.run_and_notify().await {
2880                error!(
2881                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2882                    e
2883                );
2884                self.metrics.checkpoint_errors.inc();
2885                tokio::time::sleep(Duration::from_secs(1)).await;
2886                continue;
2887            }
2888
2889            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2890        }
2891    }
2892
2893    async fn run_and_notify(&mut self) -> SuiResult {
2894        let summaries = self.run_inner()?;
2895        for summary in summaries {
2896            self.output.certified_checkpoint_created(&summary).await?;
2897        }
2898        Ok(())
2899    }
2900
2901    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2902        let _scope = monitored_scope("CheckpointAggregator");
2903        let mut result = vec![];
2904        'outer: loop {
2905            let next_to_certify = self.next_checkpoint_to_certify()?;
2906            let current = if let Some(current) = &mut self.current {
2907                // It's possible that the checkpoint was already certified by
2908                // the rest of the network and we've already received the
2909                // certified checkpoint via StateSync. In this case, we reset
2910                // the current signature aggregator to the next checkpoint to
2911                // be certified
2912                if current.summary.sequence_number < next_to_certify {
2913                    assert_reachable!("skip checkpoint certification");
2914                    self.current = None;
2915                    continue;
2916                }
2917                current
2918            } else {
2919                let Some(summary) = self
2920                    .epoch_store
2921                    .get_built_checkpoint_summary(next_to_certify)?
2922                else {
2923                    return Ok(result);
2924                };
2925                self.current = Some(CheckpointSignatureAggregator {
2926                    next_index: 0,
2927                    digest: summary.digest(),
2928                    summary,
2929                    signatures_by_digest: MultiStakeAggregator::new(
2930                        self.epoch_store.committee().clone(),
2931                    ),
2932                    store: self.store.clone(),
2933                    state: self.state.clone(),
2934                    metrics: self.metrics.clone(),
2935                });
2936                self.current.as_mut().unwrap()
2937            };
2938
2939            let epoch_tables = self
2940                .epoch_store
2941                .tables()
2942                .expect("should not run past end of epoch");
2943            let iter = epoch_tables
2944                .pending_checkpoint_signatures
2945                .safe_iter_with_bounds(
2946                    Some((current.summary.sequence_number, current.next_index)),
2947                    None,
2948                );
2949            for item in iter {
2950                let ((seq, index), data) = item?;
2951                if seq != current.summary.sequence_number {
2952                    trace!(
2953                        checkpoint_seq =? current.summary.sequence_number,
2954                        "Not enough checkpoint signatures",
2955                    );
2956                    // No more signatures (yet) for this checkpoint
2957                    return Ok(result);
2958                }
2959                trace!(
2960                    checkpoint_seq = current.summary.sequence_number,
2961                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2962                    current.summary.digest(),
2963                    data.summary.auth_sig().authority.concise()
2964                );
2965                self.metrics
2966                    .checkpoint_participation
2967                    .with_label_values(&[&format!(
2968                        "{:?}",
2969                        data.summary.auth_sig().authority.concise()
2970                    )])
2971                    .inc();
2972                if let Ok(auth_signature) = current.try_aggregate(data) {
2973                    debug!(
2974                        checkpoint_seq = current.summary.sequence_number,
2975                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2976                        current.summary.digest(),
2977                    );
2978                    let summary = VerifiedCheckpoint::new_unchecked(
2979                        CertifiedCheckpointSummary::new_from_data_and_sig(
2980                            current.summary.clone(),
2981                            auth_signature,
2982                        ),
2983                    );
2984
2985                    self.store.insert_certified_checkpoint(&summary)?;
2986                    self.metrics
2987                        .last_certified_checkpoint
2988                        .set(current.summary.sequence_number as i64);
2989                    current.summary.report_checkpoint_age(
2990                        &self.metrics.last_certified_checkpoint_age,
2991                        &self.metrics.last_certified_checkpoint_age_ms,
2992                    );
2993                    result.push(summary.into_inner());
2994                    self.current = None;
2995                    continue 'outer;
2996                } else {
2997                    current.next_index = index + 1;
2998                }
2999            }
3000            break;
3001        }
3002        Ok(result)
3003    }
3004
3005    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
3006        Ok(self
3007            .store
3008            .tables
3009            .certified_checkpoints
3010            .reversed_safe_iter_with_bounds(None, None)?
3011            .next()
3012            .transpose()?
3013            .map(|(seq, _)| seq + 1)
3014            .unwrap_or_default())
3015    }
3016}
3017
3018impl CheckpointSignatureAggregator {
3019    #[allow(clippy::result_unit_err)]
3020    pub fn try_aggregate(
3021        &mut self,
3022        data: CheckpointSignatureMessage,
3023    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3024        let their_digest = *data.summary.digest();
3025        let (_, signature) = data.summary.into_data_and_sig();
3026        let author = signature.authority;
3027        let envelope =
3028            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3029        match self.signatures_by_digest.insert(their_digest, envelope) {
3030            // ignore repeated signatures
3031            InsertResult::Failed { error }
3032                if matches!(
3033                    error.as_inner(),
3034                    SuiErrorKind::StakeAggregatorRepeatedSigner {
3035                        conflicting_sig: false,
3036                        ..
3037                    },
3038                ) =>
3039            {
3040                Err(())
3041            }
3042            InsertResult::Failed { error } => {
3043                warn!(
3044                    checkpoint_seq = self.summary.sequence_number,
3045                    "Failed to aggregate new signature from validator {:?}: {:?}",
3046                    author.concise(),
3047                    error
3048                );
3049                self.check_for_split_brain();
3050                Err(())
3051            }
3052            InsertResult::QuorumReached(cert) => {
3053                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
3054                // the signature so we know that the author signed the message at some point.
3055                if their_digest != self.digest {
3056                    self.metrics.remote_checkpoint_forks.inc();
3057                    warn!(
3058                        checkpoint_seq = self.summary.sequence_number,
3059                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3060                        author.concise(),
3061                        their_digest,
3062                        self.digest
3063                    );
3064                    return Err(());
3065                }
3066                Ok(cert)
3067            }
3068            InsertResult::NotEnoughVotes {
3069                bad_votes: _,
3070                bad_authorities: _,
3071            } => {
3072                self.check_for_split_brain();
3073                Err(())
3074            }
3075        }
3076    }
3077
3078    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
3079    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
3080    /// irrespective of the outcome of any outstanding votes.
3081    fn check_for_split_brain(&self) {
3082        debug!(
3083            checkpoint_seq = self.summary.sequence_number,
3084            "Checking for split brain condition"
3085        );
3086        if self.signatures_by_digest.quorum_unreachable() {
3087            // TODO: at this point we should immediately halt processing
3088            // of new transaction certificates to avoid building on top of
3089            // forked output
3090            // self.halt_all_execution();
3091
3092            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3093            let digests_by_stake_messages = all_unique_values
3094                .iter()
3095                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3096                .map(|(digest, (_authorities, total_stake))| {
3097                    format!("{:?} (total stake: {})", digest, total_stake)
3098                })
3099                .collect::<Vec<String>>();
3100            fail_point_arg!("kill_split_brain_node", |(
3101                checkpoint_overrides,
3102                forked_authorities,
3103            ): (
3104                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3105                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3106            )| {
3107                #[cfg(msim)]
3108                {
3109                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3110                        (checkpoint_overrides.lock(), forked_authorities.lock())
3111                    {
3112                        // Find the digest produced by non-forked authorities
3113                        let correct_digest = all_unique_values
3114                            .iter()
3115                            .find(|(_, (authorities, _))| {
3116                                // Check if any authority that produced this digest is NOT in the forked set
3117                                authorities
3118                                    .iter()
3119                                    .any(|auth| !forked_authorities_set.contains(auth))
3120                            })
3121                            .map(|(digest, _)| digest.to_string())
3122                            .unwrap_or_else(|| {
3123                                // Fallback: use the digest with the highest stake
3124                                all_unique_values
3125                                    .iter()
3126                                    .max_by_key(|(_, (_, stake))| *stake)
3127                                    .map(|(digest, _)| digest.to_string())
3128                                    .unwrap_or_else(|| self.digest.to_string())
3129                            });
3130
3131                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
3132
3133                        tracing::error!(
3134                            fatal = true,
3135                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3136                            self.summary.sequence_number,
3137                            correct_digest
3138                        );
3139                    }
3140                }
3141            });
3142
3143            debug_fatal!(
3144                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3145                self.summary.sequence_number,
3146                self.signatures_by_digest.uncommitted_stake(),
3147                digests_by_stake_messages
3148            );
3149            self.metrics.split_brain_checkpoint_forks.inc();
3150
3151            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3152            let local_summary = self.summary.clone();
3153            let state = self.state.clone();
3154            let tables = self.store.clone();
3155
3156            tokio::spawn(async move {
3157                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3158            });
3159        }
3160    }
3161}
3162
3163/// Create data dump containing relevant data for diagnosing cause of the
3164/// split brain by querying one disagreeing validator for full checkpoint contents.
3165/// To minimize peer chatter, we only query one validator at random from each
3166/// disagreeing faction, as all honest validators that participated in this round may
3167/// inevitably run the same process.
3168async fn diagnose_split_brain(
3169    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3170    local_summary: CheckpointSummary,
3171    state: Arc<AuthorityState>,
3172    tables: Arc<CheckpointStore>,
3173) {
3174    debug!(
3175        checkpoint_seq = local_summary.sequence_number,
3176        "Running split brain diagnostics..."
3177    );
3178    let time = SystemTime::now();
3179    // collect one random disagreeing validator per differing digest
3180    let digest_to_validator = all_unique_values
3181        .iter()
3182        .filter_map(|(digest, (validators, _))| {
3183            if *digest != local_summary.digest() {
3184                let random_validator = validators.choose(&mut get_rng()).unwrap();
3185                Some((*digest, *random_validator))
3186            } else {
3187                None
3188            }
3189        })
3190        .collect::<HashMap<_, _>>();
3191    if digest_to_validator.is_empty() {
3192        panic!(
3193            "Given split brain condition, there should be at \
3194                least one validator that disagrees with local signature"
3195        );
3196    }
3197
3198    let epoch_store = state.load_epoch_store_one_call_per_task();
3199    let committee = epoch_store
3200        .epoch_start_state()
3201        .get_sui_committee_with_network_metadata();
3202    let network_config = default_mysten_network_config();
3203    let network_clients =
3204        make_network_authority_clients_with_network_config(&committee, &network_config);
3205
3206    // Query all disagreeing validators
3207    let response_futures = digest_to_validator
3208        .values()
3209        .cloned()
3210        .map(|validator| {
3211            let client = network_clients
3212                .get(&validator)
3213                .expect("Failed to get network client");
3214            let request = CheckpointRequestV2 {
3215                sequence_number: Some(local_summary.sequence_number),
3216                request_content: true,
3217                certified: false,
3218            };
3219            client.handle_checkpoint_v2(request)
3220        })
3221        .collect::<Vec<_>>();
3222
3223    let digest_name_pair = digest_to_validator.iter();
3224    let response_data = futures::future::join_all(response_futures)
3225        .await
3226        .into_iter()
3227        .zip(digest_name_pair)
3228        .filter_map(|(response, (digest, name))| match response {
3229            Ok(response) => match response {
3230                CheckpointResponseV2 {
3231                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3232                    contents: Some(contents),
3233                } => Some((*name, *digest, summary, contents)),
3234                CheckpointResponseV2 {
3235                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3236                    contents: _,
3237                } => {
3238                    panic!("Expected pending checkpoint, but got certified checkpoint");
3239                }
3240                CheckpointResponseV2 {
3241                    checkpoint: None,
3242                    contents: _,
3243                } => {
3244                    error!(
3245                        "Summary for checkpoint {:?} not found on validator {:?}",
3246                        local_summary.sequence_number, name
3247                    );
3248                    None
3249                }
3250                CheckpointResponseV2 {
3251                    checkpoint: _,
3252                    contents: None,
3253                } => {
3254                    error!(
3255                        "Contents for checkpoint {:?} not found on validator {:?}",
3256                        local_summary.sequence_number, name
3257                    );
3258                    None
3259                }
3260            },
3261            Err(e) => {
3262                error!(
3263                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3264                    e
3265                );
3266                None
3267            }
3268        })
3269        .collect::<Vec<_>>();
3270
3271    let local_checkpoint_contents = tables
3272        .get_checkpoint_contents(&local_summary.content_digest)
3273        .unwrap_or_else(|_| {
3274            panic!(
3275                "Could not find checkpoint contents for digest {:?}",
3276                local_summary.digest()
3277            )
3278        })
3279        .unwrap_or_else(|| {
3280            panic!(
3281                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3282                local_summary.sequence_number,
3283                local_summary.digest()
3284            )
3285        });
3286    let local_contents_text = format!("{local_checkpoint_contents:?}");
3287
3288    let local_summary_text = format!("{local_summary:?}");
3289    let local_validator = state.name.concise();
3290    let diff_patches = response_data
3291        .iter()
3292        .map(|(name, other_digest, other_summary, contents)| {
3293            let other_contents_text = format!("{contents:?}");
3294            let other_summary_text = format!("{other_summary:?}");
3295            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3296                .enumerate_transactions(&local_summary)
3297                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3298                .unzip();
3299            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3300                .enumerate_transactions(other_summary)
3301                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3302                .unzip();
3303            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3304            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3305            let local_transactions_text = format!("{local_transactions:#?}");
3306            let other_transactions_text = format!("{other_transactions:#?}");
3307            let transactions_patch =
3308                create_patch(&local_transactions_text, &other_transactions_text);
3309            let local_effects_text = format!("{local_effects:#?}");
3310            let other_effects_text = format!("{other_effects:#?}");
3311            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3312            let seq_number = local_summary.sequence_number;
3313            let local_digest = local_summary.digest();
3314            let other_validator = name.concise();
3315            format!(
3316                "Checkpoint: {seq_number:?}\n\
3317                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3318                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3319                Summary Diff: \n{summary_patch}\n\n\
3320                Contents Diff: \n{contents_patch}\n\n\
3321                Transactions Diff: \n{transactions_patch}\n\n\
3322                Effects Diff: \n{effects_patch}",
3323            )
3324        })
3325        .collect::<Vec<_>>()
3326        .join("\n\n\n");
3327
3328    let header = format!(
3329        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3330        Datetime: {:?}",
3331        time
3332    );
3333    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3334    let path = tempfile::tempdir()
3335        .expect("Failed to create tempdir")
3336        .keep()
3337        .join(Path::new("checkpoint_fork_dump.txt"));
3338    let mut file = File::create(path).unwrap();
3339    write!(file, "{}", fork_logs_text).unwrap();
3340    debug!("{}", fork_logs_text);
3341}
3342
3343pub trait CheckpointServiceNotify {
3344    fn notify_checkpoint_signature(
3345        &self,
3346        epoch_store: &AuthorityPerEpochStore,
3347        info: &CheckpointSignatureMessage,
3348    ) -> SuiResult;
3349
3350    fn notify_checkpoint(&self) -> SuiResult;
3351}
3352
3353#[allow(clippy::large_enum_variant)]
3354enum CheckpointServiceState {
3355    Unstarted(
3356        (
3357            CheckpointBuilder,
3358            CheckpointAggregator,
3359            CheckpointStateHasher,
3360        ),
3361    ),
3362    Started,
3363}
3364
3365impl CheckpointServiceState {
3366    fn take_unstarted(
3367        &mut self,
3368    ) -> (
3369        CheckpointBuilder,
3370        CheckpointAggregator,
3371        CheckpointStateHasher,
3372    ) {
3373        let mut state = CheckpointServiceState::Started;
3374        std::mem::swap(self, &mut state);
3375
3376        match state {
3377            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3378                (builder, aggregator, hasher)
3379            }
3380            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3381        }
3382    }
3383}
3384
3385pub struct CheckpointService {
3386    tables: Arc<CheckpointStore>,
3387    notify_builder: Arc<Notify>,
3388    notify_aggregator: Arc<Notify>,
3389    last_signature_index: Mutex<u64>,
3390    // A notification for the current highest built sequence number.
3391    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3392    // The highest sequence number that had already been built at the time CheckpointService
3393    // was constructed
3394    highest_previously_built_seq: CheckpointSequenceNumber,
3395    metrics: Arc<CheckpointMetrics>,
3396    state: Mutex<CheckpointServiceState>,
3397}
3398
3399impl CheckpointService {
3400    /// Constructs a new CheckpointService in an un-started state.
3401    pub fn build(
3402        state: Arc<AuthorityState>,
3403        checkpoint_store: Arc<CheckpointStore>,
3404        epoch_store: Arc<AuthorityPerEpochStore>,
3405        effects_store: Arc<dyn TransactionCacheRead>,
3406        global_state_hasher: Weak<GlobalStateHasher>,
3407        checkpoint_output: Box<dyn CheckpointOutput>,
3408        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3409        metrics: Arc<CheckpointMetrics>,
3410        max_transactions_per_checkpoint: usize,
3411        max_checkpoint_size_bytes: usize,
3412    ) -> Arc<Self> {
3413        info!(
3414            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3415        );
3416        let notify_builder = Arc::new(Notify::new());
3417        let notify_aggregator = Arc::new(Notify::new());
3418
3419        // We may have built higher checkpoint numbers before restarting.
3420        let highest_previously_built_seq = checkpoint_store
3421            .get_latest_locally_computed_checkpoint()
3422            .expect("failed to get latest locally computed checkpoint")
3423            .map(|s| s.sequence_number)
3424            .unwrap_or(0);
3425
3426        let highest_currently_built_seq =
3427            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3428                .expect("epoch should not have ended")
3429                .map(|(seq, _)| seq)
3430                .unwrap_or(0);
3431
3432        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3433
3434        let aggregator = CheckpointAggregator::new(
3435            checkpoint_store.clone(),
3436            epoch_store.clone(),
3437            notify_aggregator.clone(),
3438            certified_checkpoint_output,
3439            state.clone(),
3440            metrics.clone(),
3441        );
3442
3443        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3444
3445        let ckpt_state_hasher = CheckpointStateHasher::new(
3446            epoch_store.clone(),
3447            global_state_hasher.clone(),
3448            receive_from_builder,
3449        );
3450
3451        let builder = CheckpointBuilder::new(
3452            state.clone(),
3453            checkpoint_store.clone(),
3454            epoch_store.clone(),
3455            notify_builder.clone(),
3456            effects_store,
3457            global_state_hasher,
3458            send_to_hasher,
3459            checkpoint_output,
3460            notify_aggregator.clone(),
3461            highest_currently_built_seq_tx.clone(),
3462            metrics.clone(),
3463            max_transactions_per_checkpoint,
3464            max_checkpoint_size_bytes,
3465        );
3466
3467        let last_signature_index = epoch_store
3468            .get_last_checkpoint_signature_index()
3469            .expect("should not cross end of epoch");
3470        let last_signature_index = Mutex::new(last_signature_index);
3471
3472        Arc::new(Self {
3473            tables: checkpoint_store,
3474            notify_builder,
3475            notify_aggregator,
3476            last_signature_index,
3477            highest_currently_built_seq_tx,
3478            highest_previously_built_seq,
3479            metrics,
3480            state: Mutex::new(CheckpointServiceState::Unstarted((
3481                builder,
3482                aggregator,
3483                ckpt_state_hasher,
3484            ))),
3485        })
3486    }
3487
3488    /// Starts the CheckpointService.
3489    ///
3490    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3491    /// been built before the most recent restart. You can think of this as a WAL replay
3492    /// operation. Upon startup, we may have a number of consensus commits and resulting
3493    /// checkpoints that were built but not committed to disk. We want to reprocess the
3494    /// commits and rebuild the checkpoints before starting normal operation.
3495    pub async fn spawn(
3496        &self,
3497        epoch_store: Arc<AuthorityPerEpochStore>,
3498        consensus_replay_waiter: Option<ReplayWaiter>,
3499    ) {
3500        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3501
3502        // Clean up state hashes computed after the last built checkpoint
3503        // This prevents ECMH divergence after fork recovery restarts
3504
3505        // Note: there is a rare crash recovery edge case where we write the builder
3506        // summary, but crash before we can bump the highest executed checkpoint.
3507        // If we committed the builder summary, it was certified and unforked, so there
3508        // is no need to clear that state hash. If we do clear it, then checkpoint executor
3509        // will wait forever for checkpoint builder to produce the state hash, which will
3510        // never happen.
3511        let last_persisted_builder_seq = epoch_store
3512            .last_persisted_checkpoint_builder_summary()
3513            .expect("epoch should not have ended")
3514            .map(|s| s.summary.sequence_number);
3515
3516        let last_executed_seq = self
3517            .tables
3518            .get_highest_executed_checkpoint()
3519            .expect("Failed to get highest executed checkpoint")
3520            .map(|checkpoint| *checkpoint.sequence_number());
3521
3522        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3523            if let Err(e) = builder
3524                .epoch_store
3525                .clear_state_hashes_after_checkpoint(last_committed_seq)
3526            {
3527                error!(
3528                    "Failed to clear state hashes after checkpoint {}: {:?}",
3529                    last_committed_seq, e
3530                );
3531            } else {
3532                info!(
3533                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3534                    last_committed_seq
3535                );
3536            }
3537        }
3538
3539        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3540
3541        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3542        let aggregator_task = spawn_monitored_task!(aggregator.run());
3543
3544        spawn_monitored_task!(async move {
3545            epoch_store
3546                .within_alive_epoch(async move {
3547                    builder.run(consensus_replay_waiter).await;
3548                    builder_finished_tx.send(()).ok();
3549                })
3550                .await
3551                .ok();
3552
3553            // state hasher will terminate as soon as it has finished processing all messages from builder
3554            state_hasher_task
3555                .await
3556                .expect("state hasher should exit normally");
3557
3558            // builder must shut down before aggregator and state_hasher, since it sends
3559            // messages to them
3560            aggregator_task.abort();
3561            aggregator_task.await.ok();
3562        });
3563
3564        // If this times out, the validator may still start up. The worst that can
3565        // happen is that we will crash later on instead of immediately. The eventual
3566        // crash would occur because we may be missing transactions that are below the
3567        // highest_synced_checkpoint watermark, which can cause a crash in
3568        // `CheckpointExecutor::extract_randomness_rounds`.
3569        if tokio::time::timeout(Duration::from_secs(120), async move {
3570            tokio::select! {
3571                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3572                _ = self.wait_for_rebuilt_checkpoints() => (),
3573            }
3574        })
3575        .await
3576        .is_err()
3577        {
3578            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3579        }
3580    }
3581}
3582
3583impl CheckpointService {
3584    /// Waits until all checkpoints had been built before the node restarted
3585    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3586    /// (and their transactions) below the highest_synced_checkpoint watermark are
3587    /// available. Once the checkpoints are constructed, we can be sure that the
3588    /// transactions have also been executed.
3589    pub async fn wait_for_rebuilt_checkpoints(&self) {
3590        let highest_previously_built_seq = self.highest_previously_built_seq;
3591        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3592        let mut highest_currently_built_seq = *rx.borrow_and_update();
3593        info!(
3594            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3595        );
3596        loop {
3597            if highest_currently_built_seq >= highest_previously_built_seq {
3598                info!("Checkpoint rebuild complete");
3599                break;
3600            }
3601            rx.changed().await.unwrap();
3602            highest_currently_built_seq = *rx.borrow_and_update();
3603        }
3604    }
3605
3606    #[cfg(test)]
3607    fn write_and_notify_checkpoint_for_testing(
3608        &self,
3609        epoch_store: &AuthorityPerEpochStore,
3610        checkpoint: PendingCheckpoint,
3611    ) -> SuiResult {
3612        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3613
3614        let mut output = ConsensusCommitOutput::new(0);
3615        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3616        output.set_default_commit_stats_for_testing();
3617        epoch_store.push_consensus_output_for_tests(output);
3618        self.notify_checkpoint()?;
3619        Ok(())
3620    }
3621}
3622
3623impl CheckpointServiceNotify for CheckpointService {
3624    fn notify_checkpoint_signature(
3625        &self,
3626        epoch_store: &AuthorityPerEpochStore,
3627        info: &CheckpointSignatureMessage,
3628    ) -> SuiResult {
3629        let sequence = info.summary.sequence_number;
3630        let signer = info.summary.auth_sig().authority.concise();
3631
3632        if let Some(highest_verified_checkpoint) = self
3633            .tables
3634            .get_highest_verified_checkpoint()?
3635            .map(|x| *x.sequence_number())
3636            && sequence <= highest_verified_checkpoint
3637        {
3638            trace!(
3639                checkpoint_seq = sequence,
3640                "Ignore checkpoint signature from {} - already certified", signer,
3641            );
3642            self.metrics
3643                .last_ignored_checkpoint_signature_received
3644                .set(sequence as i64);
3645            return Ok(());
3646        }
3647        trace!(
3648            checkpoint_seq = sequence,
3649            "Received checkpoint signature, digest {} from {}",
3650            info.summary.digest(),
3651            signer,
3652        );
3653        self.metrics
3654            .last_received_checkpoint_signatures
3655            .with_label_values(&[&signer.to_string()])
3656            .set(sequence as i64);
3657        // While it can be tempting to make last_signature_index into AtomicU64, this won't work
3658        // We need to make sure we write to `pending_signatures` and trigger `notify_aggregator` without race conditions
3659        let mut index = self.last_signature_index.lock();
3660        *index += 1;
3661        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3662        self.notify_aggregator.notify_one();
3663        Ok(())
3664    }
3665
3666    fn notify_checkpoint(&self) -> SuiResult {
3667        self.notify_builder.notify_one();
3668        Ok(())
3669    }
3670}
3671
3672// test helper
3673pub struct CheckpointServiceNoop {}
3674impl CheckpointServiceNotify for CheckpointServiceNoop {
3675    fn notify_checkpoint_signature(
3676        &self,
3677        _: &AuthorityPerEpochStore,
3678        _: &CheckpointSignatureMessage,
3679    ) -> SuiResult {
3680        Ok(())
3681    }
3682
3683    fn notify_checkpoint(&self) -> SuiResult {
3684        Ok(())
3685    }
3686}
3687
3688impl PendingCheckpoint {
3689    pub fn height(&self) -> CheckpointHeight {
3690        self.details.checkpoint_height
3691    }
3692
3693    pub fn roots(&self) -> &Vec<TransactionKey> {
3694        &self.roots
3695    }
3696
3697    pub fn details(&self) -> &PendingCheckpointInfo {
3698        &self.details
3699    }
3700}
3701
3702impl PendingCheckpointV2 {
3703    pub fn height(&self) -> CheckpointHeight {
3704        self.details.checkpoint_height
3705    }
3706
3707    pub(crate) fn num_roots(&self) -> usize {
3708        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3709    }
3710}
3711
3712pin_project! {
3713    pub struct PollCounter<Fut> {
3714        #[pin]
3715        future: Fut,
3716        count: usize,
3717    }
3718}
3719
3720impl<Fut> PollCounter<Fut> {
3721    pub fn new(future: Fut) -> Self {
3722        Self { future, count: 0 }
3723    }
3724
3725    pub fn count(&self) -> usize {
3726        self.count
3727    }
3728}
3729
3730impl<Fut: Future> Future for PollCounter<Fut> {
3731    type Output = (usize, Fut::Output);
3732
3733    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3734        let this = self.project();
3735        *this.count += 1;
3736        match this.future.poll(cx) {
3737            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3738            Poll::Pending => Poll::Pending,
3739        }
3740    }
3741}
3742
3743fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3744    PollCounter::new(future)
3745}
3746
3747#[cfg(test)]
3748mod tests {
3749    use super::*;
3750    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3751    use crate::transaction_outputs::TransactionOutputs;
3752    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3753    use futures::FutureExt as _;
3754    use futures::future::BoxFuture;
3755    use std::collections::HashMap;
3756    use std::ops::Deref;
3757    use sui_macros::sim_test;
3758    use sui_protocol_config::{Chain, ProtocolConfig};
3759    use sui_types::accumulator_event::AccumulatorEvent;
3760    use sui_types::authenticator_state::ActiveJwk;
3761    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3762    use sui_types::crypto::Signature;
3763    use sui_types::effects::{TransactionEffects, TransactionEvents};
3764    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3765    use sui_types::transaction::VerifiedTransaction;
3766    use tokio::sync::mpsc;
3767
3768    #[tokio::test]
3769    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3770        let store = CheckpointStore::new_for_tests();
3771        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3772        for seq in 70u64..=80u64 {
3773            let contents =
3774                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3775                    [sui_types::base_types::ExecutionDigests::new(
3776                        sui_types::digests::TransactionDigest::random(),
3777                        sui_types::digests::TransactionEffectsDigest::ZERO,
3778                    )],
3779                );
3780            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3781                &protocol,
3782                0,
3783                seq,
3784                0,
3785                &contents,
3786                None,
3787                sui_types::gas::GasCostSummary::default(),
3788                None,
3789                0,
3790                Vec::new(),
3791                Vec::new(),
3792            );
3793            store
3794                .tables
3795                .locally_computed_checkpoints
3796                .insert(&seq, &summary)
3797                .unwrap();
3798        }
3799
3800        store
3801            .clear_locally_computed_checkpoints_from(76)
3802            .expect("clear should succeed");
3803
3804        // Explicit boundary checks: 75 must remain, 76 must be deleted
3805        assert!(
3806            store
3807                .tables
3808                .locally_computed_checkpoints
3809                .get(&75)
3810                .unwrap()
3811                .is_some()
3812        );
3813        assert!(
3814            store
3815                .tables
3816                .locally_computed_checkpoints
3817                .get(&76)
3818                .unwrap()
3819                .is_none()
3820        );
3821
3822        for seq in 70u64..76u64 {
3823            assert!(
3824                store
3825                    .tables
3826                    .locally_computed_checkpoints
3827                    .get(&seq)
3828                    .unwrap()
3829                    .is_some()
3830            );
3831        }
3832        for seq in 76u64..=80u64 {
3833            assert!(
3834                store
3835                    .tables
3836                    .locally_computed_checkpoints
3837                    .get(&seq)
3838                    .unwrap()
3839                    .is_none()
3840            );
3841        }
3842    }
3843
3844    #[tokio::test]
3845    async fn test_fork_detection_storage() {
3846        let store = CheckpointStore::new_for_tests();
3847        // checkpoint fork
3848        let seq_num = 42;
3849        let digest = CheckpointDigest::random();
3850
3851        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3852
3853        store
3854            .record_checkpoint_fork_detected(seq_num, digest)
3855            .unwrap();
3856
3857        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3858        assert!(retrieved.is_some());
3859        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3860        assert_eq!(retrieved_seq, seq_num);
3861        assert_eq!(retrieved_digest, digest);
3862
3863        store.clear_checkpoint_fork_detected().unwrap();
3864        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3865
3866        // txn fork
3867        let tx_digest = TransactionDigest::random();
3868        let expected_effects = TransactionEffectsDigest::random();
3869        let actual_effects = TransactionEffectsDigest::random();
3870
3871        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3872
3873        store
3874            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3875            .unwrap();
3876
3877        let retrieved = store.get_transaction_fork_detected().unwrap();
3878        assert!(retrieved.is_some());
3879        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3880        assert_eq!(retrieved_tx, tx_digest);
3881        assert_eq!(retrieved_expected, expected_effects);
3882        assert_eq!(retrieved_actual, actual_effects);
3883
3884        store.clear_transaction_fork_detected().unwrap();
3885        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3886    }
3887
3888    #[sim_test]
3889    pub async fn checkpoint_builder_test() {
3890        telemetry_subscribers::init_for_testing();
3891
3892        let mut protocol_config =
3893            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3894        protocol_config.disable_accumulators_for_testing();
3895        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3896        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3897        let state = TestAuthorityBuilder::new()
3898            .with_protocol_config(protocol_config)
3899            .build()
3900            .await;
3901
3902        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3903            0,
3904            0,
3905            vec![],
3906            SequenceNumber::new(),
3907        );
3908
3909        let jwks = {
3910            let mut jwks = Vec::new();
3911            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3912                jwks.push(ActiveJwk {
3913                    jwk_id: JwkId::new(
3914                        "https://accounts.google.com".to_string(),
3915                        "1234567890".to_string(),
3916                    ),
3917                    jwk: JWK {
3918                        kty: "RSA".to_string(),
3919                        e: "AQAB".to_string(),
3920                        n: "1234567890".to_string(),
3921                        alg: "RS256".to_string(),
3922                    },
3923                    epoch: 0,
3924                });
3925            }
3926            jwks
3927        };
3928
3929        let dummy_tx_with_data =
3930            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3931
3932        for i in 0..15 {
3933            state
3934                .database_for_testing()
3935                .perpetual_tables
3936                .transactions
3937                .insert(&d(i), dummy_tx.serializable_ref())
3938                .unwrap();
3939        }
3940        for i in 15..20 {
3941            state
3942                .database_for_testing()
3943                .perpetual_tables
3944                .transactions
3945                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3946                .unwrap();
3947        }
3948
3949        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3950        commit_cert_for_test(
3951            &mut store,
3952            state.clone(),
3953            d(1),
3954            vec![d(2), d(3)],
3955            GasCostSummary::new(11, 12, 11, 1),
3956        );
3957        commit_cert_for_test(
3958            &mut store,
3959            state.clone(),
3960            d(2),
3961            vec![d(3), d(4)],
3962            GasCostSummary::new(21, 22, 21, 1),
3963        );
3964        commit_cert_for_test(
3965            &mut store,
3966            state.clone(),
3967            d(3),
3968            vec![],
3969            GasCostSummary::new(31, 32, 31, 1),
3970        );
3971        commit_cert_for_test(
3972            &mut store,
3973            state.clone(),
3974            d(4),
3975            vec![],
3976            GasCostSummary::new(41, 42, 41, 1),
3977        );
3978        for i in [5, 6, 7, 10, 11, 12, 13] {
3979            commit_cert_for_test(
3980                &mut store,
3981                state.clone(),
3982                d(i),
3983                vec![],
3984                GasCostSummary::new(41, 42, 41, 1),
3985            );
3986        }
3987        for i in [15, 16, 17] {
3988            commit_cert_for_test(
3989                &mut store,
3990                state.clone(),
3991                d(i),
3992                vec![],
3993                GasCostSummary::new(51, 52, 51, 1),
3994            );
3995        }
3996        let all_digests: Vec<_> = store.keys().copied().collect();
3997        for digest in all_digests {
3998            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3999            state
4000                .epoch_store_for_testing()
4001                .test_insert_user_signature(digest, vec![(signature, None)]);
4002        }
4003
4004        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
4005        let (certified_output, mut certified_result) =
4006            mpsc::channel::<CertifiedCheckpointSummary>(10);
4007        let store = Arc::new(store);
4008
4009        let ckpt_dir = tempfile::tempdir().unwrap();
4010        let checkpoint_store =
4011            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
4012        let epoch_store = state.epoch_store_for_testing();
4013
4014        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
4015            state.get_global_state_hash_store().clone(),
4016        ));
4017
4018        let checkpoint_service = CheckpointService::build(
4019            state.clone(),
4020            checkpoint_store,
4021            epoch_store.clone(),
4022            store,
4023            Arc::downgrade(&global_state_hasher),
4024            Box::new(output),
4025            Box::new(certified_output),
4026            CheckpointMetrics::new_for_tests(),
4027            3,
4028            100_000,
4029        );
4030        checkpoint_service.spawn(epoch_store.clone(), None).await;
4031
4032        checkpoint_service
4033            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4034            .unwrap();
4035        checkpoint_service
4036            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4037            .unwrap();
4038        checkpoint_service
4039            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4040            .unwrap();
4041        checkpoint_service
4042            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4043            .unwrap();
4044        checkpoint_service
4045            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4046            .unwrap();
4047        checkpoint_service
4048            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4049            .unwrap();
4050
4051        let (c1c, c1s) = result.recv().await.unwrap();
4052        let (c2c, c2s) = result.recv().await.unwrap();
4053
4054        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4056        assert_eq!(c1t, vec![d(4)]);
4057        assert_eq!(c1s.previous_digest, None);
4058        assert_eq!(c1s.sequence_number, 0);
4059        assert_eq!(
4060            c1s.epoch_rolling_gas_cost_summary,
4061            GasCostSummary::new(41, 42, 41, 1)
4062        );
4063
4064        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4065        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4066        assert_eq!(c2s.sequence_number, 1);
4067        assert_eq!(
4068            c2s.epoch_rolling_gas_cost_summary,
4069            GasCostSummary::new(104, 108, 104, 4)
4070        );
4071
4072        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
4073        // Verify that we split into 2 checkpoints.
4074        let (c3c, c3s) = result.recv().await.unwrap();
4075        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4076        let (c4c, c4s) = result.recv().await.unwrap();
4077        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4078        assert_eq!(c3s.sequence_number, 2);
4079        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4080        assert_eq!(c4s.sequence_number, 3);
4081        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4082        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4083        assert_eq!(c4t, vec![d(13)]);
4084
4085        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
4086        // Verify that we split into 2 checkpoints.
4087        let (c5c, c5s) = result.recv().await.unwrap();
4088        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4089        let (c6c, c6s) = result.recv().await.unwrap();
4090        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4091        assert_eq!(c5s.sequence_number, 4);
4092        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4093        assert_eq!(c6s.sequence_number, 5);
4094        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4095        assert_eq!(c5t, vec![d(15), d(16)]);
4096        assert_eq!(c6t, vec![d(17)]);
4097
4098        // Pending at index 4 was too soon after the prior one and should be coalesced into
4099        // the next one.
4100        let (c7c, c7s) = result.recv().await.unwrap();
4101        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4102        assert_eq!(c7t, vec![d(5), d(6)]);
4103        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4104        assert_eq!(c7s.sequence_number, 6);
4105
4106        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4107        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4108
4109        checkpoint_service
4110            .notify_checkpoint_signature(
4111                &epoch_store,
4112                &CheckpointSignatureMessage { summary: c2ss },
4113            )
4114            .unwrap();
4115        checkpoint_service
4116            .notify_checkpoint_signature(
4117                &epoch_store,
4118                &CheckpointSignatureMessage { summary: c1ss },
4119            )
4120            .unwrap();
4121
4122        let c1sc = certified_result.recv().await.unwrap();
4123        let c2sc = certified_result.recv().await.unwrap();
4124        assert_eq!(c1sc.sequence_number, 0);
4125        assert_eq!(c2sc.sequence_number, 1);
4126    }
4127
4128    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4129        fn notify_read_executed_effects(
4130            &self,
4131            _: &str,
4132            digests: &[TransactionDigest],
4133        ) -> BoxFuture<'_, Vec<TransactionEffects>> {
4134            std::future::ready(
4135                digests
4136                    .iter()
4137                    .map(|d| self.get(d).expect("effects not found").clone())
4138                    .collect(),
4139            )
4140            .boxed()
4141        }
4142
4143        fn notify_read_executed_effects_digests(
4144            &self,
4145            _: &str,
4146            digests: &[TransactionDigest],
4147        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4148            std::future::ready(
4149                digests
4150                    .iter()
4151                    .map(|d| {
4152                        self.get(d)
4153                            .map(|fx| fx.digest())
4154                            .expect("effects not found")
4155                    })
4156                    .collect(),
4157            )
4158            .boxed()
4159        }
4160
4161        fn multi_get_executed_effects(
4162            &self,
4163            digests: &[TransactionDigest],
4164        ) -> Vec<Option<TransactionEffects>> {
4165            digests.iter().map(|d| self.get(d).cloned()).collect()
4166        }
4167
4168        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
4169        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
4170        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
4171        // ExecutionCacheRead implementors).
4172
4173        fn multi_get_transaction_blocks(
4174            &self,
4175            _: &[TransactionDigest],
4176        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4177            unimplemented!()
4178        }
4179
4180        fn multi_get_executed_effects_digests(
4181            &self,
4182            _: &[TransactionDigest],
4183        ) -> Vec<Option<TransactionEffectsDigest>> {
4184            unimplemented!()
4185        }
4186
4187        fn multi_get_effects(
4188            &self,
4189            _: &[TransactionEffectsDigest],
4190        ) -> Vec<Option<TransactionEffects>> {
4191            unimplemented!()
4192        }
4193
4194        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4195            unimplemented!()
4196        }
4197
4198        fn get_mysticeti_fastpath_outputs(
4199            &self,
4200            _: &TransactionDigest,
4201        ) -> Option<Arc<TransactionOutputs>> {
4202            unimplemented!()
4203        }
4204
4205        fn notify_read_fastpath_transaction_outputs<'a>(
4206            &'a self,
4207            _: &'a [TransactionDigest],
4208        ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
4209            unimplemented!()
4210        }
4211
4212        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4213            unimplemented!()
4214        }
4215
4216        fn get_unchanged_loaded_runtime_objects(
4217            &self,
4218            _digest: &TransactionDigest,
4219        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4220            unimplemented!()
4221        }
4222
4223        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4224            unimplemented!()
4225        }
4226    }
4227
4228    #[async_trait::async_trait]
4229    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4230        async fn checkpoint_created(
4231            &self,
4232            summary: &CheckpointSummary,
4233            contents: &CheckpointContents,
4234            _epoch_store: &Arc<AuthorityPerEpochStore>,
4235            _checkpoint_store: &Arc<CheckpointStore>,
4236        ) -> SuiResult {
4237            self.try_send((contents.clone(), summary.clone())).unwrap();
4238            Ok(())
4239        }
4240    }
4241
4242    #[async_trait::async_trait]
4243    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4244        async fn certified_checkpoint_created(
4245            &self,
4246            summary: &CertifiedCheckpointSummary,
4247        ) -> SuiResult {
4248            self.try_send(summary.clone()).unwrap();
4249            Ok(())
4250        }
4251    }
4252
4253    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4254        PendingCheckpoint {
4255            roots: t
4256                .into_iter()
4257                .map(|t| TransactionKey::Digest(d(t)))
4258                .collect(),
4259            details: PendingCheckpointInfo {
4260                timestamp_ms,
4261                last_of_epoch: false,
4262                checkpoint_height: i,
4263                consensus_commit_ref: CommitRef::default(),
4264                rejected_transactions_digest: Digest::default(),
4265            },
4266        }
4267    }
4268
4269    fn d(i: u8) -> TransactionDigest {
4270        let mut bytes: [u8; 32] = Default::default();
4271        bytes[0] = i;
4272        TransactionDigest::new(bytes)
4273    }
4274
4275    fn e(
4276        transaction_digest: TransactionDigest,
4277        dependencies: Vec<TransactionDigest>,
4278        gas_used: GasCostSummary,
4279    ) -> TransactionEffects {
4280        let mut effects = TransactionEffects::default();
4281        *effects.transaction_digest_mut_for_testing() = transaction_digest;
4282        *effects.dependencies_mut_for_testing() = dependencies;
4283        *effects.gas_cost_summary_mut_for_testing() = gas_used;
4284        effects
4285    }
4286
4287    fn commit_cert_for_test(
4288        store: &mut HashMap<TransactionDigest, TransactionEffects>,
4289        state: Arc<AuthorityState>,
4290        digest: TransactionDigest,
4291        dependencies: Vec<TransactionDigest>,
4292        gas_used: GasCostSummary,
4293    ) {
4294        let epoch_store = state.epoch_store_for_testing();
4295        let effects = e(digest, dependencies, gas_used);
4296        store.insert(digest, effects.clone());
4297        epoch_store.insert_executed_in_epoch(&digest);
4298    }
4299}