sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::sync::Notify;
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116    // Pre-assigned checkpoint sequence number from consensus handler.
117    // Only set when split_checkpoints_in_consensus_handler is enabled.
118    pub checkpoint_seq: Option<CheckpointSequenceNumber>,
119}
120
121#[derive(Clone, Debug)]
122pub struct PendingCheckpoint {
123    pub roots: Vec<TransactionKey>,
124    pub details: PendingCheckpointInfo,
125}
126
127#[derive(Clone, Debug, Default)]
128pub struct CheckpointRoots {
129    pub tx_roots: Vec<TransactionKey>,
130    pub settlement_root: Option<TransactionKey>,
131    pub height: CheckpointHeight,
132}
133
134/// PendingCheckpointV2 is merged and split in ConsensusHandler
135/// instead of CheckpointBuilder. It contains 1 or more
136/// CheckpointRoots which represents a group of transactions
137/// settled together.
138/// Usage of PendingCheckpointV2 is gated by split_checkpoints_in_consensus_handler
139/// We need to support dual implementations until this feature flag is removed.
140#[derive(Clone, Debug)]
141pub struct PendingCheckpointV2 {
142    pub roots: Vec<CheckpointRoots>,
143    pub details: PendingCheckpointInfo,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize)]
147pub struct BuilderCheckpointSummary {
148    pub summary: CheckpointSummary,
149    // Height at which this checkpoint summary was built. None for genesis checkpoint
150    pub checkpoint_height: Option<CheckpointHeight>,
151    pub position_in_commit: usize,
152}
153
154#[derive(DBMapUtils)]
155#[cfg_attr(tidehunter, tidehunter)]
156pub struct CheckpointStoreTables {
157    /// Maps checkpoint contents digest to checkpoint contents
158    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
159
160    /// Maps checkpoint contents digest to checkpoint sequence number
161    pub(crate) checkpoint_sequence_by_contents_digest:
162        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
163
164    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
165    /// efficient reads of full checkpoints. Entries from this table are deleted after state
166    /// accumulation has completed.
167    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
168    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
169    // deprecate this table (and remove when possible).
170    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
171
172    /// Stores certified checkpoints
173    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
174    /// Map from checkpoint digest to certified checkpoint
175    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
176
177    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
178    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
179    /// certified checkpoint.
180    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
181
182    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
183    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
184
185    /// Watermarks used to determine the highest verified, fully synced, and
186    /// fully executed checkpoints
187    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
188
189    /// Stores transaction fork detection information
190    pub(crate) transaction_fork_detected: DBMap<
191        u8,
192        (
193            TransactionDigest,
194            TransactionEffectsDigest,
195            TransactionEffectsDigest,
196        ),
197    >,
198    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
199    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
200}
201
202fn full_checkpoint_content_table_default_config() -> DBOptions {
203    DBOptions {
204        options: default_db_options().options,
205        // We have seen potential data corruption issues in this table after forced shutdowns
206        // so we enable value hash logging to help with debugging.
207        // TODO: remove this once we have a better understanding of the root cause.
208        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
209    }
210}
211
212impl CheckpointStoreTables {
213    #[cfg(not(tidehunter))]
214    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
215        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
216    }
217
218    #[cfg(tidehunter)]
219    pub fn new(
220        path: &Path,
221        metric_name: &'static str,
222        pruner_watermarks: Arc<PrunerWatermarks>,
223    ) -> Self {
224        tracing::warn!("Checkpoint DB using tidehunter");
225        use crate::authority::authority_store_pruner::apply_relocation_filter;
226        use typed_store::tidehunter_util::{
227            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
228            default_mutex_count, default_value_cache_size,
229        };
230        let mutexes = default_mutex_count() * 4;
231        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
232        let override_dirty_keys_config = KeySpaceConfig::new()
233            .with_max_dirty_keys(64_000)
234            .with_value_cache_size(default_value_cache_size());
235        let config_u64 = ThConfig::new_with_config(
236            8,
237            mutexes,
238            u64_sequence_key,
239            override_dirty_keys_config.clone(),
240        );
241        let digest_config = ThConfig::new_with_rm_prefix(
242            32,
243            mutexes,
244            KeyType::uniform(default_cells_per_mutex()),
245            KeySpaceConfig::default(),
246            vec![0, 0, 0, 0, 0, 0, 0, 32],
247        );
248        let watermarks_config = KeySpaceConfig::new()
249            .with_value_cache_size(10)
250            .disable_unload();
251        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
252        let configs = vec![
253            (
254                "checkpoint_content",
255                digest_config.clone().with_config(
256                    lru_config
257                        .clone()
258                        .with_relocation_filter(|_, _| Decision::Remove),
259                ),
260            ),
261            (
262                "checkpoint_sequence_by_contents_digest",
263                digest_config.clone().with_config(apply_relocation_filter(
264                    KeySpaceConfig::default(),
265                    pruner_watermarks.checkpoint_id.clone(),
266                    |sequence_number: CheckpointSequenceNumber| sequence_number,
267                    false,
268                )),
269            ),
270            (
271                "full_checkpoint_content",
272                config_u64.clone().with_config(apply_relocation_filter(
273                    override_dirty_keys_config.clone(),
274                    pruner_watermarks.checkpoint_id.clone(),
275                    |sequence_number: CheckpointSequenceNumber| sequence_number,
276                    true,
277                )),
278            ),
279            ("certified_checkpoints", config_u64.clone()),
280            (
281                "checkpoint_by_digest",
282                digest_config.clone().with_config(apply_relocation_filter(
283                    lru_config,
284                    pruner_watermarks.epoch_id.clone(),
285                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
286                    false,
287                )),
288            ),
289            (
290                "locally_computed_checkpoints",
291                config_u64.clone().with_config(apply_relocation_filter(
292                    override_dirty_keys_config.clone(),
293                    pruner_watermarks.checkpoint_id.clone(),
294                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
295                    true,
296                )),
297            ),
298            ("epoch_last_checkpoint_map", config_u64.clone()),
299            (
300                "watermarks",
301                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
302            ),
303            (
304                "transaction_fork_detected",
305                ThConfig::new_with_config(
306                    1,
307                    1,
308                    KeyType::uniform(1),
309                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
310                ),
311            ),
312            (
313                "full_checkpoint_content_v2",
314                config_u64.clone().with_config(apply_relocation_filter(
315                    override_dirty_keys_config.clone(),
316                    pruner_watermarks.checkpoint_id.clone(),
317                    |sequence_number: CheckpointSequenceNumber| sequence_number,
318                    true,
319                )),
320            ),
321        ];
322        Self::open_tables_read_write(
323            path.to_path_buf(),
324            MetricConf::new(metric_name),
325            configs
326                .into_iter()
327                .map(|(cf, config)| (cf.to_string(), config))
328                .collect(),
329        )
330    }
331
332    #[cfg(not(tidehunter))]
333    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
334        Self::get_read_only_handle(
335            path.to_path_buf(),
336            None,
337            None,
338            MetricConf::new("checkpoint_readonly"),
339        )
340    }
341
342    #[cfg(tidehunter)]
343    pub fn open_readonly(path: &Path) -> Self {
344        Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
345    }
346}
347
348pub struct CheckpointStore {
349    pub(crate) tables: CheckpointStoreTables,
350    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
351    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
352}
353
354impl CheckpointStore {
355    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
356        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
357        Arc::new(Self {
358            tables,
359            synced_checkpoint_notify_read: NotifyRead::new(),
360            executed_checkpoint_notify_read: NotifyRead::new(),
361        })
362    }
363
364    pub fn new_for_tests() -> Arc<Self> {
365        let ckpt_dir = mysten_common::tempdir().unwrap();
366        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
367    }
368
369    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
370        let tables = CheckpointStoreTables::new(
371            path,
372            "db_checkpoint",
373            Arc::new(PrunerWatermarks::default()),
374        );
375        Arc::new(Self {
376            tables,
377            synced_checkpoint_notify_read: NotifyRead::new(),
378            executed_checkpoint_notify_read: NotifyRead::new(),
379        })
380    }
381
382    #[cfg(not(tidehunter))]
383    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
384        CheckpointStoreTables::open_readonly(path)
385    }
386
387    #[cfg(tidehunter)]
388    pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
389        CheckpointStoreTables::open_readonly(path)
390    }
391
392    #[instrument(level = "info", skip_all)]
393    pub fn insert_genesis_checkpoint(
394        &self,
395        checkpoint: VerifiedCheckpoint,
396        contents: CheckpointContents,
397        epoch_store: &AuthorityPerEpochStore,
398    ) {
399        assert_eq!(
400            checkpoint.epoch(),
401            0,
402            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
403        );
404        assert_eq!(
405            *checkpoint.sequence_number(),
406            0,
407            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
408        );
409
410        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
411        match self.get_checkpoint_by_sequence_number(0).unwrap() {
412            Some(existing_checkpoint) => {
413                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
414            }
415            None => {
416                if epoch_store.epoch() == checkpoint.epoch {
417                    epoch_store
418                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
419                        .unwrap();
420                } else {
421                    debug!(
422                        validator_epoch =% epoch_store.epoch(),
423                        genesis_epoch =% checkpoint.epoch(),
424                        "Not inserting checkpoint builder data for genesis checkpoint",
425                    );
426                }
427                self.insert_checkpoint_contents(contents).unwrap();
428                self.insert_verified_checkpoint(&checkpoint).unwrap();
429                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
430            }
431        }
432    }
433
434    pub fn get_checkpoint_by_digest(
435        &self,
436        digest: &CheckpointDigest,
437    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
438        self.tables
439            .checkpoint_by_digest
440            .get(digest)
441            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
442    }
443
444    pub fn get_checkpoint_by_sequence_number(
445        &self,
446        sequence_number: CheckpointSequenceNumber,
447    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
448        self.tables
449            .certified_checkpoints
450            .get(&sequence_number)
451            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
452    }
453
454    pub fn get_locally_computed_checkpoint(
455        &self,
456        sequence_number: CheckpointSequenceNumber,
457    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
458        self.tables
459            .locally_computed_checkpoints
460            .get(&sequence_number)
461    }
462
463    pub fn multi_get_locally_computed_checkpoints(
464        &self,
465        sequence_numbers: &[CheckpointSequenceNumber],
466    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
467        let checkpoints = self
468            .tables
469            .locally_computed_checkpoints
470            .multi_get(sequence_numbers)?;
471
472        Ok(checkpoints)
473    }
474
475    pub fn get_sequence_number_by_contents_digest(
476        &self,
477        digest: &CheckpointContentsDigest,
478    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
479        self.tables
480            .checkpoint_sequence_by_contents_digest
481            .get(digest)
482    }
483
484    pub fn delete_contents_digest_sequence_number_mapping(
485        &self,
486        digest: &CheckpointContentsDigest,
487    ) -> Result<(), TypedStoreError> {
488        self.tables
489            .checkpoint_sequence_by_contents_digest
490            .remove(digest)
491    }
492
493    pub fn get_latest_certified_checkpoint(
494        &self,
495    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
496        Ok(self
497            .tables
498            .certified_checkpoints
499            .reversed_safe_iter_with_bounds(None, None)?
500            .next()
501            .transpose()?
502            .map(|(_, v)| v.into()))
503    }
504
505    pub fn get_latest_locally_computed_checkpoint(
506        &self,
507    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
508        Ok(self
509            .tables
510            .locally_computed_checkpoints
511            .reversed_safe_iter_with_bounds(None, None)?
512            .next()
513            .transpose()?
514            .map(|(_, v)| v))
515    }
516
517    pub fn multi_get_checkpoint_by_sequence_number(
518        &self,
519        sequence_numbers: &[CheckpointSequenceNumber],
520    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
521        let checkpoints = self
522            .tables
523            .certified_checkpoints
524            .multi_get(sequence_numbers)?
525            .into_iter()
526            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
527            .collect();
528
529        Ok(checkpoints)
530    }
531
532    pub fn multi_get_checkpoint_content(
533        &self,
534        contents_digest: &[CheckpointContentsDigest],
535    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
536        self.tables.checkpoint_content.multi_get(contents_digest)
537    }
538
539    pub fn get_highest_verified_checkpoint(
540        &self,
541    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542        let highest_verified = if let Some(highest_verified) = self
543            .tables
544            .watermarks
545            .get(&CheckpointWatermark::HighestVerified)?
546        {
547            highest_verified
548        } else {
549            return Ok(None);
550        };
551        self.get_checkpoint_by_digest(&highest_verified.1)
552    }
553
554    pub fn get_highest_synced_checkpoint(
555        &self,
556    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
557        let highest_synced = if let Some(highest_synced) = self
558            .tables
559            .watermarks
560            .get(&CheckpointWatermark::HighestSynced)?
561        {
562            highest_synced
563        } else {
564            return Ok(None);
565        };
566        self.get_checkpoint_by_digest(&highest_synced.1)
567    }
568
569    pub fn get_highest_synced_checkpoint_seq_number(
570        &self,
571    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
572        if let Some(highest_synced) = self
573            .tables
574            .watermarks
575            .get(&CheckpointWatermark::HighestSynced)?
576        {
577            Ok(Some(highest_synced.0))
578        } else {
579            Ok(None)
580        }
581    }
582
583    pub fn get_highest_executed_checkpoint_seq_number(
584        &self,
585    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
586        if let Some(highest_executed) = self
587            .tables
588            .watermarks
589            .get(&CheckpointWatermark::HighestExecuted)?
590        {
591            Ok(Some(highest_executed.0))
592        } else {
593            Ok(None)
594        }
595    }
596
597    pub fn get_highest_executed_checkpoint(
598        &self,
599    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
600        let highest_executed = if let Some(highest_executed) = self
601            .tables
602            .watermarks
603            .get(&CheckpointWatermark::HighestExecuted)?
604        {
605            highest_executed
606        } else {
607            return Ok(None);
608        };
609        self.get_checkpoint_by_digest(&highest_executed.1)
610    }
611
612    pub fn get_highest_pruned_checkpoint_seq_number(
613        &self,
614    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
615        self.tables
616            .watermarks
617            .get(&CheckpointWatermark::HighestPruned)
618            .map(|watermark| watermark.map(|w| w.0))
619    }
620
621    pub fn get_checkpoint_contents(
622        &self,
623        digest: &CheckpointContentsDigest,
624    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
625        self.tables.checkpoint_content.get(digest)
626    }
627
628    pub fn get_full_checkpoint_contents_by_sequence_number(
629        &self,
630        seq: CheckpointSequenceNumber,
631    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
632        self.tables.full_checkpoint_content_v2.get(&seq)
633    }
634
635    fn prune_local_summaries(&self) -> SuiResult {
636        if let Some((last_local_summary, _)) = self
637            .tables
638            .locally_computed_checkpoints
639            .reversed_safe_iter_with_bounds(None, None)?
640            .next()
641            .transpose()?
642        {
643            let mut batch = self.tables.locally_computed_checkpoints.batch();
644            batch.schedule_delete_range(
645                &self.tables.locally_computed_checkpoints,
646                &0,
647                &last_local_summary,
648            )?;
649            batch.write()?;
650            info!("Pruned local summaries up to {:?}", last_local_summary);
651        }
652        Ok(())
653    }
654
655    pub fn clear_locally_computed_checkpoints_from(
656        &self,
657        from_seq: CheckpointSequenceNumber,
658    ) -> SuiResult {
659        let keys: Vec<_> = self
660            .tables
661            .locally_computed_checkpoints
662            .safe_iter_with_bounds(Some(from_seq), None)
663            .map(|r| r.map(|(k, _)| k))
664            .collect::<Result<_, _>>()?;
665        if let Some(&last_local_summary) = keys.last() {
666            let mut batch = self.tables.locally_computed_checkpoints.batch();
667            batch
668                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
669                .expect("Failed to delete locally computed checkpoints");
670            batch
671                .write()
672                .expect("Failed to delete locally computed checkpoints");
673            warn!(
674                from_seq,
675                last_local_summary,
676                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
677                from_seq,
678                last_local_summary
679            );
680        }
681        Ok(())
682    }
683
684    fn check_for_checkpoint_fork(
685        &self,
686        local_checkpoint: &CheckpointSummary,
687        verified_checkpoint: &VerifiedCheckpoint,
688    ) {
689        if local_checkpoint != verified_checkpoint.data() {
690            let verified_contents = self
691                .get_checkpoint_contents(&verified_checkpoint.content_digest)
692                .map(|opt_contents| {
693                    opt_contents
694                        .map(|contents| format!("{:?}", contents))
695                        .unwrap_or_else(|| {
696                            format!(
697                                "Verified checkpoint contents not found, digest: {:?}",
698                                verified_checkpoint.content_digest,
699                            )
700                        })
701                })
702                .map_err(|e| {
703                    format!(
704                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
705                        verified_checkpoint.content_digest, e
706                    )
707                })
708                .unwrap_or_else(|err_msg| err_msg);
709
710            let local_contents = self
711                .get_checkpoint_contents(&local_checkpoint.content_digest)
712                .map(|opt_contents| {
713                    opt_contents
714                        .map(|contents| format!("{:?}", contents))
715                        .unwrap_or_else(|| {
716                            format!(
717                                "Local checkpoint contents not found, digest: {:?}",
718                                local_checkpoint.content_digest
719                            )
720                        })
721                })
722                .map_err(|e| {
723                    format!(
724                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
725                        local_checkpoint.content_digest, e
726                    )
727                })
728                .unwrap_or_else(|err_msg| err_msg);
729
730            // checkpoint contents may be too large for panic message.
731            error!(
732                verified_checkpoint = ?verified_checkpoint.data(),
733                ?verified_contents,
734                ?local_checkpoint,
735                ?local_contents,
736                "Local checkpoint fork detected!",
737            );
738
739            // Record the fork in the database before crashing
740            if let Err(e) = self.record_checkpoint_fork_detected(
741                *local_checkpoint.sequence_number(),
742                local_checkpoint.digest(),
743            ) {
744                error!("Failed to record checkpoint fork in database: {:?}", e);
745            }
746
747            fail_point_arg!(
748                "kill_checkpoint_fork_node",
749                |checkpoint_overrides: std::sync::Arc<
750                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
751                >| {
752                    #[cfg(msim)]
753                    {
754                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
755                            overrides.insert(
756                                local_checkpoint.sequence_number,
757                                verified_checkpoint.digest().to_string(),
758                            );
759                        }
760                        tracing::error!(
761                            fatal = true,
762                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
763                            local_checkpoint.sequence_number(),
764                            verified_checkpoint.digest()
765                        );
766                        sui_simulator::task::shutdown_current_node();
767                    }
768                }
769            );
770
771            fatal!(
772                "Local checkpoint fork detected for sequence number: {}",
773                local_checkpoint.sequence_number()
774            );
775        }
776    }
777
778    // Called by consensus (ConsensusAggregator).
779    // Different from `insert_verified_checkpoint`, it does not touch
780    // the highest_verified_checkpoint watermark such that state sync
781    // will have a chance to process this checkpoint and perform some
782    // state-sync only things.
783    pub fn insert_certified_checkpoint(
784        &self,
785        checkpoint: &VerifiedCheckpoint,
786    ) -> Result<(), TypedStoreError> {
787        debug!(
788            checkpoint_seq = checkpoint.sequence_number(),
789            "Inserting certified checkpoint",
790        );
791        let mut batch = self.tables.certified_checkpoints.batch();
792        batch
793            .insert_batch(
794                &self.tables.certified_checkpoints,
795                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
796            )?
797            .insert_batch(
798                &self.tables.checkpoint_by_digest,
799                [(checkpoint.digest(), checkpoint.serializable_ref())],
800            )?;
801        if checkpoint.next_epoch_committee().is_some() {
802            batch.insert_batch(
803                &self.tables.epoch_last_checkpoint_map,
804                [(&checkpoint.epoch(), checkpoint.sequence_number())],
805            )?;
806        }
807        batch.write()?;
808
809        if let Some(local_checkpoint) = self
810            .tables
811            .locally_computed_checkpoints
812            .get(checkpoint.sequence_number())?
813        {
814            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
815        }
816
817        Ok(())
818    }
819
820    // Called by state sync, apart from inserting the checkpoint and updating
821    // related tables, it also bumps the highest_verified_checkpoint watermark.
822    #[instrument(level = "debug", skip_all)]
823    pub fn insert_verified_checkpoint(
824        &self,
825        checkpoint: &VerifiedCheckpoint,
826    ) -> Result<(), TypedStoreError> {
827        self.insert_certified_checkpoint(checkpoint)?;
828        self.update_highest_verified_checkpoint(checkpoint)
829    }
830
831    pub fn update_highest_verified_checkpoint(
832        &self,
833        checkpoint: &VerifiedCheckpoint,
834    ) -> Result<(), TypedStoreError> {
835        if Some(*checkpoint.sequence_number())
836            > self
837                .get_highest_verified_checkpoint()?
838                .map(|x| *x.sequence_number())
839        {
840            debug!(
841                checkpoint_seq = checkpoint.sequence_number(),
842                "Updating highest verified checkpoint",
843            );
844            self.tables.watermarks.insert(
845                &CheckpointWatermark::HighestVerified,
846                &(*checkpoint.sequence_number(), *checkpoint.digest()),
847            )?;
848        }
849
850        Ok(())
851    }
852
853    pub fn update_highest_synced_checkpoint(
854        &self,
855        checkpoint: &VerifiedCheckpoint,
856    ) -> Result<(), TypedStoreError> {
857        let seq = *checkpoint.sequence_number();
858        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
859        self.tables.watermarks.insert(
860            &CheckpointWatermark::HighestSynced,
861            &(seq, *checkpoint.digest()),
862        )?;
863        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
864        Ok(())
865    }
866
867    async fn notify_read_checkpoint_watermark<F>(
868        &self,
869        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
870        seq: CheckpointSequenceNumber,
871        get_watermark: F,
872    ) -> VerifiedCheckpoint
873    where
874        F: Fn() -> Option<CheckpointSequenceNumber>,
875    {
876        notify_read
877            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
878                let seq = seqs[0];
879                let Some(highest) = get_watermark() else {
880                    return vec![None];
881                };
882                if highest < seq {
883                    return vec![None];
884                }
885                let checkpoint = self
886                    .get_checkpoint_by_sequence_number(seq)
887                    .expect("db error")
888                    .expect("checkpoint not found");
889                vec![Some(checkpoint)]
890            })
891            .await
892            .into_iter()
893            .next()
894            .unwrap()
895    }
896
897    pub async fn notify_read_synced_checkpoint(
898        &self,
899        seq: CheckpointSequenceNumber,
900    ) -> VerifiedCheckpoint {
901        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
902            self.get_highest_synced_checkpoint_seq_number()
903                .expect("db error")
904        })
905        .await
906    }
907
908    pub async fn notify_read_executed_checkpoint(
909        &self,
910        seq: CheckpointSequenceNumber,
911    ) -> VerifiedCheckpoint {
912        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
913            self.get_highest_executed_checkpoint_seq_number()
914                .expect("db error")
915        })
916        .await
917    }
918
919    pub fn update_highest_executed_checkpoint(
920        &self,
921        checkpoint: &VerifiedCheckpoint,
922    ) -> Result<(), TypedStoreError> {
923        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
924            if seq_number >= *checkpoint.sequence_number() {
925                return Ok(());
926            }
927            assert_eq!(
928                seq_number + 1,
929                *checkpoint.sequence_number(),
930                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
931                checkpoint.sequence_number(),
932                seq_number
933            );
934        }
935        let seq = *checkpoint.sequence_number();
936        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
937        self.tables.watermarks.insert(
938            &CheckpointWatermark::HighestExecuted,
939            &(seq, *checkpoint.digest()),
940        )?;
941        self.executed_checkpoint_notify_read
942            .notify(&seq, checkpoint);
943        Ok(())
944    }
945
946    pub fn update_highest_pruned_checkpoint(
947        &self,
948        checkpoint: &VerifiedCheckpoint,
949    ) -> Result<(), TypedStoreError> {
950        self.tables.watermarks.insert(
951            &CheckpointWatermark::HighestPruned,
952            &(*checkpoint.sequence_number(), *checkpoint.digest()),
953        )
954    }
955
956    /// Sets highest executed checkpoint to any value.
957    ///
958    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
959    /// It should only be used in one-off cases or tests after fully understanding the risk.
960    pub fn set_highest_executed_checkpoint_subtle(
961        &self,
962        checkpoint: &VerifiedCheckpoint,
963    ) -> Result<(), TypedStoreError> {
964        self.tables.watermarks.insert(
965            &CheckpointWatermark::HighestExecuted,
966            &(*checkpoint.sequence_number(), *checkpoint.digest()),
967        )
968    }
969
970    pub fn insert_checkpoint_contents(
971        &self,
972        contents: CheckpointContents,
973    ) -> Result<(), TypedStoreError> {
974        debug!(
975            checkpoint_seq = ?contents.digest(),
976            "Inserting checkpoint contents",
977        );
978        self.tables
979            .checkpoint_content
980            .insert(contents.digest(), &contents)
981    }
982
983    pub fn insert_verified_checkpoint_contents(
984        &self,
985        checkpoint: &VerifiedCheckpoint,
986        full_contents: VerifiedCheckpointContents,
987    ) -> Result<(), TypedStoreError> {
988        let mut batch = self.tables.full_checkpoint_content_v2.batch();
989        batch.insert_batch(
990            &self.tables.checkpoint_sequence_by_contents_digest,
991            [(&checkpoint.content_digest, checkpoint.sequence_number())],
992        )?;
993        let full_contents = full_contents.into_inner();
994        batch.insert_batch(
995            &self.tables.full_checkpoint_content_v2,
996            [(checkpoint.sequence_number(), &full_contents)],
997        )?;
998
999        let contents = full_contents.into_checkpoint_contents();
1000        assert_eq!(&checkpoint.content_digest, contents.digest());
1001
1002        batch.insert_batch(
1003            &self.tables.checkpoint_content,
1004            [(contents.digest(), &contents)],
1005        )?;
1006
1007        batch.write()
1008    }
1009
1010    pub fn delete_full_checkpoint_contents(
1011        &self,
1012        seq: CheckpointSequenceNumber,
1013    ) -> Result<(), TypedStoreError> {
1014        self.tables.full_checkpoint_content.remove(&seq)?;
1015        self.tables.full_checkpoint_content_v2.remove(&seq)
1016    }
1017
1018    pub fn get_epoch_last_checkpoint(
1019        &self,
1020        epoch_id: EpochId,
1021    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1022        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1023        let checkpoint = match seq {
1024            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1025            None => None,
1026        };
1027        Ok(checkpoint)
1028    }
1029
1030    pub fn get_epoch_last_checkpoint_seq_number(
1031        &self,
1032        epoch_id: EpochId,
1033    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1034        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1035        Ok(seq)
1036    }
1037
1038    pub fn insert_epoch_last_checkpoint(
1039        &self,
1040        epoch_id: EpochId,
1041        checkpoint: &VerifiedCheckpoint,
1042    ) -> SuiResult {
1043        self.tables
1044            .epoch_last_checkpoint_map
1045            .insert(&epoch_id, checkpoint.sequence_number())?;
1046        Ok(())
1047    }
1048
1049    pub fn get_epoch_state_commitments(
1050        &self,
1051        epoch: EpochId,
1052    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1053        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1054            checkpoint
1055                .end_of_epoch_data
1056                .as_ref()
1057                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1058                .epoch_commitments
1059                .clone()
1060        });
1061        Ok(commitments)
1062    }
1063
1064    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1065    pub fn get_epoch_stats(
1066        &self,
1067        epoch: EpochId,
1068        last_checkpoint: &CheckpointSummary,
1069    ) -> Option<EpochStats> {
1070        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1071            (0, 0)
1072        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1073            (
1074                checkpoint.sequence_number + 1,
1075                checkpoint.network_total_transactions,
1076            )
1077        } else {
1078            return None;
1079        };
1080        Some(EpochStats {
1081            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1082            transaction_count: last_checkpoint.network_total_transactions
1083                - prev_epoch_network_transactions,
1084            total_gas_reward: last_checkpoint
1085                .epoch_rolling_gas_cost_summary
1086                .computation_cost,
1087        })
1088    }
1089
1090    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1091        // This checkpoints the entire db and not one column family
1092        self.tables
1093            .checkpoint_content
1094            .checkpoint_db(path)
1095            .map_err(Into::into)
1096    }
1097
1098    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1099        let mut wb = self.tables.watermarks.batch();
1100        wb.delete_batch(
1101            &self.tables.watermarks,
1102            std::iter::once(CheckpointWatermark::HighestExecuted),
1103        )?;
1104        wb.write()?;
1105        Ok(())
1106    }
1107
1108    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1109        self.delete_highest_executed_checkpoint_test_only()?;
1110        Ok(())
1111    }
1112
1113    pub fn record_checkpoint_fork_detected(
1114        &self,
1115        checkpoint_seq: CheckpointSequenceNumber,
1116        checkpoint_digest: CheckpointDigest,
1117    ) -> Result<(), TypedStoreError> {
1118        info!(
1119            checkpoint_seq = checkpoint_seq,
1120            checkpoint_digest = ?checkpoint_digest,
1121            "Recording checkpoint fork detection in database"
1122        );
1123        self.tables.watermarks.insert(
1124            &CheckpointWatermark::CheckpointForkDetected,
1125            &(checkpoint_seq, checkpoint_digest),
1126        )
1127    }
1128
1129    pub fn get_checkpoint_fork_detected(
1130        &self,
1131    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1132        self.tables
1133            .watermarks
1134            .get(&CheckpointWatermark::CheckpointForkDetected)
1135    }
1136
1137    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1138        self.tables
1139            .watermarks
1140            .remove(&CheckpointWatermark::CheckpointForkDetected)
1141    }
1142
1143    pub fn record_transaction_fork_detected(
1144        &self,
1145        tx_digest: TransactionDigest,
1146        expected_effects_digest: TransactionEffectsDigest,
1147        actual_effects_digest: TransactionEffectsDigest,
1148    ) -> Result<(), TypedStoreError> {
1149        info!(
1150            tx_digest = ?tx_digest,
1151            expected_effects_digest = ?expected_effects_digest,
1152            actual_effects_digest = ?actual_effects_digest,
1153            "Recording transaction fork detection in database"
1154        );
1155        self.tables.transaction_fork_detected.insert(
1156            &TRANSACTION_FORK_DETECTED_KEY,
1157            &(tx_digest, expected_effects_digest, actual_effects_digest),
1158        )
1159    }
1160
1161    pub fn get_transaction_fork_detected(
1162        &self,
1163    ) -> Result<
1164        Option<(
1165            TransactionDigest,
1166            TransactionEffectsDigest,
1167            TransactionEffectsDigest,
1168        )>,
1169        TypedStoreError,
1170    > {
1171        self.tables
1172            .transaction_fork_detected
1173            .get(&TRANSACTION_FORK_DETECTED_KEY)
1174    }
1175
1176    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1177        self.tables
1178            .transaction_fork_detected
1179            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1180    }
1181}
1182
1183#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1184pub enum CheckpointWatermark {
1185    HighestVerified,
1186    HighestSynced,
1187    HighestExecuted,
1188    HighestPruned,
1189    CheckpointForkDetected,
1190}
1191
1192struct CheckpointStateHasher {
1193    epoch_store: Arc<AuthorityPerEpochStore>,
1194    hasher: Weak<GlobalStateHasher>,
1195    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1196}
1197
1198impl CheckpointStateHasher {
1199    fn new(
1200        epoch_store: Arc<AuthorityPerEpochStore>,
1201        hasher: Weak<GlobalStateHasher>,
1202        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1203    ) -> Self {
1204        Self {
1205            epoch_store,
1206            hasher,
1207            receive_from_builder,
1208        }
1209    }
1210
1211    async fn run(self) {
1212        let Self {
1213            epoch_store,
1214            hasher,
1215            mut receive_from_builder,
1216        } = self;
1217        while let Some((seq, effects)) = receive_from_builder.recv().await {
1218            let Some(hasher) = hasher.upgrade() else {
1219                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1220                break;
1221            };
1222            hasher
1223                .accumulate_checkpoint(&effects, seq, &epoch_store)
1224                .expect("epoch ended while accumulating checkpoint");
1225        }
1226    }
1227}
1228
1229#[derive(Debug)]
1230pub enum CheckpointBuilderError {
1231    ChangeEpochTxAlreadyExecuted,
1232    SystemPackagesMissing,
1233    Retry(anyhow::Error),
1234}
1235
1236impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1237    for CheckpointBuilderError
1238{
1239    fn from(e: SuiError) -> Self {
1240        Self::Retry(e.into())
1241    }
1242}
1243
1244pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1245
1246pub struct CheckpointBuilder {
1247    state: Arc<AuthorityState>,
1248    store: Arc<CheckpointStore>,
1249    epoch_store: Arc<AuthorityPerEpochStore>,
1250    notify: Arc<Notify>,
1251    notify_aggregator: Arc<Notify>,
1252    last_built: watch::Sender<CheckpointSequenceNumber>,
1253    effects_store: Arc<dyn TransactionCacheRead>,
1254    global_state_hasher: Weak<GlobalStateHasher>,
1255    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1256    output: Box<dyn CheckpointOutput>,
1257    metrics: Arc<CheckpointMetrics>,
1258    max_transactions_per_checkpoint: usize,
1259    max_checkpoint_size_bytes: usize,
1260}
1261
1262pub struct CheckpointAggregator {
1263    store: Arc<CheckpointStore>,
1264    epoch_store: Arc<AuthorityPerEpochStore>,
1265    notify: Arc<Notify>,
1266    receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1267    pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1268    current: Option<CheckpointSignatureAggregator>,
1269    output: Box<dyn CertifiedCheckpointOutput>,
1270    state: Arc<AuthorityState>,
1271    metrics: Arc<CheckpointMetrics>,
1272}
1273
1274// This holds information to aggregate signatures for one checkpoint
1275pub struct CheckpointSignatureAggregator {
1276    summary: CheckpointSummary,
1277    digest: CheckpointDigest,
1278    /// Aggregates voting stake for each signed checkpoint proposal by authority
1279    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1280    store: Arc<CheckpointStore>,
1281    state: Arc<AuthorityState>,
1282    metrics: Arc<CheckpointMetrics>,
1283}
1284
1285impl CheckpointBuilder {
1286    fn new(
1287        state: Arc<AuthorityState>,
1288        store: Arc<CheckpointStore>,
1289        epoch_store: Arc<AuthorityPerEpochStore>,
1290        notify: Arc<Notify>,
1291        effects_store: Arc<dyn TransactionCacheRead>,
1292        // for synchronous accumulation of end-of-epoch checkpoint
1293        global_state_hasher: Weak<GlobalStateHasher>,
1294        // for asynchronous/concurrent accumulation of regular checkpoints
1295        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1296        output: Box<dyn CheckpointOutput>,
1297        notify_aggregator: Arc<Notify>,
1298        last_built: watch::Sender<CheckpointSequenceNumber>,
1299        metrics: Arc<CheckpointMetrics>,
1300        max_transactions_per_checkpoint: usize,
1301        max_checkpoint_size_bytes: usize,
1302    ) -> Self {
1303        Self {
1304            state,
1305            store,
1306            epoch_store,
1307            notify,
1308            effects_store,
1309            global_state_hasher,
1310            send_to_hasher,
1311            output,
1312            notify_aggregator,
1313            last_built,
1314            metrics,
1315            max_transactions_per_checkpoint,
1316            max_checkpoint_size_bytes,
1317        }
1318    }
1319
1320    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1321    /// commits that have been processed before the last restart, if consensus_replay_waiter
1322    /// is supplied. Then it starts building checkpoints in a loop.
1323    ///
1324    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1325    /// if slow recovery of previously built checkpoints is due to consensus replay or
1326    /// checkpoint building.
1327    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1328        if let Some(replay_waiter) = consensus_replay_waiter {
1329            info!("Waiting for consensus commits to replay ...");
1330            replay_waiter.wait_for_replay().await;
1331            info!("Consensus commits finished replaying");
1332        }
1333        info!("Starting CheckpointBuilder");
1334        loop {
1335            match self.maybe_build_checkpoints().await {
1336                Ok(()) => {}
1337                err @ Err(
1338                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1339                    | CheckpointBuilderError::SystemPackagesMissing,
1340                ) => {
1341                    info!("CheckpointBuilder stopping: {:?}", err);
1342                    return;
1343                }
1344                Err(CheckpointBuilderError::Retry(inner)) => {
1345                    let msg = format!("{:?}", inner);
1346                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1347                    tokio::time::sleep(Duration::from_secs(1)).await;
1348                    self.metrics.checkpoint_errors.inc();
1349                    continue;
1350                }
1351            }
1352
1353            self.notify.notified().await;
1354        }
1355    }
1356
1357    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1358        if self
1359            .epoch_store
1360            .protocol_config()
1361            .split_checkpoints_in_consensus_handler()
1362        {
1363            self.maybe_build_checkpoints_v2().await
1364        } else {
1365            self.maybe_build_checkpoints_v1().await
1366        }
1367    }
1368
1369    async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1370        let _scope = monitored_scope("BuildCheckpoints");
1371
1372        // Collect info about the most recently built checkpoint.
1373        let summary = self
1374            .epoch_store
1375            .last_built_checkpoint_builder_summary()
1376            .expect("epoch should not have ended");
1377        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1378        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1379
1380        let min_checkpoint_interval_ms = self
1381            .epoch_store
1382            .protocol_config()
1383            .min_checkpoint_interval_ms_as_option()
1384            .unwrap_or_default();
1385        let mut grouped_pending_checkpoints = Vec::new();
1386        let mut checkpoints_iter = self
1387            .epoch_store
1388            .get_pending_checkpoints(last_height)
1389            .expect("unexpected epoch store error")
1390            .into_iter()
1391            .peekable();
1392        while let Some((height, pending)) = checkpoints_iter.next() {
1393            // Group PendingCheckpoints until:
1394            // - minimum interval has elapsed ...
1395            let current_timestamp = pending.details().timestamp_ms;
1396            let can_build = match last_timestamp {
1397                    Some(last_timestamp) => {
1398                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1399                    }
1400                    None => true,
1401                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1402                //   should be written separately) ...
1403                } || checkpoints_iter
1404                    .peek()
1405                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1406                // - or, we have reached end of epoch.
1407                    || pending.details().last_of_epoch;
1408            grouped_pending_checkpoints.push(pending);
1409            if !can_build {
1410                debug!(
1411                    checkpoint_commit_height = height,
1412                    ?last_timestamp,
1413                    ?current_timestamp,
1414                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1415                );
1416                continue;
1417            }
1418
1419            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1420            last_height = Some(height);
1421            last_timestamp = Some(current_timestamp);
1422            debug!(
1423                checkpoint_commit_height_from = grouped_pending_checkpoints
1424                    .first()
1425                    .unwrap()
1426                    .details()
1427                    .checkpoint_height,
1428                checkpoint_commit_height_to = last_height,
1429                "Making checkpoint with commit height range"
1430            );
1431
1432            let seq = self
1433                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1434                .await?;
1435
1436            self.last_built.send_if_modified(|cur| {
1437                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1438                if seq > *cur {
1439                    *cur = seq;
1440                    true
1441                } else {
1442                    false
1443                }
1444            });
1445
1446            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1447            // execution.
1448            tokio::task::yield_now().await;
1449        }
1450        debug!(
1451            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1452            grouped_pending_checkpoints.len(),
1453        );
1454
1455        Ok(())
1456    }
1457
1458    async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1459        let _scope = monitored_scope("BuildCheckpoints");
1460
1461        // Collect info about the most recently built checkpoint.
1462        let last_height = self
1463            .epoch_store
1464            .last_built_checkpoint_builder_summary()
1465            .expect("epoch should not have ended")
1466            .and_then(|s| s.checkpoint_height);
1467
1468        for (height, pending) in self
1469            .epoch_store
1470            .get_pending_checkpoints_v2(last_height)
1471            .expect("unexpected epoch store error")
1472        {
1473            debug!(checkpoint_commit_height = height, "Making checkpoint");
1474
1475            let seq = self.make_checkpoint_v2(pending).await?;
1476
1477            self.last_built.send_if_modified(|cur| {
1478                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1479                if seq > *cur {
1480                    *cur = seq;
1481                    true
1482                } else {
1483                    false
1484                }
1485            });
1486
1487            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1488            // execution.
1489            tokio::task::yield_now().await;
1490        }
1491
1492        Ok(())
1493    }
1494
1495    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1496    async fn make_checkpoint(
1497        &mut self,
1498        pendings: Vec<PendingCheckpoint>,
1499    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1500        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1501
1502        let pending_ckpt_str = pendings
1503            .iter()
1504            .map(|p| {
1505                format!(
1506                    "height={}, commit={}",
1507                    p.details().checkpoint_height,
1508                    p.details().consensus_commit_ref
1509                )
1510            })
1511            .join("; ");
1512
1513        let last_details = pendings.last().unwrap().details().clone();
1514
1515        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1516        // in this order.
1517        let highest_executed_sequence = self
1518            .store
1519            .get_highest_executed_checkpoint_seq_number()
1520            .expect("db error")
1521            .unwrap_or(0);
1522
1523        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1524        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1525
1526        let new_checkpoints = self
1527            .create_checkpoints(
1528                sorted_tx_effects_included_in_checkpoint,
1529                &last_details,
1530                &all_roots,
1531            )
1532            .await?;
1533        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1534        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1535            debug_fatal!(
1536                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1537            );
1538        }
1539
1540        let new_ckpt_str = new_checkpoints
1541            .iter()
1542            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1543            .join("; ");
1544
1545        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1546            .await?;
1547        info!(
1548            "Made new checkpoint {} from pending checkpoint {}",
1549            new_ckpt_str, pending_ckpt_str
1550        );
1551
1552        Ok(highest_sequence)
1553    }
1554
1555    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1556    async fn make_checkpoint_v2(
1557        &mut self,
1558        pending: PendingCheckpointV2,
1559    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1560        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1561
1562        let details = pending.details.clone();
1563
1564        let highest_executed_sequence = self
1565            .store
1566            .get_highest_executed_checkpoint_seq_number()
1567            .expect("db error")
1568            .unwrap_or(0);
1569
1570        let (poll_count, result) =
1571            poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1572        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1573
1574        let new_checkpoints = self
1575            .create_checkpoints(
1576                sorted_tx_effects_included_in_checkpoint,
1577                &details,
1578                &all_roots,
1579            )
1580            .await?;
1581        assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1582        let sequence = *new_checkpoints.first().0.sequence_number();
1583        let digest = new_checkpoints.first().0.digest();
1584        if sequence <= highest_executed_sequence && poll_count > 1 {
1585            debug_fatal!(
1586                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1587            );
1588        }
1589
1590        self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1591            .await?;
1592        info!(
1593            seq = sequence,
1594            %digest,
1595            height = details.checkpoint_height,
1596            commit = %details.consensus_commit_ref,
1597            "Made new checkpoint"
1598        );
1599
1600        Ok(sequence)
1601    }
1602
1603    async fn construct_and_execute_settlement_transactions(
1604        &self,
1605        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1606        checkpoint_height: CheckpointHeight,
1607        checkpoint_seq: CheckpointSequenceNumber,
1608        tx_index_offset: u64,
1609    ) -> (TransactionKey, Vec<TransactionEffects>) {
1610        let _scope =
1611            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1612
1613        let tx_key =
1614            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1615
1616        let epoch = self.epoch_store.epoch();
1617        let accumulator_root_obj_initial_shared_version = self
1618            .epoch_store
1619            .epoch_start_config()
1620            .accumulator_root_obj_initial_shared_version()
1621            .expect("accumulator root object must exist");
1622
1623        let builder = AccumulatorSettlementTxBuilder::new(
1624            Some(self.effects_store.as_ref()),
1625            sorted_tx_effects_included_in_checkpoint,
1626            checkpoint_seq,
1627            tx_index_offset,
1628        );
1629
1630        let funds_changes = builder.collect_funds_changes();
1631        let num_updates = builder.num_updates();
1632        let settlement_txns = builder.build_tx(
1633            self.epoch_store.protocol_config(),
1634            epoch,
1635            accumulator_root_obj_initial_shared_version,
1636            checkpoint_height,
1637            checkpoint_seq,
1638        );
1639
1640        let settlement_txns: Vec<_> = settlement_txns
1641            .into_iter()
1642            .map(|tx| {
1643                VerifiedExecutableTransaction::new_system(
1644                    VerifiedTransaction::new_system_transaction(tx),
1645                    self.epoch_store.epoch(),
1646                )
1647            })
1648            .collect();
1649
1650        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1651
1652        debug!(
1653            ?settlement_digests,
1654            ?tx_key,
1655            "created settlement transactions with {num_updates} updates"
1656        );
1657
1658        self.epoch_store
1659            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1660
1661        let settlement_effects = wait_for_effects_with_retry(
1662            self.effects_store.as_ref(),
1663            "CheckpointBuilder::notify_read_settlement_effects",
1664            &settlement_digests,
1665            tx_key,
1666        )
1667        .await;
1668
1669        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1670            epoch,
1671            accumulator_root_obj_initial_shared_version,
1672            checkpoint_height,
1673            &settlement_effects,
1674        );
1675
1676        let barrier_tx = VerifiedExecutableTransaction::new_system(
1677            VerifiedTransaction::new_system_transaction(barrier_tx),
1678            self.epoch_store.epoch(),
1679        );
1680        let barrier_digest = *barrier_tx.digest();
1681
1682        self.epoch_store
1683            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1684
1685        let barrier_effects = wait_for_effects_with_retry(
1686            self.effects_store.as_ref(),
1687            "CheckpointBuilder::notify_read_barrier_effects",
1688            &[barrier_digest],
1689            tx_key,
1690        )
1691        .await;
1692
1693        let settlement_effects: Vec<_> = settlement_effects
1694            .into_iter()
1695            .chain(barrier_effects)
1696            .collect();
1697
1698        let mut next_accumulator_version = None;
1699        for fx in settlement_effects.iter() {
1700            assert!(
1701                fx.status().is_ok(),
1702                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1703                fx.transaction_digest(),
1704                fx
1705            );
1706            if let Some(version) = fx
1707                .mutated()
1708                .iter()
1709                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1710            {
1711                assert!(
1712                    next_accumulator_version.is_none(),
1713                    "Only one settlement transaction should mutate the accumulator root object"
1714                );
1715                next_accumulator_version = Some(version);
1716            }
1717        }
1718        let settlements = FundsSettlement {
1719            next_accumulator_version: next_accumulator_version
1720                .expect("Accumulator root object should be mutated in the settlement transactions"),
1721            funds_changes,
1722        };
1723
1724        self.state
1725            .execution_scheduler()
1726            .settle_address_funds(settlements);
1727
1728        (tx_key, settlement_effects)
1729    }
1730
1731    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1732    // the checkpoint, and return them in the order they should be included in the checkpoint.
1733    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1734    // checkpoint.
1735    #[instrument(level = "debug", skip_all)]
1736    async fn resolve_checkpoint_transactions(
1737        &self,
1738        pending_checkpoints: Vec<PendingCheckpoint>,
1739    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1740        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1741
1742        // Keeps track of the effects that are already included in the current checkpoint.
1743        // This is used when there are multiple pending checkpoints to create a single checkpoint
1744        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1745        // or in earlier pending checkpoints.
1746        let mut effects_in_current_checkpoint = BTreeSet::new();
1747
1748        let mut tx_effects = Vec::new();
1749        let mut tx_roots = HashSet::new();
1750
1751        for pending_checkpoint in pending_checkpoints.into_iter() {
1752            let mut pending = pending_checkpoint;
1753            debug!(
1754                checkpoint_commit_height = pending.details.checkpoint_height,
1755                "Resolving checkpoint transactions for pending checkpoint.",
1756            );
1757
1758            trace!(
1759                "roots for pending checkpoint {:?}: {:?}",
1760                pending.details.checkpoint_height, pending.roots,
1761            );
1762
1763            let settlement_root = if self.epoch_store.accumulators_enabled() {
1764                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1765                    pending.roots.pop()
1766                else {
1767                    fatal!("No settlement root found");
1768                };
1769                Some(settlement_root)
1770            } else {
1771                None
1772            };
1773
1774            let roots = &pending.roots;
1775
1776            self.metrics
1777                .checkpoint_roots_count
1778                .inc_by(roots.len() as u64);
1779
1780            let root_digests = self
1781                .epoch_store
1782                .notify_read_tx_key_to_digest(roots)
1783                .in_monitored_scope("CheckpointNotifyDigests")
1784                .await?;
1785            let root_effects = self
1786                .effects_store
1787                .notify_read_executed_effects(
1788                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1789                    &root_digests,
1790                )
1791                .in_monitored_scope("CheckpointNotifyRead")
1792                .await;
1793
1794            assert!(
1795                self.epoch_store
1796                    .protocol_config()
1797                    .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1798            );
1799
1800            // If the roots contains consensus commit prologue transaction, we want to extract it,
1801            // and put it to the front of the checkpoint.
1802            let consensus_commit_prologue =
1803                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1804
1805            // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1806            // other dependencies that haven't been included in any previous checkpoints.
1807            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1808                let unsorted_ccp = self.complete_checkpoint_effects(
1809                    vec![ccp_effects.clone()],
1810                    &mut effects_in_current_checkpoint,
1811                )?;
1812
1813                // No other dependencies of this consensus commit prologue that haven't been included
1814                // in any previous checkpoint.
1815                if unsorted_ccp.len() != 1 {
1816                    fatal!(
1817                        "Expected 1 consensus commit prologue, got {:?}",
1818                        unsorted_ccp
1819                            .iter()
1820                            .map(|e| e.transaction_digest())
1821                            .collect::<Vec<_>>()
1822                    );
1823                }
1824                assert_eq!(unsorted_ccp.len(), 1);
1825                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1826            }
1827
1828            let unsorted =
1829                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1830
1831            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1832            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1833            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1834                if cfg!(debug_assertions) {
1835                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1836                    for tx in unsorted.iter() {
1837                        assert!(tx.transaction_digest() != &ccp_digest);
1838                    }
1839                }
1840                sorted.push(ccp_effects);
1841            }
1842            sorted.extend(CausalOrder::causal_sort(unsorted));
1843
1844            if let Some(settlement_root) = settlement_root {
1845                //TODO: this is an incorrect heuristic for checkpoint seq number
1846                //      due to checkpoint splitting, to be fixed separately
1847                let last_checkpoint =
1848                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1849                let next_checkpoint_seq = last_checkpoint
1850                    .as_ref()
1851                    .map(|(seq, _)| *seq)
1852                    .unwrap_or_default()
1853                    + 1;
1854                let tx_index_offset = tx_effects.len() as u64;
1855
1856                let (tx_key, settlement_effects) = self
1857                    .construct_and_execute_settlement_transactions(
1858                        &sorted,
1859                        pending.details.checkpoint_height,
1860                        next_checkpoint_seq,
1861                        tx_index_offset,
1862                    )
1863                    .await;
1864                debug!(?tx_key, "executed settlement transactions");
1865
1866                assert_eq!(settlement_root, tx_key);
1867
1868                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1869                // should only include the digests of transactions that were originally roots in
1870                // the pending checkpoint. It is later used to identify transactions which were
1871                // added as dependencies, so that those transactions can be waited on using
1872                // `consensus_messages_processed_notify()`. System transactions (such as
1873                // settlements) are exempt from this already.
1874                //
1875                // However, we DO need to add them to `effects_in_current_checkpoint` so that
1876                // `complete_checkpoint_effects` won't pull them in again as dependencies when
1877                // processing later pending checkpoints in the same batch.
1878                effects_in_current_checkpoint
1879                    .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1880                sorted.extend(settlement_effects);
1881            }
1882
1883            #[cfg(msim)]
1884            {
1885                // Check consensus commit prologue invariants in sim test.
1886                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1887            }
1888
1889            tx_effects.extend(sorted);
1890            tx_roots.extend(root_digests);
1891        }
1892
1893        Ok((tx_effects, tx_roots))
1894    }
1895
1896    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1897    // the checkpoint, and return them in the order they should be included in the checkpoint.
1898    #[instrument(level = "debug", skip_all)]
1899    async fn resolve_checkpoint_transactions_v2(
1900        &self,
1901        pending: PendingCheckpointV2,
1902    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1903        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1904
1905        debug!(
1906            checkpoint_commit_height = pending.details.checkpoint_height,
1907            "Resolving checkpoint transactions for pending checkpoint.",
1908        );
1909
1910        trace!(
1911            "roots for pending checkpoint {:?}: {:?}",
1912            pending.details.checkpoint_height, pending.roots,
1913        );
1914
1915        assert!(
1916            self.epoch_store
1917                .protocol_config()
1918                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1919        );
1920
1921        let mut all_effects: Vec<TransactionEffects> = Vec::new();
1922        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1923
1924        for checkpoint_roots in &pending.roots {
1925            let tx_roots = &checkpoint_roots.tx_roots;
1926
1927            self.metrics
1928                .checkpoint_roots_count
1929                .inc_by(tx_roots.len() as u64);
1930
1931            let root_digests = self
1932                .epoch_store
1933                .notify_read_tx_key_to_digest(tx_roots)
1934                .in_monitored_scope("CheckpointNotifyDigests")
1935                .await?;
1936
1937            all_root_digests.extend(root_digests.iter().cloned());
1938
1939            let root_effects = self
1940                .effects_store
1941                .notify_read_executed_effects(
1942                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1943                    &root_digests,
1944                )
1945                .in_monitored_scope("CheckpointNotifyRead")
1946                .await;
1947            let consensus_commit_prologue =
1948                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1949
1950            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1951            let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1952            let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1953
1954            if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1955                let checkpoint_seq = pending
1956                    .details
1957                    .checkpoint_seq
1958                    .expect("checkpoint_seq must be set");
1959                let tx_index_offset = all_effects.len() as u64;
1960                let effects = self
1961                    .resolve_settlement_effects(
1962                        *settlement_key,
1963                        &sorted,
1964                        checkpoint_roots.height,
1965                        checkpoint_seq,
1966                        tx_index_offset,
1967                    )
1968                    .await;
1969                sorted.extend(effects);
1970            }
1971
1972            #[cfg(msim)]
1973            {
1974                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1975            }
1976
1977            all_effects.extend(sorted);
1978        }
1979        Ok((all_effects, all_root_digests.into_iter().collect()))
1980    }
1981
1982    /// Constructs settlement transactions to compute their digests, then reads effects
1983    /// directly from the cache. If execution is ahead of the checkpoint builder, the
1984    /// effects are already cached and this returns instantly. Otherwise it waits for
1985    /// the execution scheduler's queue worker to execute them.
1986    async fn resolve_settlement_effects(
1987        &self,
1988        settlement_key: TransactionKey,
1989        sorted_root_effects: &[TransactionEffects],
1990        checkpoint_height: CheckpointHeight,
1991        checkpoint_seq: CheckpointSequenceNumber,
1992        tx_index_offset: u64,
1993    ) -> Vec<TransactionEffects> {
1994        let epoch = self.epoch_store.epoch();
1995        let accumulator_root_obj_initial_shared_version = self
1996            .epoch_store
1997            .epoch_start_config()
1998            .accumulator_root_obj_initial_shared_version()
1999            .expect("accumulator root object must exist");
2000
2001        let builder = AccumulatorSettlementTxBuilder::new(
2002            None,
2003            sorted_root_effects,
2004            checkpoint_seq,
2005            tx_index_offset,
2006        );
2007
2008        let settlement_digests: Vec<_> = builder
2009            .build_tx(
2010                self.epoch_store.protocol_config(),
2011                epoch,
2012                accumulator_root_obj_initial_shared_version,
2013                checkpoint_height,
2014                checkpoint_seq,
2015            )
2016            .into_iter()
2017            .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2018            .collect();
2019
2020        debug!(
2021            ?settlement_digests,
2022            ?settlement_key,
2023            "fallback: reading settlement effects from cache"
2024        );
2025
2026        let settlement_effects = wait_for_effects_with_retry(
2027            self.effects_store.as_ref(),
2028            "CheckpointBuilder::fallback_settlement_effects",
2029            &settlement_digests,
2030            settlement_key,
2031        )
2032        .await;
2033
2034        let barrier_digest = *VerifiedTransaction::new_system_transaction(
2035            accumulators::build_accumulator_barrier_tx(
2036                epoch,
2037                accumulator_root_obj_initial_shared_version,
2038                checkpoint_height,
2039                &settlement_effects,
2040            ),
2041        )
2042        .digest();
2043
2044        let barrier_effects = wait_for_effects_with_retry(
2045            self.effects_store.as_ref(),
2046            "CheckpointBuilder::fallback_barrier_effects",
2047            &[barrier_digest],
2048            settlement_key,
2049        )
2050        .await;
2051
2052        settlement_effects
2053            .into_iter()
2054            .chain(barrier_effects)
2055            .collect()
2056    }
2057
2058    // Extracts the consensus commit prologue digest and effects from the root transactions.
2059    // The consensus commit prologue is expected to be the first transaction in the roots.
2060    fn extract_consensus_commit_prologue(
2061        &self,
2062        root_digests: &[TransactionDigest],
2063        root_effects: &[TransactionEffects],
2064    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2065        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2066        if root_digests.is_empty() {
2067            return Ok(None);
2068        }
2069
2070        // Reads the first transaction in the roots, and checks whether it is a consensus commit
2071        // prologue transaction. The consensus commit prologue transaction should be the first
2072        // transaction in the roots written by the consensus handler.
2073        let first_tx = self
2074            .state
2075            .get_transaction_cache_reader()
2076            .get_transaction_block(&root_digests[0])
2077            .expect("Transaction block must exist");
2078
2079        Ok(first_tx
2080            .transaction_data()
2081            .is_consensus_commit_prologue()
2082            .then(|| {
2083                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2084                (*first_tx.digest(), root_effects[0].clone())
2085            }))
2086    }
2087
2088    #[instrument(level = "debug", skip_all)]
2089    async fn write_checkpoints(
2090        &mut self,
2091        height: CheckpointHeight,
2092        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2093    ) -> SuiResult {
2094        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2095        let mut batch = self.store.tables.checkpoint_content.batch();
2096        let mut all_tx_digests =
2097            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2098
2099        for (summary, contents) in &new_checkpoints {
2100            debug!(
2101                checkpoint_commit_height = height,
2102                checkpoint_seq = summary.sequence_number,
2103                contents_digest = ?contents.digest(),
2104                "writing checkpoint",
2105            );
2106
2107            if let Some(previously_computed_summary) = self
2108                .store
2109                .tables
2110                .locally_computed_checkpoints
2111                .get(&summary.sequence_number)?
2112                && previously_computed_summary.digest() != summary.digest()
2113            {
2114                fatal!(
2115                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2116                    summary.sequence_number,
2117                    previously_computed_summary.digest(),
2118                    summary.digest()
2119                );
2120            }
2121
2122            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2123
2124            self.metrics
2125                .transactions_included_in_checkpoint
2126                .inc_by(contents.size() as u64);
2127            let sequence_number = summary.sequence_number;
2128            self.metrics
2129                .last_constructed_checkpoint
2130                .set(sequence_number as i64);
2131
2132            batch.insert_batch(
2133                &self.store.tables.checkpoint_content,
2134                [(contents.digest(), contents)],
2135            )?;
2136
2137            batch.insert_batch(
2138                &self.store.tables.locally_computed_checkpoints,
2139                [(sequence_number, summary)],
2140            )?;
2141        }
2142
2143        batch.write()?;
2144
2145        // Send all checkpoint sigs to consensus.
2146        for (summary, contents) in &new_checkpoints {
2147            self.output
2148                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2149                .await?;
2150        }
2151
2152        for (local_checkpoint, _) in &new_checkpoints {
2153            if let Some(certified_checkpoint) = self
2154                .store
2155                .tables
2156                .certified_checkpoints
2157                .get(local_checkpoint.sequence_number())?
2158            {
2159                self.store
2160                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2161            }
2162        }
2163
2164        self.notify_aggregator.notify_one();
2165        self.epoch_store
2166            .process_constructed_checkpoint(height, new_checkpoints);
2167        Ok(())
2168    }
2169
2170    #[allow(clippy::type_complexity)]
2171    fn split_checkpoint_chunks(
2172        &self,
2173        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2174        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2175    ) -> CheckpointBuilderResult<
2176        Vec<
2177            Vec<(
2178                TransactionEffects,
2179                Vec<(GenericSignature, Option<SequenceNumber>)>,
2180            )>,
2181        >,
2182    > {
2183        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2184
2185        // If splitting is done in consensus_handler, return everything as one chunk.
2186        if self
2187            .epoch_store
2188            .protocol_config()
2189            .split_checkpoints_in_consensus_handler()
2190        {
2191            let chunk: Vec<_> = effects_and_transaction_sizes
2192                .into_iter()
2193                .zip(signatures)
2194                .map(|((effects, _size), sigs)| (effects, sigs))
2195                .collect();
2196            return Ok(vec![chunk]);
2197        }
2198        let mut chunks = Vec::new();
2199        let mut chunk = Vec::new();
2200        let mut chunk_size: usize = 0;
2201        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2202            .into_iter()
2203            .zip(signatures.into_iter())
2204        {
2205            // Roll over to a new chunk after either max count or max size is reached.
2206            // The size calculation here is intended to estimate the size of the
2207            // FullCheckpointContents struct. If this code is modified, that struct
2208            // should also be updated accordingly.
2209            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2210                bcs::serialized_size(&signatures)?
2211            } else {
2212                let signatures: Vec<&GenericSignature> =
2213                    signatures.iter().map(|(s, _)| s).collect();
2214                bcs::serialized_size(&signatures)?
2215            };
2216            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2217            if chunk.len() == self.max_transactions_per_checkpoint
2218                || (chunk_size + size) > self.max_checkpoint_size_bytes
2219            {
2220                if chunk.is_empty() {
2221                    // Always allow at least one tx in a checkpoint.
2222                    warn!(
2223                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2224                        self.max_checkpoint_size_bytes
2225                    );
2226                } else {
2227                    chunks.push(chunk);
2228                    chunk = Vec::new();
2229                    chunk_size = 0;
2230                }
2231            }
2232
2233            chunk.push((effects, signatures));
2234            chunk_size += size;
2235        }
2236
2237        if !chunk.is_empty() || chunks.is_empty() {
2238            // We intentionally create an empty checkpoint if there is no content provided
2239            // to make a 'heartbeat' checkpoint.
2240            // Important: if some conditions are added here later, we need to make sure we always
2241            // have at least one chunk if last_pending_of_epoch is set
2242            chunks.push(chunk);
2243            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
2244            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
2245            // distinguish between "no transactions have happened" and "i am not receiving new
2246            // checkpoints".
2247        }
2248        Ok(chunks)
2249    }
2250
2251    fn load_last_built_checkpoint_summary(
2252        epoch_store: &AuthorityPerEpochStore,
2253        store: &CheckpointStore,
2254    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2255        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2256        if last_checkpoint.is_none() {
2257            let epoch = epoch_store.epoch();
2258            if epoch > 0 {
2259                let previous_epoch = epoch - 1;
2260                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2261                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2262                if let Some((ref seq, _)) = last_checkpoint {
2263                    debug!(
2264                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2265                    );
2266                } else {
2267                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
2268                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2269                }
2270            }
2271        }
2272        Ok(last_checkpoint)
2273    }
2274
2275    #[instrument(level = "debug", skip_all)]
2276    async fn create_checkpoints(
2277        &self,
2278        all_effects: Vec<TransactionEffects>,
2279        details: &PendingCheckpointInfo,
2280        all_roots: &HashSet<TransactionDigest>,
2281    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2282        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2283
2284        let total = all_effects.len();
2285        let mut last_checkpoint =
2286            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2287        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2288        debug!(
2289            checkpoint_commit_height = details.checkpoint_height,
2290            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2291            checkpoint_timestamp = details.timestamp_ms,
2292            "Creating checkpoint(s) for {} transactions",
2293            all_effects.len(),
2294        );
2295
2296        let all_digests: Vec<_> = all_effects
2297            .iter()
2298            .map(|effect| *effect.transaction_digest())
2299            .collect();
2300        let transactions_and_sizes = self
2301            .state
2302            .get_transaction_cache_reader()
2303            .get_transactions_and_serialized_sizes(&all_digests)?;
2304        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2305        let mut transactions = Vec::with_capacity(all_effects.len());
2306        let mut transaction_keys = Vec::with_capacity(all_effects.len());
2307        let mut randomness_rounds = BTreeMap::new();
2308        {
2309            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2310            debug!(
2311                ?last_checkpoint_seq,
2312                "Waiting for {:?} certificates to appear in consensus",
2313                all_effects.len()
2314            );
2315
2316            for (effects, transaction_and_size) in all_effects
2317                .into_iter()
2318                .zip(transactions_and_sizes.into_iter())
2319            {
2320                let (transaction, size) = transaction_and_size
2321                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2322                match transaction.inner().transaction_data().kind() {
2323                    TransactionKind::ConsensusCommitPrologue(_)
2324                    | TransactionKind::ConsensusCommitPrologueV2(_)
2325                    | TransactionKind::ConsensusCommitPrologueV3(_)
2326                    | TransactionKind::ConsensusCommitPrologueV4(_)
2327                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2328                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2329                        // processed before we reach here.
2330                    }
2331                    TransactionKind::ProgrammableSystemTransaction(_) => {
2332                        // settlement transactions are added by checkpoint builder
2333                    }
2334                    TransactionKind::ChangeEpoch(_)
2335                    | TransactionKind::Genesis(_)
2336                    | TransactionKind::EndOfEpochTransaction(_) => {
2337                        fatal!(
2338                            "unexpected transaction in checkpoint effects: {:?}",
2339                            transaction
2340                        );
2341                    }
2342                    TransactionKind::RandomnessStateUpdate(rsu) => {
2343                        randomness_rounds
2344                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2345                    }
2346                    TransactionKind::ProgrammableTransaction(_) => {
2347                        // Only transactions that are not roots should be included in the call to
2348                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2349                        // commit and so are known to be processed already.
2350                        let digest = *effects.transaction_digest();
2351                        if !all_roots.contains(&digest) {
2352                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2353                                ConsensusTransactionKey::Certificate(digest),
2354                            ));
2355                        }
2356                    }
2357                }
2358                transactions.push(transaction);
2359                all_effects_and_transaction_sizes.push((effects, size));
2360            }
2361
2362            self.epoch_store
2363                .consensus_messages_processed_notify(transaction_keys)
2364                .await?;
2365        }
2366
2367        let signatures = self
2368            .epoch_store
2369            .user_signatures_for_checkpoint(&transactions, &all_digests);
2370        debug!(
2371            ?last_checkpoint_seq,
2372            "Received {} checkpoint user signatures from consensus",
2373            signatures.len()
2374        );
2375
2376        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2377            Some(
2378                transactions
2379                    .iter()
2380                    .flat_map(|tx| {
2381                        if let TransactionKind::ProgrammableTransaction(ptb) =
2382                            tx.transaction_data().kind()
2383                        {
2384                            itertools::Either::Left(
2385                                ptb.commands
2386                                    .iter()
2387                                    .map(ExecutionTimeObservationKey::from_command),
2388                            )
2389                        } else {
2390                            itertools::Either::Right(std::iter::empty())
2391                        }
2392                    })
2393                    .collect(),
2394            )
2395        } else {
2396            None
2397        };
2398
2399        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2400        let chunks_count = chunks.len();
2401
2402        let mut checkpoints = Vec::with_capacity(chunks_count);
2403        debug!(
2404            ?last_checkpoint_seq,
2405            "Creating {} checkpoints with {} transactions", chunks_count, total,
2406        );
2407
2408        let epoch = self.epoch_store.epoch();
2409        for (index, transactions) in chunks.into_iter().enumerate() {
2410            let first_checkpoint_of_epoch = index == 0
2411                && last_checkpoint
2412                    .as_ref()
2413                    .map(|(_, c)| c.epoch != epoch)
2414                    .unwrap_or(true);
2415            if first_checkpoint_of_epoch {
2416                self.epoch_store
2417                    .record_epoch_first_checkpoint_creation_time_metric();
2418            }
2419            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2420
2421            let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2422                preassigned_seq
2423            } else {
2424                last_checkpoint
2425                    .as_ref()
2426                    .map(|(_, c)| c.sequence_number + 1)
2427                    .unwrap_or_default()
2428            };
2429            let mut timestamp_ms = details.timestamp_ms;
2430            if let Some((_, last_checkpoint)) = &last_checkpoint
2431                && last_checkpoint.timestamp_ms > timestamp_ms
2432            {
2433                // First consensus commit of an epoch can have zero timestamp.
2434                debug!(
2435                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2436                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2437                );
2438                if self
2439                    .epoch_store
2440                    .protocol_config()
2441                    .enforce_checkpoint_timestamp_monotonicity()
2442                {
2443                    timestamp_ms = last_checkpoint.timestamp_ms;
2444                }
2445            }
2446
2447            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2448            let epoch_rolling_gas_cost_summary =
2449                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2450
2451            let end_of_epoch_data = if last_checkpoint_of_epoch {
2452                let system_state_obj = self
2453                    .augment_epoch_last_checkpoint(
2454                        &epoch_rolling_gas_cost_summary,
2455                        timestamp_ms,
2456                        &mut effects,
2457                        &mut signatures,
2458                        sequence_number,
2459                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2460                        last_checkpoint_seq.unwrap_or_default(),
2461                    )
2462                    .await?;
2463
2464                let committee = system_state_obj
2465                    .get_current_epoch_committee()
2466                    .committee()
2467                    .clone();
2468
2469                // This must happen after the call to augment_epoch_last_checkpoint,
2470                // otherwise we will not capture the change_epoch tx.
2471                let root_state_digest = {
2472                    let state_acc = self
2473                        .global_state_hasher
2474                        .upgrade()
2475                        .expect("No checkpoints should be getting built after local configuration");
2476                    let acc = state_acc.accumulate_checkpoint(
2477                        &effects,
2478                        sequence_number,
2479                        &self.epoch_store,
2480                    )?;
2481
2482                    state_acc
2483                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2484                        .await?;
2485
2486                    state_acc.accumulate_running_root(
2487                        &self.epoch_store,
2488                        sequence_number,
2489                        Some(acc),
2490                    )?;
2491                    state_acc
2492                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2493                        .await?
2494                };
2495                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2496                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2497
2498                let epoch_commitments = if self
2499                    .epoch_store
2500                    .protocol_config()
2501                    .check_commit_root_state_digest_supported()
2502                {
2503                    vec![root_state_digest.into()]
2504                } else {
2505                    vec![]
2506                };
2507
2508                Some(EndOfEpochData {
2509                    next_epoch_committee: committee.voting_rights,
2510                    next_epoch_protocol_version: ProtocolVersion::new(
2511                        system_state_obj.protocol_version(),
2512                    ),
2513                    epoch_commitments,
2514                })
2515            } else {
2516                self.send_to_hasher
2517                    .send((sequence_number, effects.clone()))
2518                    .await?;
2519
2520                None
2521            };
2522            let contents = if self.epoch_store.protocol_config().address_aliases() {
2523                CheckpointContents::new_v2(&effects, signatures)
2524            } else {
2525                CheckpointContents::new_with_digests_and_signatures(
2526                    effects.iter().map(TransactionEffects::execution_digests),
2527                    signatures
2528                        .into_iter()
2529                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2530                        .collect(),
2531                )
2532            };
2533
2534            let num_txns = contents.size() as u64;
2535
2536            let network_total_transactions = last_checkpoint
2537                .as_ref()
2538                .map(|(_, c)| c.network_total_transactions + num_txns)
2539                .unwrap_or(num_txns);
2540
2541            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2542
2543            let matching_randomness_rounds: Vec<_> = effects
2544                .iter()
2545                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2546                .copied()
2547                .collect();
2548
2549            let checkpoint_commitments = if self
2550                .epoch_store
2551                .protocol_config()
2552                .include_checkpoint_artifacts_digest_in_summary()
2553            {
2554                let artifacts = CheckpointArtifacts::from(&effects[..]);
2555                let artifacts_digest = artifacts.digest()?;
2556                vec![artifacts_digest.into()]
2557            } else {
2558                Default::default()
2559            };
2560
2561            let summary = CheckpointSummary::new(
2562                self.epoch_store.protocol_config(),
2563                epoch,
2564                sequence_number,
2565                network_total_transactions,
2566                &contents,
2567                previous_digest,
2568                epoch_rolling_gas_cost_summary,
2569                end_of_epoch_data,
2570                timestamp_ms,
2571                matching_randomness_rounds,
2572                checkpoint_commitments,
2573            );
2574            summary.report_checkpoint_age(
2575                &self.metrics.last_created_checkpoint_age,
2576                &self.metrics.last_created_checkpoint_age_ms,
2577            );
2578            if last_checkpoint_of_epoch {
2579                info!(
2580                    checkpoint_seq = sequence_number,
2581                    "creating last checkpoint of epoch {}", epoch
2582                );
2583                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2584                    self.epoch_store
2585                        .report_epoch_metrics_at_last_checkpoint(stats);
2586                }
2587            }
2588            last_checkpoint = Some((sequence_number, summary.clone()));
2589            checkpoints.push((summary, contents));
2590        }
2591
2592        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2593    }
2594
2595    fn get_epoch_total_gas_cost(
2596        &self,
2597        last_checkpoint: Option<&CheckpointSummary>,
2598        cur_checkpoint_effects: &[TransactionEffects],
2599    ) -> GasCostSummary {
2600        let (previous_epoch, previous_gas_costs) = last_checkpoint
2601            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2602            .unwrap_or_default();
2603        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2604        if previous_epoch == self.epoch_store.epoch() {
2605            // sum only when we are within the same epoch
2606            GasCostSummary::new(
2607                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2608                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2609                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2610                previous_gas_costs.non_refundable_storage_fee
2611                    + current_gas_costs.non_refundable_storage_fee,
2612            )
2613        } else {
2614            current_gas_costs
2615        }
2616    }
2617
2618    #[instrument(level = "error", skip_all)]
2619    async fn augment_epoch_last_checkpoint(
2620        &self,
2621        epoch_total_gas_cost: &GasCostSummary,
2622        epoch_start_timestamp_ms: CheckpointTimestamp,
2623        checkpoint_effects: &mut Vec<TransactionEffects>,
2624        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2625        checkpoint: CheckpointSequenceNumber,
2626        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2627        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2628        // >1 checkpoint.
2629        last_checkpoint: CheckpointSequenceNumber,
2630    ) -> CheckpointBuilderResult<SuiSystemState> {
2631        let (system_state, effects) = self
2632            .state
2633            .create_and_execute_advance_epoch_tx(
2634                &self.epoch_store,
2635                epoch_total_gas_cost,
2636                checkpoint,
2637                epoch_start_timestamp_ms,
2638                end_of_epoch_observation_keys,
2639                last_checkpoint,
2640            )
2641            .await?;
2642        checkpoint_effects.push(effects);
2643        signatures.push(vec![]);
2644        Ok(system_state)
2645    }
2646
2647    /// For the given roots return complete list of effects to include in checkpoint
2648    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2649    /// Note that this function may be called multiple times to construct the checkpoint.
2650    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2651    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2652    /// after the call of this function.
2653    #[instrument(level = "debug", skip_all)]
2654    fn complete_checkpoint_effects(
2655        &self,
2656        mut roots: Vec<TransactionEffects>,
2657        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2658    ) -> SuiResult<Vec<TransactionEffects>> {
2659        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2660        let mut results = vec![];
2661        let mut seen = HashSet::new();
2662        loop {
2663            let mut pending = HashSet::new();
2664
2665            let transactions_included = self
2666                .epoch_store
2667                .builder_included_transactions_in_checkpoint(
2668                    roots.iter().map(|e| e.transaction_digest()),
2669                )?;
2670
2671            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2672                let digest = effect.transaction_digest();
2673                // Unnecessary to read effects of a dependency if the effect is already processed.
2674                seen.insert(*digest);
2675
2676                // Skip roots that are already included in the checkpoint.
2677                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2678                    continue;
2679                }
2680
2681                // Skip roots already included in checkpoints or roots from previous epochs
2682                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2683                    continue;
2684                }
2685
2686                let existing_effects = self
2687                    .epoch_store
2688                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2689
2690                for (dependency, effects_signature_exists) in
2691                    effect.dependencies().iter().zip(existing_effects.iter())
2692                {
2693                    // Skip here if dependency not executed in the current epoch.
2694                    // Note that the existence of an effects signature in the
2695                    // epoch store for the given digest indicates that the transaction
2696                    // was locally executed in the current epoch
2697                    if !effects_signature_exists {
2698                        continue;
2699                    }
2700                    if seen.insert(*dependency) {
2701                        pending.insert(*dependency);
2702                    }
2703                }
2704                results.push(effect);
2705            }
2706            if pending.is_empty() {
2707                break;
2708            }
2709            let pending = pending.into_iter().collect::<Vec<_>>();
2710            let effects = self.effects_store.multi_get_executed_effects(&pending);
2711            let effects = effects
2712                .into_iter()
2713                .zip(pending)
2714                .map(|(opt, digest)| match opt {
2715                    Some(x) => x,
2716                    None => panic!(
2717                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2718                        digest
2719                    ),
2720                })
2721                .collect::<Vec<_>>();
2722            roots = effects;
2723        }
2724
2725        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2726        Ok(results)
2727    }
2728
2729    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2730    // in simtest.
2731    #[cfg(msim)]
2732    fn expensive_consensus_commit_prologue_invariants_check(
2733        &self,
2734        root_digests: &[TransactionDigest],
2735        sorted: &[TransactionEffects],
2736    ) {
2737        // Gets all the consensus commit prologue transactions from the roots.
2738        let root_txs = self
2739            .state
2740            .get_transaction_cache_reader()
2741            .multi_get_transaction_blocks(root_digests);
2742        let ccps = root_txs
2743            .iter()
2744            .filter_map(|tx| {
2745                if let Some(tx) = tx {
2746                    if tx.transaction_data().is_consensus_commit_prologue() {
2747                        Some(tx)
2748                    } else {
2749                        None
2750                    }
2751                } else {
2752                    None
2753                }
2754            })
2755            .collect::<Vec<_>>();
2756
2757        // There should be at most one consensus commit prologue transaction in the roots.
2758        assert!(ccps.len() <= 1);
2759
2760        // Get all the transactions in the checkpoint.
2761        let txs = self
2762            .state
2763            .get_transaction_cache_reader()
2764            .multi_get_transaction_blocks(
2765                &sorted
2766                    .iter()
2767                    .map(|tx| tx.transaction_digest().clone())
2768                    .collect::<Vec<_>>(),
2769            );
2770
2771        if ccps.len() == 0 {
2772            // If there is no consensus commit prologue transaction in the roots, then there should be no
2773            // consensus commit prologue transaction in the checkpoint.
2774            for tx in txs.iter() {
2775                if let Some(tx) = tx {
2776                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2777                }
2778            }
2779        } else {
2780            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2781            assert!(
2782                txs[0]
2783                    .as_ref()
2784                    .unwrap()
2785                    .transaction_data()
2786                    .is_consensus_commit_prologue()
2787            );
2788
2789            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2790
2791            for tx in txs.iter().skip(1) {
2792                if let Some(tx) = tx {
2793                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2794                }
2795            }
2796        }
2797    }
2798}
2799
2800async fn wait_for_effects_with_retry(
2801    effects_store: &dyn TransactionCacheRead,
2802    task_name: &'static str,
2803    digests: &[TransactionDigest],
2804    tx_key: TransactionKey,
2805) -> Vec<TransactionEffects> {
2806    let delay = if in_antithesis() {
2807        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2808        15
2809    } else {
2810        5
2811    };
2812    loop {
2813        match tokio::time::timeout(Duration::from_secs(delay), async {
2814            effects_store
2815                .notify_read_executed_effects(task_name, digests)
2816                .await
2817        })
2818        .await
2819        {
2820            Ok(effects) => break effects,
2821            Err(_) => {
2822                debug_fatal!(
2823                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2824                    tx_key
2825                );
2826            }
2827        }
2828    }
2829}
2830
2831impl CheckpointAggregator {
2832    fn new(
2833        tables: Arc<CheckpointStore>,
2834        epoch_store: Arc<AuthorityPerEpochStore>,
2835        notify: Arc<Notify>,
2836        receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2837        output: Box<dyn CertifiedCheckpointOutput>,
2838        state: Arc<AuthorityState>,
2839        metrics: Arc<CheckpointMetrics>,
2840    ) -> Self {
2841        Self {
2842            store: tables,
2843            epoch_store,
2844            notify,
2845            receiver,
2846            pending: BTreeMap::new(),
2847            current: None,
2848            output,
2849            state,
2850            metrics,
2851        }
2852    }
2853
2854    async fn run(mut self) {
2855        info!("Starting CheckpointAggregator");
2856        loop {
2857            // Drain all signatures that arrived since the last iteration into the pending buffer
2858            while let Ok(sig) = self.receiver.try_recv() {
2859                self.pending
2860                    .entry(sig.summary.sequence_number)
2861                    .or_default()
2862                    .push(sig);
2863            }
2864
2865            if let Err(e) = self.run_and_notify().await {
2866                error!(
2867                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2868                    e
2869                );
2870                self.metrics.checkpoint_errors.inc();
2871                tokio::time::sleep(Duration::from_secs(1)).await;
2872                continue;
2873            }
2874
2875            tokio::select! {
2876                Some(sig) = self.receiver.recv() => {
2877                    self.pending
2878                        .entry(sig.summary.sequence_number)
2879                        .or_default()
2880                        .push(sig);
2881                }
2882                _ = self.notify.notified() => {}
2883                _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2884            }
2885        }
2886    }
2887
2888    async fn run_and_notify(&mut self) -> SuiResult {
2889        let summaries = self.run_inner()?;
2890        for summary in summaries {
2891            self.output.certified_checkpoint_created(&summary).await?;
2892        }
2893        Ok(())
2894    }
2895
2896    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2897        let _scope = monitored_scope("CheckpointAggregator");
2898        let mut result = vec![];
2899        'outer: loop {
2900            let next_to_certify = self.next_checkpoint_to_certify()?;
2901            // Discard buffered signatures for checkpoints already certified
2902            // (e.g. certified via StateSync before local aggregation completed).
2903            self.pending.retain(|&seq, _| seq >= next_to_certify);
2904            let current = if let Some(current) = &mut self.current {
2905                // It's possible that the checkpoint was already certified by
2906                // the rest of the network and we've already received the
2907                // certified checkpoint via StateSync. In this case, we reset
2908                // the current signature aggregator to the next checkpoint to
2909                // be certified
2910                if current.summary.sequence_number < next_to_certify {
2911                    assert_reachable!("skip checkpoint certification");
2912                    self.current = None;
2913                    continue;
2914                }
2915                current
2916            } else {
2917                let Some(summary) = self
2918                    .epoch_store
2919                    .get_built_checkpoint_summary(next_to_certify)?
2920                else {
2921                    return Ok(result);
2922                };
2923                self.current = Some(CheckpointSignatureAggregator {
2924                    digest: summary.digest(),
2925                    summary,
2926                    signatures_by_digest: MultiStakeAggregator::new(
2927                        self.epoch_store.committee().clone(),
2928                    ),
2929                    store: self.store.clone(),
2930                    state: self.state.clone(),
2931                    metrics: self.metrics.clone(),
2932                });
2933                self.current.as_mut().unwrap()
2934            };
2935
2936            let seq = current.summary.sequence_number;
2937            let sigs = self.pending.remove(&seq).unwrap_or_default();
2938            if sigs.is_empty() {
2939                trace!(
2940                    checkpoint_seq =? seq,
2941                    "Not enough checkpoint signatures",
2942                );
2943                return Ok(result);
2944            }
2945            for data in sigs {
2946                trace!(
2947                    checkpoint_seq = seq,
2948                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2949                    current.summary.digest(),
2950                    data.summary.auth_sig().authority.concise()
2951                );
2952                self.metrics
2953                    .checkpoint_participation
2954                    .with_label_values(&[&format!(
2955                        "{:?}",
2956                        data.summary.auth_sig().authority.concise()
2957                    )])
2958                    .inc();
2959                if let Ok(auth_signature) = current.try_aggregate(data) {
2960                    debug!(
2961                        checkpoint_seq = seq,
2962                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2963                        current.summary.digest(),
2964                    );
2965                    let summary = VerifiedCheckpoint::new_unchecked(
2966                        CertifiedCheckpointSummary::new_from_data_and_sig(
2967                            current.summary.clone(),
2968                            auth_signature,
2969                        ),
2970                    );
2971
2972                    self.store.insert_certified_checkpoint(&summary)?;
2973                    self.metrics.last_certified_checkpoint.set(seq as i64);
2974                    current.summary.report_checkpoint_age(
2975                        &self.metrics.last_certified_checkpoint_age,
2976                        &self.metrics.last_certified_checkpoint_age_ms,
2977                    );
2978                    result.push(summary.into_inner());
2979                    self.current = None;
2980                    continue 'outer;
2981                }
2982            }
2983            break;
2984        }
2985        Ok(result)
2986    }
2987
2988    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2989        Ok(self
2990            .store
2991            .tables
2992            .certified_checkpoints
2993            .reversed_safe_iter_with_bounds(None, None)?
2994            .next()
2995            .transpose()?
2996            .map(|(seq, _)| seq + 1)
2997            .unwrap_or_default())
2998    }
2999}
3000
3001impl CheckpointSignatureAggregator {
3002    #[allow(clippy::result_unit_err)]
3003    pub fn try_aggregate(
3004        &mut self,
3005        data: CheckpointSignatureMessage,
3006    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3007        let their_digest = *data.summary.digest();
3008        let (_, signature) = data.summary.into_data_and_sig();
3009        let author = signature.authority;
3010        let envelope =
3011            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3012        match self.signatures_by_digest.insert(their_digest, envelope) {
3013            // ignore repeated signatures
3014            InsertResult::Failed { error }
3015                if matches!(
3016                    error.as_inner(),
3017                    SuiErrorKind::StakeAggregatorRepeatedSigner {
3018                        conflicting_sig: false,
3019                        ..
3020                    },
3021                ) =>
3022            {
3023                Err(())
3024            }
3025            InsertResult::Failed { error } => {
3026                warn!(
3027                    checkpoint_seq = self.summary.sequence_number,
3028                    "Failed to aggregate new signature from validator {:?}: {:?}",
3029                    author.concise(),
3030                    error
3031                );
3032                self.check_for_split_brain();
3033                Err(())
3034            }
3035            InsertResult::QuorumReached(cert) => {
3036                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
3037                // the signature so we know that the author signed the message at some point.
3038                if their_digest != self.digest {
3039                    self.metrics.remote_checkpoint_forks.inc();
3040                    warn!(
3041                        checkpoint_seq = self.summary.sequence_number,
3042                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3043                        author.concise(),
3044                        their_digest,
3045                        self.digest
3046                    );
3047                    return Err(());
3048                }
3049                Ok(cert)
3050            }
3051            InsertResult::NotEnoughVotes {
3052                bad_votes: _,
3053                bad_authorities: _,
3054            } => {
3055                self.check_for_split_brain();
3056                Err(())
3057            }
3058        }
3059    }
3060
3061    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
3062    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
3063    /// irrespective of the outcome of any outstanding votes.
3064    fn check_for_split_brain(&self) {
3065        debug!(
3066            checkpoint_seq = self.summary.sequence_number,
3067            "Checking for split brain condition"
3068        );
3069        if self.signatures_by_digest.quorum_unreachable() {
3070            // TODO: at this point we should immediately halt processing
3071            // of new transaction certificates to avoid building on top of
3072            // forked output
3073            // self.halt_all_execution();
3074
3075            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3076            let digests_by_stake_messages = all_unique_values
3077                .iter()
3078                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3079                .map(|(digest, (_authorities, total_stake))| {
3080                    format!("{:?} (total stake: {})", digest, total_stake)
3081                })
3082                .collect::<Vec<String>>();
3083            fail_point_arg!("kill_split_brain_node", |(
3084                checkpoint_overrides,
3085                forked_authorities,
3086            ): (
3087                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3088                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3089            )| {
3090                #[cfg(msim)]
3091                {
3092                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3093                        (checkpoint_overrides.lock(), forked_authorities.lock())
3094                    {
3095                        // Find the digest produced by non-forked authorities
3096                        let correct_digest = all_unique_values
3097                            .iter()
3098                            .find(|(_, (authorities, _))| {
3099                                // Check if any authority that produced this digest is NOT in the forked set
3100                                authorities
3101                                    .iter()
3102                                    .any(|auth| !forked_authorities_set.contains(auth))
3103                            })
3104                            .map(|(digest, _)| digest.to_string())
3105                            .unwrap_or_else(|| {
3106                                // Fallback: use the digest with the highest stake
3107                                all_unique_values
3108                                    .iter()
3109                                    .max_by_key(|(_, (_, stake))| *stake)
3110                                    .map(|(digest, _)| digest.to_string())
3111                                    .unwrap_or_else(|| self.digest.to_string())
3112                            });
3113
3114                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
3115
3116                        tracing::error!(
3117                            fatal = true,
3118                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3119                            self.summary.sequence_number,
3120                            correct_digest
3121                        );
3122                    }
3123                }
3124            });
3125
3126            debug_fatal!(
3127                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3128                self.summary.sequence_number,
3129                self.signatures_by_digest.uncommitted_stake(),
3130                digests_by_stake_messages
3131            );
3132            self.metrics.split_brain_checkpoint_forks.inc();
3133
3134            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3135            let local_summary = self.summary.clone();
3136            let state = self.state.clone();
3137            let tables = self.store.clone();
3138
3139            tokio::spawn(async move {
3140                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3141            });
3142        }
3143    }
3144}
3145
3146/// Create data dump containing relevant data for diagnosing cause of the
3147/// split brain by querying one disagreeing validator for full checkpoint contents.
3148/// To minimize peer chatter, we only query one validator at random from each
3149/// disagreeing faction, as all honest validators that participated in this round may
3150/// inevitably run the same process.
3151async fn diagnose_split_brain(
3152    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3153    local_summary: CheckpointSummary,
3154    state: Arc<AuthorityState>,
3155    tables: Arc<CheckpointStore>,
3156) {
3157    debug!(
3158        checkpoint_seq = local_summary.sequence_number,
3159        "Running split brain diagnostics..."
3160    );
3161    let time = SystemTime::now();
3162    // collect one random disagreeing validator per differing digest
3163    let digest_to_validator = all_unique_values
3164        .iter()
3165        .filter_map(|(digest, (validators, _))| {
3166            if *digest != local_summary.digest() {
3167                let random_validator = validators.choose(&mut get_rng()).unwrap();
3168                Some((*digest, *random_validator))
3169            } else {
3170                None
3171            }
3172        })
3173        .collect::<HashMap<_, _>>();
3174    if digest_to_validator.is_empty() {
3175        panic!(
3176            "Given split brain condition, there should be at \
3177                least one validator that disagrees with local signature"
3178        );
3179    }
3180
3181    let epoch_store = state.load_epoch_store_one_call_per_task();
3182    let committee = epoch_store
3183        .epoch_start_state()
3184        .get_sui_committee_with_network_metadata();
3185    let network_config = default_mysten_network_config();
3186    let network_clients =
3187        make_network_authority_clients_with_network_config(&committee, &network_config);
3188
3189    // Query all disagreeing validators
3190    let response_futures = digest_to_validator
3191        .values()
3192        .cloned()
3193        .map(|validator| {
3194            let client = network_clients
3195                .get(&validator)
3196                .expect("Failed to get network client");
3197            let request = CheckpointRequestV2 {
3198                sequence_number: Some(local_summary.sequence_number),
3199                request_content: true,
3200                certified: false,
3201            };
3202            client.handle_checkpoint_v2(request)
3203        })
3204        .collect::<Vec<_>>();
3205
3206    let digest_name_pair = digest_to_validator.iter();
3207    let response_data = futures::future::join_all(response_futures)
3208        .await
3209        .into_iter()
3210        .zip(digest_name_pair)
3211        .filter_map(|(response, (digest, name))| match response {
3212            Ok(response) => match response {
3213                CheckpointResponseV2 {
3214                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3215                    contents: Some(contents),
3216                } => Some((*name, *digest, summary, contents)),
3217                CheckpointResponseV2 {
3218                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3219                    contents: _,
3220                } => {
3221                    panic!("Expected pending checkpoint, but got certified checkpoint");
3222                }
3223                CheckpointResponseV2 {
3224                    checkpoint: None,
3225                    contents: _,
3226                } => {
3227                    error!(
3228                        "Summary for checkpoint {:?} not found on validator {:?}",
3229                        local_summary.sequence_number, name
3230                    );
3231                    None
3232                }
3233                CheckpointResponseV2 {
3234                    checkpoint: _,
3235                    contents: None,
3236                } => {
3237                    error!(
3238                        "Contents for checkpoint {:?} not found on validator {:?}",
3239                        local_summary.sequence_number, name
3240                    );
3241                    None
3242                }
3243            },
3244            Err(e) => {
3245                error!(
3246                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3247                    e
3248                );
3249                None
3250            }
3251        })
3252        .collect::<Vec<_>>();
3253
3254    let local_checkpoint_contents = tables
3255        .get_checkpoint_contents(&local_summary.content_digest)
3256        .unwrap_or_else(|_| {
3257            panic!(
3258                "Could not find checkpoint contents for digest {:?}",
3259                local_summary.digest()
3260            )
3261        })
3262        .unwrap_or_else(|| {
3263            panic!(
3264                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3265                local_summary.sequence_number,
3266                local_summary.digest()
3267            )
3268        });
3269    let local_contents_text = format!("{local_checkpoint_contents:?}");
3270
3271    let local_summary_text = format!("{local_summary:?}");
3272    let local_validator = state.name.concise();
3273    let diff_patches = response_data
3274        .iter()
3275        .map(|(name, other_digest, other_summary, contents)| {
3276            let other_contents_text = format!("{contents:?}");
3277            let other_summary_text = format!("{other_summary:?}");
3278            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3279                .enumerate_transactions(&local_summary)
3280                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3281                .unzip();
3282            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3283                .enumerate_transactions(other_summary)
3284                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3285                .unzip();
3286            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3287            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3288            let local_transactions_text = format!("{local_transactions:#?}");
3289            let other_transactions_text = format!("{other_transactions:#?}");
3290            let transactions_patch =
3291                create_patch(&local_transactions_text, &other_transactions_text);
3292            let local_effects_text = format!("{local_effects:#?}");
3293            let other_effects_text = format!("{other_effects:#?}");
3294            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3295            let seq_number = local_summary.sequence_number;
3296            let local_digest = local_summary.digest();
3297            let other_validator = name.concise();
3298            format!(
3299                "Checkpoint: {seq_number:?}\n\
3300                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3301                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3302                Summary Diff: \n{summary_patch}\n\n\
3303                Contents Diff: \n{contents_patch}\n\n\
3304                Transactions Diff: \n{transactions_patch}\n\n\
3305                Effects Diff: \n{effects_patch}",
3306            )
3307        })
3308        .collect::<Vec<_>>()
3309        .join("\n\n\n");
3310
3311    let header = format!(
3312        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3313        Datetime: {:?}",
3314        time
3315    );
3316    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3317    let path = tempfile::tempdir()
3318        .expect("Failed to create tempdir")
3319        .keep()
3320        .join(Path::new("checkpoint_fork_dump.txt"));
3321    let mut file = File::create(path).unwrap();
3322    write!(file, "{}", fork_logs_text).unwrap();
3323    debug!("{}", fork_logs_text);
3324}
3325
3326pub trait CheckpointServiceNotify {
3327    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3328
3329    fn notify_checkpoint(&self) -> SuiResult;
3330}
3331
3332#[allow(clippy::large_enum_variant)]
3333enum CheckpointServiceState {
3334    Unstarted(
3335        (
3336            CheckpointBuilder,
3337            CheckpointAggregator,
3338            CheckpointStateHasher,
3339        ),
3340    ),
3341    Started,
3342}
3343
3344impl CheckpointServiceState {
3345    fn take_unstarted(
3346        &mut self,
3347    ) -> (
3348        CheckpointBuilder,
3349        CheckpointAggregator,
3350        CheckpointStateHasher,
3351    ) {
3352        let mut state = CheckpointServiceState::Started;
3353        std::mem::swap(self, &mut state);
3354
3355        match state {
3356            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3357                (builder, aggregator, hasher)
3358            }
3359            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3360        }
3361    }
3362}
3363
3364pub struct CheckpointService {
3365    tables: Arc<CheckpointStore>,
3366    notify_builder: Arc<Notify>,
3367    signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3368    // A notification for the current highest built sequence number.
3369    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3370    // The highest sequence number that had already been built at the time CheckpointService
3371    // was constructed
3372    highest_previously_built_seq: CheckpointSequenceNumber,
3373    metrics: Arc<CheckpointMetrics>,
3374    state: Mutex<CheckpointServiceState>,
3375}
3376
3377impl CheckpointService {
3378    /// Constructs a new CheckpointService in an un-started state.
3379    // The signature channel is unbounded because notify_checkpoint_signature is called from a
3380    // sync context (consensus_validator.rs implements a sync external trait) and cannot block.
3381    // The channel is consumed by a single async aggregator task that drains it continuously, so
3382    // unbounded growth is not a concern in practice.
3383    #[allow(clippy::disallowed_methods)]
3384    pub fn build(
3385        state: Arc<AuthorityState>,
3386        checkpoint_store: Arc<CheckpointStore>,
3387        epoch_store: Arc<AuthorityPerEpochStore>,
3388        effects_store: Arc<dyn TransactionCacheRead>,
3389        global_state_hasher: Weak<GlobalStateHasher>,
3390        checkpoint_output: Box<dyn CheckpointOutput>,
3391        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3392        metrics: Arc<CheckpointMetrics>,
3393        max_transactions_per_checkpoint: usize,
3394        max_checkpoint_size_bytes: usize,
3395    ) -> Arc<Self> {
3396        info!(
3397            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3398        );
3399        let notify_builder = Arc::new(Notify::new());
3400        let notify_aggregator = Arc::new(Notify::new());
3401
3402        // We may have built higher checkpoint numbers before restarting.
3403        let highest_previously_built_seq = checkpoint_store
3404            .get_latest_locally_computed_checkpoint()
3405            .expect("failed to get latest locally computed checkpoint")
3406            .map(|s| s.sequence_number)
3407            .unwrap_or(0);
3408
3409        let highest_currently_built_seq =
3410            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3411                .expect("epoch should not have ended")
3412                .map(|(seq, _)| seq)
3413                .unwrap_or(0);
3414
3415        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3416
3417        let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3418
3419        let aggregator = CheckpointAggregator::new(
3420            checkpoint_store.clone(),
3421            epoch_store.clone(),
3422            notify_aggregator.clone(),
3423            signature_receiver,
3424            certified_checkpoint_output,
3425            state.clone(),
3426            metrics.clone(),
3427        );
3428
3429        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3430
3431        let ckpt_state_hasher = CheckpointStateHasher::new(
3432            epoch_store.clone(),
3433            global_state_hasher.clone(),
3434            receive_from_builder,
3435        );
3436
3437        let builder = CheckpointBuilder::new(
3438            state.clone(),
3439            checkpoint_store.clone(),
3440            epoch_store.clone(),
3441            notify_builder.clone(),
3442            effects_store,
3443            global_state_hasher,
3444            send_to_hasher,
3445            checkpoint_output,
3446            notify_aggregator.clone(),
3447            highest_currently_built_seq_tx.clone(),
3448            metrics.clone(),
3449            max_transactions_per_checkpoint,
3450            max_checkpoint_size_bytes,
3451        );
3452
3453        Arc::new(Self {
3454            tables: checkpoint_store,
3455            notify_builder,
3456            signature_sender,
3457            highest_currently_built_seq_tx,
3458            highest_previously_built_seq,
3459            metrics,
3460            state: Mutex::new(CheckpointServiceState::Unstarted((
3461                builder,
3462                aggregator,
3463                ckpt_state_hasher,
3464            ))),
3465        })
3466    }
3467
3468    /// Starts the CheckpointService.
3469    ///
3470    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3471    /// been built before the most recent restart. You can think of this as a WAL replay
3472    /// operation. Upon startup, we may have a number of consensus commits and resulting
3473    /// checkpoints that were built but not committed to disk. We want to reprocess the
3474    /// commits and rebuild the checkpoints before starting normal operation.
3475    pub async fn spawn(
3476        &self,
3477        epoch_store: Arc<AuthorityPerEpochStore>,
3478        consensus_replay_waiter: Option<ReplayWaiter>,
3479    ) {
3480        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3481
3482        // Clean up state hashes computed after the last built checkpoint
3483        // This prevents ECMH divergence after fork recovery restarts
3484
3485        // Note: there is a rare crash recovery edge case where we write the builder
3486        // summary, but crash before we can bump the highest executed checkpoint.
3487        // If we committed the builder summary, it was certified and unforked, so there
3488        // is no need to clear that state hash. If we do clear it, then checkpoint executor
3489        // will wait forever for checkpoint builder to produce the state hash, which will
3490        // never happen.
3491        let last_persisted_builder_seq = epoch_store
3492            .last_persisted_checkpoint_builder_summary()
3493            .expect("epoch should not have ended")
3494            .map(|s| s.summary.sequence_number);
3495
3496        let last_executed_seq = self
3497            .tables
3498            .get_highest_executed_checkpoint()
3499            .expect("Failed to get highest executed checkpoint")
3500            .map(|checkpoint| *checkpoint.sequence_number());
3501
3502        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3503            if let Err(e) = builder
3504                .epoch_store
3505                .clear_state_hashes_after_checkpoint(last_committed_seq)
3506            {
3507                error!(
3508                    "Failed to clear state hashes after checkpoint {}: {:?}",
3509                    last_committed_seq, e
3510                );
3511            } else {
3512                info!(
3513                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3514                    last_committed_seq
3515                );
3516            }
3517        }
3518
3519        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3520
3521        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3522        let aggregator_task = spawn_monitored_task!(aggregator.run());
3523
3524        spawn_monitored_task!(async move {
3525            epoch_store
3526                .within_alive_epoch(async move {
3527                    builder.run(consensus_replay_waiter).await;
3528                    builder_finished_tx.send(()).ok();
3529                })
3530                .await
3531                .ok();
3532
3533            // state hasher will terminate as soon as it has finished processing all messages from builder
3534            state_hasher_task
3535                .await
3536                .expect("state hasher should exit normally");
3537
3538            // builder must shut down before aggregator and state_hasher, since it sends
3539            // messages to them
3540            aggregator_task.abort();
3541            aggregator_task.await.ok();
3542        });
3543
3544        // If this times out, the validator may still start up. The worst that can
3545        // happen is that we will crash later on instead of immediately. The eventual
3546        // crash would occur because we may be missing transactions that are below the
3547        // highest_synced_checkpoint watermark, which can cause a crash in
3548        // `CheckpointExecutor::extract_randomness_rounds`.
3549        if tokio::time::timeout(Duration::from_secs(120), async move {
3550            tokio::select! {
3551                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3552                _ = self.wait_for_rebuilt_checkpoints() => (),
3553            }
3554        })
3555        .await
3556        .is_err()
3557        {
3558            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3559        }
3560    }
3561}
3562
3563impl CheckpointService {
3564    /// Waits until all checkpoints had been built before the node restarted
3565    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3566    /// (and their transactions) below the highest_synced_checkpoint watermark are
3567    /// available. Once the checkpoints are constructed, we can be sure that the
3568    /// transactions have also been executed.
3569    pub async fn wait_for_rebuilt_checkpoints(&self) {
3570        let highest_previously_built_seq = self.highest_previously_built_seq;
3571        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3572        let mut highest_currently_built_seq = *rx.borrow_and_update();
3573        info!(
3574            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3575        );
3576        loop {
3577            if highest_currently_built_seq >= highest_previously_built_seq {
3578                info!("Checkpoint rebuild complete");
3579                break;
3580            }
3581            rx.changed().await.unwrap();
3582            highest_currently_built_seq = *rx.borrow_and_update();
3583        }
3584    }
3585
3586    #[cfg(test)]
3587    fn write_and_notify_checkpoint_for_testing(
3588        &self,
3589        epoch_store: &AuthorityPerEpochStore,
3590        checkpoint: PendingCheckpoint,
3591    ) -> SuiResult {
3592        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3593
3594        let mut output = ConsensusCommitOutput::new(0);
3595        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3596        output.set_default_commit_stats_for_testing();
3597        epoch_store.push_consensus_output_for_tests(output);
3598        self.notify_checkpoint()?;
3599        Ok(())
3600    }
3601}
3602
3603impl CheckpointServiceNotify for CheckpointService {
3604    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3605        let sequence = info.summary.sequence_number;
3606        let signer = info.summary.auth_sig().authority.concise();
3607
3608        if let Some(highest_verified_checkpoint) = self
3609            .tables
3610            .get_highest_verified_checkpoint()?
3611            .map(|x| *x.sequence_number())
3612            && sequence <= highest_verified_checkpoint
3613        {
3614            trace!(
3615                checkpoint_seq = sequence,
3616                "Ignore checkpoint signature from {} - already certified", signer,
3617            );
3618            self.metrics
3619                .last_ignored_checkpoint_signature_received
3620                .set(sequence as i64);
3621            return Ok(());
3622        }
3623        trace!(
3624            checkpoint_seq = sequence,
3625            "Received checkpoint signature, digest {} from {}",
3626            info.summary.digest(),
3627            signer,
3628        );
3629        self.metrics
3630            .last_received_checkpoint_signatures
3631            .with_label_values(&[&signer.to_string()])
3632            .set(sequence as i64);
3633        self.signature_sender.send(info.clone()).ok();
3634        Ok(())
3635    }
3636
3637    fn notify_checkpoint(&self) -> SuiResult {
3638        self.notify_builder.notify_one();
3639        Ok(())
3640    }
3641}
3642
3643// test helper
3644pub struct CheckpointServiceNoop {}
3645impl CheckpointServiceNotify for CheckpointServiceNoop {
3646    fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3647        Ok(())
3648    }
3649
3650    fn notify_checkpoint(&self) -> SuiResult {
3651        Ok(())
3652    }
3653}
3654
3655impl PendingCheckpoint {
3656    pub fn height(&self) -> CheckpointHeight {
3657        self.details.checkpoint_height
3658    }
3659
3660    pub fn roots(&self) -> &Vec<TransactionKey> {
3661        &self.roots
3662    }
3663
3664    pub fn details(&self) -> &PendingCheckpointInfo {
3665        &self.details
3666    }
3667}
3668
3669impl PendingCheckpointV2 {
3670    pub fn height(&self) -> CheckpointHeight {
3671        self.details.checkpoint_height
3672    }
3673
3674    pub(crate) fn num_roots(&self) -> usize {
3675        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3676    }
3677}
3678
3679pin_project! {
3680    pub struct PollCounter<Fut> {
3681        #[pin]
3682        future: Fut,
3683        count: usize,
3684    }
3685}
3686
3687impl<Fut> PollCounter<Fut> {
3688    pub fn new(future: Fut) -> Self {
3689        Self { future, count: 0 }
3690    }
3691
3692    pub fn count(&self) -> usize {
3693        self.count
3694    }
3695}
3696
3697impl<Fut: Future> Future for PollCounter<Fut> {
3698    type Output = (usize, Fut::Output);
3699
3700    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3701        let this = self.project();
3702        *this.count += 1;
3703        match this.future.poll(cx) {
3704            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3705            Poll::Pending => Poll::Pending,
3706        }
3707    }
3708}
3709
3710fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3711    PollCounter::new(future)
3712}
3713
3714#[cfg(test)]
3715mod tests {
3716    use super::*;
3717    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3718    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3719    use futures::FutureExt as _;
3720    use futures::future::BoxFuture;
3721    use std::collections::HashMap;
3722    use std::ops::Deref;
3723    use sui_macros::sim_test;
3724    use sui_protocol_config::{Chain, ProtocolConfig};
3725    use sui_types::accumulator_event::AccumulatorEvent;
3726    use sui_types::authenticator_state::ActiveJwk;
3727    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3728    use sui_types::crypto::Signature;
3729    use sui_types::effects::{TransactionEffects, TransactionEvents};
3730    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3731    use sui_types::transaction::VerifiedTransaction;
3732    use tokio::sync::mpsc;
3733
3734    #[tokio::test]
3735    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3736        let store = CheckpointStore::new_for_tests();
3737        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3738        for seq in 70u64..=80u64 {
3739            let contents =
3740                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3741                    [sui_types::base_types::ExecutionDigests::new(
3742                        sui_types::digests::TransactionDigest::random(),
3743                        sui_types::digests::TransactionEffectsDigest::ZERO,
3744                    )],
3745                );
3746            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3747                &protocol,
3748                0,
3749                seq,
3750                0,
3751                &contents,
3752                None,
3753                sui_types::gas::GasCostSummary::default(),
3754                None,
3755                0,
3756                Vec::new(),
3757                Vec::new(),
3758            );
3759            store
3760                .tables
3761                .locally_computed_checkpoints
3762                .insert(&seq, &summary)
3763                .unwrap();
3764        }
3765
3766        store
3767            .clear_locally_computed_checkpoints_from(76)
3768            .expect("clear should succeed");
3769
3770        // Explicit boundary checks: 75 must remain, 76 must be deleted
3771        assert!(
3772            store
3773                .tables
3774                .locally_computed_checkpoints
3775                .get(&75)
3776                .unwrap()
3777                .is_some()
3778        );
3779        assert!(
3780            store
3781                .tables
3782                .locally_computed_checkpoints
3783                .get(&76)
3784                .unwrap()
3785                .is_none()
3786        );
3787
3788        for seq in 70u64..76u64 {
3789            assert!(
3790                store
3791                    .tables
3792                    .locally_computed_checkpoints
3793                    .get(&seq)
3794                    .unwrap()
3795                    .is_some()
3796            );
3797        }
3798        for seq in 76u64..=80u64 {
3799            assert!(
3800                store
3801                    .tables
3802                    .locally_computed_checkpoints
3803                    .get(&seq)
3804                    .unwrap()
3805                    .is_none()
3806            );
3807        }
3808    }
3809
3810    #[tokio::test]
3811    async fn test_fork_detection_storage() {
3812        let store = CheckpointStore::new_for_tests();
3813        // checkpoint fork
3814        let seq_num = 42;
3815        let digest = CheckpointDigest::random();
3816
3817        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3818
3819        store
3820            .record_checkpoint_fork_detected(seq_num, digest)
3821            .unwrap();
3822
3823        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3824        assert!(retrieved.is_some());
3825        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3826        assert_eq!(retrieved_seq, seq_num);
3827        assert_eq!(retrieved_digest, digest);
3828
3829        store.clear_checkpoint_fork_detected().unwrap();
3830        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3831
3832        // txn fork
3833        let tx_digest = TransactionDigest::random();
3834        let expected_effects = TransactionEffectsDigest::random();
3835        let actual_effects = TransactionEffectsDigest::random();
3836
3837        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3838
3839        store
3840            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3841            .unwrap();
3842
3843        let retrieved = store.get_transaction_fork_detected().unwrap();
3844        assert!(retrieved.is_some());
3845        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3846        assert_eq!(retrieved_tx, tx_digest);
3847        assert_eq!(retrieved_expected, expected_effects);
3848        assert_eq!(retrieved_actual, actual_effects);
3849
3850        store.clear_transaction_fork_detected().unwrap();
3851        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3852    }
3853
3854    #[sim_test]
3855    pub async fn checkpoint_builder_test() {
3856        telemetry_subscribers::init_for_testing();
3857
3858        let mut protocol_config =
3859            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3860        protocol_config.disable_accumulators_for_testing();
3861        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3862        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3863        let state = TestAuthorityBuilder::new()
3864            .with_protocol_config(protocol_config)
3865            .build()
3866            .await;
3867
3868        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3869            0,
3870            0,
3871            vec![],
3872            SequenceNumber::new(),
3873        );
3874
3875        let jwks = {
3876            let mut jwks = Vec::new();
3877            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3878                jwks.push(ActiveJwk {
3879                    jwk_id: JwkId::new(
3880                        "https://accounts.google.com".to_string(),
3881                        "1234567890".to_string(),
3882                    ),
3883                    jwk: JWK {
3884                        kty: "RSA".to_string(),
3885                        e: "AQAB".to_string(),
3886                        n: "1234567890".to_string(),
3887                        alg: "RS256".to_string(),
3888                    },
3889                    epoch: 0,
3890                });
3891            }
3892            jwks
3893        };
3894
3895        let dummy_tx_with_data =
3896            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3897
3898        for i in 0..15 {
3899            state
3900                .database_for_testing()
3901                .perpetual_tables
3902                .transactions
3903                .insert(&d(i), dummy_tx.serializable_ref())
3904                .unwrap();
3905        }
3906        for i in 15..20 {
3907            state
3908                .database_for_testing()
3909                .perpetual_tables
3910                .transactions
3911                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3912                .unwrap();
3913        }
3914
3915        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3916        commit_cert_for_test(
3917            &mut store,
3918            state.clone(),
3919            d(1),
3920            vec![d(2), d(3)],
3921            GasCostSummary::new(11, 12, 11, 1),
3922        );
3923        commit_cert_for_test(
3924            &mut store,
3925            state.clone(),
3926            d(2),
3927            vec![d(3), d(4)],
3928            GasCostSummary::new(21, 22, 21, 1),
3929        );
3930        commit_cert_for_test(
3931            &mut store,
3932            state.clone(),
3933            d(3),
3934            vec![],
3935            GasCostSummary::new(31, 32, 31, 1),
3936        );
3937        commit_cert_for_test(
3938            &mut store,
3939            state.clone(),
3940            d(4),
3941            vec![],
3942            GasCostSummary::new(41, 42, 41, 1),
3943        );
3944        for i in [5, 6, 7, 10, 11, 12, 13] {
3945            commit_cert_for_test(
3946                &mut store,
3947                state.clone(),
3948                d(i),
3949                vec![],
3950                GasCostSummary::new(41, 42, 41, 1),
3951            );
3952        }
3953        for i in [15, 16, 17] {
3954            commit_cert_for_test(
3955                &mut store,
3956                state.clone(),
3957                d(i),
3958                vec![],
3959                GasCostSummary::new(51, 52, 51, 1),
3960            );
3961        }
3962        let all_digests: Vec<_> = store.keys().copied().collect();
3963        for digest in all_digests {
3964            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3965            state
3966                .epoch_store_for_testing()
3967                .test_insert_user_signature(digest, vec![(signature, None)]);
3968        }
3969
3970        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3971        let (certified_output, mut certified_result) =
3972            mpsc::channel::<CertifiedCheckpointSummary>(10);
3973        let store = Arc::new(store);
3974
3975        let ckpt_dir = tempfile::tempdir().unwrap();
3976        let checkpoint_store =
3977            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3978        let epoch_store = state.epoch_store_for_testing();
3979
3980        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3981            state.get_global_state_hash_store().clone(),
3982        ));
3983
3984        let checkpoint_service = CheckpointService::build(
3985            state.clone(),
3986            checkpoint_store,
3987            epoch_store.clone(),
3988            store,
3989            Arc::downgrade(&global_state_hasher),
3990            Box::new(output),
3991            Box::new(certified_output),
3992            CheckpointMetrics::new_for_tests(),
3993            3,
3994            100_000,
3995        );
3996        checkpoint_service.spawn(epoch_store.clone(), None).await;
3997
3998        checkpoint_service
3999            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4000            .unwrap();
4001        checkpoint_service
4002            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4003            .unwrap();
4004        checkpoint_service
4005            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4006            .unwrap();
4007        checkpoint_service
4008            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4009            .unwrap();
4010        checkpoint_service
4011            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4012            .unwrap();
4013        checkpoint_service
4014            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4015            .unwrap();
4016
4017        let (c1c, c1s) = result.recv().await.unwrap();
4018        let (c2c, c2s) = result.recv().await.unwrap();
4019
4020        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4021        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4022        assert_eq!(c1t, vec![d(4)]);
4023        assert_eq!(c1s.previous_digest, None);
4024        assert_eq!(c1s.sequence_number, 0);
4025        assert_eq!(
4026            c1s.epoch_rolling_gas_cost_summary,
4027            GasCostSummary::new(41, 42, 41, 1)
4028        );
4029
4030        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4031        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4032        assert_eq!(c2s.sequence_number, 1);
4033        assert_eq!(
4034            c2s.epoch_rolling_gas_cost_summary,
4035            GasCostSummary::new(104, 108, 104, 4)
4036        );
4037
4038        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
4039        // Verify that we split into 2 checkpoints.
4040        let (c3c, c3s) = result.recv().await.unwrap();
4041        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4042        let (c4c, c4s) = result.recv().await.unwrap();
4043        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4044        assert_eq!(c3s.sequence_number, 2);
4045        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4046        assert_eq!(c4s.sequence_number, 3);
4047        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4048        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4049        assert_eq!(c4t, vec![d(13)]);
4050
4051        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
4052        // Verify that we split into 2 checkpoints.
4053        let (c5c, c5s) = result.recv().await.unwrap();
4054        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4055        let (c6c, c6s) = result.recv().await.unwrap();
4056        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4057        assert_eq!(c5s.sequence_number, 4);
4058        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4059        assert_eq!(c6s.sequence_number, 5);
4060        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4061        assert_eq!(c5t, vec![d(15), d(16)]);
4062        assert_eq!(c6t, vec![d(17)]);
4063
4064        // Pending at index 4 was too soon after the prior one and should be coalesced into
4065        // the next one.
4066        let (c7c, c7s) = result.recv().await.unwrap();
4067        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4068        assert_eq!(c7t, vec![d(5), d(6)]);
4069        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4070        assert_eq!(c7s.sequence_number, 6);
4071
4072        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4073        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4074
4075        checkpoint_service
4076            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4077            .unwrap();
4078        checkpoint_service
4079            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4080            .unwrap();
4081
4082        let c1sc = certified_result.recv().await.unwrap();
4083        let c2sc = certified_result.recv().await.unwrap();
4084        assert_eq!(c1sc.sequence_number, 0);
4085        assert_eq!(c2sc.sequence_number, 1);
4086    }
4087
4088    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4089        fn notify_read_executed_effects_may_fail(
4090            &self,
4091            _: &str,
4092            digests: &[TransactionDigest],
4093        ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4094            std::future::ready(Ok(digests
4095                .iter()
4096                .map(|d| self.get(d).expect("effects not found").clone())
4097                .collect()))
4098            .boxed()
4099        }
4100
4101        fn notify_read_executed_effects_digests(
4102            &self,
4103            _: &str,
4104            digests: &[TransactionDigest],
4105        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4106            std::future::ready(
4107                digests
4108                    .iter()
4109                    .map(|d| {
4110                        self.get(d)
4111                            .map(|fx| fx.digest())
4112                            .expect("effects not found")
4113                    })
4114                    .collect(),
4115            )
4116            .boxed()
4117        }
4118
4119        fn multi_get_executed_effects(
4120            &self,
4121            digests: &[TransactionDigest],
4122        ) -> Vec<Option<TransactionEffects>> {
4123            digests.iter().map(|d| self.get(d).cloned()).collect()
4124        }
4125
4126        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
4127        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
4128        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
4129        // ExecutionCacheRead implementors).
4130
4131        fn multi_get_transaction_blocks(
4132            &self,
4133            _: &[TransactionDigest],
4134        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4135            unimplemented!()
4136        }
4137
4138        fn multi_get_executed_effects_digests(
4139            &self,
4140            _: &[TransactionDigest],
4141        ) -> Vec<Option<TransactionEffectsDigest>> {
4142            unimplemented!()
4143        }
4144
4145        fn multi_get_effects(
4146            &self,
4147            _: &[TransactionEffectsDigest],
4148        ) -> Vec<Option<TransactionEffects>> {
4149            unimplemented!()
4150        }
4151
4152        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4153            unimplemented!()
4154        }
4155
4156        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4157            unimplemented!()
4158        }
4159
4160        fn get_unchanged_loaded_runtime_objects(
4161            &self,
4162            _digest: &TransactionDigest,
4163        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4164            unimplemented!()
4165        }
4166
4167        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4168            unimplemented!()
4169        }
4170    }
4171
4172    #[async_trait::async_trait]
4173    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4174        async fn checkpoint_created(
4175            &self,
4176            summary: &CheckpointSummary,
4177            contents: &CheckpointContents,
4178            _epoch_store: &Arc<AuthorityPerEpochStore>,
4179            _checkpoint_store: &Arc<CheckpointStore>,
4180        ) -> SuiResult {
4181            self.try_send((contents.clone(), summary.clone())).unwrap();
4182            Ok(())
4183        }
4184    }
4185
4186    #[async_trait::async_trait]
4187    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4188        async fn certified_checkpoint_created(
4189            &self,
4190            summary: &CertifiedCheckpointSummary,
4191        ) -> SuiResult {
4192            self.try_send(summary.clone()).unwrap();
4193            Ok(())
4194        }
4195    }
4196
4197    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4198        PendingCheckpoint {
4199            roots: t
4200                .into_iter()
4201                .map(|t| TransactionKey::Digest(d(t)))
4202                .collect(),
4203            details: PendingCheckpointInfo {
4204                timestamp_ms,
4205                last_of_epoch: false,
4206                checkpoint_height: i,
4207                consensus_commit_ref: CommitRef::default(),
4208                rejected_transactions_digest: Digest::default(),
4209                checkpoint_seq: None,
4210            },
4211        }
4212    }
4213
4214    fn d(i: u8) -> TransactionDigest {
4215        let mut bytes: [u8; 32] = Default::default();
4216        bytes[0] = i;
4217        TransactionDigest::new(bytes)
4218    }
4219
4220    fn e(
4221        transaction_digest: TransactionDigest,
4222        dependencies: Vec<TransactionDigest>,
4223        gas_used: GasCostSummary,
4224    ) -> TransactionEffects {
4225        let mut effects = TransactionEffects::default();
4226        *effects.transaction_digest_mut_for_testing() = transaction_digest;
4227        *effects.dependencies_mut_for_testing() = dependencies;
4228        *effects.gas_cost_summary_mut_for_testing() = gas_used;
4229        effects
4230    }
4231
4232    fn commit_cert_for_test(
4233        store: &mut HashMap<TransactionDigest, TransactionEffects>,
4234        state: Arc<AuthorityState>,
4235        digest: TransactionDigest,
4236        dependencies: Vec<TransactionDigest>,
4237        gas_used: GasCostSummary,
4238    ) {
4239        let epoch_store = state.epoch_store_for_testing();
4240        let effects = e(digest, dependencies, gas_used);
4241        store.insert(digest, effects.clone());
4242        epoch_store.insert_executed_in_epoch(&digest);
4243    }
4244}