sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::CertifiedCheckpointOutput;
15pub use crate::checkpoints::checkpoint_output::{
16    CheckpointOutput, LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::ZipDebugEqIteratorExt;
29use mysten_common::random::get_rng;
30use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
31use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
32use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
33use nonempty::NonEmpty;
34use parking_lot::Mutex;
35use pin_project_lite::pin_project;
36use serde::{Deserialize, Serialize};
37use sui_macros::fail_point_arg;
38use sui_network::default_mysten_network_config;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
47use tokio::sync::{mpsc, watch};
48use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
49
50use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
51use crate::authority::authority_store_pruner::PrunerWatermarks;
52use crate::consensus_handler::SequencedConsensusTransactionKey;
53use rand::seq::SliceRandom;
54use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
55use std::fs::File;
56use std::future::Future;
57use std::io::Write;
58use std::path::Path;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::Weak;
62use std::task::{Context, Poll};
63use std::time::{Duration, SystemTime};
64use sui_protocol_config::ProtocolVersion;
65use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
66use sui_types::committee::StakeUnit;
67use sui_types::crypto::AuthorityStrongQuorumSignInfo;
68use sui_types::digests::{
69    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
70};
71use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
72use sui_types::error::{SuiErrorKind, SuiResult};
73use sui_types::gas::GasCostSummary;
74use sui_types::message_envelope::Message;
75use sui_types::messages_checkpoint::{
76    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
77    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
78    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
79    VerifiedCheckpointContents,
80};
81use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
82use sui_types::messages_consensus::ConsensusTransactionKey;
83use sui_types::signature::GenericSignature;
84use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
85use sui_types::transaction::{
86    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
87};
88use tokio::sync::Notify;
89use tracing::{debug, error, info, instrument, trace, warn};
90use typed_store::DBMapUtils;
91use typed_store::Map;
92use typed_store::{
93    TypedStoreError,
94    rocks::{DBMap, MetricConf},
95};
96
97const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
98
99pub type CheckpointHeight = u64;
100
101pub struct EpochStats {
102    pub checkpoint_count: u64,
103    pub transaction_count: u64,
104    pub total_gas_reward: u64,
105}
106
107#[derive(Clone, Debug)]
108pub struct PendingCheckpointInfo {
109    pub timestamp_ms: CheckpointTimestamp,
110    pub last_of_epoch: bool,
111    // Computed in calculate_pending_checkpoint_height() from consensus round,
112    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
113    pub checkpoint_height: CheckpointHeight,
114    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
115    pub consensus_commit_ref: CommitRef,
116    pub rejected_transactions_digest: Digest,
117    // Pre-assigned checkpoint sequence number from consensus handler.
118    // Only set when split_checkpoints_in_consensus_handler is enabled.
119    pub checkpoint_seq: Option<CheckpointSequenceNumber>,
120}
121
122#[derive(Clone, Debug)]
123pub struct PendingCheckpoint {
124    pub roots: Vec<TransactionKey>,
125    pub details: PendingCheckpointInfo,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct CheckpointRoots {
130    pub tx_roots: Vec<TransactionKey>,
131    pub settlement_root: Option<TransactionKey>,
132    pub height: CheckpointHeight,
133}
134
135/// PendingCheckpointV2 is merged and split in ConsensusHandler
136/// instead of CheckpointBuilder. It contains 1 or more
137/// CheckpointRoots which represents a group of transactions
138/// settled together.
139/// Usage of PendingCheckpointV2 is gated by split_checkpoints_in_consensus_handler
140/// We need to support dual implementations until this feature flag is removed.
141#[derive(Clone, Debug)]
142pub struct PendingCheckpointV2 {
143    pub roots: Vec<CheckpointRoots>,
144    pub details: PendingCheckpointInfo,
145}
146
147#[derive(Clone, Debug, Serialize, Deserialize)]
148pub struct BuilderCheckpointSummary {
149    pub summary: CheckpointSummary,
150    // Height at which this checkpoint summary was built. None for genesis checkpoint
151    pub checkpoint_height: Option<CheckpointHeight>,
152    pub position_in_commit: usize,
153}
154
155#[derive(DBMapUtils)]
156#[cfg_attr(tidehunter, tidehunter)]
157pub struct CheckpointStoreTables {
158    /// Maps checkpoint contents digest to checkpoint contents
159    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
160
161    /// Maps checkpoint contents digest to checkpoint sequence number
162    pub(crate) checkpoint_sequence_by_contents_digest:
163        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
164
165    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
166    /// efficient reads of full checkpoints. Entries from this table are deleted after state
167    /// accumulation has completed.
168    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
169    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
170    // deprecate this table (and remove when possible).
171    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
172
173    /// Stores certified checkpoints
174    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
175    /// Map from checkpoint digest to certified checkpoint
176    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
177
178    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
179    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
180    /// certified checkpoint.
181    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
182
183    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
184    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
185
186    /// Watermarks used to determine the highest verified, fully synced, and
187    /// fully executed checkpoints
188    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
189
190    /// Stores transaction fork detection information
191    pub(crate) transaction_fork_detected: DBMap<
192        u8,
193        (
194            TransactionDigest,
195            TransactionEffectsDigest,
196            TransactionEffectsDigest,
197        ),
198    >,
199    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
200    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
201}
202
203fn full_checkpoint_content_table_default_config() -> DBOptions {
204    DBOptions {
205        options: default_db_options().options,
206        // We have seen potential data corruption issues in this table after forced shutdowns
207        // so we enable value hash logging to help with debugging.
208        // TODO: remove this once we have a better understanding of the root cause.
209        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
210    }
211}
212
213impl CheckpointStoreTables {
214    #[cfg(not(tidehunter))]
215    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
216        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
217    }
218
219    #[cfg(tidehunter)]
220    pub fn new(
221        path: &Path,
222        metric_name: &'static str,
223        pruner_watermarks: Arc<PrunerWatermarks>,
224    ) -> Self {
225        tracing::warn!("Checkpoint DB using tidehunter");
226        use crate::authority::authority_store_pruner::apply_relocation_filter;
227        use typed_store::tidehunter_util::{
228            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
229            default_max_dirty_keys, default_mutex_count, default_value_cache_size,
230        };
231        let mutexes = default_mutex_count();
232        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
233        let override_dirty_keys_config = KeySpaceConfig::new()
234            .with_max_dirty_keys(16 * default_max_dirty_keys())
235            .with_value_cache_size(default_value_cache_size());
236        let config_u64 = ThConfig::new_with_config(
237            8,
238            mutexes,
239            u64_sequence_key,
240            override_dirty_keys_config.clone(),
241        );
242        let digest_config = ThConfig::new_with_rm_prefix(
243            32,
244            mutexes,
245            KeyType::uniform(default_cells_per_mutex()),
246            KeySpaceConfig::default(),
247            vec![0, 0, 0, 0, 0, 0, 0, 32],
248        );
249        let watermarks_config = KeySpaceConfig::new()
250            .with_value_cache_size(10)
251            .disable_unload();
252        let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
253        let configs = vec![
254            (
255                "checkpoint_content",
256                digest_config.clone().with_config(
257                    KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
258                ),
259            ),
260            (
261                "checkpoint_sequence_by_contents_digest",
262                digest_config.clone().with_config(apply_relocation_filter(
263                    KeySpaceConfig::default(),
264                    pruner_watermarks.checkpoint_id.clone(),
265                    |sequence_number: CheckpointSequenceNumber| sequence_number,
266                    false,
267                )),
268            ),
269            (
270                "full_checkpoint_content",
271                config_u64.clone().with_config(apply_relocation_filter(
272                    override_dirty_keys_config.clone(),
273                    pruner_watermarks.checkpoint_id.clone(),
274                    |sequence_number: CheckpointSequenceNumber| sequence_number,
275                    true,
276                )),
277            ),
278            ("certified_checkpoints", config_u64.clone()),
279            (
280                "checkpoint_by_digest",
281                digest_config.clone().with_config(apply_relocation_filter(
282                    lru_config,
283                    pruner_watermarks.epoch_id.clone(),
284                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
285                    false,
286                )),
287            ),
288            (
289                "locally_computed_checkpoints",
290                config_u64.clone().with_config(apply_relocation_filter(
291                    override_dirty_keys_config.clone(),
292                    pruner_watermarks.checkpoint_id.clone(),
293                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
294                    true,
295                )),
296            ),
297            ("epoch_last_checkpoint_map", config_u64.clone()),
298            (
299                "watermarks",
300                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
301            ),
302            (
303                "transaction_fork_detected",
304                ThConfig::new_with_config(
305                    1,
306                    1,
307                    KeyType::uniform(1),
308                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
309                ),
310            ),
311            (
312                "full_checkpoint_content_v2",
313                config_u64.clone().with_config(apply_relocation_filter(
314                    override_dirty_keys_config.clone(),
315                    pruner_watermarks.checkpoint_id.clone(),
316                    |sequence_number: CheckpointSequenceNumber| sequence_number,
317                    true,
318                )),
319            ),
320        ];
321        Self::open_tables_read_write(
322            path.to_path_buf(),
323            MetricConf::new(metric_name),
324            configs
325                .into_iter()
326                .map(|(cf, config)| (cf.to_string(), config))
327                .collect(),
328        )
329    }
330
331    #[cfg(not(tidehunter))]
332    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
333        Self::get_read_only_handle(
334            path.to_path_buf(),
335            None,
336            None,
337            MetricConf::new("checkpoint_readonly"),
338        )
339    }
340
341    #[cfg(tidehunter)]
342    pub fn open_readonly(path: &Path) -> Self {
343        Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
344    }
345}
346
347pub struct CheckpointStore {
348    pub(crate) tables: CheckpointStoreTables,
349    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
350    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
351}
352
353impl CheckpointStore {
354    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
355        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
356        Arc::new(Self {
357            tables,
358            synced_checkpoint_notify_read: NotifyRead::new(),
359            executed_checkpoint_notify_read: NotifyRead::new(),
360        })
361    }
362
363    pub fn new_for_tests() -> Arc<Self> {
364        let ckpt_dir = mysten_common::tempdir().unwrap();
365        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
366    }
367
368    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
369        let tables = CheckpointStoreTables::new(
370            path,
371            "db_checkpoint",
372            Arc::new(PrunerWatermarks::default()),
373        );
374        Arc::new(Self {
375            tables,
376            synced_checkpoint_notify_read: NotifyRead::new(),
377            executed_checkpoint_notify_read: NotifyRead::new(),
378        })
379    }
380
381    #[cfg(not(tidehunter))]
382    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
383        CheckpointStoreTables::open_readonly(path)
384    }
385
386    #[cfg(tidehunter)]
387    pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
388        CheckpointStoreTables::open_readonly(path)
389    }
390
391    #[instrument(level = "info", skip_all)]
392    pub fn insert_genesis_checkpoint(
393        &self,
394        checkpoint: VerifiedCheckpoint,
395        contents: CheckpointContents,
396        epoch_store: &AuthorityPerEpochStore,
397    ) {
398        assert_eq!(
399            checkpoint.epoch(),
400            0,
401            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
402        );
403        assert_eq!(
404            *checkpoint.sequence_number(),
405            0,
406            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
407        );
408
409        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
410        match self.get_checkpoint_by_sequence_number(0).unwrap() {
411            Some(existing_checkpoint) => {
412                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
413            }
414            None => {
415                if epoch_store.epoch() == checkpoint.epoch {
416                    epoch_store
417                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
418                        .unwrap();
419                } else {
420                    debug!(
421                        validator_epoch =% epoch_store.epoch(),
422                        genesis_epoch =% checkpoint.epoch(),
423                        "Not inserting checkpoint builder data for genesis checkpoint",
424                    );
425                }
426                self.insert_checkpoint_contents(contents).unwrap();
427                self.insert_verified_checkpoint(&checkpoint).unwrap();
428                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
429            }
430        }
431    }
432
433    pub fn get_checkpoint_by_digest(
434        &self,
435        digest: &CheckpointDigest,
436    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
437        self.tables
438            .checkpoint_by_digest
439            .get(digest)
440            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
441    }
442
443    pub fn get_checkpoint_by_sequence_number(
444        &self,
445        sequence_number: CheckpointSequenceNumber,
446    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
447        self.tables
448            .certified_checkpoints
449            .get(&sequence_number)
450            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
451    }
452
453    pub fn get_locally_computed_checkpoint(
454        &self,
455        sequence_number: CheckpointSequenceNumber,
456    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
457        self.tables
458            .locally_computed_checkpoints
459            .get(&sequence_number)
460    }
461
462    pub fn multi_get_locally_computed_checkpoints(
463        &self,
464        sequence_numbers: &[CheckpointSequenceNumber],
465    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
466        let checkpoints = self
467            .tables
468            .locally_computed_checkpoints
469            .multi_get(sequence_numbers)?;
470
471        Ok(checkpoints)
472    }
473
474    pub fn get_sequence_number_by_contents_digest(
475        &self,
476        digest: &CheckpointContentsDigest,
477    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
478        self.tables
479            .checkpoint_sequence_by_contents_digest
480            .get(digest)
481    }
482
483    pub fn delete_contents_digest_sequence_number_mapping(
484        &self,
485        digest: &CheckpointContentsDigest,
486    ) -> Result<(), TypedStoreError> {
487        self.tables
488            .checkpoint_sequence_by_contents_digest
489            .remove(digest)
490    }
491
492    pub fn get_latest_certified_checkpoint(
493        &self,
494    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
495        Ok(self
496            .tables
497            .certified_checkpoints
498            .reversed_safe_iter_with_bounds(None, None)?
499            .next()
500            .transpose()?
501            .map(|(_, v)| v.into()))
502    }
503
504    pub fn get_latest_locally_computed_checkpoint(
505        &self,
506    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
507        Ok(self
508            .tables
509            .locally_computed_checkpoints
510            .reversed_safe_iter_with_bounds(None, None)?
511            .next()
512            .transpose()?
513            .map(|(_, v)| v))
514    }
515
516    pub fn multi_get_checkpoint_by_sequence_number(
517        &self,
518        sequence_numbers: &[CheckpointSequenceNumber],
519    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
520        let checkpoints = self
521            .tables
522            .certified_checkpoints
523            .multi_get(sequence_numbers)?
524            .into_iter()
525            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
526            .collect();
527
528        Ok(checkpoints)
529    }
530
531    pub fn multi_get_checkpoint_content(
532        &self,
533        contents_digest: &[CheckpointContentsDigest],
534    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
535        self.tables.checkpoint_content.multi_get(contents_digest)
536    }
537
538    pub fn get_highest_verified_checkpoint(
539        &self,
540    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
541        let highest_verified = if let Some(highest_verified) = self
542            .tables
543            .watermarks
544            .get(&CheckpointWatermark::HighestVerified)?
545        {
546            highest_verified
547        } else {
548            return Ok(None);
549        };
550        self.get_checkpoint_by_digest(&highest_verified.1)
551    }
552
553    pub fn get_highest_synced_checkpoint(
554        &self,
555    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
556        let highest_synced = if let Some(highest_synced) = self
557            .tables
558            .watermarks
559            .get(&CheckpointWatermark::HighestSynced)?
560        {
561            highest_synced
562        } else {
563            return Ok(None);
564        };
565        self.get_checkpoint_by_digest(&highest_synced.1)
566    }
567
568    pub fn get_highest_synced_checkpoint_seq_number(
569        &self,
570    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571        if let Some(highest_synced) = self
572            .tables
573            .watermarks
574            .get(&CheckpointWatermark::HighestSynced)?
575        {
576            Ok(Some(highest_synced.0))
577        } else {
578            Ok(None)
579        }
580    }
581
582    pub fn get_highest_executed_checkpoint_seq_number(
583        &self,
584    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
585        if let Some(highest_executed) = self
586            .tables
587            .watermarks
588            .get(&CheckpointWatermark::HighestExecuted)?
589        {
590            Ok(Some(highest_executed.0))
591        } else {
592            Ok(None)
593        }
594    }
595
596    pub fn get_highest_executed_checkpoint(
597        &self,
598    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
599        let highest_executed = if let Some(highest_executed) = self
600            .tables
601            .watermarks
602            .get(&CheckpointWatermark::HighestExecuted)?
603        {
604            highest_executed
605        } else {
606            return Ok(None);
607        };
608        self.get_checkpoint_by_digest(&highest_executed.1)
609    }
610
611    pub fn get_highest_pruned_checkpoint_seq_number(
612        &self,
613    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
614        self.tables
615            .watermarks
616            .get(&CheckpointWatermark::HighestPruned)
617            .map(|watermark| watermark.map(|w| w.0))
618    }
619
620    pub fn get_checkpoint_contents(
621        &self,
622        digest: &CheckpointContentsDigest,
623    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
624        self.tables.checkpoint_content.get(digest)
625    }
626
627    pub fn get_full_checkpoint_contents_by_sequence_number(
628        &self,
629        seq: CheckpointSequenceNumber,
630    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
631        self.tables.full_checkpoint_content_v2.get(&seq)
632    }
633
634    fn prune_local_summaries(&self) -> SuiResult {
635        if let Some((last_local_summary, _)) = self
636            .tables
637            .locally_computed_checkpoints
638            .reversed_safe_iter_with_bounds(None, None)?
639            .next()
640            .transpose()?
641        {
642            let mut batch = self.tables.locally_computed_checkpoints.batch();
643            batch.schedule_delete_range(
644                &self.tables.locally_computed_checkpoints,
645                &0,
646                &last_local_summary,
647            )?;
648            batch.write()?;
649            info!("Pruned local summaries up to {:?}", last_local_summary);
650        }
651        Ok(())
652    }
653
654    pub fn clear_locally_computed_checkpoints_from(
655        &self,
656        from_seq: CheckpointSequenceNumber,
657    ) -> SuiResult {
658        let keys: Vec<_> = self
659            .tables
660            .locally_computed_checkpoints
661            .safe_iter_with_bounds(Some(from_seq), None)
662            .map(|r| r.map(|(k, _)| k))
663            .collect::<Result<_, _>>()?;
664        if let Some(&last_local_summary) = keys.last() {
665            let mut batch = self.tables.locally_computed_checkpoints.batch();
666            batch
667                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
668                .expect("Failed to delete locally computed checkpoints");
669            batch
670                .write()
671                .expect("Failed to delete locally computed checkpoints");
672            warn!(
673                from_seq,
674                last_local_summary,
675                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
676                from_seq,
677                last_local_summary
678            );
679        }
680        Ok(())
681    }
682
683    fn check_for_checkpoint_fork(
684        &self,
685        local_checkpoint: &CheckpointSummary,
686        verified_checkpoint: &VerifiedCheckpoint,
687    ) {
688        if local_checkpoint != verified_checkpoint.data() {
689            let verified_contents = self
690                .get_checkpoint_contents(&verified_checkpoint.content_digest)
691                .map(|opt_contents| {
692                    opt_contents
693                        .map(|contents| format!("{:?}", contents))
694                        .unwrap_or_else(|| {
695                            format!(
696                                "Verified checkpoint contents not found, digest: {:?}",
697                                verified_checkpoint.content_digest,
698                            )
699                        })
700                })
701                .map_err(|e| {
702                    format!(
703                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
704                        verified_checkpoint.content_digest, e
705                    )
706                })
707                .unwrap_or_else(|err_msg| err_msg);
708
709            let local_contents = self
710                .get_checkpoint_contents(&local_checkpoint.content_digest)
711                .map(|opt_contents| {
712                    opt_contents
713                        .map(|contents| format!("{:?}", contents))
714                        .unwrap_or_else(|| {
715                            format!(
716                                "Local checkpoint contents not found, digest: {:?}",
717                                local_checkpoint.content_digest
718                            )
719                        })
720                })
721                .map_err(|e| {
722                    format!(
723                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
724                        local_checkpoint.content_digest, e
725                    )
726                })
727                .unwrap_or_else(|err_msg| err_msg);
728
729            // checkpoint contents may be too large for panic message.
730            error!(
731                verified_checkpoint = ?verified_checkpoint.data(),
732                ?verified_contents,
733                ?local_checkpoint,
734                ?local_contents,
735                "Local checkpoint fork detected!",
736            );
737
738            // Record the fork in the database before crashing
739            if let Err(e) = self.record_checkpoint_fork_detected(
740                *local_checkpoint.sequence_number(),
741                local_checkpoint.digest(),
742            ) {
743                error!("Failed to record checkpoint fork in database: {:?}", e);
744            }
745
746            fail_point_arg!(
747                "kill_checkpoint_fork_node",
748                |checkpoint_overrides: std::sync::Arc<
749                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
750                >| {
751                    #[cfg(msim)]
752                    {
753                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
754                            overrides.insert(
755                                local_checkpoint.sequence_number,
756                                verified_checkpoint.digest().to_string(),
757                            );
758                        }
759                        tracing::error!(
760                            fatal = true,
761                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
762                            local_checkpoint.sequence_number(),
763                            verified_checkpoint.digest()
764                        );
765                        sui_simulator::task::shutdown_current_node();
766                    }
767                }
768            );
769
770            fatal!(
771                "Local checkpoint fork detected for sequence number: {}",
772                local_checkpoint.sequence_number()
773            );
774        }
775    }
776
777    // Called by consensus (ConsensusAggregator).
778    // Different from `insert_verified_checkpoint`, it does not touch
779    // the highest_verified_checkpoint watermark such that state sync
780    // will have a chance to process this checkpoint and perform some
781    // state-sync only things.
782    pub fn insert_certified_checkpoint(
783        &self,
784        checkpoint: &VerifiedCheckpoint,
785    ) -> Result<(), TypedStoreError> {
786        debug!(
787            checkpoint_seq = checkpoint.sequence_number(),
788            "Inserting certified checkpoint",
789        );
790        let mut batch = self.tables.certified_checkpoints.batch();
791        batch
792            .insert_batch(
793                &self.tables.certified_checkpoints,
794                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
795            )?
796            .insert_batch(
797                &self.tables.checkpoint_by_digest,
798                [(checkpoint.digest(), checkpoint.serializable_ref())],
799            )?;
800        if checkpoint.next_epoch_committee().is_some() {
801            batch.insert_batch(
802                &self.tables.epoch_last_checkpoint_map,
803                [(&checkpoint.epoch(), checkpoint.sequence_number())],
804            )?;
805        }
806        batch.write()?;
807
808        if let Some(local_checkpoint) = self
809            .tables
810            .locally_computed_checkpoints
811            .get(checkpoint.sequence_number())?
812        {
813            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
814        }
815
816        Ok(())
817    }
818
819    // Called by state sync, apart from inserting the checkpoint and updating
820    // related tables, it also bumps the highest_verified_checkpoint watermark.
821    #[instrument(level = "debug", skip_all)]
822    pub fn insert_verified_checkpoint(
823        &self,
824        checkpoint: &VerifiedCheckpoint,
825    ) -> Result<(), TypedStoreError> {
826        self.insert_certified_checkpoint(checkpoint)?;
827        self.update_highest_verified_checkpoint(checkpoint)
828    }
829
830    pub fn update_highest_verified_checkpoint(
831        &self,
832        checkpoint: &VerifiedCheckpoint,
833    ) -> Result<(), TypedStoreError> {
834        if Some(*checkpoint.sequence_number())
835            > self
836                .get_highest_verified_checkpoint()?
837                .map(|x| *x.sequence_number())
838        {
839            debug!(
840                checkpoint_seq = checkpoint.sequence_number(),
841                "Updating highest verified checkpoint",
842            );
843            self.tables.watermarks.insert(
844                &CheckpointWatermark::HighestVerified,
845                &(*checkpoint.sequence_number(), *checkpoint.digest()),
846            )?;
847        }
848
849        Ok(())
850    }
851
852    pub fn update_highest_synced_checkpoint(
853        &self,
854        checkpoint: &VerifiedCheckpoint,
855    ) -> Result<(), TypedStoreError> {
856        let seq = *checkpoint.sequence_number();
857        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
858        self.tables.watermarks.insert(
859            &CheckpointWatermark::HighestSynced,
860            &(seq, *checkpoint.digest()),
861        )?;
862        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
863        Ok(())
864    }
865
866    async fn notify_read_checkpoint_watermark<F>(
867        &self,
868        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
869        seq: CheckpointSequenceNumber,
870        get_watermark: F,
871    ) -> VerifiedCheckpoint
872    where
873        F: Fn() -> Option<CheckpointSequenceNumber>,
874    {
875        notify_read
876            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
877                let seq = seqs[0];
878                let Some(highest) = get_watermark() else {
879                    return vec![None];
880                };
881                if highest < seq {
882                    return vec![None];
883                }
884                let checkpoint = self
885                    .get_checkpoint_by_sequence_number(seq)
886                    .expect("db error")
887                    .expect("checkpoint not found");
888                vec![Some(checkpoint)]
889            })
890            .await
891            .into_iter()
892            .next()
893            .unwrap()
894    }
895
896    pub async fn notify_read_synced_checkpoint(
897        &self,
898        seq: CheckpointSequenceNumber,
899    ) -> VerifiedCheckpoint {
900        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
901            self.get_highest_synced_checkpoint_seq_number()
902                .expect("db error")
903        })
904        .await
905    }
906
907    pub async fn notify_read_executed_checkpoint(
908        &self,
909        seq: CheckpointSequenceNumber,
910    ) -> VerifiedCheckpoint {
911        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
912            self.get_highest_executed_checkpoint_seq_number()
913                .expect("db error")
914        })
915        .await
916    }
917
918    pub fn update_highest_executed_checkpoint(
919        &self,
920        checkpoint: &VerifiedCheckpoint,
921    ) -> Result<(), TypedStoreError> {
922        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
923            if seq_number >= *checkpoint.sequence_number() {
924                return Ok(());
925            }
926            assert_eq!(
927                seq_number + 1,
928                *checkpoint.sequence_number(),
929                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
930                checkpoint.sequence_number(),
931                seq_number
932            );
933        }
934        let seq = *checkpoint.sequence_number();
935        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
936        self.tables.watermarks.insert(
937            &CheckpointWatermark::HighestExecuted,
938            &(seq, *checkpoint.digest()),
939        )?;
940        self.executed_checkpoint_notify_read
941            .notify(&seq, checkpoint);
942        Ok(())
943    }
944
945    pub fn update_highest_pruned_checkpoint(
946        &self,
947        checkpoint: &VerifiedCheckpoint,
948    ) -> Result<(), TypedStoreError> {
949        self.tables.watermarks.insert(
950            &CheckpointWatermark::HighestPruned,
951            &(*checkpoint.sequence_number(), *checkpoint.digest()),
952        )
953    }
954
955    /// Sets highest executed checkpoint to any value.
956    ///
957    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
958    /// It should only be used in one-off cases or tests after fully understanding the risk.
959    pub fn set_highest_executed_checkpoint_subtle(
960        &self,
961        checkpoint: &VerifiedCheckpoint,
962    ) -> Result<(), TypedStoreError> {
963        self.tables.watermarks.insert(
964            &CheckpointWatermark::HighestExecuted,
965            &(*checkpoint.sequence_number(), *checkpoint.digest()),
966        )
967    }
968
969    pub fn insert_checkpoint_contents(
970        &self,
971        contents: CheckpointContents,
972    ) -> Result<(), TypedStoreError> {
973        debug!(
974            checkpoint_seq = ?contents.digest(),
975            "Inserting checkpoint contents",
976        );
977        self.tables
978            .checkpoint_content
979            .insert(contents.digest(), &contents)
980    }
981
982    pub fn insert_verified_checkpoint_contents(
983        &self,
984        checkpoint: &VerifiedCheckpoint,
985        full_contents: VerifiedCheckpointContents,
986    ) -> Result<(), TypedStoreError> {
987        let mut batch = self.tables.full_checkpoint_content_v2.batch();
988        batch.insert_batch(
989            &self.tables.checkpoint_sequence_by_contents_digest,
990            [(&checkpoint.content_digest, checkpoint.sequence_number())],
991        )?;
992        let full_contents = full_contents.into_inner();
993        batch.insert_batch(
994            &self.tables.full_checkpoint_content_v2,
995            [(checkpoint.sequence_number(), &full_contents)],
996        )?;
997
998        let contents = full_contents.into_checkpoint_contents();
999        assert_eq!(&checkpoint.content_digest, contents.digest());
1000
1001        batch.insert_batch(
1002            &self.tables.checkpoint_content,
1003            [(contents.digest(), &contents)],
1004        )?;
1005
1006        batch.write()
1007    }
1008
1009    pub fn delete_full_checkpoint_contents(
1010        &self,
1011        seq: CheckpointSequenceNumber,
1012    ) -> Result<(), TypedStoreError> {
1013        self.tables.full_checkpoint_content.remove(&seq)?;
1014        self.tables.full_checkpoint_content_v2.remove(&seq)
1015    }
1016
1017    pub fn get_epoch_last_checkpoint(
1018        &self,
1019        epoch_id: EpochId,
1020    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1021        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1022        let checkpoint = match seq {
1023            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1024            None => None,
1025        };
1026        Ok(checkpoint)
1027    }
1028
1029    pub fn get_epoch_last_checkpoint_seq_number(
1030        &self,
1031        epoch_id: EpochId,
1032    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1033        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1034        Ok(seq)
1035    }
1036
1037    /// Returns the sequence number of the first checkpoint in the given epoch.
1038    /// For epoch 0 this is always 0; for epoch N > 0 it is last_checkpoint(N-1) + 1.
1039    pub fn get_epoch_first_checkpoint_seq(
1040        &self,
1041        epoch: EpochId,
1042    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1043        if epoch == 0 {
1044            return Ok(Some(0));
1045        }
1046        Ok(self
1047            .tables
1048            .epoch_last_checkpoint_map
1049            .get(&(epoch - 1))?
1050            .map(|s| s + 1))
1051    }
1052
1053    /// Iterate certified checkpoints starting at `start` (inclusive), up to `limit`.
1054    pub fn list_checkpoints_from_seq(
1055        &self,
1056        start: Option<CheckpointSequenceNumber>,
1057        limit: usize,
1058    ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1059        self.tables
1060            .certified_checkpoints
1061            .safe_iter_with_bounds(start, None)
1062            .take(limit)
1063            .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1064            .collect()
1065    }
1066
1067    /// Iterate epoch→last-checkpoint-seq entries starting at `start` epoch, up to `limit`.
1068    pub fn list_epoch_last_checkpoints(
1069        &self,
1070        start: Option<EpochId>,
1071        limit: usize,
1072    ) -> Result<Vec<(EpochId, CheckpointSequenceNumber)>, TypedStoreError> {
1073        self.tables
1074            .epoch_last_checkpoint_map
1075            .safe_iter_with_bounds(start, None)
1076            .take(limit)
1077            .collect()
1078    }
1079
1080    /// Iterate checkpoint digests from `checkpoint_by_digest`, starting at `start`, up to `limit`.
1081    pub fn list_checkpoint_digests(
1082        &self,
1083        start: Option<CheckpointDigest>,
1084        limit: usize,
1085    ) -> Result<Vec<CheckpointDigest>, TypedStoreError> {
1086        self.tables
1087            .checkpoint_by_digest
1088            .safe_iter_with_bounds(start, None)
1089            .take(limit)
1090            .map(|r| r.map(|(d, _)| d))
1091            .collect()
1092    }
1093
1094    /// Iterate checkpoint contents digests, starting at `start`, up to `limit`.
1095    pub fn list_checkpoint_contents_digests(
1096        &self,
1097        start: Option<CheckpointContentsDigest>,
1098        limit: usize,
1099    ) -> Result<Vec<CheckpointContentsDigest>, TypedStoreError> {
1100        self.tables
1101            .checkpoint_content
1102            .safe_iter_with_bounds(start, None)
1103            .take(limit)
1104            .map(|r| r.map(|(d, _)| d))
1105            .collect()
1106    }
1107
1108    /// Iterate certified checkpoints belonging to `epoch`, starting at `start_seq`, up to `limit`.
1109    pub fn list_epoch_checkpoints(
1110        &self,
1111        epoch: EpochId,
1112        start_seq: Option<CheckpointSequenceNumber>,
1113        limit: usize,
1114    ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1115        let Some(last_seq) = self.tables.epoch_last_checkpoint_map.get(&epoch)? else {
1116            return Ok(vec![]);
1117        };
1118        // Compute first seq directly to stay within TypedStoreError.
1119        let first_seq = if epoch == 0 {
1120            0
1121        } else {
1122            self.tables
1123                .epoch_last_checkpoint_map
1124                .get(&(epoch - 1))?
1125                .map(|s| s + 1)
1126                .unwrap_or(0)
1127        };
1128        let start = start_seq.map(|s| s.max(first_seq)).unwrap_or(first_seq);
1129        self.tables
1130            .certified_checkpoints
1131            .safe_iter_with_bounds(Some(start), Some(last_seq + 1))
1132            .take(limit)
1133            .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1134            .collect()
1135    }
1136
1137    pub fn insert_epoch_last_checkpoint(
1138        &self,
1139        epoch_id: EpochId,
1140        checkpoint: &VerifiedCheckpoint,
1141    ) -> SuiResult {
1142        self.tables
1143            .epoch_last_checkpoint_map
1144            .insert(&epoch_id, checkpoint.sequence_number())?;
1145        Ok(())
1146    }
1147
1148    pub fn get_epoch_state_commitments(
1149        &self,
1150        epoch: EpochId,
1151    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1152        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1153            checkpoint
1154                .end_of_epoch_data
1155                .as_ref()
1156                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1157                .epoch_commitments
1158                .clone()
1159        });
1160        Ok(commitments)
1161    }
1162
1163    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1164    pub fn get_epoch_stats(
1165        &self,
1166        epoch: EpochId,
1167        last_checkpoint: &CheckpointSummary,
1168    ) -> Option<EpochStats> {
1169        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1170            (0, 0)
1171        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1172            (
1173                checkpoint.sequence_number + 1,
1174                checkpoint.network_total_transactions,
1175            )
1176        } else {
1177            return None;
1178        };
1179        Some(EpochStats {
1180            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1181            transaction_count: last_checkpoint.network_total_transactions
1182                - prev_epoch_network_transactions,
1183            total_gas_reward: last_checkpoint
1184                .epoch_rolling_gas_cost_summary
1185                .computation_cost,
1186        })
1187    }
1188
1189    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1190        // This checkpoints the entire db and not one column family
1191        self.tables
1192            .checkpoint_content
1193            .checkpoint_db(path)
1194            .map_err(Into::into)
1195    }
1196
1197    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1198        let mut wb = self.tables.watermarks.batch();
1199        wb.delete_batch(
1200            &self.tables.watermarks,
1201            std::iter::once(CheckpointWatermark::HighestExecuted),
1202        )?;
1203        wb.write()?;
1204        Ok(())
1205    }
1206
1207    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1208        self.delete_highest_executed_checkpoint_test_only()?;
1209        Ok(())
1210    }
1211
1212    pub fn record_checkpoint_fork_detected(
1213        &self,
1214        checkpoint_seq: CheckpointSequenceNumber,
1215        checkpoint_digest: CheckpointDigest,
1216    ) -> Result<(), TypedStoreError> {
1217        info!(
1218            checkpoint_seq = checkpoint_seq,
1219            checkpoint_digest = ?checkpoint_digest,
1220            "Recording checkpoint fork detection in database"
1221        );
1222        self.tables.watermarks.insert(
1223            &CheckpointWatermark::CheckpointForkDetected,
1224            &(checkpoint_seq, checkpoint_digest),
1225        )
1226    }
1227
1228    pub fn get_checkpoint_fork_detected(
1229        &self,
1230    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1231        self.tables
1232            .watermarks
1233            .get(&CheckpointWatermark::CheckpointForkDetected)
1234    }
1235
1236    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1237        self.tables
1238            .watermarks
1239            .remove(&CheckpointWatermark::CheckpointForkDetected)
1240    }
1241
1242    pub fn record_transaction_fork_detected(
1243        &self,
1244        tx_digest: TransactionDigest,
1245        expected_effects_digest: TransactionEffectsDigest,
1246        actual_effects_digest: TransactionEffectsDigest,
1247    ) -> Result<(), TypedStoreError> {
1248        info!(
1249            tx_digest = ?tx_digest,
1250            expected_effects_digest = ?expected_effects_digest,
1251            actual_effects_digest = ?actual_effects_digest,
1252            "Recording transaction fork detection in database"
1253        );
1254        self.tables.transaction_fork_detected.insert(
1255            &TRANSACTION_FORK_DETECTED_KEY,
1256            &(tx_digest, expected_effects_digest, actual_effects_digest),
1257        )
1258    }
1259
1260    pub fn get_transaction_fork_detected(
1261        &self,
1262    ) -> Result<
1263        Option<(
1264            TransactionDigest,
1265            TransactionEffectsDigest,
1266            TransactionEffectsDigest,
1267        )>,
1268        TypedStoreError,
1269    > {
1270        self.tables
1271            .transaction_fork_detected
1272            .get(&TRANSACTION_FORK_DETECTED_KEY)
1273    }
1274
1275    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1276        self.tables
1277            .transaction_fork_detected
1278            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1279    }
1280}
1281
1282#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1283pub enum CheckpointWatermark {
1284    HighestVerified,
1285    HighestSynced,
1286    HighestExecuted,
1287    HighestPruned,
1288    CheckpointForkDetected,
1289}
1290
1291struct CheckpointStateHasher {
1292    epoch_store: Arc<AuthorityPerEpochStore>,
1293    hasher: Weak<GlobalStateHasher>,
1294    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1295}
1296
1297impl CheckpointStateHasher {
1298    fn new(
1299        epoch_store: Arc<AuthorityPerEpochStore>,
1300        hasher: Weak<GlobalStateHasher>,
1301        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1302    ) -> Self {
1303        Self {
1304            epoch_store,
1305            hasher,
1306            receive_from_builder,
1307        }
1308    }
1309
1310    async fn run(self) {
1311        let Self {
1312            epoch_store,
1313            hasher,
1314            mut receive_from_builder,
1315        } = self;
1316        while let Some((seq, effects)) = receive_from_builder.recv().await {
1317            let Some(hasher) = hasher.upgrade() else {
1318                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1319                break;
1320            };
1321            hasher
1322                .accumulate_checkpoint(&effects, seq, &epoch_store)
1323                .expect("epoch ended while accumulating checkpoint");
1324        }
1325    }
1326}
1327
1328#[derive(Debug)]
1329pub enum CheckpointBuilderError {
1330    ChangeEpochTxAlreadyExecuted,
1331    SystemPackagesMissing,
1332    Retry(anyhow::Error),
1333}
1334
1335impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1336    for CheckpointBuilderError
1337{
1338    fn from(e: SuiError) -> Self {
1339        Self::Retry(e.into())
1340    }
1341}
1342
1343pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1344
1345pub struct CheckpointBuilder {
1346    state: Arc<AuthorityState>,
1347    store: Arc<CheckpointStore>,
1348    epoch_store: Arc<AuthorityPerEpochStore>,
1349    notify: Arc<Notify>,
1350    notify_aggregator: Arc<Notify>,
1351    last_built: watch::Sender<CheckpointSequenceNumber>,
1352    effects_store: Arc<dyn TransactionCacheRead>,
1353    global_state_hasher: Weak<GlobalStateHasher>,
1354    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1355    output: Box<dyn CheckpointOutput>,
1356    metrics: Arc<CheckpointMetrics>,
1357    max_transactions_per_checkpoint: usize,
1358    max_checkpoint_size_bytes: usize,
1359}
1360
1361pub struct CheckpointAggregator {
1362    store: Arc<CheckpointStore>,
1363    epoch_store: Arc<AuthorityPerEpochStore>,
1364    notify: Arc<Notify>,
1365    receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1366    pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1367    current: Option<CheckpointSignatureAggregator>,
1368    output: Box<dyn CertifiedCheckpointOutput>,
1369    state: Arc<AuthorityState>,
1370    metrics: Arc<CheckpointMetrics>,
1371}
1372
1373// This holds information to aggregate signatures for one checkpoint
1374pub struct CheckpointSignatureAggregator {
1375    summary: CheckpointSummary,
1376    digest: CheckpointDigest,
1377    /// Aggregates voting stake for each signed checkpoint proposal by authority
1378    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1379    store: Arc<CheckpointStore>,
1380    state: Arc<AuthorityState>,
1381    metrics: Arc<CheckpointMetrics>,
1382}
1383
1384impl CheckpointBuilder {
1385    fn new(
1386        state: Arc<AuthorityState>,
1387        store: Arc<CheckpointStore>,
1388        epoch_store: Arc<AuthorityPerEpochStore>,
1389        notify: Arc<Notify>,
1390        effects_store: Arc<dyn TransactionCacheRead>,
1391        // for synchronous accumulation of end-of-epoch checkpoint
1392        global_state_hasher: Weak<GlobalStateHasher>,
1393        // for asynchronous/concurrent accumulation of regular checkpoints
1394        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1395        output: Box<dyn CheckpointOutput>,
1396        notify_aggregator: Arc<Notify>,
1397        last_built: watch::Sender<CheckpointSequenceNumber>,
1398        metrics: Arc<CheckpointMetrics>,
1399        max_transactions_per_checkpoint: usize,
1400        max_checkpoint_size_bytes: usize,
1401    ) -> Self {
1402        Self {
1403            state,
1404            store,
1405            epoch_store,
1406            notify,
1407            effects_store,
1408            global_state_hasher,
1409            send_to_hasher,
1410            output,
1411            notify_aggregator,
1412            last_built,
1413            metrics,
1414            max_transactions_per_checkpoint,
1415            max_checkpoint_size_bytes,
1416        }
1417    }
1418
1419    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1420    /// commits that have been processed before the last restart, if consensus_replay_waiter
1421    /// is supplied. Then it starts building checkpoints in a loop.
1422    ///
1423    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1424    /// if slow recovery of previously built checkpoints is due to consensus replay or
1425    /// checkpoint building.
1426    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1427        if let Some(replay_waiter) = consensus_replay_waiter {
1428            info!("Waiting for consensus commits to replay ...");
1429            replay_waiter.wait_for_replay().await;
1430            info!("Consensus commits finished replaying");
1431        }
1432        info!("Starting CheckpointBuilder");
1433        loop {
1434            match self.maybe_build_checkpoints().await {
1435                Ok(()) => {}
1436                err @ Err(
1437                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1438                    | CheckpointBuilderError::SystemPackagesMissing,
1439                ) => {
1440                    info!("CheckpointBuilder stopping: {:?}", err);
1441                    return;
1442                }
1443                Err(CheckpointBuilderError::Retry(inner)) => {
1444                    let msg = format!("{:?}", inner);
1445                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1446                    tokio::time::sleep(Duration::from_secs(1)).await;
1447                    self.metrics.checkpoint_errors.inc();
1448                    continue;
1449                }
1450            }
1451
1452            self.notify.notified().await;
1453        }
1454    }
1455
1456    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1457        if self
1458            .epoch_store
1459            .protocol_config()
1460            .split_checkpoints_in_consensus_handler()
1461        {
1462            self.maybe_build_checkpoints_v2().await
1463        } else {
1464            self.maybe_build_checkpoints_v1().await
1465        }
1466    }
1467
1468    async fn maybe_build_checkpoints_v1(&mut self) -> CheckpointBuilderResult {
1469        let _scope = monitored_scope("BuildCheckpoints");
1470
1471        // Collect info about the most recently built checkpoint.
1472        let summary = self
1473            .epoch_store
1474            .last_built_checkpoint_builder_summary()
1475            .expect("epoch should not have ended");
1476        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1477        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1478
1479        let min_checkpoint_interval_ms = self
1480            .epoch_store
1481            .protocol_config()
1482            .min_checkpoint_interval_ms_as_option()
1483            .unwrap_or_default();
1484        let mut grouped_pending_checkpoints = Vec::new();
1485        let mut checkpoints_iter = self
1486            .epoch_store
1487            .get_pending_checkpoints(last_height)
1488            .expect("unexpected epoch store error")
1489            .into_iter()
1490            .peekable();
1491        while let Some((height, pending)) = checkpoints_iter.next() {
1492            // Group PendingCheckpoints until:
1493            // - minimum interval has elapsed ...
1494            let current_timestamp = pending.details().timestamp_ms;
1495            let can_build = match last_timestamp {
1496                    Some(last_timestamp) => {
1497                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1498                    }
1499                    None => true,
1500                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1501                //   should be written separately) ...
1502                } || checkpoints_iter
1503                    .peek()
1504                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1505                // - or, we have reached end of epoch.
1506                    || pending.details().last_of_epoch;
1507            grouped_pending_checkpoints.push(pending);
1508            if !can_build {
1509                debug!(
1510                    checkpoint_commit_height = height,
1511                    ?last_timestamp,
1512                    ?current_timestamp,
1513                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1514                );
1515                continue;
1516            }
1517
1518            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1519            last_height = Some(height);
1520            last_timestamp = Some(current_timestamp);
1521            debug!(
1522                checkpoint_commit_height_from = grouped_pending_checkpoints
1523                    .first()
1524                    .unwrap()
1525                    .details()
1526                    .checkpoint_height,
1527                checkpoint_commit_height_to = last_height,
1528                "Making checkpoint with commit height range"
1529            );
1530
1531            let seq = self
1532                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1533                .await?;
1534
1535            self.last_built.send_if_modified(|cur| {
1536                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1537                if seq > *cur {
1538                    *cur = seq;
1539                    true
1540                } else {
1541                    false
1542                }
1543            });
1544
1545            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1546            // execution.
1547            tokio::task::yield_now().await;
1548        }
1549        debug!(
1550            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1551            grouped_pending_checkpoints.len(),
1552        );
1553
1554        Ok(())
1555    }
1556
1557    async fn maybe_build_checkpoints_v2(&mut self) -> CheckpointBuilderResult {
1558        let _scope = monitored_scope("BuildCheckpoints");
1559
1560        // Collect info about the most recently built checkpoint.
1561        let last_height = self
1562            .epoch_store
1563            .last_built_checkpoint_builder_summary()
1564            .expect("epoch should not have ended")
1565            .and_then(|s| s.checkpoint_height);
1566
1567        for (height, pending) in self
1568            .epoch_store
1569            .get_pending_checkpoints_v2(last_height)
1570            .expect("unexpected epoch store error")
1571        {
1572            debug!(checkpoint_commit_height = height, "Making checkpoint");
1573
1574            let seq = self.make_checkpoint_v2(pending).await?;
1575
1576            self.last_built.send_if_modified(|cur| {
1577                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1578                if seq > *cur {
1579                    *cur = seq;
1580                    true
1581                } else {
1582                    false
1583                }
1584            });
1585
1586            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1587            // execution.
1588            tokio::task::yield_now().await;
1589        }
1590
1591        Ok(())
1592    }
1593
1594    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1595    async fn make_checkpoint(
1596        &mut self,
1597        pendings: Vec<PendingCheckpoint>,
1598    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1599        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1600
1601        let pending_ckpt_str = pendings
1602            .iter()
1603            .map(|p| {
1604                format!(
1605                    "height={}, commit={}",
1606                    p.details().checkpoint_height,
1607                    p.details().consensus_commit_ref
1608                )
1609            })
1610            .join("; ");
1611
1612        let last_details = pendings.last().unwrap().details().clone();
1613
1614        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1615        // in this order.
1616        let highest_executed_sequence = self
1617            .store
1618            .get_highest_executed_checkpoint_seq_number()
1619            .expect("db error")
1620            .unwrap_or(0);
1621
1622        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1623        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1624
1625        let new_checkpoints = self
1626            .create_checkpoints(
1627                sorted_tx_effects_included_in_checkpoint,
1628                &last_details,
1629                &all_roots,
1630            )
1631            .await?;
1632        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1633        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1634            debug_fatal!(
1635                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1636            );
1637        }
1638
1639        let new_ckpt_str = new_checkpoints
1640            .iter()
1641            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1642            .join("; ");
1643
1644        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1645            .await?;
1646        info!(
1647            "Made new checkpoint {} from pending checkpoint {}",
1648            new_ckpt_str, pending_ckpt_str
1649        );
1650
1651        Ok(highest_sequence)
1652    }
1653
1654    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1655    async fn make_checkpoint_v2(
1656        &mut self,
1657        pending: PendingCheckpointV2,
1658    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1659        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1660
1661        let details = pending.details.clone();
1662
1663        let highest_executed_sequence = self
1664            .store
1665            .get_highest_executed_checkpoint_seq_number()
1666            .expect("db error")
1667            .unwrap_or(0);
1668
1669        let (poll_count, result) =
1670            poll_count(self.resolve_checkpoint_transactions_v2(pending)).await;
1671        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1672
1673        let new_checkpoints = self
1674            .create_checkpoints(
1675                sorted_tx_effects_included_in_checkpoint,
1676                &details,
1677                &all_roots,
1678            )
1679            .await?;
1680        assert_eq!(new_checkpoints.len(), 1, "Expected exactly one checkpoint");
1681        let sequence = *new_checkpoints.first().0.sequence_number();
1682        let digest = new_checkpoints.first().0.digest();
1683        if sequence <= highest_executed_sequence && poll_count > 1 {
1684            debug_fatal!(
1685                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1686            );
1687        }
1688
1689        self.write_checkpoints(details.checkpoint_height, new_checkpoints)
1690            .await?;
1691        info!(
1692            seq = sequence,
1693            %digest,
1694            height = details.checkpoint_height,
1695            commit = %details.consensus_commit_ref,
1696            "Made new checkpoint"
1697        );
1698
1699        Ok(sequence)
1700    }
1701
1702    async fn construct_and_execute_settlement_transactions(
1703        &self,
1704        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1705        checkpoint_height: CheckpointHeight,
1706        checkpoint_seq: CheckpointSequenceNumber,
1707        tx_index_offset: u64,
1708    ) -> (TransactionKey, Vec<TransactionEffects>) {
1709        let _scope =
1710            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1711
1712        let tx_key =
1713            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1714
1715        let epoch = self.epoch_store.epoch();
1716        let accumulator_root_obj_initial_shared_version = self
1717            .epoch_store
1718            .epoch_start_config()
1719            .accumulator_root_obj_initial_shared_version()
1720            .expect("accumulator root object must exist");
1721
1722        let builder = AccumulatorSettlementTxBuilder::new(
1723            Some(self.effects_store.as_ref()),
1724            sorted_tx_effects_included_in_checkpoint,
1725            checkpoint_seq,
1726            tx_index_offset,
1727        );
1728
1729        let funds_changes = builder.collect_funds_changes();
1730        let num_updates = builder.num_updates();
1731        let settlement_txns = builder.build_tx(
1732            self.epoch_store.protocol_config(),
1733            epoch,
1734            accumulator_root_obj_initial_shared_version,
1735            checkpoint_height,
1736            checkpoint_seq,
1737        );
1738
1739        let settlement_txns: Vec<_> = settlement_txns
1740            .into_iter()
1741            .map(|tx| {
1742                VerifiedExecutableTransaction::new_system(
1743                    VerifiedTransaction::new_system_transaction(tx),
1744                    self.epoch_store.epoch(),
1745                )
1746            })
1747            .collect();
1748
1749        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1750
1751        debug!(
1752            ?settlement_digests,
1753            ?tx_key,
1754            "created settlement transactions with {num_updates} updates"
1755        );
1756
1757        self.epoch_store
1758            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1759
1760        let settlement_effects = wait_for_effects_with_retry(
1761            self.effects_store.as_ref(),
1762            "CheckpointBuilder::notify_read_settlement_effects",
1763            &settlement_digests,
1764            tx_key,
1765        )
1766        .await;
1767        let (accounts_created, accounts_deleted) =
1768            accumulators::count_accumulator_object_changes(&settlement_effects);
1769        self.metrics
1770            .report_accumulator_account_changes(accounts_created, accounts_deleted);
1771
1772        let barrier_tx = accumulators::build_accumulator_barrier_tx(
1773            epoch,
1774            accumulator_root_obj_initial_shared_version,
1775            checkpoint_height,
1776            &settlement_effects,
1777        );
1778
1779        let barrier_tx = VerifiedExecutableTransaction::new_system(
1780            VerifiedTransaction::new_system_transaction(barrier_tx),
1781            self.epoch_store.epoch(),
1782        );
1783        let barrier_digest = *barrier_tx.digest();
1784
1785        self.epoch_store
1786            .notify_barrier_transaction_ready(tx_key, barrier_tx);
1787
1788        let barrier_effects = wait_for_effects_with_retry(
1789            self.effects_store.as_ref(),
1790            "CheckpointBuilder::notify_read_barrier_effects",
1791            &[barrier_digest],
1792            tx_key,
1793        )
1794        .await;
1795
1796        let settlement_and_barrier_effects: Vec<_> = settlement_effects
1797            .into_iter()
1798            .chain(barrier_effects)
1799            .collect();
1800
1801        let mut next_accumulator_version = None;
1802        for fx in settlement_and_barrier_effects.iter() {
1803            assert!(
1804                fx.status().is_ok(),
1805                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1806                fx.transaction_digest(),
1807                fx
1808            );
1809            if let Some(version) = fx
1810                .mutated()
1811                .iter()
1812                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1813            {
1814                assert!(
1815                    next_accumulator_version.is_none(),
1816                    "Only one settlement transaction should mutate the accumulator root object"
1817                );
1818                next_accumulator_version = Some(version);
1819            }
1820        }
1821        let settlements = FundsSettlement {
1822            next_accumulator_version: next_accumulator_version
1823                .expect("Accumulator root object should be mutated in the settlement transactions"),
1824            funds_changes,
1825        };
1826
1827        self.state
1828            .execution_scheduler()
1829            .settle_address_funds(settlements);
1830
1831        (tx_key, settlement_and_barrier_effects)
1832    }
1833
1834    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1835    // the checkpoint, and return them in the order they should be included in the checkpoint.
1836    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1837    // checkpoint.
1838    #[instrument(level = "debug", skip_all)]
1839    async fn resolve_checkpoint_transactions(
1840        &self,
1841        pending_checkpoints: Vec<PendingCheckpoint>,
1842    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1843        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1844
1845        // Keeps track of the effects that are already included in the current checkpoint.
1846        // This is used when there are multiple pending checkpoints to create a single checkpoint
1847        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1848        // or in earlier pending checkpoints.
1849        let mut effects_in_current_checkpoint = BTreeSet::new();
1850
1851        let mut tx_effects = Vec::new();
1852        let mut tx_roots = HashSet::new();
1853
1854        for pending_checkpoint in pending_checkpoints.into_iter() {
1855            let mut pending = pending_checkpoint;
1856            debug!(
1857                checkpoint_commit_height = pending.details.checkpoint_height,
1858                "Resolving checkpoint transactions for pending checkpoint.",
1859            );
1860
1861            trace!(
1862                "roots for pending checkpoint {:?}: {:?}",
1863                pending.details.checkpoint_height, pending.roots,
1864            );
1865
1866            let settlement_root = if self.epoch_store.accumulators_enabled() {
1867                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1868                    pending.roots.pop()
1869                else {
1870                    fatal!("No settlement root found");
1871                };
1872                Some(settlement_root)
1873            } else {
1874                None
1875            };
1876
1877            let roots = &pending.roots;
1878
1879            self.metrics
1880                .checkpoint_roots_count
1881                .inc_by(roots.len() as u64);
1882
1883            let root_digests = self
1884                .epoch_store
1885                .notify_read_tx_key_to_digest(roots)
1886                .in_monitored_scope("CheckpointNotifyDigests")
1887                .await?;
1888            let root_effects = self
1889                .effects_store
1890                .notify_read_executed_effects(
1891                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1892                    &root_digests,
1893                )
1894                .in_monitored_scope("CheckpointNotifyRead")
1895                .await;
1896
1897            assert!(
1898                self.epoch_store
1899                    .protocol_config()
1900                    .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1901            );
1902
1903            // If the roots contains consensus commit prologue transaction, we want to extract it,
1904            // and put it to the front of the checkpoint.
1905            let consensus_commit_prologue =
1906                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1907
1908            // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1909            // other dependencies that haven't been included in any previous checkpoints.
1910            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1911                let unsorted_ccp = self.complete_checkpoint_effects(
1912                    vec![ccp_effects.clone()],
1913                    &mut effects_in_current_checkpoint,
1914                )?;
1915
1916                // No other dependencies of this consensus commit prologue that haven't been included
1917                // in any previous checkpoint.
1918                if unsorted_ccp.len() != 1 {
1919                    fatal!(
1920                        "Expected 1 consensus commit prologue, got {:?}",
1921                        unsorted_ccp
1922                            .iter()
1923                            .map(|e| e.transaction_digest())
1924                            .collect::<Vec<_>>()
1925                    );
1926                }
1927                assert_eq!(unsorted_ccp.len(), 1);
1928                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1929            }
1930
1931            let unsorted =
1932                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1933
1934            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1935            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1936            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1937                if cfg!(debug_assertions) {
1938                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1939                    for tx in unsorted.iter() {
1940                        assert!(tx.transaction_digest() != &ccp_digest);
1941                    }
1942                }
1943                sorted.push(ccp_effects);
1944            }
1945            sorted.extend(CausalOrder::causal_sort(unsorted));
1946
1947            if let Some(settlement_root) = settlement_root {
1948                //TODO: this is an incorrect heuristic for checkpoint seq number
1949                //      due to checkpoint splitting, to be fixed separately
1950                let last_checkpoint =
1951                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1952                let next_checkpoint_seq = last_checkpoint
1953                    .as_ref()
1954                    .map(|(seq, _)| *seq)
1955                    .unwrap_or_default()
1956                    + 1;
1957                let tx_index_offset = tx_effects.len() as u64;
1958
1959                let (tx_key, settlement_effects) = self
1960                    .construct_and_execute_settlement_transactions(
1961                        &sorted,
1962                        pending.details.checkpoint_height,
1963                        next_checkpoint_seq,
1964                        tx_index_offset,
1965                    )
1966                    .await;
1967                debug!(?tx_key, "executed settlement transactions");
1968
1969                assert_eq!(settlement_root, tx_key);
1970
1971                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1972                // should only include the digests of transactions that were originally roots in
1973                // the pending checkpoint. It is later used to identify transactions which were
1974                // added as dependencies, so that those transactions can be waited on using
1975                // `consensus_messages_processed_notify()`. System transactions (such as
1976                // settlements) are exempt from this already.
1977                //
1978                // However, we DO need to add them to `effects_in_current_checkpoint` so that
1979                // `complete_checkpoint_effects` won't pull them in again as dependencies when
1980                // processing later pending checkpoints in the same batch.
1981                effects_in_current_checkpoint
1982                    .extend(settlement_effects.iter().map(|e| *e.transaction_digest()));
1983                sorted.extend(settlement_effects);
1984            }
1985
1986            #[cfg(msim)]
1987            {
1988                // Check consensus commit prologue invariants in sim test.
1989                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1990            }
1991
1992            tx_effects.extend(sorted);
1993            tx_roots.extend(root_digests);
1994        }
1995
1996        Ok((tx_effects, tx_roots))
1997    }
1998
1999    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
2000    // the checkpoint, and return them in the order they should be included in the checkpoint.
2001    #[instrument(level = "debug", skip_all)]
2002    async fn resolve_checkpoint_transactions_v2(
2003        &self,
2004        pending: PendingCheckpointV2,
2005    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
2006        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
2007
2008        debug!(
2009            checkpoint_commit_height = pending.details.checkpoint_height,
2010            "Resolving checkpoint transactions for pending checkpoint.",
2011        );
2012
2013        trace!(
2014            "roots for pending checkpoint {:?}: {:?}",
2015            pending.details.checkpoint_height, pending.roots,
2016        );
2017
2018        assert!(
2019            self.epoch_store
2020                .protocol_config()
2021                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2022        );
2023
2024        let mut all_effects: Vec<TransactionEffects> = Vec::new();
2025        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
2026
2027        for checkpoint_roots in &pending.roots {
2028            let tx_roots = &checkpoint_roots.tx_roots;
2029
2030            self.metrics
2031                .checkpoint_roots_count
2032                .inc_by(tx_roots.len() as u64);
2033
2034            let root_digests = self
2035                .epoch_store
2036                .notify_read_tx_key_to_digest(tx_roots)
2037                .in_monitored_scope("CheckpointNotifyDigests")
2038                .await?;
2039
2040            all_root_digests.extend(root_digests.iter().cloned());
2041
2042            let root_effects = self
2043                .effects_store
2044                .notify_read_executed_effects(
2045                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
2046                    &root_digests,
2047                )
2048                .in_monitored_scope("CheckpointNotifyRead")
2049                .await;
2050            let consensus_commit_prologue =
2051                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
2052
2053            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
2054            let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
2055            let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
2056
2057            if let Some(settlement_key) = &checkpoint_roots.settlement_root {
2058                let checkpoint_seq = pending
2059                    .details
2060                    .checkpoint_seq
2061                    .expect("checkpoint_seq must be set");
2062                let tx_index_offset = all_effects.len() as u64;
2063                let effects = self
2064                    .resolve_settlement_effects(
2065                        *settlement_key,
2066                        &sorted,
2067                        checkpoint_roots.height,
2068                        checkpoint_seq,
2069                        tx_index_offset,
2070                    )
2071                    .await;
2072                sorted.extend(effects);
2073            }
2074
2075            #[cfg(msim)]
2076            {
2077                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
2078            }
2079
2080            all_effects.extend(sorted);
2081        }
2082        Ok((all_effects, all_root_digests.into_iter().collect()))
2083    }
2084
2085    /// Constructs settlement transactions to compute their digests, then reads effects
2086    /// directly from the cache. If execution is ahead of the checkpoint builder, the
2087    /// effects are already cached and this returns instantly. Otherwise it waits for
2088    /// the execution scheduler's queue worker to execute them.
2089    async fn resolve_settlement_effects(
2090        &self,
2091        settlement_key: TransactionKey,
2092        sorted_root_effects: &[TransactionEffects],
2093        checkpoint_height: CheckpointHeight,
2094        checkpoint_seq: CheckpointSequenceNumber,
2095        tx_index_offset: u64,
2096    ) -> Vec<TransactionEffects> {
2097        let epoch = self.epoch_store.epoch();
2098        let accumulator_root_obj_initial_shared_version = self
2099            .epoch_store
2100            .epoch_start_config()
2101            .accumulator_root_obj_initial_shared_version()
2102            .expect("accumulator root object must exist");
2103
2104        let builder = AccumulatorSettlementTxBuilder::new(
2105            None,
2106            sorted_root_effects,
2107            checkpoint_seq,
2108            tx_index_offset,
2109        );
2110
2111        let settlement_digests: Vec<_> = builder
2112            .build_tx(
2113                self.epoch_store.protocol_config(),
2114                epoch,
2115                accumulator_root_obj_initial_shared_version,
2116                checkpoint_height,
2117                checkpoint_seq,
2118            )
2119            .into_iter()
2120            .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
2121            .collect();
2122
2123        debug!(
2124            ?settlement_digests,
2125            ?settlement_key,
2126            "reading settlement effects from cache"
2127        );
2128
2129        let settlement_effects = wait_for_effects_with_retry(
2130            self.effects_store.as_ref(),
2131            "CheckpointBuilder::settlement_effects",
2132            &settlement_digests,
2133            settlement_key,
2134        )
2135        .await;
2136
2137        let barrier_digest = *VerifiedTransaction::new_system_transaction(
2138            accumulators::build_accumulator_barrier_tx(
2139                epoch,
2140                accumulator_root_obj_initial_shared_version,
2141                checkpoint_height,
2142                &settlement_effects,
2143            ),
2144        )
2145        .digest();
2146
2147        let barrier_effects = wait_for_effects_with_retry(
2148            self.effects_store.as_ref(),
2149            "CheckpointBuilder::barrier_effects",
2150            &[barrier_digest],
2151            settlement_key,
2152        )
2153        .await;
2154
2155        settlement_effects
2156            .into_iter()
2157            .chain(barrier_effects)
2158            .collect()
2159    }
2160
2161    // Extracts the consensus commit prologue digest and effects from the root transactions.
2162    // The consensus commit prologue is expected to be the first transaction in the roots.
2163    fn extract_consensus_commit_prologue(
2164        &self,
2165        root_digests: &[TransactionDigest],
2166        root_effects: &[TransactionEffects],
2167    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
2168        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
2169        if root_digests.is_empty() {
2170            return Ok(None);
2171        }
2172
2173        // Reads the first transaction in the roots, and checks whether it is a consensus commit
2174        // prologue transaction. The consensus commit prologue transaction should be the first
2175        // transaction in the roots written by the consensus handler.
2176        let first_tx = self
2177            .state
2178            .get_transaction_cache_reader()
2179            .get_transaction_block(&root_digests[0])
2180            .expect("Transaction block must exist");
2181
2182        Ok(first_tx
2183            .transaction_data()
2184            .is_consensus_commit_prologue()
2185            .then(|| {
2186                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
2187                (*first_tx.digest(), root_effects[0].clone())
2188            }))
2189    }
2190
2191    #[instrument(level = "debug", skip_all)]
2192    async fn write_checkpoints(
2193        &mut self,
2194        height: CheckpointHeight,
2195        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
2196    ) -> SuiResult {
2197        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
2198        let mut batch = self.store.tables.checkpoint_content.batch();
2199        let mut all_tx_digests =
2200            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
2201
2202        for (summary, contents) in &new_checkpoints {
2203            debug!(
2204                checkpoint_commit_height = height,
2205                checkpoint_seq = summary.sequence_number,
2206                contents_digest = ?contents.digest(),
2207                "writing checkpoint",
2208            );
2209
2210            if let Some(previously_computed_summary) = self
2211                .store
2212                .tables
2213                .locally_computed_checkpoints
2214                .get(&summary.sequence_number)?
2215                && previously_computed_summary.digest() != summary.digest()
2216            {
2217                fatal!(
2218                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
2219                    summary.sequence_number,
2220                    previously_computed_summary.digest(),
2221                    summary.digest()
2222                );
2223            }
2224
2225            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
2226
2227            self.metrics
2228                .transactions_included_in_checkpoint
2229                .inc_by(contents.size() as u64);
2230            let sequence_number = summary.sequence_number;
2231            self.metrics
2232                .last_constructed_checkpoint
2233                .set(sequence_number as i64);
2234
2235            batch.insert_batch(
2236                &self.store.tables.checkpoint_content,
2237                [(contents.digest(), contents)],
2238            )?;
2239
2240            batch.insert_batch(
2241                &self.store.tables.locally_computed_checkpoints,
2242                [(sequence_number, summary)],
2243            )?;
2244        }
2245
2246        batch.write()?;
2247
2248        // Send all checkpoint sigs to consensus.
2249        for (summary, contents) in &new_checkpoints {
2250            self.output
2251                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
2252                .await?;
2253        }
2254
2255        for (local_checkpoint, _) in &new_checkpoints {
2256            if let Some(certified_checkpoint) = self
2257                .store
2258                .tables
2259                .certified_checkpoints
2260                .get(local_checkpoint.sequence_number())?
2261            {
2262                self.store
2263                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
2264            }
2265        }
2266
2267        self.notify_aggregator.notify_one();
2268        self.epoch_store
2269            .process_constructed_checkpoint(height, new_checkpoints);
2270        Ok(())
2271    }
2272
2273    #[allow(clippy::type_complexity)]
2274    fn split_checkpoint_chunks(
2275        &self,
2276        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
2277        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2278    ) -> CheckpointBuilderResult<
2279        Vec<
2280            Vec<(
2281                TransactionEffects,
2282                Vec<(GenericSignature, Option<SequenceNumber>)>,
2283            )>,
2284        >,
2285    > {
2286        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
2287
2288        // If splitting is done in consensus_handler, return everything as one chunk.
2289        if self
2290            .epoch_store
2291            .protocol_config()
2292            .split_checkpoints_in_consensus_handler()
2293        {
2294            let chunk: Vec<_> = effects_and_transaction_sizes
2295                .into_iter()
2296                .zip_debug_eq(signatures)
2297                .map(|((effects, _size), sigs)| (effects, sigs))
2298                .collect();
2299            return Ok(vec![chunk]);
2300        }
2301        let mut chunks = Vec::new();
2302        let mut chunk = Vec::new();
2303        let mut chunk_size: usize = 0;
2304        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
2305            .into_iter()
2306            .zip_debug_eq(signatures.into_iter())
2307        {
2308            // Roll over to a new chunk after either max count or max size is reached.
2309            // The size calculation here is intended to estimate the size of the
2310            // FullCheckpointContents struct. If this code is modified, that struct
2311            // should also be updated accordingly.
2312            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
2313                bcs::serialized_size(&signatures)?
2314            } else {
2315                let signatures: Vec<&GenericSignature> =
2316                    signatures.iter().map(|(s, _)| s).collect();
2317                bcs::serialized_size(&signatures)?
2318            };
2319            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
2320            if chunk.len() == self.max_transactions_per_checkpoint
2321                || (chunk_size + size) > self.max_checkpoint_size_bytes
2322            {
2323                if chunk.is_empty() {
2324                    // Always allow at least one tx in a checkpoint.
2325                    warn!(
2326                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
2327                        self.max_checkpoint_size_bytes
2328                    );
2329                } else {
2330                    chunks.push(chunk);
2331                    chunk = Vec::new();
2332                    chunk_size = 0;
2333                }
2334            }
2335
2336            chunk.push((effects, signatures));
2337            chunk_size += size;
2338        }
2339
2340        if !chunk.is_empty() || chunks.is_empty() {
2341            // We intentionally create an empty checkpoint if there is no content provided
2342            // to make a 'heartbeat' checkpoint.
2343            // Important: if some conditions are added here later, we need to make sure we always
2344            // have at least one chunk if last_pending_of_epoch is set
2345            chunks.push(chunk);
2346            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
2347            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
2348            // distinguish between "no transactions have happened" and "i am not receiving new
2349            // checkpoints".
2350        }
2351        Ok(chunks)
2352    }
2353
2354    fn load_last_built_checkpoint_summary(
2355        epoch_store: &AuthorityPerEpochStore,
2356        store: &CheckpointStore,
2357    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
2358        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
2359        if last_checkpoint.is_none() {
2360            let epoch = epoch_store.epoch();
2361            if epoch > 0 {
2362                let previous_epoch = epoch - 1;
2363                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
2364                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
2365                if let Some((ref seq, _)) = last_checkpoint {
2366                    debug!(
2367                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
2368                    );
2369                } else {
2370                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
2371                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
2372                }
2373            }
2374        }
2375        Ok(last_checkpoint)
2376    }
2377
2378    #[instrument(level = "debug", skip_all)]
2379    async fn create_checkpoints(
2380        &self,
2381        all_effects: Vec<TransactionEffects>,
2382        details: &PendingCheckpointInfo,
2383        all_roots: &HashSet<TransactionDigest>,
2384    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
2385        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
2386
2387        let total = all_effects.len();
2388        let mut last_checkpoint =
2389            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
2390        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
2391        debug!(
2392            checkpoint_commit_height = details.checkpoint_height,
2393            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
2394            checkpoint_timestamp = details.timestamp_ms,
2395            "Creating checkpoint(s) for {} transactions",
2396            all_effects.len(),
2397        );
2398
2399        let all_digests: Vec<_> = all_effects
2400            .iter()
2401            .map(|effect| *effect.transaction_digest())
2402            .collect();
2403        let transactions_and_sizes = self
2404            .state
2405            .get_transaction_cache_reader()
2406            .get_transactions_and_serialized_sizes(&all_digests)?;
2407        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
2408        let mut transactions = Vec::with_capacity(all_effects.len());
2409        let mut transaction_keys = Vec::with_capacity(all_effects.len());
2410        let mut randomness_rounds = BTreeMap::new();
2411        {
2412            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
2413            debug!(
2414                ?last_checkpoint_seq,
2415                "Waiting for {:?} certificates to appear in consensus",
2416                all_effects.len()
2417            );
2418
2419            for (effects, transaction_and_size) in all_effects
2420                .into_iter()
2421                .zip_debug_eq(transactions_and_sizes.into_iter())
2422            {
2423                let (transaction, size) = transaction_and_size
2424                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
2425                match transaction.inner().transaction_data().kind() {
2426                    TransactionKind::ConsensusCommitPrologue(_)
2427                    | TransactionKind::ConsensusCommitPrologueV2(_)
2428                    | TransactionKind::ConsensusCommitPrologueV3(_)
2429                    | TransactionKind::ConsensusCommitPrologueV4(_)
2430                    | TransactionKind::AuthenticatorStateUpdate(_) => {
2431                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
2432                        // processed before we reach here.
2433                    }
2434                    TransactionKind::ProgrammableSystemTransaction(_) => {
2435                        // settlement transactions are added by checkpoint builder
2436                    }
2437                    TransactionKind::ChangeEpoch(_)
2438                    | TransactionKind::Genesis(_)
2439                    | TransactionKind::EndOfEpochTransaction(_) => {
2440                        fatal!(
2441                            "unexpected transaction in checkpoint effects: {:?}",
2442                            transaction
2443                        );
2444                    }
2445                    TransactionKind::RandomnessStateUpdate(rsu) => {
2446                        randomness_rounds
2447                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2448                    }
2449                    TransactionKind::ProgrammableTransaction(_) => {
2450                        // Only transactions that are not roots should be included in the call to
2451                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2452                        // commit and so are known to be processed already.
2453                        let digest = *effects.transaction_digest();
2454                        if !all_roots.contains(&digest) {
2455                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2456                                ConsensusTransactionKey::Certificate(digest),
2457                            ));
2458                        }
2459                    }
2460                }
2461                transactions.push(transaction);
2462                all_effects_and_transaction_sizes.push((effects, size));
2463            }
2464
2465            self.epoch_store
2466                .consensus_messages_processed_notify(transaction_keys)
2467                .await?;
2468        }
2469
2470        let signatures = self
2471            .epoch_store
2472            .user_signatures_for_checkpoint(&transactions, &all_digests);
2473        debug!(
2474            ?last_checkpoint_seq,
2475            "Received {} checkpoint user signatures from consensus",
2476            signatures.len()
2477        );
2478
2479        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2480            Some(
2481                transactions
2482                    .iter()
2483                    .flat_map(|tx| {
2484                        if let TransactionKind::ProgrammableTransaction(ptb) =
2485                            tx.transaction_data().kind()
2486                        {
2487                            itertools::Either::Left(
2488                                ptb.commands
2489                                    .iter()
2490                                    .map(ExecutionTimeObservationKey::from_command),
2491                            )
2492                        } else {
2493                            itertools::Either::Right(std::iter::empty())
2494                        }
2495                    })
2496                    .collect(),
2497            )
2498        } else {
2499            None
2500        };
2501
2502        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2503        let chunks_count = chunks.len();
2504
2505        let mut checkpoints = Vec::with_capacity(chunks_count);
2506        debug!(
2507            ?last_checkpoint_seq,
2508            "Creating {} checkpoints with {} transactions", chunks_count, total,
2509        );
2510
2511        let epoch = self.epoch_store.epoch();
2512        for (index, transactions) in chunks.into_iter().enumerate() {
2513            let first_checkpoint_of_epoch = index == 0
2514                && last_checkpoint
2515                    .as_ref()
2516                    .map(|(_, c)| c.epoch != epoch)
2517                    .unwrap_or(true);
2518            if first_checkpoint_of_epoch {
2519                self.epoch_store
2520                    .record_epoch_first_checkpoint_creation_time_metric();
2521            }
2522            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2523
2524            let sequence_number = if let Some(preassigned_seq) = details.checkpoint_seq {
2525                preassigned_seq
2526            } else {
2527                last_checkpoint
2528                    .as_ref()
2529                    .map(|(_, c)| c.sequence_number + 1)
2530                    .unwrap_or_default()
2531            };
2532            let mut timestamp_ms = details.timestamp_ms;
2533            if let Some((_, last_checkpoint)) = &last_checkpoint
2534                && last_checkpoint.timestamp_ms > timestamp_ms
2535            {
2536                // First consensus commit of an epoch can have zero timestamp.
2537                debug!(
2538                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2539                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2540                );
2541                if self
2542                    .epoch_store
2543                    .protocol_config()
2544                    .enforce_checkpoint_timestamp_monotonicity()
2545                {
2546                    timestamp_ms = last_checkpoint.timestamp_ms;
2547                }
2548            }
2549
2550            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2551            let epoch_rolling_gas_cost_summary =
2552                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2553
2554            let end_of_epoch_data = if last_checkpoint_of_epoch {
2555                let system_state_obj = self
2556                    .augment_epoch_last_checkpoint(
2557                        &epoch_rolling_gas_cost_summary,
2558                        timestamp_ms,
2559                        &mut effects,
2560                        &mut signatures,
2561                        sequence_number,
2562                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2563                        last_checkpoint_seq.unwrap_or_default(),
2564                    )
2565                    .await?;
2566
2567                let committee = system_state_obj
2568                    .get_current_epoch_committee()
2569                    .committee()
2570                    .clone();
2571
2572                // This must happen after the call to augment_epoch_last_checkpoint,
2573                // otherwise we will not capture the change_epoch tx.
2574                let root_state_digest = {
2575                    let state_acc = self
2576                        .global_state_hasher
2577                        .upgrade()
2578                        .expect("No checkpoints should be getting built after local configuration");
2579                    let acc = state_acc.accumulate_checkpoint(
2580                        &effects,
2581                        sequence_number,
2582                        &self.epoch_store,
2583                    )?;
2584
2585                    state_acc
2586                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2587                        .await?;
2588
2589                    state_acc.accumulate_running_root(
2590                        &self.epoch_store,
2591                        sequence_number,
2592                        Some(acc),
2593                    )?;
2594                    state_acc
2595                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2596                        .await?
2597                };
2598                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2599                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2600
2601                let epoch_commitments = if self
2602                    .epoch_store
2603                    .protocol_config()
2604                    .check_commit_root_state_digest_supported()
2605                {
2606                    vec![root_state_digest.into()]
2607                } else {
2608                    vec![]
2609                };
2610
2611                Some(EndOfEpochData {
2612                    next_epoch_committee: committee.voting_rights,
2613                    next_epoch_protocol_version: ProtocolVersion::new(
2614                        system_state_obj.protocol_version(),
2615                    ),
2616                    epoch_commitments,
2617                })
2618            } else {
2619                self.send_to_hasher
2620                    .send((sequence_number, effects.clone()))
2621                    .await?;
2622
2623                None
2624            };
2625            let contents = if self.epoch_store.protocol_config().address_aliases() {
2626                CheckpointContents::new_v2(&effects, signatures)
2627            } else {
2628                CheckpointContents::new_with_digests_and_signatures(
2629                    effects.iter().map(TransactionEffects::execution_digests),
2630                    signatures
2631                        .into_iter()
2632                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2633                        .collect(),
2634                )
2635            };
2636
2637            let num_txns = contents.size() as u64;
2638
2639            let network_total_transactions = last_checkpoint
2640                .as_ref()
2641                .map(|(_, c)| c.network_total_transactions + num_txns)
2642                .unwrap_or(num_txns);
2643
2644            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2645
2646            let matching_randomness_rounds: Vec<_> = effects
2647                .iter()
2648                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2649                .copied()
2650                .collect();
2651
2652            let checkpoint_commitments = if self
2653                .epoch_store
2654                .protocol_config()
2655                .include_checkpoint_artifacts_digest_in_summary()
2656            {
2657                let artifacts = CheckpointArtifacts::from(&effects[..]);
2658                let artifacts_digest = artifacts.digest()?;
2659                vec![artifacts_digest.into()]
2660            } else {
2661                Default::default()
2662            };
2663
2664            let summary = CheckpointSummary::new(
2665                self.epoch_store.protocol_config(),
2666                epoch,
2667                sequence_number,
2668                network_total_transactions,
2669                &contents,
2670                previous_digest,
2671                epoch_rolling_gas_cost_summary,
2672                end_of_epoch_data,
2673                timestamp_ms,
2674                matching_randomness_rounds,
2675                checkpoint_commitments,
2676            );
2677            summary.report_checkpoint_age(
2678                &self.metrics.last_created_checkpoint_age,
2679                &self.metrics.last_created_checkpoint_age_ms,
2680            );
2681            if last_checkpoint_of_epoch {
2682                info!(
2683                    checkpoint_seq = sequence_number,
2684                    "creating last checkpoint of epoch {}", epoch
2685                );
2686                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2687                    self.epoch_store
2688                        .report_epoch_metrics_at_last_checkpoint(stats);
2689                }
2690            }
2691            last_checkpoint = Some((sequence_number, summary.clone()));
2692            checkpoints.push((summary, contents));
2693        }
2694
2695        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2696    }
2697
2698    fn get_epoch_total_gas_cost(
2699        &self,
2700        last_checkpoint: Option<&CheckpointSummary>,
2701        cur_checkpoint_effects: &[TransactionEffects],
2702    ) -> GasCostSummary {
2703        let (previous_epoch, previous_gas_costs) = last_checkpoint
2704            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2705            .unwrap_or_default();
2706        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2707        if previous_epoch == self.epoch_store.epoch() {
2708            // sum only when we are within the same epoch
2709            GasCostSummary::new(
2710                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2711                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2712                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2713                previous_gas_costs.non_refundable_storage_fee
2714                    + current_gas_costs.non_refundable_storage_fee,
2715            )
2716        } else {
2717            current_gas_costs
2718        }
2719    }
2720
2721    #[instrument(level = "error", skip_all)]
2722    async fn augment_epoch_last_checkpoint(
2723        &self,
2724        epoch_total_gas_cost: &GasCostSummary,
2725        epoch_start_timestamp_ms: CheckpointTimestamp,
2726        checkpoint_effects: &mut Vec<TransactionEffects>,
2727        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2728        checkpoint: CheckpointSequenceNumber,
2729        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2730        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2731        // >1 checkpoint.
2732        last_checkpoint: CheckpointSequenceNumber,
2733    ) -> CheckpointBuilderResult<SuiSystemState> {
2734        let (system_state, effects) = self
2735            .state
2736            .create_and_execute_advance_epoch_tx(
2737                &self.epoch_store,
2738                epoch_total_gas_cost,
2739                checkpoint,
2740                epoch_start_timestamp_ms,
2741                end_of_epoch_observation_keys,
2742                last_checkpoint,
2743            )
2744            .await?;
2745        checkpoint_effects.push(effects);
2746        signatures.push(vec![]);
2747        Ok(system_state)
2748    }
2749
2750    /// For the given roots return complete list of effects to include in checkpoint
2751    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2752    /// Note that this function may be called multiple times to construct the checkpoint.
2753    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2754    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2755    /// after the call of this function.
2756    #[instrument(level = "debug", skip_all)]
2757    fn complete_checkpoint_effects(
2758        &self,
2759        mut roots: Vec<TransactionEffects>,
2760        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2761    ) -> SuiResult<Vec<TransactionEffects>> {
2762        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2763        let mut results = vec![];
2764        let mut seen = HashSet::new();
2765        loop {
2766            let mut pending = HashSet::new();
2767
2768            let transactions_included = self
2769                .epoch_store
2770                .builder_included_transactions_in_checkpoint(
2771                    roots.iter().map(|e| e.transaction_digest()),
2772                )?;
2773
2774            for (effect, tx_included) in roots
2775                .into_iter()
2776                .zip_debug_eq(transactions_included.into_iter())
2777            {
2778                let digest = effect.transaction_digest();
2779                // Unnecessary to read effects of a dependency if the effect is already processed.
2780                seen.insert(*digest);
2781
2782                // Skip roots that are already included in the checkpoint.
2783                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2784                    continue;
2785                }
2786
2787                // Skip roots already included in checkpoints or roots from previous epochs
2788                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2789                    continue;
2790                }
2791
2792                let existing_effects = self
2793                    .epoch_store
2794                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2795
2796                for (dependency, effects_signature_exists) in effect
2797                    .dependencies()
2798                    .iter()
2799                    .zip_debug_eq(existing_effects.iter())
2800                {
2801                    // Skip here if dependency not executed in the current epoch.
2802                    // Note that the existence of an effects signature in the
2803                    // epoch store for the given digest indicates that the transaction
2804                    // was locally executed in the current epoch
2805                    if !effects_signature_exists {
2806                        continue;
2807                    }
2808                    if seen.insert(*dependency) {
2809                        pending.insert(*dependency);
2810                    }
2811                }
2812                results.push(effect);
2813            }
2814            if pending.is_empty() {
2815                break;
2816            }
2817            let pending = pending.into_iter().collect::<Vec<_>>();
2818            let effects = self.effects_store.multi_get_executed_effects(&pending);
2819            let effects = effects
2820                .into_iter()
2821                .zip_debug_eq(pending)
2822                .map(|(opt, digest)| match opt {
2823                    Some(x) => x,
2824                    None => panic!(
2825                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2826                        digest
2827                    ),
2828                })
2829                .collect::<Vec<_>>();
2830            roots = effects;
2831        }
2832
2833        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2834        Ok(results)
2835    }
2836
2837    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2838    // in simtest.
2839    #[cfg(msim)]
2840    fn expensive_consensus_commit_prologue_invariants_check(
2841        &self,
2842        root_digests: &[TransactionDigest],
2843        sorted: &[TransactionEffects],
2844    ) {
2845        // Gets all the consensus commit prologue transactions from the roots.
2846        let root_txs = self
2847            .state
2848            .get_transaction_cache_reader()
2849            .multi_get_transaction_blocks(root_digests);
2850        let ccps = root_txs
2851            .iter()
2852            .filter_map(|tx| {
2853                if let Some(tx) = tx {
2854                    if tx.transaction_data().is_consensus_commit_prologue() {
2855                        Some(tx)
2856                    } else {
2857                        None
2858                    }
2859                } else {
2860                    None
2861                }
2862            })
2863            .collect::<Vec<_>>();
2864
2865        // There should be at most one consensus commit prologue transaction in the roots.
2866        assert!(ccps.len() <= 1);
2867
2868        // Get all the transactions in the checkpoint.
2869        let txs = self
2870            .state
2871            .get_transaction_cache_reader()
2872            .multi_get_transaction_blocks(
2873                &sorted
2874                    .iter()
2875                    .map(|tx| tx.transaction_digest().clone())
2876                    .collect::<Vec<_>>(),
2877            );
2878
2879        if ccps.len() == 0 {
2880            // If there is no consensus commit prologue transaction in the roots, then there should be no
2881            // consensus commit prologue transaction in the checkpoint.
2882            for tx in txs.iter() {
2883                if let Some(tx) = tx {
2884                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2885                }
2886            }
2887        } else {
2888            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2889            assert!(
2890                txs[0]
2891                    .as_ref()
2892                    .unwrap()
2893                    .transaction_data()
2894                    .is_consensus_commit_prologue()
2895            );
2896
2897            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2898
2899            for tx in txs.iter().skip(1) {
2900                if let Some(tx) = tx {
2901                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2902                }
2903            }
2904        }
2905    }
2906}
2907
2908async fn wait_for_effects_with_retry(
2909    effects_store: &dyn TransactionCacheRead,
2910    task_name: &'static str,
2911    digests: &[TransactionDigest],
2912    tx_key: TransactionKey,
2913) -> Vec<TransactionEffects> {
2914    let delay = if in_antithesis() {
2915        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2916        15
2917    } else {
2918        5
2919    };
2920    loop {
2921        match tokio::time::timeout(Duration::from_secs(delay), async {
2922            effects_store
2923                .notify_read_executed_effects(task_name, digests)
2924                .await
2925        })
2926        .await
2927        {
2928            Ok(effects) => break effects,
2929            Err(_) => {
2930                debug_fatal!(
2931                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2932                    tx_key
2933                );
2934            }
2935        }
2936    }
2937}
2938
2939impl CheckpointAggregator {
2940    fn new(
2941        tables: Arc<CheckpointStore>,
2942        epoch_store: Arc<AuthorityPerEpochStore>,
2943        notify: Arc<Notify>,
2944        receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2945        output: Box<dyn CertifiedCheckpointOutput>,
2946        state: Arc<AuthorityState>,
2947        metrics: Arc<CheckpointMetrics>,
2948    ) -> Self {
2949        Self {
2950            store: tables,
2951            epoch_store,
2952            notify,
2953            receiver,
2954            pending: BTreeMap::new(),
2955            current: None,
2956            output,
2957            state,
2958            metrics,
2959        }
2960    }
2961
2962    async fn run(mut self) {
2963        info!("Starting CheckpointAggregator");
2964        loop {
2965            // Drain all signatures that arrived since the last iteration into the pending buffer
2966            while let Ok(sig) = self.receiver.try_recv() {
2967                self.pending
2968                    .entry(sig.summary.sequence_number)
2969                    .or_default()
2970                    .push(sig);
2971            }
2972
2973            if let Err(e) = self.run_and_notify().await {
2974                error!(
2975                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2976                    e
2977                );
2978                self.metrics.checkpoint_errors.inc();
2979                tokio::time::sleep(Duration::from_secs(1)).await;
2980                continue;
2981            }
2982
2983            tokio::select! {
2984                Some(sig) = self.receiver.recv() => {
2985                    self.pending
2986                        .entry(sig.summary.sequence_number)
2987                        .or_default()
2988                        .push(sig);
2989                }
2990                _ = self.notify.notified() => {}
2991                _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2992            }
2993        }
2994    }
2995
2996    async fn run_and_notify(&mut self) -> SuiResult {
2997        let summaries = self.run_inner()?;
2998        for summary in summaries {
2999            self.output.certified_checkpoint_created(&summary).await?;
3000        }
3001        Ok(())
3002    }
3003
3004    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
3005        let _scope = monitored_scope("CheckpointAggregator");
3006        let mut result = vec![];
3007        'outer: loop {
3008            let next_to_certify = self.next_checkpoint_to_certify()?;
3009            // Discard buffered signatures for checkpoints already certified
3010            // (e.g. certified via StateSync before local aggregation completed).
3011            self.pending.retain(|&seq, _| seq >= next_to_certify);
3012            let current = if let Some(current) = &mut self.current {
3013                // It's possible that the checkpoint was already certified by
3014                // the rest of the network and we've already received the
3015                // certified checkpoint via StateSync. In this case, we reset
3016                // the current signature aggregator to the next checkpoint to
3017                // be certified
3018                if current.summary.sequence_number < next_to_certify {
3019                    assert_reachable!("skip checkpoint certification");
3020                    self.current = None;
3021                    continue;
3022                }
3023                current
3024            } else {
3025                let Some(summary) = self
3026                    .epoch_store
3027                    .get_built_checkpoint_summary(next_to_certify)?
3028                else {
3029                    return Ok(result);
3030                };
3031                self.current = Some(CheckpointSignatureAggregator {
3032                    digest: summary.digest(),
3033                    summary,
3034                    signatures_by_digest: MultiStakeAggregator::new(
3035                        self.epoch_store.committee().clone(),
3036                    ),
3037                    store: self.store.clone(),
3038                    state: self.state.clone(),
3039                    metrics: self.metrics.clone(),
3040                });
3041                self.current.as_mut().unwrap()
3042            };
3043
3044            let seq = current.summary.sequence_number;
3045            let sigs = self.pending.remove(&seq).unwrap_or_default();
3046            if sigs.is_empty() {
3047                trace!(
3048                    checkpoint_seq =? seq,
3049                    "Not enough checkpoint signatures",
3050                );
3051                return Ok(result);
3052            }
3053            for data in sigs {
3054                trace!(
3055                    checkpoint_seq = seq,
3056                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
3057                    current.summary.digest(),
3058                    data.summary.auth_sig().authority.concise()
3059                );
3060                self.metrics
3061                    .checkpoint_participation
3062                    .with_label_values(&[&format!(
3063                        "{:?}",
3064                        data.summary.auth_sig().authority.concise()
3065                    )])
3066                    .inc();
3067                if let Ok(auth_signature) = current.try_aggregate(data) {
3068                    debug!(
3069                        checkpoint_seq = seq,
3070                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
3071                        current.summary.digest(),
3072                    );
3073                    let summary = VerifiedCheckpoint::new_unchecked(
3074                        CertifiedCheckpointSummary::new_from_data_and_sig(
3075                            current.summary.clone(),
3076                            auth_signature,
3077                        ),
3078                    );
3079
3080                    self.store.insert_certified_checkpoint(&summary)?;
3081                    self.metrics.last_certified_checkpoint.set(seq as i64);
3082                    current.summary.report_checkpoint_age(
3083                        &self.metrics.last_certified_checkpoint_age,
3084                        &self.metrics.last_certified_checkpoint_age_ms,
3085                    );
3086                    result.push(summary.into_inner());
3087                    self.current = None;
3088                    continue 'outer;
3089                }
3090            }
3091            break;
3092        }
3093        Ok(result)
3094    }
3095
3096    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
3097        Ok(self
3098            .store
3099            .tables
3100            .certified_checkpoints
3101            .reversed_safe_iter_with_bounds(None, None)?
3102            .next()
3103            .transpose()?
3104            .map(|(seq, _)| seq + 1)
3105            .unwrap_or_default())
3106    }
3107}
3108
3109impl CheckpointSignatureAggregator {
3110    #[allow(clippy::result_unit_err)]
3111    pub fn try_aggregate(
3112        &mut self,
3113        data: CheckpointSignatureMessage,
3114    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
3115        let their_digest = *data.summary.digest();
3116        let (_, signature) = data.summary.into_data_and_sig();
3117        let author = signature.authority;
3118        let envelope =
3119            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
3120        match self.signatures_by_digest.insert(their_digest, envelope) {
3121            // ignore repeated signatures
3122            InsertResult::Failed { error }
3123                if matches!(
3124                    error.as_inner(),
3125                    SuiErrorKind::StakeAggregatorRepeatedSigner {
3126                        conflicting_sig: false,
3127                        ..
3128                    },
3129                ) =>
3130            {
3131                Err(())
3132            }
3133            InsertResult::Failed { error } => {
3134                warn!(
3135                    checkpoint_seq = self.summary.sequence_number,
3136                    "Failed to aggregate new signature from validator {:?}: {:?}",
3137                    author.concise(),
3138                    error
3139                );
3140                self.check_for_split_brain();
3141                Err(())
3142            }
3143            InsertResult::QuorumReached(cert) => {
3144                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
3145                // the signature so we know that the author signed the message at some point.
3146                if their_digest != self.digest {
3147                    self.metrics.remote_checkpoint_forks.inc();
3148                    warn!(
3149                        checkpoint_seq = self.summary.sequence_number,
3150                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
3151                        author.concise(),
3152                        their_digest,
3153                        self.digest
3154                    );
3155                    return Err(());
3156                }
3157                Ok(cert)
3158            }
3159            InsertResult::NotEnoughVotes {
3160                bad_votes: _,
3161                bad_authorities: _,
3162            } => {
3163                self.check_for_split_brain();
3164                Err(())
3165            }
3166        }
3167    }
3168
3169    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
3170    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
3171    /// irrespective of the outcome of any outstanding votes.
3172    fn check_for_split_brain(&self) {
3173        debug!(
3174            checkpoint_seq = self.summary.sequence_number,
3175            "Checking for split brain condition"
3176        );
3177        if self.signatures_by_digest.quorum_unreachable() {
3178            // TODO: at this point we should immediately halt processing
3179            // of new transaction certificates to avoid building on top of
3180            // forked output
3181            // self.halt_all_execution();
3182
3183            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3184            let digests_by_stake_messages = all_unique_values
3185                .iter()
3186                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
3187                .map(|(digest, (_authorities, total_stake))| {
3188                    format!("{:?} (total stake: {})", digest, total_stake)
3189                })
3190                .collect::<Vec<String>>();
3191            fail_point_arg!("kill_split_brain_node", |(
3192                checkpoint_overrides,
3193                forked_authorities,
3194            ): (
3195                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
3196                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
3197            )| {
3198                #[cfg(msim)]
3199                {
3200                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
3201                        (checkpoint_overrides.lock(), forked_authorities.lock())
3202                    {
3203                        // Find the digest produced by non-forked authorities
3204                        let correct_digest = all_unique_values
3205                            .iter()
3206                            .find(|(_, (authorities, _))| {
3207                                // Check if any authority that produced this digest is NOT in the forked set
3208                                authorities
3209                                    .iter()
3210                                    .any(|auth| !forked_authorities_set.contains(auth))
3211                            })
3212                            .map(|(digest, _)| digest.to_string())
3213                            .unwrap_or_else(|| {
3214                                // Fallback: use the digest with the highest stake
3215                                all_unique_values
3216                                    .iter()
3217                                    .max_by_key(|(_, (_, stake))| *stake)
3218                                    .map(|(digest, _)| digest.to_string())
3219                                    .unwrap_or_else(|| self.digest.to_string())
3220                            });
3221
3222                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
3223
3224                        tracing::error!(
3225                            fatal = true,
3226                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
3227                            self.summary.sequence_number,
3228                            correct_digest
3229                        );
3230                    }
3231                }
3232            });
3233
3234            debug_fatal!(
3235                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
3236                self.summary.sequence_number,
3237                self.signatures_by_digest.uncommitted_stake(),
3238                digests_by_stake_messages
3239            );
3240            self.metrics.split_brain_checkpoint_forks.inc();
3241
3242            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
3243            let local_summary = self.summary.clone();
3244            let state = self.state.clone();
3245            let tables = self.store.clone();
3246
3247            tokio::spawn(async move {
3248                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
3249            });
3250        }
3251    }
3252}
3253
3254/// Create data dump containing relevant data for diagnosing cause of the
3255/// split brain by querying one disagreeing validator for full checkpoint contents.
3256/// To minimize peer chatter, we only query one validator at random from each
3257/// disagreeing faction, as all honest validators that participated in this round may
3258/// inevitably run the same process.
3259async fn diagnose_split_brain(
3260    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
3261    local_summary: CheckpointSummary,
3262    state: Arc<AuthorityState>,
3263    tables: Arc<CheckpointStore>,
3264) {
3265    debug!(
3266        checkpoint_seq = local_summary.sequence_number,
3267        "Running split brain diagnostics..."
3268    );
3269    let time = SystemTime::now();
3270    // collect one random disagreeing validator per differing digest
3271    let digest_to_validator = all_unique_values
3272        .iter()
3273        .filter_map(|(digest, (validators, _))| {
3274            if *digest != local_summary.digest() {
3275                let random_validator = validators.choose(&mut get_rng()).unwrap();
3276                Some((*digest, *random_validator))
3277            } else {
3278                None
3279            }
3280        })
3281        .collect::<HashMap<_, _>>();
3282    if digest_to_validator.is_empty() {
3283        panic!(
3284            "Given split brain condition, there should be at \
3285                least one validator that disagrees with local signature"
3286        );
3287    }
3288
3289    let epoch_store = state.load_epoch_store_one_call_per_task();
3290    let committee = epoch_store
3291        .epoch_start_state()
3292        .get_sui_committee_with_network_metadata();
3293    let network_config = default_mysten_network_config();
3294    let network_clients =
3295        make_network_authority_clients_with_network_config(&committee, &network_config);
3296
3297    // Query all disagreeing validators
3298    let response_futures = digest_to_validator
3299        .values()
3300        .cloned()
3301        .map(|validator| {
3302            let client = network_clients
3303                .get(&validator)
3304                .expect("Failed to get network client");
3305            let request = CheckpointRequestV2 {
3306                sequence_number: Some(local_summary.sequence_number),
3307                request_content: true,
3308                certified: false,
3309            };
3310            client.handle_checkpoint_v2(request)
3311        })
3312        .collect::<Vec<_>>();
3313
3314    let digest_name_pair = digest_to_validator.iter();
3315    let response_data = futures::future::join_all(response_futures)
3316        .await
3317        .into_iter()
3318        .zip_debug_eq(digest_name_pair)
3319        .filter_map(|(response, (digest, name))| match response {
3320            Ok(response) => match response {
3321                CheckpointResponseV2 {
3322                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
3323                    contents: Some(contents),
3324                } => Some((*name, *digest, summary, contents)),
3325                CheckpointResponseV2 {
3326                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
3327                    contents: _,
3328                } => {
3329                    panic!("Expected pending checkpoint, but got certified checkpoint");
3330                }
3331                CheckpointResponseV2 {
3332                    checkpoint: None,
3333                    contents: _,
3334                } => {
3335                    error!(
3336                        "Summary for checkpoint {:?} not found on validator {:?}",
3337                        local_summary.sequence_number, name
3338                    );
3339                    None
3340                }
3341                CheckpointResponseV2 {
3342                    checkpoint: _,
3343                    contents: None,
3344                } => {
3345                    error!(
3346                        "Contents for checkpoint {:?} not found on validator {:?}",
3347                        local_summary.sequence_number, name
3348                    );
3349                    None
3350                }
3351            },
3352            Err(e) => {
3353                error!(
3354                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
3355                    e
3356                );
3357                None
3358            }
3359        })
3360        .collect::<Vec<_>>();
3361
3362    let local_checkpoint_contents = tables
3363        .get_checkpoint_contents(&local_summary.content_digest)
3364        .unwrap_or_else(|_| {
3365            panic!(
3366                "Could not find checkpoint contents for digest {:?}",
3367                local_summary.digest()
3368            )
3369        })
3370        .unwrap_or_else(|| {
3371            panic!(
3372                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
3373                local_summary.sequence_number,
3374                local_summary.digest()
3375            )
3376        });
3377    let local_contents_text = format!("{local_checkpoint_contents:?}");
3378
3379    let local_summary_text = format!("{local_summary:?}");
3380    let local_validator = state.name.concise();
3381    let diff_patches = response_data
3382        .iter()
3383        .map(|(name, other_digest, other_summary, contents)| {
3384            let other_contents_text = format!("{contents:?}");
3385            let other_summary_text = format!("{other_summary:?}");
3386            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
3387                .enumerate_transactions(&local_summary)
3388                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3389                .unzip();
3390            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
3391                .enumerate_transactions(other_summary)
3392                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
3393                .unzip();
3394            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
3395            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
3396            let local_transactions_text = format!("{local_transactions:#?}");
3397            let other_transactions_text = format!("{other_transactions:#?}");
3398            let transactions_patch =
3399                create_patch(&local_transactions_text, &other_transactions_text);
3400            let local_effects_text = format!("{local_effects:#?}");
3401            let other_effects_text = format!("{other_effects:#?}");
3402            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
3403            let seq_number = local_summary.sequence_number;
3404            let local_digest = local_summary.digest();
3405            let other_validator = name.concise();
3406            format!(
3407                "Checkpoint: {seq_number:?}\n\
3408                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
3409                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
3410                Summary Diff: \n{summary_patch}\n\n\
3411                Contents Diff: \n{contents_patch}\n\n\
3412                Transactions Diff: \n{transactions_patch}\n\n\
3413                Effects Diff: \n{effects_patch}",
3414            )
3415        })
3416        .collect::<Vec<_>>()
3417        .join("\n\n\n");
3418
3419    let header = format!(
3420        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
3421        Datetime: {:?}",
3422        time
3423    );
3424    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
3425    let path = tempfile::tempdir()
3426        .expect("Failed to create tempdir")
3427        .keep()
3428        .join(Path::new("checkpoint_fork_dump.txt"));
3429    let mut file = File::create(path).unwrap();
3430    write!(file, "{}", fork_logs_text).unwrap();
3431    debug!("{}", fork_logs_text);
3432}
3433
3434pub trait CheckpointServiceNotify {
3435    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
3436
3437    fn notify_checkpoint(&self) -> SuiResult;
3438}
3439
3440#[allow(clippy::large_enum_variant)]
3441enum CheckpointServiceState {
3442    Unstarted(
3443        (
3444            CheckpointBuilder,
3445            CheckpointAggregator,
3446            CheckpointStateHasher,
3447        ),
3448    ),
3449    Started,
3450}
3451
3452impl CheckpointServiceState {
3453    fn take_unstarted(
3454        &mut self,
3455    ) -> (
3456        CheckpointBuilder,
3457        CheckpointAggregator,
3458        CheckpointStateHasher,
3459    ) {
3460        let mut state = CheckpointServiceState::Started;
3461        std::mem::swap(self, &mut state);
3462
3463        match state {
3464            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
3465                (builder, aggregator, hasher)
3466            }
3467            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
3468        }
3469    }
3470}
3471
3472pub struct CheckpointService {
3473    tables: Arc<CheckpointStore>,
3474    notify_builder: Arc<Notify>,
3475    signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
3476    // A notification for the current highest built sequence number.
3477    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3478    // The highest sequence number that had already been built at the time CheckpointService
3479    // was constructed
3480    highest_previously_built_seq: CheckpointSequenceNumber,
3481    metrics: Arc<CheckpointMetrics>,
3482    state: Mutex<CheckpointServiceState>,
3483}
3484
3485impl CheckpointService {
3486    /// Constructs a new CheckpointService in an un-started state.
3487    // The signature channel is unbounded because notify_checkpoint_signature is called from a
3488    // sync context (consensus_validator.rs implements a sync external trait) and cannot block.
3489    // The channel is consumed by a single async aggregator task that drains it continuously, so
3490    // unbounded growth is not a concern in practice.
3491    #[allow(clippy::disallowed_methods)]
3492    pub fn build(
3493        state: Arc<AuthorityState>,
3494        checkpoint_store: Arc<CheckpointStore>,
3495        epoch_store: Arc<AuthorityPerEpochStore>,
3496        effects_store: Arc<dyn TransactionCacheRead>,
3497        global_state_hasher: Weak<GlobalStateHasher>,
3498        checkpoint_output: Box<dyn CheckpointOutput>,
3499        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3500        metrics: Arc<CheckpointMetrics>,
3501        max_transactions_per_checkpoint: usize,
3502        max_checkpoint_size_bytes: usize,
3503    ) -> Arc<Self> {
3504        info!(
3505            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3506        );
3507        Self::initialize_accumulator_account_metrics(&state, &epoch_store, &metrics);
3508        let notify_builder = Arc::new(Notify::new());
3509        let notify_aggregator = Arc::new(Notify::new());
3510
3511        // We may have built higher checkpoint numbers before restarting.
3512        let highest_previously_built_seq = checkpoint_store
3513            .get_latest_locally_computed_checkpoint()
3514            .expect("failed to get latest locally computed checkpoint")
3515            .map(|s| s.sequence_number)
3516            .unwrap_or(0);
3517
3518        let highest_currently_built_seq =
3519            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3520                .expect("epoch should not have ended")
3521                .map(|(seq, _)| seq)
3522                .unwrap_or(0);
3523
3524        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3525
3526        let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
3527
3528        let aggregator = CheckpointAggregator::new(
3529            checkpoint_store.clone(),
3530            epoch_store.clone(),
3531            notify_aggregator.clone(),
3532            signature_receiver,
3533            certified_checkpoint_output,
3534            state.clone(),
3535            metrics.clone(),
3536        );
3537
3538        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3539
3540        let ckpt_state_hasher = CheckpointStateHasher::new(
3541            epoch_store.clone(),
3542            global_state_hasher.clone(),
3543            receive_from_builder,
3544        );
3545
3546        let builder = CheckpointBuilder::new(
3547            state.clone(),
3548            checkpoint_store.clone(),
3549            epoch_store.clone(),
3550            notify_builder.clone(),
3551            effects_store,
3552            global_state_hasher,
3553            send_to_hasher,
3554            checkpoint_output,
3555            notify_aggregator.clone(),
3556            highest_currently_built_seq_tx.clone(),
3557            metrics.clone(),
3558            max_transactions_per_checkpoint,
3559            max_checkpoint_size_bytes,
3560        );
3561
3562        Arc::new(Self {
3563            tables: checkpoint_store,
3564            notify_builder,
3565            signature_sender,
3566            highest_currently_built_seq_tx,
3567            highest_previously_built_seq,
3568            metrics,
3569            state: Mutex::new(CheckpointServiceState::Unstarted((
3570                builder,
3571                aggregator,
3572                ckpt_state_hasher,
3573            ))),
3574        })
3575    }
3576
3577    fn initialize_accumulator_account_metrics(
3578        state: &AuthorityState,
3579        epoch_store: &AuthorityPerEpochStore,
3580        metrics: &CheckpointMetrics,
3581    ) {
3582        if !epoch_store.protocol_config().enable_accumulators() {
3583            return;
3584        }
3585
3586        let object_store = state.get_object_store();
3587        match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
3588            Ok(Some(count)) => metrics.initialize_accumulator_accounts_live(count),
3589            Ok(None) => {}
3590            Err(e) => fatal!("failed to initialize accumulator account metrics: {e}"),
3591        }
3592    }
3593
3594    /// Starts the CheckpointService.
3595    ///
3596    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3597    /// been built before the most recent restart. You can think of this as a WAL replay
3598    /// operation. Upon startup, we may have a number of consensus commits and resulting
3599    /// checkpoints that were built but not committed to disk. We want to reprocess the
3600    /// commits and rebuild the checkpoints before starting normal operation.
3601    pub async fn spawn(
3602        &self,
3603        epoch_store: Arc<AuthorityPerEpochStore>,
3604        consensus_replay_waiter: Option<ReplayWaiter>,
3605    ) {
3606        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3607
3608        // Clean up state hashes computed after the last built checkpoint
3609        // This prevents ECMH divergence after fork recovery restarts
3610
3611        // Note: there is a rare crash recovery edge case where we write the builder
3612        // summary, but crash before we can bump the highest executed checkpoint.
3613        // If we committed the builder summary, it was certified and unforked, so there
3614        // is no need to clear that state hash. If we do clear it, then checkpoint executor
3615        // will wait forever for checkpoint builder to produce the state hash, which will
3616        // never happen.
3617        let last_persisted_builder_seq = epoch_store
3618            .last_persisted_checkpoint_builder_summary()
3619            .expect("epoch should not have ended")
3620            .map(|s| s.summary.sequence_number);
3621
3622        let last_executed_seq = self
3623            .tables
3624            .get_highest_executed_checkpoint()
3625            .expect("Failed to get highest executed checkpoint")
3626            .map(|checkpoint| *checkpoint.sequence_number());
3627
3628        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
3629            if let Err(e) = builder
3630                .epoch_store
3631                .clear_state_hashes_after_checkpoint(last_committed_seq)
3632            {
3633                error!(
3634                    "Failed to clear state hashes after checkpoint {}: {:?}",
3635                    last_committed_seq, e
3636                );
3637            } else {
3638                info!(
3639                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3640                    last_committed_seq
3641                );
3642            }
3643        }
3644
3645        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3646
3647        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3648        let aggregator_task = spawn_monitored_task!(aggregator.run());
3649
3650        spawn_monitored_task!(async move {
3651            epoch_store
3652                .within_alive_epoch(async move {
3653                    builder.run(consensus_replay_waiter).await;
3654                    builder_finished_tx.send(()).ok();
3655                })
3656                .await
3657                .ok();
3658
3659            // state hasher will terminate as soon as it has finished processing all messages from builder
3660            state_hasher_task
3661                .await
3662                .expect("state hasher should exit normally");
3663
3664            // builder must shut down before aggregator and state_hasher, since it sends
3665            // messages to them
3666            aggregator_task.abort();
3667            aggregator_task.await.ok();
3668        });
3669
3670        // If this times out, the validator may still start up. The worst that can
3671        // happen is that we will crash later on instead of immediately. The eventual
3672        // crash would occur because we may be missing transactions that are below the
3673        // highest_synced_checkpoint watermark, which can cause a crash in
3674        // `CheckpointExecutor::extract_randomness_rounds`.
3675        if tokio::time::timeout(Duration::from_secs(120), async move {
3676            tokio::select! {
3677                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3678                _ = self.wait_for_rebuilt_checkpoints() => (),
3679            }
3680        })
3681        .await
3682        .is_err()
3683        {
3684            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3685        }
3686    }
3687}
3688
3689impl CheckpointService {
3690    /// Waits until all checkpoints had been built before the node restarted
3691    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3692    /// (and their transactions) below the highest_synced_checkpoint watermark are
3693    /// available. Once the checkpoints are constructed, we can be sure that the
3694    /// transactions have also been executed.
3695    pub async fn wait_for_rebuilt_checkpoints(&self) {
3696        let highest_previously_built_seq = self.highest_previously_built_seq;
3697        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3698        let mut highest_currently_built_seq = *rx.borrow_and_update();
3699        info!(
3700            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3701        );
3702        loop {
3703            if highest_currently_built_seq >= highest_previously_built_seq {
3704                info!("Checkpoint rebuild complete");
3705                break;
3706            }
3707            rx.changed().await.unwrap();
3708            highest_currently_built_seq = *rx.borrow_and_update();
3709        }
3710    }
3711
3712    #[cfg(test)]
3713    fn write_and_notify_checkpoint_for_testing(
3714        &self,
3715        epoch_store: &AuthorityPerEpochStore,
3716        checkpoint: PendingCheckpoint,
3717    ) -> SuiResult {
3718        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3719
3720        let mut output = ConsensusCommitOutput::new(0);
3721        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3722        output.set_default_commit_stats_for_testing();
3723        epoch_store.push_consensus_output_for_tests(output);
3724        self.notify_checkpoint()?;
3725        Ok(())
3726    }
3727}
3728
3729impl CheckpointServiceNotify for CheckpointService {
3730    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3731        let sequence = info.summary.sequence_number;
3732        let signer = info.summary.auth_sig().authority.concise();
3733
3734        if let Some(highest_verified_checkpoint) = self
3735            .tables
3736            .get_highest_verified_checkpoint()?
3737            .map(|x| *x.sequence_number())
3738            && sequence <= highest_verified_checkpoint
3739        {
3740            trace!(
3741                checkpoint_seq = sequence,
3742                "Ignore checkpoint signature from {} - already certified", signer,
3743            );
3744            self.metrics
3745                .last_ignored_checkpoint_signature_received
3746                .set(sequence as i64);
3747            return Ok(());
3748        }
3749        trace!(
3750            checkpoint_seq = sequence,
3751            "Received checkpoint signature, digest {} from {}",
3752            info.summary.digest(),
3753            signer,
3754        );
3755        self.metrics
3756            .last_received_checkpoint_signatures
3757            .with_label_values(&[&signer.to_string()])
3758            .set(sequence as i64);
3759        self.signature_sender.send(info.clone()).ok();
3760        Ok(())
3761    }
3762
3763    fn notify_checkpoint(&self) -> SuiResult {
3764        self.notify_builder.notify_one();
3765        Ok(())
3766    }
3767}
3768
3769// test helper
3770pub struct CheckpointServiceNoop {}
3771impl CheckpointServiceNotify for CheckpointServiceNoop {
3772    fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3773        Ok(())
3774    }
3775
3776    fn notify_checkpoint(&self) -> SuiResult {
3777        Ok(())
3778    }
3779}
3780
3781impl PendingCheckpoint {
3782    pub fn height(&self) -> CheckpointHeight {
3783        self.details.checkpoint_height
3784    }
3785
3786    pub fn roots(&self) -> &Vec<TransactionKey> {
3787        &self.roots
3788    }
3789
3790    pub fn details(&self) -> &PendingCheckpointInfo {
3791        &self.details
3792    }
3793}
3794
3795impl PendingCheckpointV2 {
3796    pub fn height(&self) -> CheckpointHeight {
3797        self.details.checkpoint_height
3798    }
3799
3800    pub(crate) fn num_roots(&self) -> usize {
3801        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3802    }
3803}
3804
3805pin_project! {
3806    pub struct PollCounter<Fut> {
3807        #[pin]
3808        future: Fut,
3809        count: usize,
3810    }
3811}
3812
3813impl<Fut> PollCounter<Fut> {
3814    pub fn new(future: Fut) -> Self {
3815        Self { future, count: 0 }
3816    }
3817
3818    pub fn count(&self) -> usize {
3819        self.count
3820    }
3821}
3822
3823impl<Fut: Future> Future for PollCounter<Fut> {
3824    type Output = (usize, Fut::Output);
3825
3826    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3827        let this = self.project();
3828        *this.count += 1;
3829        match this.future.poll(cx) {
3830            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3831            Poll::Pending => Poll::Pending,
3832        }
3833    }
3834}
3835
3836fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3837    PollCounter::new(future)
3838}
3839
3840#[cfg(test)]
3841mod tests {
3842    use super::*;
3843    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3844    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3845    use futures::FutureExt as _;
3846    use futures::future::BoxFuture;
3847    use std::collections::HashMap;
3848    use std::ops::Deref;
3849    use sui_macros::sim_test;
3850    use sui_protocol_config::{Chain, ProtocolConfig};
3851    use sui_types::accumulator_event::AccumulatorEvent;
3852    use sui_types::authenticator_state::ActiveJwk;
3853    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3854    use sui_types::crypto::Signature;
3855    use sui_types::effects::{TransactionEffects, TransactionEvents};
3856    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3857    use sui_types::transaction::VerifiedTransaction;
3858    use tokio::sync::mpsc;
3859
3860    #[tokio::test]
3861    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3862        let store = CheckpointStore::new_for_tests();
3863        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3864        for seq in 70u64..=80u64 {
3865            let contents =
3866                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3867                    [sui_types::base_types::ExecutionDigests::new(
3868                        sui_types::digests::TransactionDigest::random(),
3869                        sui_types::digests::TransactionEffectsDigest::ZERO,
3870                    )],
3871                );
3872            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3873                &protocol,
3874                0,
3875                seq,
3876                0,
3877                &contents,
3878                None,
3879                sui_types::gas::GasCostSummary::default(),
3880                None,
3881                0,
3882                Vec::new(),
3883                Vec::new(),
3884            );
3885            store
3886                .tables
3887                .locally_computed_checkpoints
3888                .insert(&seq, &summary)
3889                .unwrap();
3890        }
3891
3892        store
3893            .clear_locally_computed_checkpoints_from(76)
3894            .expect("clear should succeed");
3895
3896        // Explicit boundary checks: 75 must remain, 76 must be deleted
3897        assert!(
3898            store
3899                .tables
3900                .locally_computed_checkpoints
3901                .get(&75)
3902                .unwrap()
3903                .is_some()
3904        );
3905        assert!(
3906            store
3907                .tables
3908                .locally_computed_checkpoints
3909                .get(&76)
3910                .unwrap()
3911                .is_none()
3912        );
3913
3914        for seq in 70u64..76u64 {
3915            assert!(
3916                store
3917                    .tables
3918                    .locally_computed_checkpoints
3919                    .get(&seq)
3920                    .unwrap()
3921                    .is_some()
3922            );
3923        }
3924        for seq in 76u64..=80u64 {
3925            assert!(
3926                store
3927                    .tables
3928                    .locally_computed_checkpoints
3929                    .get(&seq)
3930                    .unwrap()
3931                    .is_none()
3932            );
3933        }
3934    }
3935
3936    #[tokio::test]
3937    async fn test_fork_detection_storage() {
3938        let store = CheckpointStore::new_for_tests();
3939        // checkpoint fork
3940        let seq_num = 42;
3941        let digest = CheckpointDigest::random();
3942
3943        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3944
3945        store
3946            .record_checkpoint_fork_detected(seq_num, digest)
3947            .unwrap();
3948
3949        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3950        assert!(retrieved.is_some());
3951        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3952        assert_eq!(retrieved_seq, seq_num);
3953        assert_eq!(retrieved_digest, digest);
3954
3955        store.clear_checkpoint_fork_detected().unwrap();
3956        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3957
3958        // txn fork
3959        let tx_digest = TransactionDigest::random();
3960        let expected_effects = TransactionEffectsDigest::random();
3961        let actual_effects = TransactionEffectsDigest::random();
3962
3963        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3964
3965        store
3966            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3967            .unwrap();
3968
3969        let retrieved = store.get_transaction_fork_detected().unwrap();
3970        assert!(retrieved.is_some());
3971        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3972        assert_eq!(retrieved_tx, tx_digest);
3973        assert_eq!(retrieved_expected, expected_effects);
3974        assert_eq!(retrieved_actual, actual_effects);
3975
3976        store.clear_transaction_fork_detected().unwrap();
3977        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3978    }
3979
3980    #[sim_test]
3981    pub async fn checkpoint_builder_test() {
3982        telemetry_subscribers::init_for_testing();
3983
3984        let mut protocol_config =
3985            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3986        protocol_config.disable_accumulators_for_testing();
3987        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(false);
3988        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3989        let state = TestAuthorityBuilder::new()
3990            .with_protocol_config(protocol_config)
3991            .build()
3992            .await;
3993
3994        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3995            0,
3996            0,
3997            vec![],
3998            SequenceNumber::new(),
3999        );
4000
4001        let jwks = {
4002            let mut jwks = Vec::new();
4003            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
4004                jwks.push(ActiveJwk {
4005                    jwk_id: JwkId::new(
4006                        "https://accounts.google.com".to_string(),
4007                        "1234567890".to_string(),
4008                    ),
4009                    jwk: JWK {
4010                        kty: "RSA".to_string(),
4011                        e: "AQAB".to_string(),
4012                        n: "1234567890".to_string(),
4013                        alg: "RS256".to_string(),
4014                    },
4015                    epoch: 0,
4016                });
4017            }
4018            jwks
4019        };
4020
4021        let dummy_tx_with_data =
4022            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
4023
4024        for i in 0..15 {
4025            state
4026                .database_for_testing()
4027                .perpetual_tables
4028                .transactions
4029                .insert(&d(i), dummy_tx.serializable_ref())
4030                .unwrap();
4031        }
4032        for i in 15..20 {
4033            state
4034                .database_for_testing()
4035                .perpetual_tables
4036                .transactions
4037                .insert(&d(i), dummy_tx_with_data.serializable_ref())
4038                .unwrap();
4039        }
4040
4041        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
4042        commit_cert_for_test(
4043            &mut store,
4044            state.clone(),
4045            d(1),
4046            vec![d(2), d(3)],
4047            GasCostSummary::new(11, 12, 11, 1),
4048        );
4049        commit_cert_for_test(
4050            &mut store,
4051            state.clone(),
4052            d(2),
4053            vec![d(3), d(4)],
4054            GasCostSummary::new(21, 22, 21, 1),
4055        );
4056        commit_cert_for_test(
4057            &mut store,
4058            state.clone(),
4059            d(3),
4060            vec![],
4061            GasCostSummary::new(31, 32, 31, 1),
4062        );
4063        commit_cert_for_test(
4064            &mut store,
4065            state.clone(),
4066            d(4),
4067            vec![],
4068            GasCostSummary::new(41, 42, 41, 1),
4069        );
4070        for i in [5, 6, 7, 10, 11, 12, 13] {
4071            commit_cert_for_test(
4072                &mut store,
4073                state.clone(),
4074                d(i),
4075                vec![],
4076                GasCostSummary::new(41, 42, 41, 1),
4077            );
4078        }
4079        for i in [15, 16, 17] {
4080            commit_cert_for_test(
4081                &mut store,
4082                state.clone(),
4083                d(i),
4084                vec![],
4085                GasCostSummary::new(51, 52, 51, 1),
4086            );
4087        }
4088        let all_digests: Vec<_> = store.keys().copied().collect();
4089        for digest in all_digests {
4090            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
4091            state
4092                .epoch_store_for_testing()
4093                .test_insert_user_signature(digest, vec![(signature, None)]);
4094        }
4095
4096        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
4097        let (certified_output, mut certified_result) =
4098            mpsc::channel::<CertifiedCheckpointSummary>(10);
4099        let store = Arc::new(store);
4100
4101        let ckpt_dir = tempfile::tempdir().unwrap();
4102        let checkpoint_store =
4103            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
4104        let epoch_store = state.epoch_store_for_testing();
4105
4106        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
4107            state.get_global_state_hash_store().clone(),
4108        ));
4109
4110        let checkpoint_service = CheckpointService::build(
4111            state.clone(),
4112            checkpoint_store,
4113            epoch_store.clone(),
4114            store,
4115            Arc::downgrade(&global_state_hasher),
4116            Box::new(output),
4117            Box::new(certified_output),
4118            CheckpointMetrics::new_for_tests(),
4119            3,
4120            100_000,
4121        );
4122        checkpoint_service.spawn(epoch_store.clone(), None).await;
4123
4124        checkpoint_service
4125            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
4126            .unwrap();
4127        checkpoint_service
4128            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
4129            .unwrap();
4130        checkpoint_service
4131            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
4132            .unwrap();
4133        checkpoint_service
4134            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
4135            .unwrap();
4136        checkpoint_service
4137            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
4138            .unwrap();
4139        checkpoint_service
4140            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
4141            .unwrap();
4142
4143        let (c1c, c1s) = result.recv().await.unwrap();
4144        let (c2c, c2s) = result.recv().await.unwrap();
4145
4146        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4147        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4148        assert_eq!(c1t, vec![d(4)]);
4149        assert_eq!(c1s.previous_digest, None);
4150        assert_eq!(c1s.sequence_number, 0);
4151        assert_eq!(
4152            c1s.epoch_rolling_gas_cost_summary,
4153            GasCostSummary::new(41, 42, 41, 1)
4154        );
4155
4156        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
4157        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
4158        assert_eq!(c2s.sequence_number, 1);
4159        assert_eq!(
4160            c2s.epoch_rolling_gas_cost_summary,
4161            GasCostSummary::new(104, 108, 104, 4)
4162        );
4163
4164        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
4165        // Verify that we split into 2 checkpoints.
4166        let (c3c, c3s) = result.recv().await.unwrap();
4167        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4168        let (c4c, c4s) = result.recv().await.unwrap();
4169        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4170        assert_eq!(c3s.sequence_number, 2);
4171        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
4172        assert_eq!(c4s.sequence_number, 3);
4173        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
4174        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
4175        assert_eq!(c4t, vec![d(13)]);
4176
4177        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
4178        // Verify that we split into 2 checkpoints.
4179        let (c5c, c5s) = result.recv().await.unwrap();
4180        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4181        let (c6c, c6s) = result.recv().await.unwrap();
4182        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4183        assert_eq!(c5s.sequence_number, 4);
4184        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
4185        assert_eq!(c6s.sequence_number, 5);
4186        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
4187        assert_eq!(c5t, vec![d(15), d(16)]);
4188        assert_eq!(c6t, vec![d(17)]);
4189
4190        // Pending at index 4 was too soon after the prior one and should be coalesced into
4191        // the next one.
4192        let (c7c, c7s) = result.recv().await.unwrap();
4193        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
4194        assert_eq!(c7t, vec![d(5), d(6)]);
4195        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
4196        assert_eq!(c7s.sequence_number, 6);
4197
4198        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
4199        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
4200
4201        checkpoint_service
4202            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
4203            .unwrap();
4204        checkpoint_service
4205            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
4206            .unwrap();
4207
4208        let c1sc = certified_result.recv().await.unwrap();
4209        let c2sc = certified_result.recv().await.unwrap();
4210        assert_eq!(c1sc.sequence_number, 0);
4211        assert_eq!(c2sc.sequence_number, 1);
4212    }
4213
4214    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
4215        fn notify_read_executed_effects_may_fail(
4216            &self,
4217            _: &str,
4218            digests: &[TransactionDigest],
4219        ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
4220            std::future::ready(Ok(digests
4221                .iter()
4222                .map(|d| self.get(d).expect("effects not found").clone())
4223                .collect()))
4224            .boxed()
4225        }
4226
4227        fn notify_read_executed_effects_digests(
4228            &self,
4229            _: &str,
4230            digests: &[TransactionDigest],
4231        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
4232            std::future::ready(
4233                digests
4234                    .iter()
4235                    .map(|d| {
4236                        self.get(d)
4237                            .map(|fx| fx.digest())
4238                            .expect("effects not found")
4239                    })
4240                    .collect(),
4241            )
4242            .boxed()
4243        }
4244
4245        fn multi_get_executed_effects(
4246            &self,
4247            digests: &[TransactionDigest],
4248        ) -> Vec<Option<TransactionEffects>> {
4249            digests.iter().map(|d| self.get(d).cloned()).collect()
4250        }
4251
4252        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
4253        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
4254        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
4255        // ExecutionCacheRead implementors).
4256
4257        fn multi_get_transaction_blocks(
4258            &self,
4259            _: &[TransactionDigest],
4260        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
4261            unimplemented!()
4262        }
4263
4264        fn multi_get_executed_effects_digests(
4265            &self,
4266            _: &[TransactionDigest],
4267        ) -> Vec<Option<TransactionEffectsDigest>> {
4268            unimplemented!()
4269        }
4270
4271        fn multi_get_effects(
4272            &self,
4273            _: &[TransactionEffectsDigest],
4274        ) -> Vec<Option<TransactionEffects>> {
4275            unimplemented!()
4276        }
4277
4278        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
4279            unimplemented!()
4280        }
4281
4282        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
4283            unimplemented!()
4284        }
4285
4286        fn get_unchanged_loaded_runtime_objects(
4287            &self,
4288            _digest: &TransactionDigest,
4289        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
4290            unimplemented!()
4291        }
4292
4293        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
4294            unimplemented!()
4295        }
4296    }
4297
4298    #[async_trait::async_trait]
4299    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
4300        async fn checkpoint_created(
4301            &self,
4302            summary: &CheckpointSummary,
4303            contents: &CheckpointContents,
4304            _epoch_store: &Arc<AuthorityPerEpochStore>,
4305            _checkpoint_store: &Arc<CheckpointStore>,
4306        ) -> SuiResult {
4307            self.try_send((contents.clone(), summary.clone())).unwrap();
4308            Ok(())
4309        }
4310    }
4311
4312    #[async_trait::async_trait]
4313    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
4314        async fn certified_checkpoint_created(
4315            &self,
4316            summary: &CertifiedCheckpointSummary,
4317        ) -> SuiResult {
4318            self.try_send(summary.clone()).unwrap();
4319            Ok(())
4320        }
4321    }
4322
4323    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
4324        PendingCheckpoint {
4325            roots: t
4326                .into_iter()
4327                .map(|t| TransactionKey::Digest(d(t)))
4328                .collect(),
4329            details: PendingCheckpointInfo {
4330                timestamp_ms,
4331                last_of_epoch: false,
4332                checkpoint_height: i,
4333                consensus_commit_ref: CommitRef::default(),
4334                rejected_transactions_digest: Digest::default(),
4335                checkpoint_seq: None,
4336            },
4337        }
4338    }
4339
4340    fn d(i: u8) -> TransactionDigest {
4341        let mut bytes: [u8; 32] = Default::default();
4342        bytes[0] = i;
4343        TransactionDigest::new(bytes)
4344    }
4345
4346    fn e(
4347        transaction_digest: TransactionDigest,
4348        dependencies: Vec<TransactionDigest>,
4349        gas_used: GasCostSummary,
4350    ) -> TransactionEffects {
4351        let mut effects = TransactionEffects::default();
4352        *effects.transaction_digest_mut_for_testing() = transaction_digest;
4353        *effects.dependencies_mut_for_testing() = dependencies;
4354        *effects.gas_cost_summary_mut_for_testing() = gas_used;
4355        effects
4356    }
4357
4358    fn commit_cert_for_test(
4359        store: &mut HashMap<TransactionDigest, TransactionEffects>,
4360        state: Arc<AuthorityState>,
4361        digest: TransactionDigest,
4362        dependencies: Vec<TransactionDigest>,
4363        gas_used: GasCostSummary,
4364    ) {
4365        let epoch_store = state.epoch_store_for_testing();
4366        let effects = e(digest, dependencies, gas_used);
4367        store.insert(digest, effects.clone());
4368        epoch_store.insert_executed_in_epoch(&digest);
4369    }
4370}