sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120    pub roots: Vec<TransactionKey>,
121    pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125pub struct BuilderCheckpointSummary {
126    pub summary: CheckpointSummary,
127    // Height at which this checkpoint summary was built. None for genesis checkpoint
128    pub checkpoint_height: Option<CheckpointHeight>,
129    pub position_in_commit: usize,
130}
131
132#[derive(DBMapUtils)]
133#[cfg_attr(tidehunter, tidehunter)]
134pub struct CheckpointStoreTables {
135    /// Maps checkpoint contents digest to checkpoint contents
136    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
137
138    /// Maps checkpoint contents digest to checkpoint sequence number
139    pub(crate) checkpoint_sequence_by_contents_digest:
140        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
141
142    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
143    /// efficient reads of full checkpoints. Entries from this table are deleted after state
144    /// accumulation has completed.
145    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
146    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
147    // deprecate this table (and remove when possible).
148    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
149    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
150    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
151
152    /// Stores certified checkpoints
153    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
154    /// Map from checkpoint digest to certified checkpoint
155    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
156
157    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
158    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
159    /// certified checkpoint.
160    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
161
162    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
163    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
164
165    /// Watermarks used to determine the highest verified, fully synced, and
166    /// fully executed checkpoints
167    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
168
169    /// Stores transaction fork detection information
170    pub(crate) transaction_fork_detected: DBMap<
171        u8,
172        (
173            TransactionDigest,
174            TransactionEffectsDigest,
175            TransactionEffectsDigest,
176        ),
177    >,
178}
179
180fn full_checkpoint_content_table_default_config() -> DBOptions {
181    DBOptions {
182        options: default_db_options().options,
183        // We have seen potential data corruption issues in this table after forced shutdowns
184        // so we enable value hash logging to help with debugging.
185        // TODO: remove this once we have a better understanding of the root cause.
186        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
187    }
188}
189
190impl CheckpointStoreTables {
191    #[cfg(not(tidehunter))]
192    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
193        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
194    }
195
196    #[cfg(tidehunter)]
197    pub fn new(
198        path: &Path,
199        metric_name: &'static str,
200        pruner_watermarks: Arc<PrunerWatermarks>,
201    ) -> Self {
202        tracing::warn!("Checkpoint DB using tidehunter");
203        use crate::authority::authority_store_pruner::apply_relocation_filter;
204        use typed_store::tidehunter_util::{
205            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
206            default_mutex_count, default_value_cache_size,
207        };
208        let mutexes = default_mutex_count() * 4;
209        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
210        let override_dirty_keys_config = KeySpaceConfig::new()
211            .with_max_dirty_keys(64_000)
212            .with_value_cache_size(default_value_cache_size());
213        let config_u64 = ThConfig::new_with_config(
214            8,
215            mutexes,
216            u64_sequence_key,
217            override_dirty_keys_config.clone(),
218        );
219        let digest_config = ThConfig::new_with_rm_prefix(
220            32,
221            mutexes,
222            KeyType::uniform(default_cells_per_mutex()),
223            KeySpaceConfig::default(),
224            vec![0, 0, 0, 0, 0, 0, 0, 32],
225        );
226        let watermarks_config = KeySpaceConfig::new()
227            .with_value_cache_size(10)
228            .disable_unload();
229        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
230        let configs = vec![
231            (
232                "checkpoint_content",
233                digest_config.clone().with_config(
234                    lru_config
235                        .clone()
236                        .with_relocation_filter(|_, _| Decision::Remove),
237                ),
238            ),
239            (
240                "checkpoint_sequence_by_contents_digest",
241                digest_config.clone().with_config(apply_relocation_filter(
242                    KeySpaceConfig::default(),
243                    pruner_watermarks.checkpoint_id.clone(),
244                    |sequence_number: CheckpointSequenceNumber| sequence_number,
245                    false,
246                )),
247            ),
248            (
249                "full_checkpoint_content",
250                config_u64.clone().with_config(apply_relocation_filter(
251                    override_dirty_keys_config.clone(),
252                    pruner_watermarks.checkpoint_id.clone(),
253                    |sequence_number: CheckpointSequenceNumber| sequence_number,
254                    true,
255                )),
256            ),
257            ("certified_checkpoints", config_u64.clone()),
258            (
259                "checkpoint_by_digest",
260                digest_config.clone().with_config(apply_relocation_filter(
261                    lru_config,
262                    pruner_watermarks.epoch_id.clone(),
263                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
264                    false,
265                )),
266            ),
267            (
268                "locally_computed_checkpoints",
269                config_u64.clone().with_config(apply_relocation_filter(
270                    override_dirty_keys_config.clone(),
271                    pruner_watermarks.checkpoint_id.clone(),
272                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
273                    true,
274                )),
275            ),
276            ("epoch_last_checkpoint_map", config_u64.clone()),
277            (
278                "watermarks",
279                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
280            ),
281            (
282                "transaction_fork_detected",
283                ThConfig::new_with_config(
284                    1,
285                    1,
286                    KeyType::uniform(1),
287                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
288                ),
289            ),
290            (
291                "full_checkpoint_content_v2",
292                config_u64.clone().with_config(apply_relocation_filter(
293                    override_dirty_keys_config.clone(),
294                    pruner_watermarks.checkpoint_id.clone(),
295                    |sequence_number: CheckpointSequenceNumber| sequence_number,
296                    true,
297                )),
298            ),
299        ];
300        Self::open_tables_read_write(
301            path.to_path_buf(),
302            MetricConf::new(metric_name),
303            configs
304                .into_iter()
305                .map(|(cf, config)| (cf.to_string(), config))
306                .collect(),
307        )
308    }
309
310    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
311        Self::get_read_only_handle(
312            path.to_path_buf(),
313            None,
314            None,
315            MetricConf::new("checkpoint_readonly"),
316        )
317    }
318}
319
320pub struct CheckpointStore {
321    pub(crate) tables: CheckpointStoreTables,
322    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
323    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
324}
325
326impl CheckpointStore {
327    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
328        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
329        Arc::new(Self {
330            tables,
331            synced_checkpoint_notify_read: NotifyRead::new(),
332            executed_checkpoint_notify_read: NotifyRead::new(),
333        })
334    }
335
336    pub fn new_for_tests() -> Arc<Self> {
337        let ckpt_dir = mysten_common::tempdir().unwrap();
338        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
339    }
340
341    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
342        let tables = CheckpointStoreTables::new(
343            path,
344            "db_checkpoint",
345            Arc::new(PrunerWatermarks::default()),
346        );
347        Arc::new(Self {
348            tables,
349            synced_checkpoint_notify_read: NotifyRead::new(),
350            executed_checkpoint_notify_read: NotifyRead::new(),
351        })
352    }
353
354    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
355        CheckpointStoreTables::open_readonly(path)
356    }
357
358    #[instrument(level = "info", skip_all)]
359    pub fn insert_genesis_checkpoint(
360        &self,
361        checkpoint: VerifiedCheckpoint,
362        contents: CheckpointContents,
363        epoch_store: &AuthorityPerEpochStore,
364    ) {
365        assert_eq!(
366            checkpoint.epoch(),
367            0,
368            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
369        );
370        assert_eq!(
371            *checkpoint.sequence_number(),
372            0,
373            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
374        );
375
376        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
377        match self.get_checkpoint_by_sequence_number(0).unwrap() {
378            Some(existing_checkpoint) => {
379                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
380            }
381            None => {
382                if epoch_store.epoch() == checkpoint.epoch {
383                    epoch_store
384                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
385                        .unwrap();
386                } else {
387                    debug!(
388                        validator_epoch =% epoch_store.epoch(),
389                        genesis_epoch =% checkpoint.epoch(),
390                        "Not inserting checkpoint builder data for genesis checkpoint",
391                    );
392                }
393                self.insert_checkpoint_contents(contents).unwrap();
394                self.insert_verified_checkpoint(&checkpoint).unwrap();
395                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
396            }
397        }
398    }
399
400    pub fn get_checkpoint_by_digest(
401        &self,
402        digest: &CheckpointDigest,
403    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
404        self.tables
405            .checkpoint_by_digest
406            .get(digest)
407            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
408    }
409
410    pub fn get_checkpoint_by_sequence_number(
411        &self,
412        sequence_number: CheckpointSequenceNumber,
413    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
414        self.tables
415            .certified_checkpoints
416            .get(&sequence_number)
417            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
418    }
419
420    pub fn get_locally_computed_checkpoint(
421        &self,
422        sequence_number: CheckpointSequenceNumber,
423    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
424        self.tables
425            .locally_computed_checkpoints
426            .get(&sequence_number)
427    }
428
429    pub fn multi_get_locally_computed_checkpoints(
430        &self,
431        sequence_numbers: &[CheckpointSequenceNumber],
432    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
433        let checkpoints = self
434            .tables
435            .locally_computed_checkpoints
436            .multi_get(sequence_numbers)?;
437
438        Ok(checkpoints)
439    }
440
441    pub fn get_sequence_number_by_contents_digest(
442        &self,
443        digest: &CheckpointContentsDigest,
444    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
445        self.tables
446            .checkpoint_sequence_by_contents_digest
447            .get(digest)
448    }
449
450    pub fn delete_contents_digest_sequence_number_mapping(
451        &self,
452        digest: &CheckpointContentsDigest,
453    ) -> Result<(), TypedStoreError> {
454        self.tables
455            .checkpoint_sequence_by_contents_digest
456            .remove(digest)
457    }
458
459    pub fn get_latest_certified_checkpoint(
460        &self,
461    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
462        Ok(self
463            .tables
464            .certified_checkpoints
465            .reversed_safe_iter_with_bounds(None, None)?
466            .next()
467            .transpose()?
468            .map(|(_, v)| v.into()))
469    }
470
471    pub fn get_latest_locally_computed_checkpoint(
472        &self,
473    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
474        Ok(self
475            .tables
476            .locally_computed_checkpoints
477            .reversed_safe_iter_with_bounds(None, None)?
478            .next()
479            .transpose()?
480            .map(|(_, v)| v))
481    }
482
483    pub fn multi_get_checkpoint_by_sequence_number(
484        &self,
485        sequence_numbers: &[CheckpointSequenceNumber],
486    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
487        let checkpoints = self
488            .tables
489            .certified_checkpoints
490            .multi_get(sequence_numbers)?
491            .into_iter()
492            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
493            .collect();
494
495        Ok(checkpoints)
496    }
497
498    pub fn multi_get_checkpoint_content(
499        &self,
500        contents_digest: &[CheckpointContentsDigest],
501    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
502        self.tables.checkpoint_content.multi_get(contents_digest)
503    }
504
505    pub fn get_highest_verified_checkpoint(
506        &self,
507    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
508        let highest_verified = if let Some(highest_verified) = self
509            .tables
510            .watermarks
511            .get(&CheckpointWatermark::HighestVerified)?
512        {
513            highest_verified
514        } else {
515            return Ok(None);
516        };
517        self.get_checkpoint_by_digest(&highest_verified.1)
518    }
519
520    pub fn get_highest_synced_checkpoint(
521        &self,
522    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
523        let highest_synced = if let Some(highest_synced) = self
524            .tables
525            .watermarks
526            .get(&CheckpointWatermark::HighestSynced)?
527        {
528            highest_synced
529        } else {
530            return Ok(None);
531        };
532        self.get_checkpoint_by_digest(&highest_synced.1)
533    }
534
535    pub fn get_highest_synced_checkpoint_seq_number(
536        &self,
537    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
538        if let Some(highest_synced) = self
539            .tables
540            .watermarks
541            .get(&CheckpointWatermark::HighestSynced)?
542        {
543            Ok(Some(highest_synced.0))
544        } else {
545            Ok(None)
546        }
547    }
548
549    pub fn get_highest_executed_checkpoint_seq_number(
550        &self,
551    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
552        if let Some(highest_executed) = self
553            .tables
554            .watermarks
555            .get(&CheckpointWatermark::HighestExecuted)?
556        {
557            Ok(Some(highest_executed.0))
558        } else {
559            Ok(None)
560        }
561    }
562
563    pub fn get_highest_executed_checkpoint(
564        &self,
565    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
566        let highest_executed = if let Some(highest_executed) = self
567            .tables
568            .watermarks
569            .get(&CheckpointWatermark::HighestExecuted)?
570        {
571            highest_executed
572        } else {
573            return Ok(None);
574        };
575        self.get_checkpoint_by_digest(&highest_executed.1)
576    }
577
578    pub fn get_highest_pruned_checkpoint_seq_number(
579        &self,
580    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
581        self.tables
582            .watermarks
583            .get(&CheckpointWatermark::HighestPruned)
584            .map(|watermark| watermark.map(|w| w.0))
585    }
586
587    pub fn get_checkpoint_contents(
588        &self,
589        digest: &CheckpointContentsDigest,
590    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
591        self.tables.checkpoint_content.get(digest)
592    }
593
594    pub fn get_full_checkpoint_contents_by_sequence_number(
595        &self,
596        seq: CheckpointSequenceNumber,
597    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
598        self.tables.full_checkpoint_content_v2.get(&seq)
599    }
600
601    fn prune_local_summaries(&self) -> SuiResult {
602        if let Some((last_local_summary, _)) = self
603            .tables
604            .locally_computed_checkpoints
605            .reversed_safe_iter_with_bounds(None, None)?
606            .next()
607            .transpose()?
608        {
609            let mut batch = self.tables.locally_computed_checkpoints.batch();
610            batch.schedule_delete_range(
611                &self.tables.locally_computed_checkpoints,
612                &0,
613                &last_local_summary,
614            )?;
615            batch.write()?;
616            info!("Pruned local summaries up to {:?}", last_local_summary);
617        }
618        Ok(())
619    }
620
621    pub fn clear_locally_computed_checkpoints_from(
622        &self,
623        from_seq: CheckpointSequenceNumber,
624    ) -> SuiResult {
625        let keys: Vec<_> = self
626            .tables
627            .locally_computed_checkpoints
628            .safe_iter_with_bounds(Some(from_seq), None)
629            .map(|r| r.map(|(k, _)| k))
630            .collect::<Result<_, _>>()?;
631        if let Some(&last_local_summary) = keys.last() {
632            let mut batch = self.tables.locally_computed_checkpoints.batch();
633            batch
634                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
635                .expect("Failed to delete locally computed checkpoints");
636            batch
637                .write()
638                .expect("Failed to delete locally computed checkpoints");
639            warn!(
640                from_seq,
641                last_local_summary,
642                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
643                from_seq,
644                last_local_summary
645            );
646        }
647        Ok(())
648    }
649
650    fn check_for_checkpoint_fork(
651        &self,
652        local_checkpoint: &CheckpointSummary,
653        verified_checkpoint: &VerifiedCheckpoint,
654    ) {
655        if local_checkpoint != verified_checkpoint.data() {
656            let verified_contents = self
657                .get_checkpoint_contents(&verified_checkpoint.content_digest)
658                .map(|opt_contents| {
659                    opt_contents
660                        .map(|contents| format!("{:?}", contents))
661                        .unwrap_or_else(|| {
662                            format!(
663                                "Verified checkpoint contents not found, digest: {:?}",
664                                verified_checkpoint.content_digest,
665                            )
666                        })
667                })
668                .map_err(|e| {
669                    format!(
670                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
671                        verified_checkpoint.content_digest, e
672                    )
673                })
674                .unwrap_or_else(|err_msg| err_msg);
675
676            let local_contents = self
677                .get_checkpoint_contents(&local_checkpoint.content_digest)
678                .map(|opt_contents| {
679                    opt_contents
680                        .map(|contents| format!("{:?}", contents))
681                        .unwrap_or_else(|| {
682                            format!(
683                                "Local checkpoint contents not found, digest: {:?}",
684                                local_checkpoint.content_digest
685                            )
686                        })
687                })
688                .map_err(|e| {
689                    format!(
690                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
691                        local_checkpoint.content_digest, e
692                    )
693                })
694                .unwrap_or_else(|err_msg| err_msg);
695
696            // checkpoint contents may be too large for panic message.
697            error!(
698                verified_checkpoint = ?verified_checkpoint.data(),
699                ?verified_contents,
700                ?local_checkpoint,
701                ?local_contents,
702                "Local checkpoint fork detected!",
703            );
704
705            // Record the fork in the database before crashing
706            if let Err(e) = self.record_checkpoint_fork_detected(
707                *local_checkpoint.sequence_number(),
708                local_checkpoint.digest(),
709            ) {
710                error!("Failed to record checkpoint fork in database: {:?}", e);
711            }
712
713            fail_point_arg!(
714                "kill_checkpoint_fork_node",
715                |checkpoint_overrides: std::sync::Arc<
716                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
717                >| {
718                    #[cfg(msim)]
719                    {
720                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
721                            overrides.insert(
722                                local_checkpoint.sequence_number,
723                                verified_checkpoint.digest().to_string(),
724                            );
725                        }
726                        tracing::error!(
727                            fatal = true,
728                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
729                            local_checkpoint.sequence_number(),
730                            verified_checkpoint.digest()
731                        );
732                        sui_simulator::task::shutdown_current_node();
733                    }
734                }
735            );
736
737            fatal!(
738                "Local checkpoint fork detected for sequence number: {}",
739                local_checkpoint.sequence_number()
740            );
741        }
742    }
743
744    // Called by consensus (ConsensusAggregator).
745    // Different from `insert_verified_checkpoint`, it does not touch
746    // the highest_verified_checkpoint watermark such that state sync
747    // will have a chance to process this checkpoint and perform some
748    // state-sync only things.
749    pub fn insert_certified_checkpoint(
750        &self,
751        checkpoint: &VerifiedCheckpoint,
752    ) -> Result<(), TypedStoreError> {
753        debug!(
754            checkpoint_seq = checkpoint.sequence_number(),
755            "Inserting certified checkpoint",
756        );
757        let mut batch = self.tables.certified_checkpoints.batch();
758        batch
759            .insert_batch(
760                &self.tables.certified_checkpoints,
761                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
762            )?
763            .insert_batch(
764                &self.tables.checkpoint_by_digest,
765                [(checkpoint.digest(), checkpoint.serializable_ref())],
766            )?;
767        if checkpoint.next_epoch_committee().is_some() {
768            batch.insert_batch(
769                &self.tables.epoch_last_checkpoint_map,
770                [(&checkpoint.epoch(), checkpoint.sequence_number())],
771            )?;
772        }
773        batch.write()?;
774
775        if let Some(local_checkpoint) = self
776            .tables
777            .locally_computed_checkpoints
778            .get(checkpoint.sequence_number())?
779        {
780            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
781        }
782
783        Ok(())
784    }
785
786    // Called by state sync, apart from inserting the checkpoint and updating
787    // related tables, it also bumps the highest_verified_checkpoint watermark.
788    #[instrument(level = "debug", skip_all)]
789    pub fn insert_verified_checkpoint(
790        &self,
791        checkpoint: &VerifiedCheckpoint,
792    ) -> Result<(), TypedStoreError> {
793        self.insert_certified_checkpoint(checkpoint)?;
794        self.update_highest_verified_checkpoint(checkpoint)
795    }
796
797    pub fn update_highest_verified_checkpoint(
798        &self,
799        checkpoint: &VerifiedCheckpoint,
800    ) -> Result<(), TypedStoreError> {
801        if Some(*checkpoint.sequence_number())
802            > self
803                .get_highest_verified_checkpoint()?
804                .map(|x| *x.sequence_number())
805        {
806            debug!(
807                checkpoint_seq = checkpoint.sequence_number(),
808                "Updating highest verified checkpoint",
809            );
810            self.tables.watermarks.insert(
811                &CheckpointWatermark::HighestVerified,
812                &(*checkpoint.sequence_number(), *checkpoint.digest()),
813            )?;
814        }
815
816        Ok(())
817    }
818
819    pub fn update_highest_synced_checkpoint(
820        &self,
821        checkpoint: &VerifiedCheckpoint,
822    ) -> Result<(), TypedStoreError> {
823        let seq = *checkpoint.sequence_number();
824        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
825        self.tables.watermarks.insert(
826            &CheckpointWatermark::HighestSynced,
827            &(seq, *checkpoint.digest()),
828        )?;
829        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
830        Ok(())
831    }
832
833    async fn notify_read_checkpoint_watermark<F>(
834        &self,
835        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
836        seq: CheckpointSequenceNumber,
837        get_watermark: F,
838    ) -> VerifiedCheckpoint
839    where
840        F: Fn() -> Option<CheckpointSequenceNumber>,
841    {
842        notify_read
843            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
844                let seq = seqs[0];
845                let Some(highest) = get_watermark() else {
846                    return vec![None];
847                };
848                if highest < seq {
849                    return vec![None];
850                }
851                let checkpoint = self
852                    .get_checkpoint_by_sequence_number(seq)
853                    .expect("db error")
854                    .expect("checkpoint not found");
855                vec![Some(checkpoint)]
856            })
857            .await
858            .into_iter()
859            .next()
860            .unwrap()
861    }
862
863    pub async fn notify_read_synced_checkpoint(
864        &self,
865        seq: CheckpointSequenceNumber,
866    ) -> VerifiedCheckpoint {
867        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
868            self.get_highest_synced_checkpoint_seq_number()
869                .expect("db error")
870        })
871        .await
872    }
873
874    pub async fn notify_read_executed_checkpoint(
875        &self,
876        seq: CheckpointSequenceNumber,
877    ) -> VerifiedCheckpoint {
878        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
879            self.get_highest_executed_checkpoint_seq_number()
880                .expect("db error")
881        })
882        .await
883    }
884
885    pub fn update_highest_executed_checkpoint(
886        &self,
887        checkpoint: &VerifiedCheckpoint,
888    ) -> Result<(), TypedStoreError> {
889        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
890            if seq_number >= *checkpoint.sequence_number() {
891                return Ok(());
892            }
893            assert_eq!(
894                seq_number + 1,
895                *checkpoint.sequence_number(),
896                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
897                checkpoint.sequence_number(),
898                seq_number
899            );
900        }
901        let seq = *checkpoint.sequence_number();
902        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
903        self.tables.watermarks.insert(
904            &CheckpointWatermark::HighestExecuted,
905            &(seq, *checkpoint.digest()),
906        )?;
907        self.executed_checkpoint_notify_read
908            .notify(&seq, checkpoint);
909        Ok(())
910    }
911
912    pub fn update_highest_pruned_checkpoint(
913        &self,
914        checkpoint: &VerifiedCheckpoint,
915    ) -> Result<(), TypedStoreError> {
916        self.tables.watermarks.insert(
917            &CheckpointWatermark::HighestPruned,
918            &(*checkpoint.sequence_number(), *checkpoint.digest()),
919        )
920    }
921
922    /// Sets highest executed checkpoint to any value.
923    ///
924    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
925    /// It should only be used in one-off cases or tests after fully understanding the risk.
926    pub fn set_highest_executed_checkpoint_subtle(
927        &self,
928        checkpoint: &VerifiedCheckpoint,
929    ) -> Result<(), TypedStoreError> {
930        self.tables.watermarks.insert(
931            &CheckpointWatermark::HighestExecuted,
932            &(*checkpoint.sequence_number(), *checkpoint.digest()),
933        )
934    }
935
936    pub fn insert_checkpoint_contents(
937        &self,
938        contents: CheckpointContents,
939    ) -> Result<(), TypedStoreError> {
940        debug!(
941            checkpoint_seq = ?contents.digest(),
942            "Inserting checkpoint contents",
943        );
944        self.tables
945            .checkpoint_content
946            .insert(contents.digest(), &contents)
947    }
948
949    pub fn insert_verified_checkpoint_contents(
950        &self,
951        checkpoint: &VerifiedCheckpoint,
952        full_contents: VerifiedCheckpointContents,
953    ) -> Result<(), TypedStoreError> {
954        let mut batch = self.tables.full_checkpoint_content_v2.batch();
955        batch.insert_batch(
956            &self.tables.checkpoint_sequence_by_contents_digest,
957            [(&checkpoint.content_digest, checkpoint.sequence_number())],
958        )?;
959        let full_contents = full_contents.into_inner();
960        batch.insert_batch(
961            &self.tables.full_checkpoint_content_v2,
962            [(checkpoint.sequence_number(), &full_contents)],
963        )?;
964
965        let contents = full_contents.into_checkpoint_contents();
966        assert_eq!(&checkpoint.content_digest, contents.digest());
967
968        batch.insert_batch(
969            &self.tables.checkpoint_content,
970            [(contents.digest(), &contents)],
971        )?;
972
973        batch.write()
974    }
975
976    pub fn delete_full_checkpoint_contents(
977        &self,
978        seq: CheckpointSequenceNumber,
979    ) -> Result<(), TypedStoreError> {
980        self.tables.full_checkpoint_content.remove(&seq)?;
981        self.tables.full_checkpoint_content_v2.remove(&seq)
982    }
983
984    pub fn get_epoch_last_checkpoint(
985        &self,
986        epoch_id: EpochId,
987    ) -> SuiResult<Option<VerifiedCheckpoint>> {
988        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
989        let checkpoint = match seq {
990            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
991            None => None,
992        };
993        Ok(checkpoint)
994    }
995
996    pub fn get_epoch_last_checkpoint_seq_number(
997        &self,
998        epoch_id: EpochId,
999    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1000        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1001        Ok(seq)
1002    }
1003
1004    pub fn insert_epoch_last_checkpoint(
1005        &self,
1006        epoch_id: EpochId,
1007        checkpoint: &VerifiedCheckpoint,
1008    ) -> SuiResult {
1009        self.tables
1010            .epoch_last_checkpoint_map
1011            .insert(&epoch_id, checkpoint.sequence_number())?;
1012        Ok(())
1013    }
1014
1015    pub fn get_epoch_state_commitments(
1016        &self,
1017        epoch: EpochId,
1018    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1019        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1020            checkpoint
1021                .end_of_epoch_data
1022                .as_ref()
1023                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1024                .epoch_commitments
1025                .clone()
1026        });
1027        Ok(commitments)
1028    }
1029
1030    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1031    pub fn get_epoch_stats(
1032        &self,
1033        epoch: EpochId,
1034        last_checkpoint: &CheckpointSummary,
1035    ) -> Option<EpochStats> {
1036        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1037            (0, 0)
1038        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1039            (
1040                checkpoint.sequence_number + 1,
1041                checkpoint.network_total_transactions,
1042            )
1043        } else {
1044            return None;
1045        };
1046        Some(EpochStats {
1047            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1048            transaction_count: last_checkpoint.network_total_transactions
1049                - prev_epoch_network_transactions,
1050            total_gas_reward: last_checkpoint
1051                .epoch_rolling_gas_cost_summary
1052                .computation_cost,
1053        })
1054    }
1055
1056    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1057        // This checkpoints the entire db and not one column family
1058        self.tables
1059            .checkpoint_content
1060            .checkpoint_db(path)
1061            .map_err(Into::into)
1062    }
1063
1064    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1065        let mut wb = self.tables.watermarks.batch();
1066        wb.delete_batch(
1067            &self.tables.watermarks,
1068            std::iter::once(CheckpointWatermark::HighestExecuted),
1069        )?;
1070        wb.write()?;
1071        Ok(())
1072    }
1073
1074    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1075        self.delete_highest_executed_checkpoint_test_only()?;
1076        Ok(())
1077    }
1078
1079    pub fn record_checkpoint_fork_detected(
1080        &self,
1081        checkpoint_seq: CheckpointSequenceNumber,
1082        checkpoint_digest: CheckpointDigest,
1083    ) -> Result<(), TypedStoreError> {
1084        info!(
1085            checkpoint_seq = checkpoint_seq,
1086            checkpoint_digest = ?checkpoint_digest,
1087            "Recording checkpoint fork detection in database"
1088        );
1089        self.tables.watermarks.insert(
1090            &CheckpointWatermark::CheckpointForkDetected,
1091            &(checkpoint_seq, checkpoint_digest),
1092        )
1093    }
1094
1095    pub fn get_checkpoint_fork_detected(
1096        &self,
1097    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1098        self.tables
1099            .watermarks
1100            .get(&CheckpointWatermark::CheckpointForkDetected)
1101    }
1102
1103    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1104        self.tables
1105            .watermarks
1106            .remove(&CheckpointWatermark::CheckpointForkDetected)
1107    }
1108
1109    pub fn record_transaction_fork_detected(
1110        &self,
1111        tx_digest: TransactionDigest,
1112        expected_effects_digest: TransactionEffectsDigest,
1113        actual_effects_digest: TransactionEffectsDigest,
1114    ) -> Result<(), TypedStoreError> {
1115        info!(
1116            tx_digest = ?tx_digest,
1117            expected_effects_digest = ?expected_effects_digest,
1118            actual_effects_digest = ?actual_effects_digest,
1119            "Recording transaction fork detection in database"
1120        );
1121        self.tables.transaction_fork_detected.insert(
1122            &TRANSACTION_FORK_DETECTED_KEY,
1123            &(tx_digest, expected_effects_digest, actual_effects_digest),
1124        )
1125    }
1126
1127    pub fn get_transaction_fork_detected(
1128        &self,
1129    ) -> Result<
1130        Option<(
1131            TransactionDigest,
1132            TransactionEffectsDigest,
1133            TransactionEffectsDigest,
1134        )>,
1135        TypedStoreError,
1136    > {
1137        self.tables
1138            .transaction_fork_detected
1139            .get(&TRANSACTION_FORK_DETECTED_KEY)
1140    }
1141
1142    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1143        self.tables
1144            .transaction_fork_detected
1145            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1146    }
1147}
1148
1149#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1150pub enum CheckpointWatermark {
1151    HighestVerified,
1152    HighestSynced,
1153    HighestExecuted,
1154    HighestPruned,
1155    CheckpointForkDetected,
1156}
1157
1158struct CheckpointStateHasher {
1159    epoch_store: Arc<AuthorityPerEpochStore>,
1160    hasher: Weak<GlobalStateHasher>,
1161    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1162}
1163
1164impl CheckpointStateHasher {
1165    fn new(
1166        epoch_store: Arc<AuthorityPerEpochStore>,
1167        hasher: Weak<GlobalStateHasher>,
1168        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1169    ) -> Self {
1170        Self {
1171            epoch_store,
1172            hasher,
1173            receive_from_builder,
1174        }
1175    }
1176
1177    async fn run(self) {
1178        let Self {
1179            epoch_store,
1180            hasher,
1181            mut receive_from_builder,
1182        } = self;
1183        while let Some((seq, effects)) = receive_from_builder.recv().await {
1184            let Some(hasher) = hasher.upgrade() else {
1185                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1186                break;
1187            };
1188            hasher
1189                .accumulate_checkpoint(&effects, seq, &epoch_store)
1190                .expect("epoch ended while accumulating checkpoint");
1191        }
1192    }
1193}
1194
1195#[derive(Debug)]
1196pub(crate) enum CheckpointBuilderError {
1197    ChangeEpochTxAlreadyExecuted,
1198    SystemPackagesMissing,
1199    Retry(anyhow::Error),
1200}
1201
1202impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1203    for CheckpointBuilderError
1204{
1205    fn from(e: SuiError) -> Self {
1206        Self::Retry(e.into())
1207    }
1208}
1209
1210pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1211
1212pub struct CheckpointBuilder {
1213    state: Arc<AuthorityState>,
1214    store: Arc<CheckpointStore>,
1215    epoch_store: Arc<AuthorityPerEpochStore>,
1216    notify: Arc<Notify>,
1217    notify_aggregator: Arc<Notify>,
1218    last_built: watch::Sender<CheckpointSequenceNumber>,
1219    effects_store: Arc<dyn TransactionCacheRead>,
1220    global_state_hasher: Weak<GlobalStateHasher>,
1221    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1222    output: Box<dyn CheckpointOutput>,
1223    metrics: Arc<CheckpointMetrics>,
1224    max_transactions_per_checkpoint: usize,
1225    max_checkpoint_size_bytes: usize,
1226}
1227
1228pub struct CheckpointAggregator {
1229    store: Arc<CheckpointStore>,
1230    epoch_store: Arc<AuthorityPerEpochStore>,
1231    notify: Arc<Notify>,
1232    current: Option<CheckpointSignatureAggregator>,
1233    output: Box<dyn CertifiedCheckpointOutput>,
1234    state: Arc<AuthorityState>,
1235    metrics: Arc<CheckpointMetrics>,
1236}
1237
1238// This holds information to aggregate signatures for one checkpoint
1239pub struct CheckpointSignatureAggregator {
1240    next_index: u64,
1241    summary: CheckpointSummary,
1242    digest: CheckpointDigest,
1243    /// Aggregates voting stake for each signed checkpoint proposal by authority
1244    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1245    store: Arc<CheckpointStore>,
1246    state: Arc<AuthorityState>,
1247    metrics: Arc<CheckpointMetrics>,
1248}
1249
1250impl CheckpointBuilder {
1251    fn new(
1252        state: Arc<AuthorityState>,
1253        store: Arc<CheckpointStore>,
1254        epoch_store: Arc<AuthorityPerEpochStore>,
1255        notify: Arc<Notify>,
1256        effects_store: Arc<dyn TransactionCacheRead>,
1257        // for synchronous accumulation of end-of-epoch checkpoint
1258        global_state_hasher: Weak<GlobalStateHasher>,
1259        // for asynchronous/concurrent accumulation of regular checkpoints
1260        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1261        output: Box<dyn CheckpointOutput>,
1262        notify_aggregator: Arc<Notify>,
1263        last_built: watch::Sender<CheckpointSequenceNumber>,
1264        metrics: Arc<CheckpointMetrics>,
1265        max_transactions_per_checkpoint: usize,
1266        max_checkpoint_size_bytes: usize,
1267    ) -> Self {
1268        Self {
1269            state,
1270            store,
1271            epoch_store,
1272            notify,
1273            effects_store,
1274            global_state_hasher,
1275            send_to_hasher,
1276            output,
1277            notify_aggregator,
1278            last_built,
1279            metrics,
1280            max_transactions_per_checkpoint,
1281            max_checkpoint_size_bytes,
1282        }
1283    }
1284
1285    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1286    /// commits that have been processed before the last restart, if consensus_replay_waiter
1287    /// is supplied. Then it starts building checkpoints in a loop.
1288    ///
1289    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1290    /// if slow recovery of previously built checkpoints is due to consensus replay or
1291    /// checkpoint building.
1292    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1293        if let Some(replay_waiter) = consensus_replay_waiter {
1294            info!("Waiting for consensus commits to replay ...");
1295            replay_waiter.wait_for_replay().await;
1296            info!("Consensus commits finished replaying");
1297        }
1298        info!("Starting CheckpointBuilder");
1299        loop {
1300            match self.maybe_build_checkpoints().await {
1301                Ok(()) => {}
1302                err @ Err(
1303                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1304                    | CheckpointBuilderError::SystemPackagesMissing,
1305                ) => {
1306                    info!("CheckpointBuilder stopping: {:?}", err);
1307                    return;
1308                }
1309                Err(CheckpointBuilderError::Retry(inner)) => {
1310                    let msg = format!("{:?}", inner);
1311                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1312                    tokio::time::sleep(Duration::from_secs(1)).await;
1313                    self.metrics.checkpoint_errors.inc();
1314                    continue;
1315                }
1316            }
1317
1318            self.notify.notified().await;
1319        }
1320    }
1321
1322    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1323        let _scope = monitored_scope("BuildCheckpoints");
1324
1325        // Collect info about the most recently built checkpoint.
1326        let summary = self
1327            .epoch_store
1328            .last_built_checkpoint_builder_summary()
1329            .expect("epoch should not have ended");
1330        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1331        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1332
1333        let min_checkpoint_interval_ms = self
1334            .epoch_store
1335            .protocol_config()
1336            .min_checkpoint_interval_ms_as_option()
1337            .unwrap_or_default();
1338        let mut grouped_pending_checkpoints = Vec::new();
1339        let mut checkpoints_iter = self
1340            .epoch_store
1341            .get_pending_checkpoints(last_height)
1342            .expect("unexpected epoch store error")
1343            .into_iter()
1344            .peekable();
1345        while let Some((height, pending)) = checkpoints_iter.next() {
1346            // Group PendingCheckpoints until:
1347            // - minimum interval has elapsed ...
1348            let current_timestamp = pending.details().timestamp_ms;
1349            let can_build = match last_timestamp {
1350                    Some(last_timestamp) => {
1351                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1352                    }
1353                    None => true,
1354                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1355                //   should be written separately) ...
1356                } || checkpoints_iter
1357                    .peek()
1358                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1359                // - or, we have reached end of epoch.
1360                    || pending.details().last_of_epoch;
1361            grouped_pending_checkpoints.push(pending);
1362            if !can_build {
1363                debug!(
1364                    checkpoint_commit_height = height,
1365                    ?last_timestamp,
1366                    ?current_timestamp,
1367                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1368                );
1369                continue;
1370            }
1371
1372            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1373            last_height = Some(height);
1374            last_timestamp = Some(current_timestamp);
1375            debug!(
1376                checkpoint_commit_height_from = grouped_pending_checkpoints
1377                    .first()
1378                    .unwrap()
1379                    .details()
1380                    .checkpoint_height,
1381                checkpoint_commit_height_to = last_height,
1382                "Making checkpoint with commit height range"
1383            );
1384
1385            let seq = self
1386                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1387                .await?;
1388
1389            self.last_built.send_if_modified(|cur| {
1390                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1391                if seq > *cur {
1392                    *cur = seq;
1393                    true
1394                } else {
1395                    false
1396                }
1397            });
1398
1399            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1400            // execution.
1401            tokio::task::yield_now().await;
1402        }
1403        debug!(
1404            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1405            grouped_pending_checkpoints.len(),
1406        );
1407
1408        Ok(())
1409    }
1410
1411    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1412    async fn make_checkpoint(
1413        &mut self,
1414        pendings: Vec<PendingCheckpoint>,
1415    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1416        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1417
1418        let pending_ckpt_str = pendings
1419            .iter()
1420            .map(|p| {
1421                format!(
1422                    "height={}, commit={}",
1423                    p.details().checkpoint_height,
1424                    p.details().consensus_commit_ref
1425                )
1426            })
1427            .join("; ");
1428
1429        let last_details = pendings.last().unwrap().details().clone();
1430
1431        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1432        // in this order.
1433        let highest_executed_sequence = self
1434            .store
1435            .get_highest_executed_checkpoint_seq_number()
1436            .expect("db error")
1437            .unwrap_or(0);
1438
1439        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1440        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1441
1442        let new_checkpoints = self
1443            .create_checkpoints(
1444                sorted_tx_effects_included_in_checkpoint,
1445                &last_details,
1446                &all_roots,
1447            )
1448            .await?;
1449        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1450        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1451            debug_fatal!(
1452                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1453            );
1454        }
1455
1456        let new_ckpt_str = new_checkpoints
1457            .iter()
1458            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1459            .join("; ");
1460
1461        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1462            .await?;
1463        info!(
1464            "Made new checkpoint {} from pending checkpoint {}",
1465            new_ckpt_str, pending_ckpt_str
1466        );
1467
1468        Ok(highest_sequence)
1469    }
1470
1471    async fn construct_and_execute_settlement_transactions(
1472        &self,
1473        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1474        checkpoint_height: CheckpointHeight,
1475        checkpoint_seq: CheckpointSequenceNumber,
1476        tx_index_offset: u64,
1477    ) -> (TransactionKey, Vec<TransactionEffects>) {
1478        let _scope =
1479            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1480
1481        let tx_key =
1482            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1483
1484        let epoch = self.epoch_store.epoch();
1485        let accumulator_root_obj_initial_shared_version = self
1486            .epoch_store
1487            .epoch_start_config()
1488            .accumulator_root_obj_initial_shared_version()
1489            .expect("accumulator root object must exist");
1490
1491        let builder = AccumulatorSettlementTxBuilder::new(
1492            Some(self.effects_store.as_ref()),
1493            sorted_tx_effects_included_in_checkpoint,
1494            checkpoint_seq,
1495            tx_index_offset,
1496        );
1497
1498        let funds_changes = builder.collect_funds_changes();
1499        let num_updates = builder.num_updates();
1500        let settlement_txns = builder.build_tx(
1501            self.epoch_store.protocol_config(),
1502            epoch,
1503            accumulator_root_obj_initial_shared_version,
1504            checkpoint_height,
1505            checkpoint_seq,
1506        );
1507
1508        let settlement_txns: Vec<_> = settlement_txns
1509            .into_iter()
1510            .map(|tx| {
1511                VerifiedExecutableTransaction::new_system(
1512                    VerifiedTransaction::new_system_transaction(tx),
1513                    self.epoch_store.epoch(),
1514                )
1515            })
1516            .collect();
1517
1518        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1519
1520        debug!(
1521            ?settlement_digests,
1522            ?tx_key,
1523            "created settlement transactions with {num_updates} updates"
1524        );
1525
1526        self.epoch_store
1527            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1528
1529        let settlement_effects = wait_for_effects_with_retry(
1530            self.effects_store.as_ref(),
1531            "CheckpointBuilder::notify_read_settlement_effects",
1532            &settlement_digests,
1533            tx_key,
1534        )
1535        .await;
1536
1537        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1538            epoch,
1539            accumulator_root_obj_initial_shared_version,
1540            checkpoint_height,
1541            &settlement_effects,
1542        );
1543
1544        let barrier_tx = VerifiedExecutableTransaction::new_system(
1545            VerifiedTransaction::new_system_transaction(barrier_tx),
1546            self.epoch_store.epoch(),
1547        );
1548        let barrier_digest = *barrier_tx.digest();
1549
1550        self.epoch_store
1551            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1552
1553        let barrier_effects = wait_for_effects_with_retry(
1554            self.effects_store.as_ref(),
1555            "CheckpointBuilder::notify_read_barrier_effects",
1556            &[barrier_digest],
1557            tx_key,
1558        )
1559        .await;
1560
1561        let settlement_effects: Vec<_> = settlement_effects
1562            .into_iter()
1563            .chain(barrier_effects)
1564            .collect();
1565
1566        let mut next_accumulator_version = None;
1567        for fx in settlement_effects.iter() {
1568            assert!(
1569                fx.status().is_ok(),
1570                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1571                fx.transaction_digest(),
1572                fx
1573            );
1574            if let Some(version) = fx
1575                .mutated()
1576                .iter()
1577                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1578            {
1579                assert!(
1580                    next_accumulator_version.is_none(),
1581                    "Only one settlement transaction should mutate the accumulator root object"
1582                );
1583                next_accumulator_version = Some(version);
1584            }
1585        }
1586        let settlements = FundsSettlement {
1587            next_accumulator_version: next_accumulator_version
1588                .expect("Accumulator root object should be mutated in the settlement transactions"),
1589            funds_changes,
1590        };
1591
1592        self.state.execution_scheduler().settle_funds(settlements);
1593
1594        (tx_key, settlement_effects)
1595    }
1596
1597    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1598    // the checkpoint, and return them in the order they should be included in the checkpoint.
1599    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1600    // checkpoint.
1601    #[instrument(level = "debug", skip_all)]
1602    async fn resolve_checkpoint_transactions(
1603        &self,
1604        pending_checkpoints: Vec<PendingCheckpoint>,
1605    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1606        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1607
1608        // Keeps track of the effects that are already included in the current checkpoint.
1609        // This is used when there are multiple pending checkpoints to create a single checkpoint
1610        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1611        // or in earlier pending checkpoints.
1612        let mut effects_in_current_checkpoint = BTreeSet::new();
1613
1614        let mut tx_effects = Vec::new();
1615        let mut tx_roots = HashSet::new();
1616
1617        for pending_checkpoint in pending_checkpoints.into_iter() {
1618            let mut pending = pending_checkpoint;
1619            debug!(
1620                checkpoint_commit_height = pending.details.checkpoint_height,
1621                "Resolving checkpoint transactions for pending checkpoint.",
1622            );
1623
1624            trace!(
1625                "roots for pending checkpoint {:?}: {:?}",
1626                pending.details.checkpoint_height, pending.roots,
1627            );
1628
1629            let settlement_root = if self.epoch_store.accumulators_enabled() {
1630                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1631                    pending.roots.pop()
1632                else {
1633                    fatal!("No settlement root found");
1634                };
1635                Some(settlement_root)
1636            } else {
1637                None
1638            };
1639
1640            let roots = &pending.roots;
1641
1642            self.metrics
1643                .checkpoint_roots_count
1644                .inc_by(roots.len() as u64);
1645
1646            let root_digests = self
1647                .epoch_store
1648                .notify_read_tx_key_to_digest(roots)
1649                .in_monitored_scope("CheckpointNotifyDigests")
1650                .await?;
1651            let root_effects = self
1652                .effects_store
1653                .notify_read_executed_effects(
1654                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1655                    &root_digests,
1656                )
1657                .in_monitored_scope("CheckpointNotifyRead")
1658                .await;
1659
1660            let consensus_commit_prologue = if self
1661                .epoch_store
1662                .protocol_config()
1663                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1664            {
1665                // If the roots contains consensus commit prologue transaction, we want to extract it,
1666                // and put it to the front of the checkpoint.
1667
1668                let consensus_commit_prologue =
1669                    self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1670
1671                // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1672                // other dependencies that haven't been included in any previous checkpoints.
1673                if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1674                    let unsorted_ccp = self.complete_checkpoint_effects(
1675                        vec![ccp_effects.clone()],
1676                        &mut effects_in_current_checkpoint,
1677                    )?;
1678
1679                    // No other dependencies of this consensus commit prologue that haven't been included
1680                    // in any previous checkpoint.
1681                    if unsorted_ccp.len() != 1 {
1682                        fatal!(
1683                            "Expected 1 consensus commit prologue, got {:?}",
1684                            unsorted_ccp
1685                                .iter()
1686                                .map(|e| e.transaction_digest())
1687                                .collect::<Vec<_>>()
1688                        );
1689                    }
1690                    assert_eq!(unsorted_ccp.len(), 1);
1691                    assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1692                }
1693                consensus_commit_prologue
1694            } else {
1695                None
1696            };
1697
1698            let unsorted =
1699                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1700
1701            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1702            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1703            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1704                if cfg!(debug_assertions) {
1705                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1706                    for tx in unsorted.iter() {
1707                        assert!(tx.transaction_digest() != &ccp_digest);
1708                    }
1709                }
1710                sorted.push(ccp_effects);
1711            }
1712            sorted.extend(CausalOrder::causal_sort(unsorted));
1713
1714            if let Some(settlement_root) = settlement_root {
1715                //TODO: this is an incorrect heuristic for checkpoint seq number
1716                //      due to checkpoint splitting, to be fixed separately
1717                let last_checkpoint =
1718                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1719                let next_checkpoint_seq = last_checkpoint
1720                    .as_ref()
1721                    .map(|(seq, _)| *seq)
1722                    .unwrap_or_default()
1723                    + 1;
1724                let tx_index_offset = tx_effects.len() as u64;
1725
1726                let (tx_key, settlement_effects) = self
1727                    .construct_and_execute_settlement_transactions(
1728                        &sorted,
1729                        pending.details.checkpoint_height,
1730                        next_checkpoint_seq,
1731                        tx_index_offset,
1732                    )
1733                    .await;
1734                debug!(?tx_key, "executed settlement transactions");
1735
1736                assert_eq!(settlement_root, tx_key);
1737
1738                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1739                // should only include the digests of transactions that were originally roots in
1740                // the pending checkpoint. It is later used to identify transactions which were
1741                // added as dependencies, so that those transactions can be waited on using
1742                // `consensus_messages_processed_notify()`. System transactions (such as
1743                // settlements) are exempt from this already.
1744                sorted.extend(settlement_effects);
1745            }
1746
1747            #[cfg(msim)]
1748            {
1749                // Check consensus commit prologue invariants in sim test.
1750                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1751            }
1752
1753            tx_effects.extend(sorted);
1754            tx_roots.extend(root_digests);
1755        }
1756
1757        Ok((tx_effects, tx_roots))
1758    }
1759
1760    // This function is used to extract the consensus commit prologue digest and effects from the root
1761    // transactions.
1762    // This function can only be used when prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled.
1763    // The consensus commit prologue is expected to be the first transaction in the roots.
1764    fn extract_consensus_commit_prologue(
1765        &self,
1766        root_digests: &[TransactionDigest],
1767        root_effects: &[TransactionEffects],
1768    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1769        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1770        if root_digests.is_empty() {
1771            return Ok(None);
1772        }
1773
1774        // Reads the first transaction in the roots, and checks whether it is a consensus commit prologue
1775        // transaction.
1776        // When prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled, the consensus commit prologue
1777        // transaction should be the first transaction in the roots written by the consensus handler.
1778        let first_tx = self
1779            .state
1780            .get_transaction_cache_reader()
1781            .get_transaction_block(&root_digests[0])
1782            .expect("Transaction block must exist");
1783
1784        Ok(first_tx
1785            .transaction_data()
1786            .is_consensus_commit_prologue()
1787            .then(|| {
1788                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1789                (*first_tx.digest(), root_effects[0].clone())
1790            }))
1791    }
1792
1793    #[instrument(level = "debug", skip_all)]
1794    async fn write_checkpoints(
1795        &mut self,
1796        height: CheckpointHeight,
1797        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1798    ) -> SuiResult {
1799        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1800        let mut batch = self.store.tables.checkpoint_content.batch();
1801        let mut all_tx_digests =
1802            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1803
1804        for (summary, contents) in &new_checkpoints {
1805            debug!(
1806                checkpoint_commit_height = height,
1807                checkpoint_seq = summary.sequence_number,
1808                contents_digest = ?contents.digest(),
1809                "writing checkpoint",
1810            );
1811
1812            if let Some(previously_computed_summary) = self
1813                .store
1814                .tables
1815                .locally_computed_checkpoints
1816                .get(&summary.sequence_number)?
1817                && previously_computed_summary.digest() != summary.digest()
1818            {
1819                fatal!(
1820                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1821                    summary.sequence_number,
1822                    previously_computed_summary.digest(),
1823                    summary.digest()
1824                );
1825            }
1826
1827            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1828
1829            self.metrics
1830                .transactions_included_in_checkpoint
1831                .inc_by(contents.size() as u64);
1832            let sequence_number = summary.sequence_number;
1833            self.metrics
1834                .last_constructed_checkpoint
1835                .set(sequence_number as i64);
1836
1837            batch.insert_batch(
1838                &self.store.tables.checkpoint_content,
1839                [(contents.digest(), contents)],
1840            )?;
1841
1842            batch.insert_batch(
1843                &self.store.tables.locally_computed_checkpoints,
1844                [(sequence_number, summary)],
1845            )?;
1846        }
1847
1848        batch.write()?;
1849
1850        // Send all checkpoint sigs to consensus.
1851        for (summary, contents) in &new_checkpoints {
1852            self.output
1853                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1854                .await?;
1855        }
1856
1857        for (local_checkpoint, _) in &new_checkpoints {
1858            if let Some(certified_checkpoint) = self
1859                .store
1860                .tables
1861                .certified_checkpoints
1862                .get(local_checkpoint.sequence_number())?
1863            {
1864                self.store
1865                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1866            }
1867        }
1868
1869        self.notify_aggregator.notify_one();
1870        self.epoch_store
1871            .process_constructed_checkpoint(height, new_checkpoints);
1872        Ok(())
1873    }
1874
1875    #[allow(clippy::type_complexity)]
1876    fn split_checkpoint_chunks(
1877        &self,
1878        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1879        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
1880    ) -> CheckpointBuilderResult<
1881        Vec<
1882            Vec<(
1883                TransactionEffects,
1884                Vec<(GenericSignature, Option<SequenceNumber>)>,
1885            )>,
1886        >,
1887    > {
1888        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1889        let mut chunks = Vec::new();
1890        let mut chunk = Vec::new();
1891        let mut chunk_size: usize = 0;
1892        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1893            .into_iter()
1894            .zip(signatures.into_iter())
1895        {
1896            // Roll over to a new chunk after either max count or max size is reached.
1897            // The size calculation here is intended to estimate the size of the
1898            // FullCheckpointContents struct. If this code is modified, that struct
1899            // should also be updated accordingly.
1900            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
1901                bcs::serialized_size(&signatures)?
1902            } else {
1903                let signatures: Vec<&GenericSignature> =
1904                    signatures.iter().map(|(s, _)| s).collect();
1905                bcs::serialized_size(&signatures)?
1906            };
1907            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
1908            if chunk.len() == self.max_transactions_per_checkpoint
1909                || (chunk_size + size) > self.max_checkpoint_size_bytes
1910            {
1911                if chunk.is_empty() {
1912                    // Always allow at least one tx in a checkpoint.
1913                    warn!(
1914                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1915                        self.max_checkpoint_size_bytes
1916                    );
1917                } else {
1918                    chunks.push(chunk);
1919                    chunk = Vec::new();
1920                    chunk_size = 0;
1921                }
1922            }
1923
1924            chunk.push((effects, signatures));
1925            chunk_size += size;
1926        }
1927
1928        if !chunk.is_empty() || chunks.is_empty() {
1929            // We intentionally create an empty checkpoint if there is no content provided
1930            // to make a 'heartbeat' checkpoint.
1931            // Important: if some conditions are added here later, we need to make sure we always
1932            // have at least one chunk if last_pending_of_epoch is set
1933            chunks.push(chunk);
1934            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
1935            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
1936            // distinguish between "no transactions have happened" and "i am not receiving new
1937            // checkpoints".
1938        }
1939        Ok(chunks)
1940    }
1941
1942    fn load_last_built_checkpoint_summary(
1943        epoch_store: &AuthorityPerEpochStore,
1944        store: &CheckpointStore,
1945    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1946        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1947        if last_checkpoint.is_none() {
1948            let epoch = epoch_store.epoch();
1949            if epoch > 0 {
1950                let previous_epoch = epoch - 1;
1951                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1952                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1953                if let Some((ref seq, _)) = last_checkpoint {
1954                    debug!(
1955                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1956                    );
1957                } else {
1958                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
1959                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1960                }
1961            }
1962        }
1963        Ok(last_checkpoint)
1964    }
1965
1966    #[instrument(level = "debug", skip_all)]
1967    async fn create_checkpoints(
1968        &self,
1969        all_effects: Vec<TransactionEffects>,
1970        details: &PendingCheckpointInfo,
1971        all_roots: &HashSet<TransactionDigest>,
1972    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1973        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1974
1975        let total = all_effects.len();
1976        let mut last_checkpoint =
1977            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1978        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1979        info!(
1980            checkpoint_commit_height = details.checkpoint_height,
1981            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1982            checkpoint_timestamp = details.timestamp_ms,
1983            "Creating checkpoint(s) for {} transactions",
1984            all_effects.len(),
1985        );
1986
1987        let all_digests: Vec<_> = all_effects
1988            .iter()
1989            .map(|effect| *effect.transaction_digest())
1990            .collect();
1991        let transactions_and_sizes = self
1992            .state
1993            .get_transaction_cache_reader()
1994            .get_transactions_and_serialized_sizes(&all_digests)?;
1995        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1996        let mut transactions = Vec::with_capacity(all_effects.len());
1997        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1998        let mut randomness_rounds = BTreeMap::new();
1999        {
2000            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2001            debug!(
2002                ?last_checkpoint_seq,
2003                "Waiting for {:?} certificates to appear in consensus",
2004                all_effects.len()
2005            );
2006
2007            for (effects, transaction_and_size) in all_effects
2008                .into_iter()
2009                .zip(transactions_and_sizes.into_iter())
2010            {
2011                let (transaction, size) = transaction_and_size
2012                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2013                match transaction.inner().transaction_data().kind() {
2014                    TransactionKind::ConsensusCommitPrologue(_)
2015                    | TransactionKind::ConsensusCommitPrologueV2(_)
2016                    | TransactionKind::ConsensusCommitPrologueV3(_)
2017                    | TransactionKind::ConsensusCommitPrologueV4(_)
2018                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2019                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2020                        // processed before we reach here.
2021                    }
2022                    TransactionKind::ProgrammableSystemTransaction(_) => {
2023                        // settlement transactions are added by checkpoint builder
2024                    }
2025                    TransactionKind::ChangeEpoch(_)
2026                    | TransactionKind::Genesis(_)
2027                    | TransactionKind::EndOfEpochTransaction(_) => {
2028                        fatal!(
2029                            "unexpected transaction in checkpoint effects: {:?}",
2030                            transaction
2031                        );
2032                    }
2033                    TransactionKind::RandomnessStateUpdate(rsu) => {
2034                        randomness_rounds
2035                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2036                    }
2037                    TransactionKind::ProgrammableTransaction(_) => {
2038                        // Only transactions that are not roots should be included in the call to
2039                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2040                        // commit and so are known to be processed already.
2041                        let digest = *effects.transaction_digest();
2042                        if !all_roots.contains(&digest) {
2043                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2044                                ConsensusTransactionKey::Certificate(digest),
2045                            ));
2046                        }
2047                    }
2048                }
2049                transactions.push(transaction);
2050                all_effects_and_transaction_sizes.push((effects, size));
2051            }
2052
2053            self.epoch_store
2054                .consensus_messages_processed_notify(transaction_keys)
2055                .await?;
2056        }
2057
2058        let signatures = self
2059            .epoch_store
2060            .user_signatures_for_checkpoint(&transactions, &all_digests);
2061        debug!(
2062            ?last_checkpoint_seq,
2063            "Received {} checkpoint user signatures from consensus",
2064            signatures.len()
2065        );
2066
2067        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2068            Some(
2069                transactions
2070                    .iter()
2071                    .flat_map(|tx| {
2072                        if let TransactionKind::ProgrammableTransaction(ptb) =
2073                            tx.transaction_data().kind()
2074                        {
2075                            itertools::Either::Left(
2076                                ptb.commands
2077                                    .iter()
2078                                    .map(ExecutionTimeObservationKey::from_command),
2079                            )
2080                        } else {
2081                            itertools::Either::Right(std::iter::empty())
2082                        }
2083                    })
2084                    .collect(),
2085            )
2086        } else {
2087            None
2088        };
2089
2090        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2091        let chunks_count = chunks.len();
2092
2093        let mut checkpoints = Vec::with_capacity(chunks_count);
2094        debug!(
2095            ?last_checkpoint_seq,
2096            "Creating {} checkpoints with {} transactions", chunks_count, total,
2097        );
2098
2099        let epoch = self.epoch_store.epoch();
2100        for (index, transactions) in chunks.into_iter().enumerate() {
2101            let first_checkpoint_of_epoch = index == 0
2102                && last_checkpoint
2103                    .as_ref()
2104                    .map(|(_, c)| c.epoch != epoch)
2105                    .unwrap_or(true);
2106            if first_checkpoint_of_epoch {
2107                self.epoch_store
2108                    .record_epoch_first_checkpoint_creation_time_metric();
2109            }
2110            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2111
2112            let sequence_number = last_checkpoint
2113                .as_ref()
2114                .map(|(_, c)| c.sequence_number + 1)
2115                .unwrap_or_default();
2116            let mut timestamp_ms = details.timestamp_ms;
2117            if let Some((_, last_checkpoint)) = &last_checkpoint
2118                && last_checkpoint.timestamp_ms > timestamp_ms
2119            {
2120                // First consensus commit of an epoch can have zero timestamp.
2121                debug!(
2122                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2123                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2124                );
2125                if self
2126                    .epoch_store
2127                    .protocol_config()
2128                    .enforce_checkpoint_timestamp_monotonicity()
2129                {
2130                    timestamp_ms = last_checkpoint.timestamp_ms;
2131                }
2132            }
2133
2134            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2135            let epoch_rolling_gas_cost_summary =
2136                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2137
2138            let end_of_epoch_data = if last_checkpoint_of_epoch {
2139                let system_state_obj = self
2140                    .augment_epoch_last_checkpoint(
2141                        &epoch_rolling_gas_cost_summary,
2142                        timestamp_ms,
2143                        &mut effects,
2144                        &mut signatures,
2145                        sequence_number,
2146                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2147                        last_checkpoint_seq.unwrap_or_default(),
2148                    )
2149                    .await?;
2150
2151                let committee = system_state_obj
2152                    .get_current_epoch_committee()
2153                    .committee()
2154                    .clone();
2155
2156                // This must happen after the call to augment_epoch_last_checkpoint,
2157                // otherwise we will not capture the change_epoch tx.
2158                let root_state_digest = {
2159                    let state_acc = self
2160                        .global_state_hasher
2161                        .upgrade()
2162                        .expect("No checkpoints should be getting built after local configuration");
2163                    let acc = state_acc.accumulate_checkpoint(
2164                        &effects,
2165                        sequence_number,
2166                        &self.epoch_store,
2167                    )?;
2168
2169                    state_acc
2170                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2171                        .await?;
2172
2173                    state_acc.accumulate_running_root(
2174                        &self.epoch_store,
2175                        sequence_number,
2176                        Some(acc),
2177                    )?;
2178                    state_acc
2179                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2180                        .await?
2181                };
2182                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2183                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2184
2185                let epoch_commitments = if self
2186                    .epoch_store
2187                    .protocol_config()
2188                    .check_commit_root_state_digest_supported()
2189                {
2190                    vec![root_state_digest.into()]
2191                } else {
2192                    vec![]
2193                };
2194
2195                Some(EndOfEpochData {
2196                    next_epoch_committee: committee.voting_rights,
2197                    next_epoch_protocol_version: ProtocolVersion::new(
2198                        system_state_obj.protocol_version(),
2199                    ),
2200                    epoch_commitments,
2201                })
2202            } else {
2203                self.send_to_hasher
2204                    .send((sequence_number, effects.clone()))
2205                    .await?;
2206
2207                None
2208            };
2209            let contents = if self.epoch_store.protocol_config().address_aliases() {
2210                CheckpointContents::new_v2(&effects, signatures)
2211            } else {
2212                CheckpointContents::new_with_digests_and_signatures(
2213                    effects.iter().map(TransactionEffects::execution_digests),
2214                    signatures
2215                        .into_iter()
2216                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2217                        .collect(),
2218                )
2219            };
2220
2221            let num_txns = contents.size() as u64;
2222
2223            let network_total_transactions = last_checkpoint
2224                .as_ref()
2225                .map(|(_, c)| c.network_total_transactions + num_txns)
2226                .unwrap_or(num_txns);
2227
2228            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2229
2230            let matching_randomness_rounds: Vec<_> = effects
2231                .iter()
2232                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2233                .copied()
2234                .collect();
2235
2236            let checkpoint_commitments = if self
2237                .epoch_store
2238                .protocol_config()
2239                .include_checkpoint_artifacts_digest_in_summary()
2240            {
2241                let artifacts = CheckpointArtifacts::from(&effects[..]);
2242                let artifacts_digest = artifacts.digest()?;
2243                vec![artifacts_digest.into()]
2244            } else {
2245                Default::default()
2246            };
2247
2248            let summary = CheckpointSummary::new(
2249                self.epoch_store.protocol_config(),
2250                epoch,
2251                sequence_number,
2252                network_total_transactions,
2253                &contents,
2254                previous_digest,
2255                epoch_rolling_gas_cost_summary,
2256                end_of_epoch_data,
2257                timestamp_ms,
2258                matching_randomness_rounds,
2259                checkpoint_commitments,
2260            );
2261            summary.report_checkpoint_age(
2262                &self.metrics.last_created_checkpoint_age,
2263                &self.metrics.last_created_checkpoint_age_ms,
2264            );
2265            if last_checkpoint_of_epoch {
2266                info!(
2267                    checkpoint_seq = sequence_number,
2268                    "creating last checkpoint of epoch {}", epoch
2269                );
2270                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2271                    self.epoch_store
2272                        .report_epoch_metrics_at_last_checkpoint(stats);
2273                }
2274            }
2275            last_checkpoint = Some((sequence_number, summary.clone()));
2276            checkpoints.push((summary, contents));
2277        }
2278
2279        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2280    }
2281
2282    fn get_epoch_total_gas_cost(
2283        &self,
2284        last_checkpoint: Option<&CheckpointSummary>,
2285        cur_checkpoint_effects: &[TransactionEffects],
2286    ) -> GasCostSummary {
2287        let (previous_epoch, previous_gas_costs) = last_checkpoint
2288            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2289            .unwrap_or_default();
2290        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2291        if previous_epoch == self.epoch_store.epoch() {
2292            // sum only when we are within the same epoch
2293            GasCostSummary::new(
2294                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2295                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2296                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2297                previous_gas_costs.non_refundable_storage_fee
2298                    + current_gas_costs.non_refundable_storage_fee,
2299            )
2300        } else {
2301            current_gas_costs
2302        }
2303    }
2304
2305    #[instrument(level = "error", skip_all)]
2306    async fn augment_epoch_last_checkpoint(
2307        &self,
2308        epoch_total_gas_cost: &GasCostSummary,
2309        epoch_start_timestamp_ms: CheckpointTimestamp,
2310        checkpoint_effects: &mut Vec<TransactionEffects>,
2311        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2312        checkpoint: CheckpointSequenceNumber,
2313        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2314        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2315        // >1 checkpoint.
2316        last_checkpoint: CheckpointSequenceNumber,
2317    ) -> CheckpointBuilderResult<SuiSystemState> {
2318        let (system_state, effects) = self
2319            .state
2320            .create_and_execute_advance_epoch_tx(
2321                &self.epoch_store,
2322                epoch_total_gas_cost,
2323                checkpoint,
2324                epoch_start_timestamp_ms,
2325                end_of_epoch_observation_keys,
2326                last_checkpoint,
2327            )
2328            .await?;
2329        checkpoint_effects.push(effects);
2330        signatures.push(vec![]);
2331        Ok(system_state)
2332    }
2333
2334    /// For the given roots return complete list of effects to include in checkpoint
2335    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2336    /// Note that this function may be called multiple times to construct the checkpoint.
2337    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2338    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2339    /// after the call of this function.
2340    #[instrument(level = "debug", skip_all)]
2341    fn complete_checkpoint_effects(
2342        &self,
2343        mut roots: Vec<TransactionEffects>,
2344        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2345    ) -> SuiResult<Vec<TransactionEffects>> {
2346        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2347        let mut results = vec![];
2348        let mut seen = HashSet::new();
2349        loop {
2350            let mut pending = HashSet::new();
2351
2352            let transactions_included = self
2353                .epoch_store
2354                .builder_included_transactions_in_checkpoint(
2355                    roots.iter().map(|e| e.transaction_digest()),
2356                )?;
2357
2358            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2359                let digest = effect.transaction_digest();
2360                // Unnecessary to read effects of a dependency if the effect is already processed.
2361                seen.insert(*digest);
2362
2363                // Skip roots that are already included in the checkpoint.
2364                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2365                    continue;
2366                }
2367
2368                // Skip roots already included in checkpoints or roots from previous epochs
2369                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2370                    continue;
2371                }
2372
2373                let existing_effects = self
2374                    .epoch_store
2375                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2376
2377                for (dependency, effects_signature_exists) in
2378                    effect.dependencies().iter().zip(existing_effects.iter())
2379                {
2380                    // Skip here if dependency not executed in the current epoch.
2381                    // Note that the existence of an effects signature in the
2382                    // epoch store for the given digest indicates that the transaction
2383                    // was locally executed in the current epoch
2384                    if !effects_signature_exists {
2385                        continue;
2386                    }
2387                    if seen.insert(*dependency) {
2388                        pending.insert(*dependency);
2389                    }
2390                }
2391                results.push(effect);
2392            }
2393            if pending.is_empty() {
2394                break;
2395            }
2396            let pending = pending.into_iter().collect::<Vec<_>>();
2397            let effects = self.effects_store.multi_get_executed_effects(&pending);
2398            let effects = effects
2399                .into_iter()
2400                .zip(pending)
2401                .map(|(opt, digest)| match opt {
2402                    Some(x) => x,
2403                    None => panic!(
2404                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2405                        digest
2406                    ),
2407                })
2408                .collect::<Vec<_>>();
2409            roots = effects;
2410        }
2411
2412        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2413        Ok(results)
2414    }
2415
2416    // This function is used to check the invariants of the consensus commit prologue transactions in the checkpoint
2417    // in simtest.
2418    #[cfg(msim)]
2419    fn expensive_consensus_commit_prologue_invariants_check(
2420        &self,
2421        root_digests: &[TransactionDigest],
2422        sorted: &[TransactionEffects],
2423    ) {
2424        if !self
2425            .epoch_store
2426            .protocol_config()
2427            .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2428        {
2429            return;
2430        }
2431
2432        // Gets all the consensus commit prologue transactions from the roots.
2433        let root_txs = self
2434            .state
2435            .get_transaction_cache_reader()
2436            .multi_get_transaction_blocks(root_digests);
2437        let ccps = root_txs
2438            .iter()
2439            .filter_map(|tx| {
2440                if let Some(tx) = tx {
2441                    if tx.transaction_data().is_consensus_commit_prologue() {
2442                        Some(tx)
2443                    } else {
2444                        None
2445                    }
2446                } else {
2447                    None
2448                }
2449            })
2450            .collect::<Vec<_>>();
2451
2452        // There should be at most one consensus commit prologue transaction in the roots.
2453        assert!(ccps.len() <= 1);
2454
2455        // Get all the transactions in the checkpoint.
2456        let txs = self
2457            .state
2458            .get_transaction_cache_reader()
2459            .multi_get_transaction_blocks(
2460                &sorted
2461                    .iter()
2462                    .map(|tx| tx.transaction_digest().clone())
2463                    .collect::<Vec<_>>(),
2464            );
2465
2466        if ccps.len() == 0 {
2467            // If there is no consensus commit prologue transaction in the roots, then there should be no
2468            // consensus commit prologue transaction in the checkpoint.
2469            for tx in txs.iter() {
2470                if let Some(tx) = tx {
2471                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2472                }
2473            }
2474        } else {
2475            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2476            assert!(
2477                txs[0]
2478                    .as_ref()
2479                    .unwrap()
2480                    .transaction_data()
2481                    .is_consensus_commit_prologue()
2482            );
2483
2484            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2485
2486            for tx in txs.iter().skip(1) {
2487                if let Some(tx) = tx {
2488                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2489                }
2490            }
2491        }
2492    }
2493}
2494
2495async fn wait_for_effects_with_retry(
2496    effects_store: &dyn TransactionCacheRead,
2497    task_name: &'static str,
2498    digests: &[TransactionDigest],
2499    tx_key: TransactionKey,
2500) -> Vec<TransactionEffects> {
2501    loop {
2502        match tokio::time::timeout(Duration::from_secs(5), async {
2503            effects_store
2504                .notify_read_executed_effects(task_name, digests)
2505                .await
2506        })
2507        .await
2508        {
2509            Ok(effects) => break effects,
2510            Err(_) => {
2511                debug_fatal!(
2512                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2513                    tx_key
2514                );
2515            }
2516        }
2517    }
2518}
2519
2520impl CheckpointAggregator {
2521    fn new(
2522        tables: Arc<CheckpointStore>,
2523        epoch_store: Arc<AuthorityPerEpochStore>,
2524        notify: Arc<Notify>,
2525        output: Box<dyn CertifiedCheckpointOutput>,
2526        state: Arc<AuthorityState>,
2527        metrics: Arc<CheckpointMetrics>,
2528    ) -> Self {
2529        let current = None;
2530        Self {
2531            store: tables,
2532            epoch_store,
2533            notify,
2534            current,
2535            output,
2536            state,
2537            metrics,
2538        }
2539    }
2540
2541    async fn run(mut self) {
2542        info!("Starting CheckpointAggregator");
2543        loop {
2544            if let Err(e) = self.run_and_notify().await {
2545                error!(
2546                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2547                    e
2548                );
2549                self.metrics.checkpoint_errors.inc();
2550                tokio::time::sleep(Duration::from_secs(1)).await;
2551                continue;
2552            }
2553
2554            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2555        }
2556    }
2557
2558    async fn run_and_notify(&mut self) -> SuiResult {
2559        let summaries = self.run_inner()?;
2560        for summary in summaries {
2561            self.output.certified_checkpoint_created(&summary).await?;
2562        }
2563        Ok(())
2564    }
2565
2566    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2567        let _scope = monitored_scope("CheckpointAggregator");
2568        let mut result = vec![];
2569        'outer: loop {
2570            let next_to_certify = self.next_checkpoint_to_certify()?;
2571            let current = if let Some(current) = &mut self.current {
2572                // It's possible that the checkpoint was already certified by
2573                // the rest of the network and we've already received the
2574                // certified checkpoint via StateSync. In this case, we reset
2575                // the current signature aggregator to the next checkpoint to
2576                // be certified
2577                if current.summary.sequence_number < next_to_certify {
2578                    assert_reachable!("skip checkpoint certification");
2579                    self.current = None;
2580                    continue;
2581                }
2582                current
2583            } else {
2584                let Some(summary) = self
2585                    .epoch_store
2586                    .get_built_checkpoint_summary(next_to_certify)?
2587                else {
2588                    return Ok(result);
2589                };
2590                self.current = Some(CheckpointSignatureAggregator {
2591                    next_index: 0,
2592                    digest: summary.digest(),
2593                    summary,
2594                    signatures_by_digest: MultiStakeAggregator::new(
2595                        self.epoch_store.committee().clone(),
2596                    ),
2597                    store: self.store.clone(),
2598                    state: self.state.clone(),
2599                    metrics: self.metrics.clone(),
2600                });
2601                self.current.as_mut().unwrap()
2602            };
2603
2604            let epoch_tables = self
2605                .epoch_store
2606                .tables()
2607                .expect("should not run past end of epoch");
2608            let iter = epoch_tables
2609                .pending_checkpoint_signatures
2610                .safe_iter_with_bounds(
2611                    Some((current.summary.sequence_number, current.next_index)),
2612                    None,
2613                );
2614            for item in iter {
2615                let ((seq, index), data) = item?;
2616                if seq != current.summary.sequence_number {
2617                    trace!(
2618                        checkpoint_seq =? current.summary.sequence_number,
2619                        "Not enough checkpoint signatures",
2620                    );
2621                    // No more signatures (yet) for this checkpoint
2622                    return Ok(result);
2623                }
2624                trace!(
2625                    checkpoint_seq = current.summary.sequence_number,
2626                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2627                    current.summary.digest(),
2628                    data.summary.auth_sig().authority.concise()
2629                );
2630                self.metrics
2631                    .checkpoint_participation
2632                    .with_label_values(&[&format!(
2633                        "{:?}",
2634                        data.summary.auth_sig().authority.concise()
2635                    )])
2636                    .inc();
2637                if let Ok(auth_signature) = current.try_aggregate(data) {
2638                    debug!(
2639                        checkpoint_seq = current.summary.sequence_number,
2640                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2641                        current.summary.digest(),
2642                    );
2643                    let summary = VerifiedCheckpoint::new_unchecked(
2644                        CertifiedCheckpointSummary::new_from_data_and_sig(
2645                            current.summary.clone(),
2646                            auth_signature,
2647                        ),
2648                    );
2649
2650                    self.store.insert_certified_checkpoint(&summary)?;
2651                    self.metrics
2652                        .last_certified_checkpoint
2653                        .set(current.summary.sequence_number as i64);
2654                    current.summary.report_checkpoint_age(
2655                        &self.metrics.last_certified_checkpoint_age,
2656                        &self.metrics.last_certified_checkpoint_age_ms,
2657                    );
2658                    result.push(summary.into_inner());
2659                    self.current = None;
2660                    continue 'outer;
2661                } else {
2662                    current.next_index = index + 1;
2663                }
2664            }
2665            break;
2666        }
2667        Ok(result)
2668    }
2669
2670    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2671        Ok(self
2672            .store
2673            .tables
2674            .certified_checkpoints
2675            .reversed_safe_iter_with_bounds(None, None)?
2676            .next()
2677            .transpose()?
2678            .map(|(seq, _)| seq + 1)
2679            .unwrap_or_default())
2680    }
2681}
2682
2683impl CheckpointSignatureAggregator {
2684    #[allow(clippy::result_unit_err)]
2685    pub fn try_aggregate(
2686        &mut self,
2687        data: CheckpointSignatureMessage,
2688    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2689        let their_digest = *data.summary.digest();
2690        let (_, signature) = data.summary.into_data_and_sig();
2691        let author = signature.authority;
2692        let envelope =
2693            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2694        match self.signatures_by_digest.insert(their_digest, envelope) {
2695            // ignore repeated signatures
2696            InsertResult::Failed { error }
2697                if matches!(
2698                    error.as_inner(),
2699                    SuiErrorKind::StakeAggregatorRepeatedSigner {
2700                        conflicting_sig: false,
2701                        ..
2702                    },
2703                ) =>
2704            {
2705                Err(())
2706            }
2707            InsertResult::Failed { error } => {
2708                warn!(
2709                    checkpoint_seq = self.summary.sequence_number,
2710                    "Failed to aggregate new signature from validator {:?}: {:?}",
2711                    author.concise(),
2712                    error
2713                );
2714                self.check_for_split_brain();
2715                Err(())
2716            }
2717            InsertResult::QuorumReached(cert) => {
2718                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
2719                // the signature so we know that the author signed the message at some point.
2720                if their_digest != self.digest {
2721                    self.metrics.remote_checkpoint_forks.inc();
2722                    warn!(
2723                        checkpoint_seq = self.summary.sequence_number,
2724                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2725                        author.concise(),
2726                        their_digest,
2727                        self.digest
2728                    );
2729                    return Err(());
2730                }
2731                Ok(cert)
2732            }
2733            InsertResult::NotEnoughVotes {
2734                bad_votes: _,
2735                bad_authorities: _,
2736            } => {
2737                self.check_for_split_brain();
2738                Err(())
2739            }
2740        }
2741    }
2742
2743    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
2744    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
2745    /// irrespective of the outcome of any outstanding votes.
2746    fn check_for_split_brain(&self) {
2747        debug!(
2748            checkpoint_seq = self.summary.sequence_number,
2749            "Checking for split brain condition"
2750        );
2751        if self.signatures_by_digest.quorum_unreachable() {
2752            // TODO: at this point we should immediately halt processing
2753            // of new transaction certificates to avoid building on top of
2754            // forked output
2755            // self.halt_all_execution();
2756
2757            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2758            let digests_by_stake_messages = all_unique_values
2759                .iter()
2760                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2761                .map(|(digest, (_authorities, total_stake))| {
2762                    format!("{:?} (total stake: {})", digest, total_stake)
2763                })
2764                .collect::<Vec<String>>();
2765            fail_point_arg!("kill_split_brain_node", |(
2766                checkpoint_overrides,
2767                forked_authorities,
2768            ): (
2769                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2770                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2771            )| {
2772                #[cfg(msim)]
2773                {
2774                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2775                        (checkpoint_overrides.lock(), forked_authorities.lock())
2776                    {
2777                        // Find the digest produced by non-forked authorities
2778                        let correct_digest = all_unique_values
2779                            .iter()
2780                            .find(|(_, (authorities, _))| {
2781                                // Check if any authority that produced this digest is NOT in the forked set
2782                                authorities
2783                                    .iter()
2784                                    .any(|auth| !forked_authorities_set.contains(auth))
2785                            })
2786                            .map(|(digest, _)| digest.to_string())
2787                            .unwrap_or_else(|| {
2788                                // Fallback: use the digest with the highest stake
2789                                all_unique_values
2790                                    .iter()
2791                                    .max_by_key(|(_, (_, stake))| *stake)
2792                                    .map(|(digest, _)| digest.to_string())
2793                                    .unwrap_or_else(|| self.digest.to_string())
2794                            });
2795
2796                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
2797
2798                        tracing::error!(
2799                            fatal = true,
2800                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2801                            self.summary.sequence_number,
2802                            correct_digest
2803                        );
2804                    }
2805                }
2806            });
2807
2808            debug_fatal!(
2809                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2810                self.summary.sequence_number,
2811                self.signatures_by_digest.uncommitted_stake(),
2812                digests_by_stake_messages
2813            );
2814            self.metrics.split_brain_checkpoint_forks.inc();
2815
2816            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2817            let local_summary = self.summary.clone();
2818            let state = self.state.clone();
2819            let tables = self.store.clone();
2820
2821            tokio::spawn(async move {
2822                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2823            });
2824        }
2825    }
2826}
2827
2828/// Create data dump containing relevant data for diagnosing cause of the
2829/// split brain by querying one disagreeing validator for full checkpoint contents.
2830/// To minimize peer chatter, we only query one validator at random from each
2831/// disagreeing faction, as all honest validators that participated in this round may
2832/// inevitably run the same process.
2833async fn diagnose_split_brain(
2834    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2835    local_summary: CheckpointSummary,
2836    state: Arc<AuthorityState>,
2837    tables: Arc<CheckpointStore>,
2838) {
2839    debug!(
2840        checkpoint_seq = local_summary.sequence_number,
2841        "Running split brain diagnostics..."
2842    );
2843    let time = SystemTime::now();
2844    // collect one random disagreeing validator per differing digest
2845    let digest_to_validator = all_unique_values
2846        .iter()
2847        .filter_map(|(digest, (validators, _))| {
2848            if *digest != local_summary.digest() {
2849                let random_validator = validators.choose(&mut get_rng()).unwrap();
2850                Some((*digest, *random_validator))
2851            } else {
2852                None
2853            }
2854        })
2855        .collect::<HashMap<_, _>>();
2856    if digest_to_validator.is_empty() {
2857        panic!(
2858            "Given split brain condition, there should be at \
2859                least one validator that disagrees with local signature"
2860        );
2861    }
2862
2863    let epoch_store = state.load_epoch_store_one_call_per_task();
2864    let committee = epoch_store
2865        .epoch_start_state()
2866        .get_sui_committee_with_network_metadata();
2867    let network_config = default_mysten_network_config();
2868    let network_clients =
2869        make_network_authority_clients_with_network_config(&committee, &network_config);
2870
2871    // Query all disagreeing validators
2872    let response_futures = digest_to_validator
2873        .values()
2874        .cloned()
2875        .map(|validator| {
2876            let client = network_clients
2877                .get(&validator)
2878                .expect("Failed to get network client");
2879            let request = CheckpointRequestV2 {
2880                sequence_number: Some(local_summary.sequence_number),
2881                request_content: true,
2882                certified: false,
2883            };
2884            client.handle_checkpoint_v2(request)
2885        })
2886        .collect::<Vec<_>>();
2887
2888    let digest_name_pair = digest_to_validator.iter();
2889    let response_data = futures::future::join_all(response_futures)
2890        .await
2891        .into_iter()
2892        .zip(digest_name_pair)
2893        .filter_map(|(response, (digest, name))| match response {
2894            Ok(response) => match response {
2895                CheckpointResponseV2 {
2896                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2897                    contents: Some(contents),
2898                } => Some((*name, *digest, summary, contents)),
2899                CheckpointResponseV2 {
2900                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2901                    contents: _,
2902                } => {
2903                    panic!("Expected pending checkpoint, but got certified checkpoint");
2904                }
2905                CheckpointResponseV2 {
2906                    checkpoint: None,
2907                    contents: _,
2908                } => {
2909                    error!(
2910                        "Summary for checkpoint {:?} not found on validator {:?}",
2911                        local_summary.sequence_number, name
2912                    );
2913                    None
2914                }
2915                CheckpointResponseV2 {
2916                    checkpoint: _,
2917                    contents: None,
2918                } => {
2919                    error!(
2920                        "Contents for checkpoint {:?} not found on validator {:?}",
2921                        local_summary.sequence_number, name
2922                    );
2923                    None
2924                }
2925            },
2926            Err(e) => {
2927                error!(
2928                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2929                    e
2930                );
2931                None
2932            }
2933        })
2934        .collect::<Vec<_>>();
2935
2936    let local_checkpoint_contents = tables
2937        .get_checkpoint_contents(&local_summary.content_digest)
2938        .unwrap_or_else(|_| {
2939            panic!(
2940                "Could not find checkpoint contents for digest {:?}",
2941                local_summary.digest()
2942            )
2943        })
2944        .unwrap_or_else(|| {
2945            panic!(
2946                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2947                local_summary.sequence_number,
2948                local_summary.digest()
2949            )
2950        });
2951    let local_contents_text = format!("{local_checkpoint_contents:?}");
2952
2953    let local_summary_text = format!("{local_summary:?}");
2954    let local_validator = state.name.concise();
2955    let diff_patches = response_data
2956        .iter()
2957        .map(|(name, other_digest, other_summary, contents)| {
2958            let other_contents_text = format!("{contents:?}");
2959            let other_summary_text = format!("{other_summary:?}");
2960            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2961                .enumerate_transactions(&local_summary)
2962                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2963                .unzip();
2964            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2965                .enumerate_transactions(other_summary)
2966                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2967                .unzip();
2968            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2969            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2970            let local_transactions_text = format!("{local_transactions:#?}");
2971            let other_transactions_text = format!("{other_transactions:#?}");
2972            let transactions_patch =
2973                create_patch(&local_transactions_text, &other_transactions_text);
2974            let local_effects_text = format!("{local_effects:#?}");
2975            let other_effects_text = format!("{other_effects:#?}");
2976            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2977            let seq_number = local_summary.sequence_number;
2978            let local_digest = local_summary.digest();
2979            let other_validator = name.concise();
2980            format!(
2981                "Checkpoint: {seq_number:?}\n\
2982                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2983                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2984                Summary Diff: \n{summary_patch}\n\n\
2985                Contents Diff: \n{contents_patch}\n\n\
2986                Transactions Diff: \n{transactions_patch}\n\n\
2987                Effects Diff: \n{effects_patch}",
2988            )
2989        })
2990        .collect::<Vec<_>>()
2991        .join("\n\n\n");
2992
2993    let header = format!(
2994        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2995        Datetime: {:?}",
2996        time
2997    );
2998    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2999    let path = tempfile::tempdir()
3000        .expect("Failed to create tempdir")
3001        .keep()
3002        .join(Path::new("checkpoint_fork_dump.txt"));
3003    let mut file = File::create(path).unwrap();
3004    write!(file, "{}", fork_logs_text).unwrap();
3005    debug!("{}", fork_logs_text);
3006}
3007
3008pub trait CheckpointServiceNotify {
3009    fn notify_checkpoint_signature(
3010        &self,
3011        epoch_store: &AuthorityPerEpochStore,
3012        info: &CheckpointSignatureMessage,
3013    ) -> SuiResult;
3014
3015    fn notify_checkpoint(&self) -> SuiResult;
3016}
3017
3018#[allow(clippy::large_enum_variant)]
3019enum CheckpointServiceState {
3020    Unstarted(
3021        (
3022            CheckpointBuilder,
3023            CheckpointAggregator,
3024            CheckpointStateHasher,
3025        ),
3026    ),
3027    Started,
3028}
3029
3030impl CheckpointServiceState {
3031    fn take_unstarted(
3032        &mut self,
3033    ) -> (
3034        CheckpointBuilder,
3035        CheckpointAggregator,
3036        CheckpointStateHasher,
3037    ) {
3038        let mut state = CheckpointServiceState::Started;
3039        std::mem::swap(self, &mut state);
3040
3041        match state {
3042            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3043                (builder, aggregator, hasher)
3044            }
3045            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3046        }
3047    }
3048}
3049
3050pub struct CheckpointService {
3051    tables: Arc<CheckpointStore>,
3052    notify_builder: Arc<Notify>,
3053    notify_aggregator: Arc<Notify>,
3054    last_signature_index: Mutex<u64>,
3055    // A notification for the current highest built sequence number.
3056    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3057    // The highest sequence number that had already been built at the time CheckpointService
3058    // was constructed
3059    highest_previously_built_seq: CheckpointSequenceNumber,
3060    metrics: Arc<CheckpointMetrics>,
3061    state: Mutex<CheckpointServiceState>,
3062}
3063
3064impl CheckpointService {
3065    /// Constructs a new CheckpointService in an un-started state.
3066    pub fn build(
3067        state: Arc<AuthorityState>,
3068        checkpoint_store: Arc<CheckpointStore>,
3069        epoch_store: Arc<AuthorityPerEpochStore>,
3070        effects_store: Arc<dyn TransactionCacheRead>,
3071        global_state_hasher: Weak<GlobalStateHasher>,
3072        checkpoint_output: Box<dyn CheckpointOutput>,
3073        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3074        metrics: Arc<CheckpointMetrics>,
3075        max_transactions_per_checkpoint: usize,
3076        max_checkpoint_size_bytes: usize,
3077    ) -> Arc<Self> {
3078        info!(
3079            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3080        );
3081        let notify_builder = Arc::new(Notify::new());
3082        let notify_aggregator = Arc::new(Notify::new());
3083
3084        // We may have built higher checkpoint numbers before restarting.
3085        let highest_previously_built_seq = checkpoint_store
3086            .get_latest_locally_computed_checkpoint()
3087            .expect("failed to get latest locally computed checkpoint")
3088            .map(|s| s.sequence_number)
3089            .unwrap_or(0);
3090
3091        let highest_currently_built_seq =
3092            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3093                .expect("epoch should not have ended")
3094                .map(|(seq, _)| seq)
3095                .unwrap_or(0);
3096
3097        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3098
3099        let aggregator = CheckpointAggregator::new(
3100            checkpoint_store.clone(),
3101            epoch_store.clone(),
3102            notify_aggregator.clone(),
3103            certified_checkpoint_output,
3104            state.clone(),
3105            metrics.clone(),
3106        );
3107
3108        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3109
3110        let ckpt_state_hasher = CheckpointStateHasher::new(
3111            epoch_store.clone(),
3112            global_state_hasher.clone(),
3113            receive_from_builder,
3114        );
3115
3116        let builder = CheckpointBuilder::new(
3117            state.clone(),
3118            checkpoint_store.clone(),
3119            epoch_store.clone(),
3120            notify_builder.clone(),
3121            effects_store,
3122            global_state_hasher,
3123            send_to_hasher,
3124            checkpoint_output,
3125            notify_aggregator.clone(),
3126            highest_currently_built_seq_tx.clone(),
3127            metrics.clone(),
3128            max_transactions_per_checkpoint,
3129            max_checkpoint_size_bytes,
3130        );
3131
3132        let last_signature_index = epoch_store
3133            .get_last_checkpoint_signature_index()
3134            .expect("should not cross end of epoch");
3135        let last_signature_index = Mutex::new(last_signature_index);
3136
3137        Arc::new(Self {
3138            tables: checkpoint_store,
3139            notify_builder,
3140            notify_aggregator,
3141            last_signature_index,
3142            highest_currently_built_seq_tx,
3143            highest_previously_built_seq,
3144            metrics,
3145            state: Mutex::new(CheckpointServiceState::Unstarted((
3146                builder,
3147                aggregator,
3148                ckpt_state_hasher,
3149            ))),
3150        })
3151    }
3152
3153    /// Starts the CheckpointService.
3154    ///
3155    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3156    /// been built before the most recent restart. You can think of this as a WAL replay
3157    /// operation. Upon startup, we may have a number of consensus commits and resulting
3158    /// checkpoints that were built but not committed to disk. We want to reprocess the
3159    /// commits and rebuild the checkpoints before starting normal operation.
3160    pub async fn spawn(
3161        &self,
3162        epoch_store: Arc<AuthorityPerEpochStore>,
3163        consensus_replay_waiter: Option<ReplayWaiter>,
3164    ) {
3165        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3166
3167        // Clean up state hashes computed after the last committed checkpoint
3168        // This prevents ECMH divergence after fork recovery restarts
3169        if let Some(last_committed_seq) = self
3170            .tables
3171            .get_highest_executed_checkpoint()
3172            .expect("Failed to get highest executed checkpoint")
3173            .map(|checkpoint| *checkpoint.sequence_number())
3174        {
3175            if let Err(e) = builder
3176                .epoch_store
3177                .clear_state_hashes_after_checkpoint(last_committed_seq)
3178            {
3179                error!(
3180                    "Failed to clear state hashes after checkpoint {}: {:?}",
3181                    last_committed_seq, e
3182                );
3183            } else {
3184                info!(
3185                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3186                    last_committed_seq
3187                );
3188            }
3189        }
3190
3191        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3192
3193        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3194        let aggregator_task = spawn_monitored_task!(aggregator.run());
3195
3196        spawn_monitored_task!(async move {
3197            epoch_store
3198                .within_alive_epoch(async move {
3199                    builder.run(consensus_replay_waiter).await;
3200                    builder_finished_tx.send(()).ok();
3201                })
3202                .await
3203                .ok();
3204
3205            // state hasher will terminate as soon as it has finished processing all messages from builder
3206            state_hasher_task
3207                .await
3208                .expect("state hasher should exit normally");
3209
3210            // builder must shut down before aggregator and state_hasher, since it sends
3211            // messages to them
3212            aggregator_task.abort();
3213            aggregator_task.await.ok();
3214        });
3215
3216        // If this times out, the validator may still start up. The worst that can
3217        // happen is that we will crash later on instead of immediately. The eventual
3218        // crash would occur because we may be missing transactions that are below the
3219        // highest_synced_checkpoint watermark, which can cause a crash in
3220        // `CheckpointExecutor::extract_randomness_rounds`.
3221        if tokio::time::timeout(Duration::from_secs(120), async move {
3222            tokio::select! {
3223                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3224                _ = self.wait_for_rebuilt_checkpoints() => (),
3225            }
3226        })
3227        .await
3228        .is_err()
3229        {
3230            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3231        }
3232    }
3233}
3234
3235impl CheckpointService {
3236    /// Waits until all checkpoints had been built before the node restarted
3237    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3238    /// (and their transactions) below the highest_synced_checkpoint watermark are
3239    /// available. Once the checkpoints are constructed, we can be sure that the
3240    /// transactions have also been executed.
3241    pub async fn wait_for_rebuilt_checkpoints(&self) {
3242        let highest_previously_built_seq = self.highest_previously_built_seq;
3243        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3244        let mut highest_currently_built_seq = *rx.borrow_and_update();
3245        info!(
3246            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3247        );
3248        loop {
3249            if highest_currently_built_seq >= highest_previously_built_seq {
3250                info!("Checkpoint rebuild complete");
3251                break;
3252            }
3253            rx.changed().await.unwrap();
3254            highest_currently_built_seq = *rx.borrow_and_update();
3255        }
3256    }
3257
3258    #[cfg(test)]
3259    fn write_and_notify_checkpoint_for_testing(
3260        &self,
3261        epoch_store: &AuthorityPerEpochStore,
3262        checkpoint: PendingCheckpoint,
3263    ) -> SuiResult {
3264        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3265
3266        let mut output = ConsensusCommitOutput::new(0);
3267        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3268        output.set_default_commit_stats_for_testing();
3269        epoch_store.push_consensus_output_for_tests(output);
3270        self.notify_checkpoint()?;
3271        Ok(())
3272    }
3273}
3274
3275impl CheckpointServiceNotify for CheckpointService {
3276    fn notify_checkpoint_signature(
3277        &self,
3278        epoch_store: &AuthorityPerEpochStore,
3279        info: &CheckpointSignatureMessage,
3280    ) -> SuiResult {
3281        let sequence = info.summary.sequence_number;
3282        let signer = info.summary.auth_sig().authority.concise();
3283
3284        if let Some(highest_verified_checkpoint) = self
3285            .tables
3286            .get_highest_verified_checkpoint()?
3287            .map(|x| *x.sequence_number())
3288            && sequence <= highest_verified_checkpoint
3289        {
3290            trace!(
3291                checkpoint_seq = sequence,
3292                "Ignore checkpoint signature from {} - already certified", signer,
3293            );
3294            self.metrics
3295                .last_ignored_checkpoint_signature_received
3296                .set(sequence as i64);
3297            return Ok(());
3298        }
3299        trace!(
3300            checkpoint_seq = sequence,
3301            "Received checkpoint signature, digest {} from {}",
3302            info.summary.digest(),
3303            signer,
3304        );
3305        self.metrics
3306            .last_received_checkpoint_signatures
3307            .with_label_values(&[&signer.to_string()])
3308            .set(sequence as i64);
3309        // While it can be tempting to make last_signature_index into AtomicU64, this won't work
3310        // We need to make sure we write to `pending_signatures` and trigger `notify_aggregator` without race conditions
3311        let mut index = self.last_signature_index.lock();
3312        *index += 1;
3313        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3314        self.notify_aggregator.notify_one();
3315        Ok(())
3316    }
3317
3318    fn notify_checkpoint(&self) -> SuiResult {
3319        self.notify_builder.notify_one();
3320        Ok(())
3321    }
3322}
3323
3324// test helper
3325pub struct CheckpointServiceNoop {}
3326impl CheckpointServiceNotify for CheckpointServiceNoop {
3327    fn notify_checkpoint_signature(
3328        &self,
3329        _: &AuthorityPerEpochStore,
3330        _: &CheckpointSignatureMessage,
3331    ) -> SuiResult {
3332        Ok(())
3333    }
3334
3335    fn notify_checkpoint(&self) -> SuiResult {
3336        Ok(())
3337    }
3338}
3339
3340impl PendingCheckpoint {
3341    pub fn height(&self) -> CheckpointHeight {
3342        self.details.checkpoint_height
3343    }
3344
3345    pub fn roots(&self) -> &Vec<TransactionKey> {
3346        &self.roots
3347    }
3348
3349    pub fn details(&self) -> &PendingCheckpointInfo {
3350        &self.details
3351    }
3352}
3353
3354pin_project! {
3355    pub struct PollCounter<Fut> {
3356        #[pin]
3357        future: Fut,
3358        count: usize,
3359    }
3360}
3361
3362impl<Fut> PollCounter<Fut> {
3363    pub fn new(future: Fut) -> Self {
3364        Self { future, count: 0 }
3365    }
3366
3367    pub fn count(&self) -> usize {
3368        self.count
3369    }
3370}
3371
3372impl<Fut: Future> Future for PollCounter<Fut> {
3373    type Output = (usize, Fut::Output);
3374
3375    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3376        let this = self.project();
3377        *this.count += 1;
3378        match this.future.poll(cx) {
3379            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3380            Poll::Pending => Poll::Pending,
3381        }
3382    }
3383}
3384
3385fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3386    PollCounter::new(future)
3387}
3388
3389#[cfg(test)]
3390mod tests {
3391    use super::*;
3392    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3393    use crate::transaction_outputs::TransactionOutputs;
3394    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3395    use futures::FutureExt as _;
3396    use futures::future::BoxFuture;
3397    use std::collections::HashMap;
3398    use std::ops::Deref;
3399    use sui_macros::sim_test;
3400    use sui_protocol_config::{Chain, ProtocolConfig};
3401    use sui_types::accumulator_event::AccumulatorEvent;
3402    use sui_types::authenticator_state::ActiveJwk;
3403    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3404    use sui_types::crypto::Signature;
3405    use sui_types::effects::{TransactionEffects, TransactionEvents};
3406    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3407    use sui_types::transaction::VerifiedTransaction;
3408    use tokio::sync::mpsc;
3409
3410    #[tokio::test]
3411    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3412        let store = CheckpointStore::new_for_tests();
3413        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3414        for seq in 70u64..=80u64 {
3415            let contents =
3416                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3417                    [sui_types::base_types::ExecutionDigests::new(
3418                        sui_types::digests::TransactionDigest::random(),
3419                        sui_types::digests::TransactionEffectsDigest::ZERO,
3420                    )],
3421                );
3422            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3423                &protocol,
3424                0,
3425                seq,
3426                0,
3427                &contents,
3428                None,
3429                sui_types::gas::GasCostSummary::default(),
3430                None,
3431                0,
3432                Vec::new(),
3433                Vec::new(),
3434            );
3435            store
3436                .tables
3437                .locally_computed_checkpoints
3438                .insert(&seq, &summary)
3439                .unwrap();
3440        }
3441
3442        store
3443            .clear_locally_computed_checkpoints_from(76)
3444            .expect("clear should succeed");
3445
3446        // Explicit boundary checks: 75 must remain, 76 must be deleted
3447        assert!(
3448            store
3449                .tables
3450                .locally_computed_checkpoints
3451                .get(&75)
3452                .unwrap()
3453                .is_some()
3454        );
3455        assert!(
3456            store
3457                .tables
3458                .locally_computed_checkpoints
3459                .get(&76)
3460                .unwrap()
3461                .is_none()
3462        );
3463
3464        for seq in 70u64..76u64 {
3465            assert!(
3466                store
3467                    .tables
3468                    .locally_computed_checkpoints
3469                    .get(&seq)
3470                    .unwrap()
3471                    .is_some()
3472            );
3473        }
3474        for seq in 76u64..=80u64 {
3475            assert!(
3476                store
3477                    .tables
3478                    .locally_computed_checkpoints
3479                    .get(&seq)
3480                    .unwrap()
3481                    .is_none()
3482            );
3483        }
3484    }
3485
3486    #[tokio::test]
3487    async fn test_fork_detection_storage() {
3488        let store = CheckpointStore::new_for_tests();
3489        // checkpoint fork
3490        let seq_num = 42;
3491        let digest = CheckpointDigest::random();
3492
3493        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3494
3495        store
3496            .record_checkpoint_fork_detected(seq_num, digest)
3497            .unwrap();
3498
3499        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3500        assert!(retrieved.is_some());
3501        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3502        assert_eq!(retrieved_seq, seq_num);
3503        assert_eq!(retrieved_digest, digest);
3504
3505        store.clear_checkpoint_fork_detected().unwrap();
3506        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3507
3508        // txn fork
3509        let tx_digest = TransactionDigest::random();
3510        let expected_effects = TransactionEffectsDigest::random();
3511        let actual_effects = TransactionEffectsDigest::random();
3512
3513        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3514
3515        store
3516            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3517            .unwrap();
3518
3519        let retrieved = store.get_transaction_fork_detected().unwrap();
3520        assert!(retrieved.is_some());
3521        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3522        assert_eq!(retrieved_tx, tx_digest);
3523        assert_eq!(retrieved_expected, expected_effects);
3524        assert_eq!(retrieved_actual, actual_effects);
3525
3526        store.clear_transaction_fork_detected().unwrap();
3527        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3528    }
3529
3530    #[sim_test]
3531    pub async fn checkpoint_builder_test() {
3532        telemetry_subscribers::init_for_testing();
3533
3534        let mut protocol_config =
3535            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3536        protocol_config.disable_accumulators_for_testing();
3537        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3538        let state = TestAuthorityBuilder::new()
3539            .with_protocol_config(protocol_config)
3540            .build()
3541            .await;
3542
3543        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3544            0,
3545            0,
3546            vec![],
3547            SequenceNumber::new(),
3548        );
3549
3550        let jwks = {
3551            let mut jwks = Vec::new();
3552            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3553                jwks.push(ActiveJwk {
3554                    jwk_id: JwkId::new(
3555                        "https://accounts.google.com".to_string(),
3556                        "1234567890".to_string(),
3557                    ),
3558                    jwk: JWK {
3559                        kty: "RSA".to_string(),
3560                        e: "AQAB".to_string(),
3561                        n: "1234567890".to_string(),
3562                        alg: "RS256".to_string(),
3563                    },
3564                    epoch: 0,
3565                });
3566            }
3567            jwks
3568        };
3569
3570        let dummy_tx_with_data =
3571            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3572
3573        for i in 0..15 {
3574            state
3575                .database_for_testing()
3576                .perpetual_tables
3577                .transactions
3578                .insert(&d(i), dummy_tx.serializable_ref())
3579                .unwrap();
3580        }
3581        for i in 15..20 {
3582            state
3583                .database_for_testing()
3584                .perpetual_tables
3585                .transactions
3586                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3587                .unwrap();
3588        }
3589
3590        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3591        commit_cert_for_test(
3592            &mut store,
3593            state.clone(),
3594            d(1),
3595            vec![d(2), d(3)],
3596            GasCostSummary::new(11, 12, 11, 1),
3597        );
3598        commit_cert_for_test(
3599            &mut store,
3600            state.clone(),
3601            d(2),
3602            vec![d(3), d(4)],
3603            GasCostSummary::new(21, 22, 21, 1),
3604        );
3605        commit_cert_for_test(
3606            &mut store,
3607            state.clone(),
3608            d(3),
3609            vec![],
3610            GasCostSummary::new(31, 32, 31, 1),
3611        );
3612        commit_cert_for_test(
3613            &mut store,
3614            state.clone(),
3615            d(4),
3616            vec![],
3617            GasCostSummary::new(41, 42, 41, 1),
3618        );
3619        for i in [5, 6, 7, 10, 11, 12, 13] {
3620            commit_cert_for_test(
3621                &mut store,
3622                state.clone(),
3623                d(i),
3624                vec![],
3625                GasCostSummary::new(41, 42, 41, 1),
3626            );
3627        }
3628        for i in [15, 16, 17] {
3629            commit_cert_for_test(
3630                &mut store,
3631                state.clone(),
3632                d(i),
3633                vec![],
3634                GasCostSummary::new(51, 52, 51, 1),
3635            );
3636        }
3637        let all_digests: Vec<_> = store.keys().copied().collect();
3638        for digest in all_digests {
3639            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3640            state
3641                .epoch_store_for_testing()
3642                .test_insert_user_signature(digest, vec![(signature, None)]);
3643        }
3644
3645        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3646        let (certified_output, mut certified_result) =
3647            mpsc::channel::<CertifiedCheckpointSummary>(10);
3648        let store = Arc::new(store);
3649
3650        let ckpt_dir = tempfile::tempdir().unwrap();
3651        let checkpoint_store =
3652            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3653        let epoch_store = state.epoch_store_for_testing();
3654
3655        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3656            state.get_global_state_hash_store().clone(),
3657        ));
3658
3659        let checkpoint_service = CheckpointService::build(
3660            state.clone(),
3661            checkpoint_store,
3662            epoch_store.clone(),
3663            store,
3664            Arc::downgrade(&global_state_hasher),
3665            Box::new(output),
3666            Box::new(certified_output),
3667            CheckpointMetrics::new_for_tests(),
3668            3,
3669            100_000,
3670        );
3671        checkpoint_service.spawn(epoch_store.clone(), None).await;
3672
3673        checkpoint_service
3674            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3675            .unwrap();
3676        checkpoint_service
3677            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3678            .unwrap();
3679        checkpoint_service
3680            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3681            .unwrap();
3682        checkpoint_service
3683            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3684            .unwrap();
3685        checkpoint_service
3686            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3687            .unwrap();
3688        checkpoint_service
3689            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3690            .unwrap();
3691
3692        let (c1c, c1s) = result.recv().await.unwrap();
3693        let (c2c, c2s) = result.recv().await.unwrap();
3694
3695        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3696        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3697        assert_eq!(c1t, vec![d(4)]);
3698        assert_eq!(c1s.previous_digest, None);
3699        assert_eq!(c1s.sequence_number, 0);
3700        assert_eq!(
3701            c1s.epoch_rolling_gas_cost_summary,
3702            GasCostSummary::new(41, 42, 41, 1)
3703        );
3704
3705        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3706        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3707        assert_eq!(c2s.sequence_number, 1);
3708        assert_eq!(
3709            c2s.epoch_rolling_gas_cost_summary,
3710            GasCostSummary::new(104, 108, 104, 4)
3711        );
3712
3713        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
3714        // Verify that we split into 2 checkpoints.
3715        let (c3c, c3s) = result.recv().await.unwrap();
3716        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3717        let (c4c, c4s) = result.recv().await.unwrap();
3718        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3719        assert_eq!(c3s.sequence_number, 2);
3720        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3721        assert_eq!(c4s.sequence_number, 3);
3722        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3723        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3724        assert_eq!(c4t, vec![d(13)]);
3725
3726        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
3727        // Verify that we split into 2 checkpoints.
3728        let (c5c, c5s) = result.recv().await.unwrap();
3729        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3730        let (c6c, c6s) = result.recv().await.unwrap();
3731        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3732        assert_eq!(c5s.sequence_number, 4);
3733        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3734        assert_eq!(c6s.sequence_number, 5);
3735        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3736        assert_eq!(c5t, vec![d(15), d(16)]);
3737        assert_eq!(c6t, vec![d(17)]);
3738
3739        // Pending at index 4 was too soon after the prior one and should be coalesced into
3740        // the next one.
3741        let (c7c, c7s) = result.recv().await.unwrap();
3742        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3743        assert_eq!(c7t, vec![d(5), d(6)]);
3744        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3745        assert_eq!(c7s.sequence_number, 6);
3746
3747        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3748        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3749
3750        checkpoint_service
3751            .notify_checkpoint_signature(
3752                &epoch_store,
3753                &CheckpointSignatureMessage { summary: c2ss },
3754            )
3755            .unwrap();
3756        checkpoint_service
3757            .notify_checkpoint_signature(
3758                &epoch_store,
3759                &CheckpointSignatureMessage { summary: c1ss },
3760            )
3761            .unwrap();
3762
3763        let c1sc = certified_result.recv().await.unwrap();
3764        let c2sc = certified_result.recv().await.unwrap();
3765        assert_eq!(c1sc.sequence_number, 0);
3766        assert_eq!(c2sc.sequence_number, 1);
3767    }
3768
3769    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3770        fn notify_read_executed_effects(
3771            &self,
3772            _: &str,
3773            digests: &[TransactionDigest],
3774        ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3775            std::future::ready(
3776                digests
3777                    .iter()
3778                    .map(|d| self.get(d).expect("effects not found").clone())
3779                    .collect(),
3780            )
3781            .boxed()
3782        }
3783
3784        fn notify_read_executed_effects_digests(
3785            &self,
3786            _: &str,
3787            digests: &[TransactionDigest],
3788        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3789            std::future::ready(
3790                digests
3791                    .iter()
3792                    .map(|d| {
3793                        self.get(d)
3794                            .map(|fx| fx.digest())
3795                            .expect("effects not found")
3796                    })
3797                    .collect(),
3798            )
3799            .boxed()
3800        }
3801
3802        fn multi_get_executed_effects(
3803            &self,
3804            digests: &[TransactionDigest],
3805        ) -> Vec<Option<TransactionEffects>> {
3806            digests.iter().map(|d| self.get(d).cloned()).collect()
3807        }
3808
3809        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
3810        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
3811        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
3812        // ExecutionCacheRead implementors).
3813
3814        fn multi_get_transaction_blocks(
3815            &self,
3816            _: &[TransactionDigest],
3817        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3818            unimplemented!()
3819        }
3820
3821        fn multi_get_executed_effects_digests(
3822            &self,
3823            _: &[TransactionDigest],
3824        ) -> Vec<Option<TransactionEffectsDigest>> {
3825            unimplemented!()
3826        }
3827
3828        fn multi_get_effects(
3829            &self,
3830            _: &[TransactionEffectsDigest],
3831        ) -> Vec<Option<TransactionEffects>> {
3832            unimplemented!()
3833        }
3834
3835        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3836            unimplemented!()
3837        }
3838
3839        fn get_mysticeti_fastpath_outputs(
3840            &self,
3841            _: &TransactionDigest,
3842        ) -> Option<Arc<TransactionOutputs>> {
3843            unimplemented!()
3844        }
3845
3846        fn notify_read_fastpath_transaction_outputs<'a>(
3847            &'a self,
3848            _: &'a [TransactionDigest],
3849        ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3850            unimplemented!()
3851        }
3852
3853        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3854            unimplemented!()
3855        }
3856
3857        fn get_unchanged_loaded_runtime_objects(
3858            &self,
3859            _digest: &TransactionDigest,
3860        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3861            unimplemented!()
3862        }
3863
3864        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
3865            unimplemented!()
3866        }
3867    }
3868
3869    #[async_trait::async_trait]
3870    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3871        async fn checkpoint_created(
3872            &self,
3873            summary: &CheckpointSummary,
3874            contents: &CheckpointContents,
3875            _epoch_store: &Arc<AuthorityPerEpochStore>,
3876            _checkpoint_store: &Arc<CheckpointStore>,
3877        ) -> SuiResult {
3878            self.try_send((contents.clone(), summary.clone())).unwrap();
3879            Ok(())
3880        }
3881    }
3882
3883    #[async_trait::async_trait]
3884    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3885        async fn certified_checkpoint_created(
3886            &self,
3887            summary: &CertifiedCheckpointSummary,
3888        ) -> SuiResult {
3889            self.try_send(summary.clone()).unwrap();
3890            Ok(())
3891        }
3892    }
3893
3894    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3895        PendingCheckpoint {
3896            roots: t
3897                .into_iter()
3898                .map(|t| TransactionKey::Digest(d(t)))
3899                .collect(),
3900            details: PendingCheckpointInfo {
3901                timestamp_ms,
3902                last_of_epoch: false,
3903                checkpoint_height: i,
3904                consensus_commit_ref: CommitRef::default(),
3905                rejected_transactions_digest: Digest::default(),
3906            },
3907        }
3908    }
3909
3910    fn d(i: u8) -> TransactionDigest {
3911        let mut bytes: [u8; 32] = Default::default();
3912        bytes[0] = i;
3913        TransactionDigest::new(bytes)
3914    }
3915
3916    fn e(
3917        transaction_digest: TransactionDigest,
3918        dependencies: Vec<TransactionDigest>,
3919        gas_used: GasCostSummary,
3920    ) -> TransactionEffects {
3921        let mut effects = TransactionEffects::default();
3922        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3923        *effects.dependencies_mut_for_testing() = dependencies;
3924        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3925        effects
3926    }
3927
3928    fn commit_cert_for_test(
3929        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3930        state: Arc<AuthorityState>,
3931        digest: TransactionDigest,
3932        dependencies: Vec<TransactionDigest>,
3933        gas_used: GasCostSummary,
3934    ) {
3935        let epoch_store = state.epoch_store_for_testing();
3936        let effects = e(digest, dependencies, gas_used);
3937        store.insert(digest, effects.clone());
3938        epoch_store.insert_executed_in_epoch(&digest);
3939    }
3940}