sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::AccumulatorSettlementTxBuilder;
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::balance_withdraw_scheduler::BalanceSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78    VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92    TypedStoreError,
93    rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101    pub checkpoint_count: u64,
102    pub transaction_count: u64,
103    pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108    pub timestamp_ms: CheckpointTimestamp,
109    pub last_of_epoch: bool,
110    // Computed in calculate_pending_checkpoint_height() from consensus round,
111    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
112    pub checkpoint_height: CheckpointHeight,
113    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
114    pub consensus_commit_ref: CommitRef,
115    pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120    pub roots: Vec<TransactionKey>,
121    pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125pub struct BuilderCheckpointSummary {
126    pub summary: CheckpointSummary,
127    // Height at which this checkpoint summary was built. None for genesis checkpoint
128    pub checkpoint_height: Option<CheckpointHeight>,
129    pub position_in_commit: usize,
130}
131
132#[derive(DBMapUtils)]
133#[cfg_attr(tidehunter, tidehunter)]
134pub struct CheckpointStoreTables {
135    /// Maps checkpoint contents digest to checkpoint contents
136    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
137
138    /// Maps checkpoint contents digest to checkpoint sequence number
139    pub(crate) checkpoint_sequence_by_contents_digest:
140        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
141
142    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
143    /// efficient reads of full checkpoints. Entries from this table are deleted after state
144    /// accumulation has completed.
145    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
146    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
147    // deprecate this table (and remove when possible).
148    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
149    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
150    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
151
152    /// Stores certified checkpoints
153    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
154    /// Map from checkpoint digest to certified checkpoint
155    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
156
157    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
158    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
159    /// certified checkpoint.
160    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
161
162    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
163    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
164
165    /// Watermarks used to determine the highest verified, fully synced, and
166    /// fully executed checkpoints
167    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
168
169    /// Stores transaction fork detection information
170    pub(crate) transaction_fork_detected: DBMap<
171        u8,
172        (
173            TransactionDigest,
174            TransactionEffectsDigest,
175            TransactionEffectsDigest,
176        ),
177    >,
178}
179
180fn full_checkpoint_content_table_default_config() -> DBOptions {
181    DBOptions {
182        options: default_db_options().options,
183        // We have seen potential data corruption issues in this table after forced shutdowns
184        // so we enable value hash logging to help with debugging.
185        // TODO: remove this once we have a better understanding of the root cause.
186        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
187    }
188}
189
190impl CheckpointStoreTables {
191    #[cfg(not(tidehunter))]
192    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
193        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
194    }
195
196    #[cfg(tidehunter)]
197    pub fn new(
198        path: &Path,
199        metric_name: &'static str,
200        pruner_watermarks: Arc<PrunerWatermarks>,
201    ) -> Self {
202        tracing::warn!("Checkpoint DB using tidehunter");
203        use crate::authority::authority_store_pruner::apply_relocation_filter;
204        use typed_store::tidehunter_util::{
205            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
206            default_mutex_count, default_value_cache_size,
207        };
208        let mutexes = default_mutex_count() * 4;
209        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
210        let override_dirty_keys_config = KeySpaceConfig::new()
211            .with_max_dirty_keys(64_000)
212            .with_value_cache_size(default_value_cache_size());
213        let config_u64 = ThConfig::new_with_config(
214            8,
215            mutexes,
216            u64_sequence_key,
217            override_dirty_keys_config.clone(),
218        );
219        let digest_config = ThConfig::new_with_rm_prefix(
220            32,
221            mutexes,
222            KeyType::uniform(default_cells_per_mutex()),
223            KeySpaceConfig::default(),
224            vec![0, 0, 0, 0, 0, 0, 0, 32],
225        );
226        let watermarks_config = KeySpaceConfig::new()
227            .with_value_cache_size(10)
228            .disable_unload();
229        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
230        let configs = vec![
231            (
232                "checkpoint_content",
233                digest_config.clone().with_config(
234                    lru_config
235                        .clone()
236                        .with_relocation_filter(|_, _| Decision::Remove),
237                ),
238            ),
239            (
240                "checkpoint_sequence_by_contents_digest",
241                digest_config.clone().with_config(apply_relocation_filter(
242                    KeySpaceConfig::default(),
243                    pruner_watermarks.checkpoint_id.clone(),
244                    |sequence_number: CheckpointSequenceNumber| sequence_number,
245                    false,
246                )),
247            ),
248            (
249                "full_checkpoint_content",
250                config_u64.clone().with_config(apply_relocation_filter(
251                    override_dirty_keys_config.clone(),
252                    pruner_watermarks.checkpoint_id.clone(),
253                    |sequence_number: CheckpointSequenceNumber| sequence_number,
254                    true,
255                )),
256            ),
257            ("certified_checkpoints", config_u64.clone()),
258            (
259                "checkpoint_by_digest",
260                digest_config.clone().with_config(apply_relocation_filter(
261                    lru_config,
262                    pruner_watermarks.epoch_id.clone(),
263                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
264                    false,
265                )),
266            ),
267            (
268                "locally_computed_checkpoints",
269                config_u64.clone().with_config(apply_relocation_filter(
270                    override_dirty_keys_config.clone(),
271                    pruner_watermarks.checkpoint_id.clone(),
272                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
273                    true,
274                )),
275            ),
276            ("epoch_last_checkpoint_map", config_u64.clone()),
277            (
278                "watermarks",
279                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
280            ),
281            (
282                "transaction_fork_detected",
283                ThConfig::new_with_config(
284                    1,
285                    1,
286                    KeyType::uniform(1),
287                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
288                ),
289            ),
290        ];
291        Self::open_tables_read_write(
292            path.to_path_buf(),
293            MetricConf::new(metric_name),
294            configs
295                .into_iter()
296                .map(|(cf, config)| (cf.to_string(), config))
297                .collect(),
298        )
299    }
300
301    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
302        Self::get_read_only_handle(
303            path.to_path_buf(),
304            None,
305            None,
306            MetricConf::new("checkpoint_readonly"),
307        )
308    }
309}
310
311pub struct CheckpointStore {
312    pub(crate) tables: CheckpointStoreTables,
313    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
314    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
315}
316
317impl CheckpointStore {
318    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
319        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
320        Arc::new(Self {
321            tables,
322            synced_checkpoint_notify_read: NotifyRead::new(),
323            executed_checkpoint_notify_read: NotifyRead::new(),
324        })
325    }
326
327    pub fn new_for_tests() -> Arc<Self> {
328        let ckpt_dir = mysten_common::tempdir().unwrap();
329        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
330    }
331
332    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
333        let tables = CheckpointStoreTables::new(
334            path,
335            "db_checkpoint",
336            Arc::new(PrunerWatermarks::default()),
337        );
338        Arc::new(Self {
339            tables,
340            synced_checkpoint_notify_read: NotifyRead::new(),
341            executed_checkpoint_notify_read: NotifyRead::new(),
342        })
343    }
344
345    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
346        CheckpointStoreTables::open_readonly(path)
347    }
348
349    #[instrument(level = "info", skip_all)]
350    pub fn insert_genesis_checkpoint(
351        &self,
352        checkpoint: VerifiedCheckpoint,
353        contents: CheckpointContents,
354        epoch_store: &AuthorityPerEpochStore,
355    ) {
356        assert_eq!(
357            checkpoint.epoch(),
358            0,
359            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
360        );
361        assert_eq!(
362            *checkpoint.sequence_number(),
363            0,
364            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
365        );
366
367        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
368        match self.get_checkpoint_by_sequence_number(0).unwrap() {
369            Some(existing_checkpoint) => {
370                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
371            }
372            None => {
373                if epoch_store.epoch() == checkpoint.epoch {
374                    epoch_store
375                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
376                        .unwrap();
377                } else {
378                    debug!(
379                        validator_epoch =% epoch_store.epoch(),
380                        genesis_epoch =% checkpoint.epoch(),
381                        "Not inserting checkpoint builder data for genesis checkpoint",
382                    );
383                }
384                self.insert_checkpoint_contents(contents).unwrap();
385                self.insert_verified_checkpoint(&checkpoint).unwrap();
386                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
387            }
388        }
389    }
390
391    pub fn get_checkpoint_by_digest(
392        &self,
393        digest: &CheckpointDigest,
394    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
395        self.tables
396            .checkpoint_by_digest
397            .get(digest)
398            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
399    }
400
401    pub fn get_checkpoint_by_sequence_number(
402        &self,
403        sequence_number: CheckpointSequenceNumber,
404    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
405        self.tables
406            .certified_checkpoints
407            .get(&sequence_number)
408            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
409    }
410
411    pub fn get_locally_computed_checkpoint(
412        &self,
413        sequence_number: CheckpointSequenceNumber,
414    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
415        self.tables
416            .locally_computed_checkpoints
417            .get(&sequence_number)
418    }
419
420    pub fn multi_get_locally_computed_checkpoints(
421        &self,
422        sequence_numbers: &[CheckpointSequenceNumber],
423    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
424        let checkpoints = self
425            .tables
426            .locally_computed_checkpoints
427            .multi_get(sequence_numbers)?;
428
429        Ok(checkpoints)
430    }
431
432    pub fn get_sequence_number_by_contents_digest(
433        &self,
434        digest: &CheckpointContentsDigest,
435    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
436        self.tables
437            .checkpoint_sequence_by_contents_digest
438            .get(digest)
439    }
440
441    pub fn delete_contents_digest_sequence_number_mapping(
442        &self,
443        digest: &CheckpointContentsDigest,
444    ) -> Result<(), TypedStoreError> {
445        self.tables
446            .checkpoint_sequence_by_contents_digest
447            .remove(digest)
448    }
449
450    pub fn get_latest_certified_checkpoint(
451        &self,
452    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
453        Ok(self
454            .tables
455            .certified_checkpoints
456            .reversed_safe_iter_with_bounds(None, None)?
457            .next()
458            .transpose()?
459            .map(|(_, v)| v.into()))
460    }
461
462    pub fn get_latest_locally_computed_checkpoint(
463        &self,
464    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
465        Ok(self
466            .tables
467            .locally_computed_checkpoints
468            .reversed_safe_iter_with_bounds(None, None)?
469            .next()
470            .transpose()?
471            .map(|(_, v)| v))
472    }
473
474    pub fn multi_get_checkpoint_by_sequence_number(
475        &self,
476        sequence_numbers: &[CheckpointSequenceNumber],
477    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
478        let checkpoints = self
479            .tables
480            .certified_checkpoints
481            .multi_get(sequence_numbers)?
482            .into_iter()
483            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
484            .collect();
485
486        Ok(checkpoints)
487    }
488
489    pub fn multi_get_checkpoint_content(
490        &self,
491        contents_digest: &[CheckpointContentsDigest],
492    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
493        self.tables.checkpoint_content.multi_get(contents_digest)
494    }
495
496    pub fn get_highest_verified_checkpoint(
497        &self,
498    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
499        let highest_verified = if let Some(highest_verified) = self
500            .tables
501            .watermarks
502            .get(&CheckpointWatermark::HighestVerified)?
503        {
504            highest_verified
505        } else {
506            return Ok(None);
507        };
508        self.get_checkpoint_by_digest(&highest_verified.1)
509    }
510
511    pub fn get_highest_synced_checkpoint(
512        &self,
513    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
514        let highest_synced = if let Some(highest_synced) = self
515            .tables
516            .watermarks
517            .get(&CheckpointWatermark::HighestSynced)?
518        {
519            highest_synced
520        } else {
521            return Ok(None);
522        };
523        self.get_checkpoint_by_digest(&highest_synced.1)
524    }
525
526    pub fn get_highest_synced_checkpoint_seq_number(
527        &self,
528    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
529        if let Some(highest_synced) = self
530            .tables
531            .watermarks
532            .get(&CheckpointWatermark::HighestSynced)?
533        {
534            Ok(Some(highest_synced.0))
535        } else {
536            Ok(None)
537        }
538    }
539
540    pub fn get_highest_executed_checkpoint_seq_number(
541        &self,
542    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
543        if let Some(highest_executed) = self
544            .tables
545            .watermarks
546            .get(&CheckpointWatermark::HighestExecuted)?
547        {
548            Ok(Some(highest_executed.0))
549        } else {
550            Ok(None)
551        }
552    }
553
554    pub fn get_highest_executed_checkpoint(
555        &self,
556    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
557        let highest_executed = if let Some(highest_executed) = self
558            .tables
559            .watermarks
560            .get(&CheckpointWatermark::HighestExecuted)?
561        {
562            highest_executed
563        } else {
564            return Ok(None);
565        };
566        self.get_checkpoint_by_digest(&highest_executed.1)
567    }
568
569    pub fn get_highest_pruned_checkpoint_seq_number(
570        &self,
571    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
572        self.tables
573            .watermarks
574            .get(&CheckpointWatermark::HighestPruned)
575            .map(|watermark| watermark.map(|w| w.0))
576    }
577
578    pub fn get_checkpoint_contents(
579        &self,
580        digest: &CheckpointContentsDigest,
581    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
582        self.tables.checkpoint_content.get(digest)
583    }
584
585    pub fn get_full_checkpoint_contents_by_sequence_number(
586        &self,
587        seq: CheckpointSequenceNumber,
588    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
589        self.tables.full_checkpoint_content_v2.get(&seq)
590    }
591
592    fn prune_local_summaries(&self) -> SuiResult {
593        if let Some((last_local_summary, _)) = self
594            .tables
595            .locally_computed_checkpoints
596            .reversed_safe_iter_with_bounds(None, None)?
597            .next()
598            .transpose()?
599        {
600            let mut batch = self.tables.locally_computed_checkpoints.batch();
601            batch.schedule_delete_range(
602                &self.tables.locally_computed_checkpoints,
603                &0,
604                &last_local_summary,
605            )?;
606            batch.write()?;
607            info!("Pruned local summaries up to {:?}", last_local_summary);
608        }
609        Ok(())
610    }
611
612    pub fn clear_locally_computed_checkpoints_from(
613        &self,
614        from_seq: CheckpointSequenceNumber,
615    ) -> SuiResult {
616        let keys: Vec<_> = self
617            .tables
618            .locally_computed_checkpoints
619            .safe_iter_with_bounds(Some(from_seq), None)
620            .map(|r| r.map(|(k, _)| k))
621            .collect::<Result<_, _>>()?;
622        if let Some(&last_local_summary) = keys.last() {
623            let mut batch = self.tables.locally_computed_checkpoints.batch();
624            batch
625                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
626                .expect("Failed to delete locally computed checkpoints");
627            batch
628                .write()
629                .expect("Failed to delete locally computed checkpoints");
630            warn!(
631                from_seq,
632                last_local_summary,
633                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
634                from_seq,
635                last_local_summary
636            );
637        }
638        Ok(())
639    }
640
641    fn check_for_checkpoint_fork(
642        &self,
643        local_checkpoint: &CheckpointSummary,
644        verified_checkpoint: &VerifiedCheckpoint,
645    ) {
646        if local_checkpoint != verified_checkpoint.data() {
647            let verified_contents = self
648                .get_checkpoint_contents(&verified_checkpoint.content_digest)
649                .map(|opt_contents| {
650                    opt_contents
651                        .map(|contents| format!("{:?}", contents))
652                        .unwrap_or_else(|| {
653                            format!(
654                                "Verified checkpoint contents not found, digest: {:?}",
655                                verified_checkpoint.content_digest,
656                            )
657                        })
658                })
659                .map_err(|e| {
660                    format!(
661                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
662                        verified_checkpoint.content_digest, e
663                    )
664                })
665                .unwrap_or_else(|err_msg| err_msg);
666
667            let local_contents = self
668                .get_checkpoint_contents(&local_checkpoint.content_digest)
669                .map(|opt_contents| {
670                    opt_contents
671                        .map(|contents| format!("{:?}", contents))
672                        .unwrap_or_else(|| {
673                            format!(
674                                "Local checkpoint contents not found, digest: {:?}",
675                                local_checkpoint.content_digest
676                            )
677                        })
678                })
679                .map_err(|e| {
680                    format!(
681                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
682                        local_checkpoint.content_digest, e
683                    )
684                })
685                .unwrap_or_else(|err_msg| err_msg);
686
687            // checkpoint contents may be too large for panic message.
688            error!(
689                verified_checkpoint = ?verified_checkpoint.data(),
690                ?verified_contents,
691                ?local_checkpoint,
692                ?local_contents,
693                "Local checkpoint fork detected!",
694            );
695
696            // Record the fork in the database before crashing
697            if let Err(e) = self.record_checkpoint_fork_detected(
698                *local_checkpoint.sequence_number(),
699                local_checkpoint.digest(),
700            ) {
701                error!("Failed to record checkpoint fork in database: {:?}", e);
702            }
703
704            fail_point_arg!(
705                "kill_checkpoint_fork_node",
706                |checkpoint_overrides: std::sync::Arc<
707                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
708                >| {
709                    #[cfg(msim)]
710                    {
711                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
712                            overrides.insert(
713                                local_checkpoint.sequence_number,
714                                verified_checkpoint.digest().to_string(),
715                            );
716                        }
717                        tracing::error!(
718                            fatal = true,
719                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
720                            local_checkpoint.sequence_number(),
721                            verified_checkpoint.digest()
722                        );
723                        sui_simulator::task::shutdown_current_node();
724                    }
725                }
726            );
727
728            fatal!(
729                "Local checkpoint fork detected for sequence number: {}",
730                local_checkpoint.sequence_number()
731            );
732        }
733    }
734
735    // Called by consensus (ConsensusAggregator).
736    // Different from `insert_verified_checkpoint`, it does not touch
737    // the highest_verified_checkpoint watermark such that state sync
738    // will have a chance to process this checkpoint and perform some
739    // state-sync only things.
740    pub fn insert_certified_checkpoint(
741        &self,
742        checkpoint: &VerifiedCheckpoint,
743    ) -> Result<(), TypedStoreError> {
744        debug!(
745            checkpoint_seq = checkpoint.sequence_number(),
746            "Inserting certified checkpoint",
747        );
748        let mut batch = self.tables.certified_checkpoints.batch();
749        batch
750            .insert_batch(
751                &self.tables.certified_checkpoints,
752                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
753            )?
754            .insert_batch(
755                &self.tables.checkpoint_by_digest,
756                [(checkpoint.digest(), checkpoint.serializable_ref())],
757            )?;
758        if checkpoint.next_epoch_committee().is_some() {
759            batch.insert_batch(
760                &self.tables.epoch_last_checkpoint_map,
761                [(&checkpoint.epoch(), checkpoint.sequence_number())],
762            )?;
763        }
764        batch.write()?;
765
766        if let Some(local_checkpoint) = self
767            .tables
768            .locally_computed_checkpoints
769            .get(checkpoint.sequence_number())?
770        {
771            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
772        }
773
774        Ok(())
775    }
776
777    // Called by state sync, apart from inserting the checkpoint and updating
778    // related tables, it also bumps the highest_verified_checkpoint watermark.
779    #[instrument(level = "debug", skip_all)]
780    pub fn insert_verified_checkpoint(
781        &self,
782        checkpoint: &VerifiedCheckpoint,
783    ) -> Result<(), TypedStoreError> {
784        self.insert_certified_checkpoint(checkpoint)?;
785        self.update_highest_verified_checkpoint(checkpoint)
786    }
787
788    pub fn update_highest_verified_checkpoint(
789        &self,
790        checkpoint: &VerifiedCheckpoint,
791    ) -> Result<(), TypedStoreError> {
792        if Some(*checkpoint.sequence_number())
793            > self
794                .get_highest_verified_checkpoint()?
795                .map(|x| *x.sequence_number())
796        {
797            debug!(
798                checkpoint_seq = checkpoint.sequence_number(),
799                "Updating highest verified checkpoint",
800            );
801            self.tables.watermarks.insert(
802                &CheckpointWatermark::HighestVerified,
803                &(*checkpoint.sequence_number(), *checkpoint.digest()),
804            )?;
805        }
806
807        Ok(())
808    }
809
810    pub fn update_highest_synced_checkpoint(
811        &self,
812        checkpoint: &VerifiedCheckpoint,
813    ) -> Result<(), TypedStoreError> {
814        let seq = *checkpoint.sequence_number();
815        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
816        self.tables.watermarks.insert(
817            &CheckpointWatermark::HighestSynced,
818            &(seq, *checkpoint.digest()),
819        )?;
820        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
821        Ok(())
822    }
823
824    async fn notify_read_checkpoint_watermark<F>(
825        &self,
826        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
827        seq: CheckpointSequenceNumber,
828        get_watermark: F,
829    ) -> VerifiedCheckpoint
830    where
831        F: Fn() -> Option<CheckpointSequenceNumber>,
832    {
833        notify_read
834            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
835                let seq = seqs[0];
836                let Some(highest) = get_watermark() else {
837                    return vec![None];
838                };
839                if highest < seq {
840                    return vec![None];
841                }
842                let checkpoint = self
843                    .get_checkpoint_by_sequence_number(seq)
844                    .expect("db error")
845                    .expect("checkpoint not found");
846                vec![Some(checkpoint)]
847            })
848            .await
849            .into_iter()
850            .next()
851            .unwrap()
852    }
853
854    pub async fn notify_read_synced_checkpoint(
855        &self,
856        seq: CheckpointSequenceNumber,
857    ) -> VerifiedCheckpoint {
858        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
859            self.get_highest_synced_checkpoint_seq_number()
860                .expect("db error")
861        })
862        .await
863    }
864
865    pub async fn notify_read_executed_checkpoint(
866        &self,
867        seq: CheckpointSequenceNumber,
868    ) -> VerifiedCheckpoint {
869        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
870            self.get_highest_executed_checkpoint_seq_number()
871                .expect("db error")
872        })
873        .await
874    }
875
876    pub fn update_highest_executed_checkpoint(
877        &self,
878        checkpoint: &VerifiedCheckpoint,
879    ) -> Result<(), TypedStoreError> {
880        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
881            if seq_number >= *checkpoint.sequence_number() {
882                return Ok(());
883            }
884            assert_eq!(
885                seq_number + 1,
886                *checkpoint.sequence_number(),
887                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
888                checkpoint.sequence_number(),
889                seq_number
890            );
891        }
892        let seq = *checkpoint.sequence_number();
893        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
894        self.tables.watermarks.insert(
895            &CheckpointWatermark::HighestExecuted,
896            &(seq, *checkpoint.digest()),
897        )?;
898        self.executed_checkpoint_notify_read
899            .notify(&seq, checkpoint);
900        Ok(())
901    }
902
903    pub fn update_highest_pruned_checkpoint(
904        &self,
905        checkpoint: &VerifiedCheckpoint,
906    ) -> Result<(), TypedStoreError> {
907        self.tables.watermarks.insert(
908            &CheckpointWatermark::HighestPruned,
909            &(*checkpoint.sequence_number(), *checkpoint.digest()),
910        )
911    }
912
913    /// Sets highest executed checkpoint to any value.
914    ///
915    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
916    /// It should only be used in one-off cases or tests after fully understanding the risk.
917    pub fn set_highest_executed_checkpoint_subtle(
918        &self,
919        checkpoint: &VerifiedCheckpoint,
920    ) -> Result<(), TypedStoreError> {
921        self.tables.watermarks.insert(
922            &CheckpointWatermark::HighestExecuted,
923            &(*checkpoint.sequence_number(), *checkpoint.digest()),
924        )
925    }
926
927    pub fn insert_checkpoint_contents(
928        &self,
929        contents: CheckpointContents,
930    ) -> Result<(), TypedStoreError> {
931        debug!(
932            checkpoint_seq = ?contents.digest(),
933            "Inserting checkpoint contents",
934        );
935        self.tables
936            .checkpoint_content
937            .insert(contents.digest(), &contents)
938    }
939
940    pub fn insert_verified_checkpoint_contents(
941        &self,
942        checkpoint: &VerifiedCheckpoint,
943        full_contents: VerifiedCheckpointContents,
944    ) -> Result<(), TypedStoreError> {
945        let mut batch = self.tables.full_checkpoint_content_v2.batch();
946        batch.insert_batch(
947            &self.tables.checkpoint_sequence_by_contents_digest,
948            [(&checkpoint.content_digest, checkpoint.sequence_number())],
949        )?;
950        let full_contents = full_contents.into_inner();
951        batch.insert_batch(
952            &self.tables.full_checkpoint_content_v2,
953            [(checkpoint.sequence_number(), &full_contents)],
954        )?;
955
956        let contents = full_contents.into_checkpoint_contents();
957        assert_eq!(&checkpoint.content_digest, contents.digest());
958
959        batch.insert_batch(
960            &self.tables.checkpoint_content,
961            [(contents.digest(), &contents)],
962        )?;
963
964        batch.write()
965    }
966
967    pub fn delete_full_checkpoint_contents(
968        &self,
969        seq: CheckpointSequenceNumber,
970    ) -> Result<(), TypedStoreError> {
971        self.tables.full_checkpoint_content.remove(&seq)?;
972        self.tables.full_checkpoint_content_v2.remove(&seq)
973    }
974
975    pub fn get_epoch_last_checkpoint(
976        &self,
977        epoch_id: EpochId,
978    ) -> SuiResult<Option<VerifiedCheckpoint>> {
979        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
980        let checkpoint = match seq {
981            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
982            None => None,
983        };
984        Ok(checkpoint)
985    }
986
987    pub fn get_epoch_last_checkpoint_seq_number(
988        &self,
989        epoch_id: EpochId,
990    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
991        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
992        Ok(seq)
993    }
994
995    pub fn insert_epoch_last_checkpoint(
996        &self,
997        epoch_id: EpochId,
998        checkpoint: &VerifiedCheckpoint,
999    ) -> SuiResult {
1000        self.tables
1001            .epoch_last_checkpoint_map
1002            .insert(&epoch_id, checkpoint.sequence_number())?;
1003        Ok(())
1004    }
1005
1006    pub fn get_epoch_state_commitments(
1007        &self,
1008        epoch: EpochId,
1009    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1010        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1011            checkpoint
1012                .end_of_epoch_data
1013                .as_ref()
1014                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1015                .epoch_commitments
1016                .clone()
1017        });
1018        Ok(commitments)
1019    }
1020
1021    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1022    pub fn get_epoch_stats(
1023        &self,
1024        epoch: EpochId,
1025        last_checkpoint: &CheckpointSummary,
1026    ) -> Option<EpochStats> {
1027        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1028            (0, 0)
1029        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1030            (
1031                checkpoint.sequence_number + 1,
1032                checkpoint.network_total_transactions,
1033            )
1034        } else {
1035            return None;
1036        };
1037        Some(EpochStats {
1038            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1039            transaction_count: last_checkpoint.network_total_transactions
1040                - prev_epoch_network_transactions,
1041            total_gas_reward: last_checkpoint
1042                .epoch_rolling_gas_cost_summary
1043                .computation_cost,
1044        })
1045    }
1046
1047    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1048        // This checkpoints the entire db and not one column family
1049        self.tables
1050            .checkpoint_content
1051            .checkpoint_db(path)
1052            .map_err(Into::into)
1053    }
1054
1055    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1056        let mut wb = self.tables.watermarks.batch();
1057        wb.delete_batch(
1058            &self.tables.watermarks,
1059            std::iter::once(CheckpointWatermark::HighestExecuted),
1060        )?;
1061        wb.write()?;
1062        Ok(())
1063    }
1064
1065    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1066        self.delete_highest_executed_checkpoint_test_only()?;
1067        Ok(())
1068    }
1069
1070    pub fn record_checkpoint_fork_detected(
1071        &self,
1072        checkpoint_seq: CheckpointSequenceNumber,
1073        checkpoint_digest: CheckpointDigest,
1074    ) -> Result<(), TypedStoreError> {
1075        info!(
1076            checkpoint_seq = checkpoint_seq,
1077            checkpoint_digest = ?checkpoint_digest,
1078            "Recording checkpoint fork detection in database"
1079        );
1080        self.tables.watermarks.insert(
1081            &CheckpointWatermark::CheckpointForkDetected,
1082            &(checkpoint_seq, checkpoint_digest),
1083        )
1084    }
1085
1086    pub fn get_checkpoint_fork_detected(
1087        &self,
1088    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1089        self.tables
1090            .watermarks
1091            .get(&CheckpointWatermark::CheckpointForkDetected)
1092    }
1093
1094    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1095        self.tables
1096            .watermarks
1097            .remove(&CheckpointWatermark::CheckpointForkDetected)
1098    }
1099
1100    pub fn record_transaction_fork_detected(
1101        &self,
1102        tx_digest: TransactionDigest,
1103        expected_effects_digest: TransactionEffectsDigest,
1104        actual_effects_digest: TransactionEffectsDigest,
1105    ) -> Result<(), TypedStoreError> {
1106        info!(
1107            tx_digest = ?tx_digest,
1108            expected_effects_digest = ?expected_effects_digest,
1109            actual_effects_digest = ?actual_effects_digest,
1110            "Recording transaction fork detection in database"
1111        );
1112        self.tables.transaction_fork_detected.insert(
1113            &TRANSACTION_FORK_DETECTED_KEY,
1114            &(tx_digest, expected_effects_digest, actual_effects_digest),
1115        )
1116    }
1117
1118    pub fn get_transaction_fork_detected(
1119        &self,
1120    ) -> Result<
1121        Option<(
1122            TransactionDigest,
1123            TransactionEffectsDigest,
1124            TransactionEffectsDigest,
1125        )>,
1126        TypedStoreError,
1127    > {
1128        self.tables
1129            .transaction_fork_detected
1130            .get(&TRANSACTION_FORK_DETECTED_KEY)
1131    }
1132
1133    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1134        self.tables
1135            .transaction_fork_detected
1136            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1137    }
1138}
1139
1140#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1141pub enum CheckpointWatermark {
1142    HighestVerified,
1143    HighestSynced,
1144    HighestExecuted,
1145    HighestPruned,
1146    CheckpointForkDetected,
1147}
1148
1149struct CheckpointStateHasher {
1150    epoch_store: Arc<AuthorityPerEpochStore>,
1151    hasher: Weak<GlobalStateHasher>,
1152    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1153}
1154
1155impl CheckpointStateHasher {
1156    fn new(
1157        epoch_store: Arc<AuthorityPerEpochStore>,
1158        hasher: Weak<GlobalStateHasher>,
1159        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1160    ) -> Self {
1161        Self {
1162            epoch_store,
1163            hasher,
1164            receive_from_builder,
1165        }
1166    }
1167
1168    async fn run(self) {
1169        let Self {
1170            epoch_store,
1171            hasher,
1172            mut receive_from_builder,
1173        } = self;
1174        while let Some((seq, effects)) = receive_from_builder.recv().await {
1175            let Some(hasher) = hasher.upgrade() else {
1176                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1177                break;
1178            };
1179            hasher
1180                .accumulate_checkpoint(&effects, seq, &epoch_store)
1181                .expect("epoch ended while accumulating checkpoint");
1182        }
1183    }
1184}
1185
1186#[derive(Debug)]
1187pub(crate) enum CheckpointBuilderError {
1188    ChangeEpochTxAlreadyExecuted,
1189    SystemPackagesMissing,
1190    Retry(anyhow::Error),
1191}
1192
1193impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1194    for CheckpointBuilderError
1195{
1196    fn from(e: SuiError) -> Self {
1197        Self::Retry(e.into())
1198    }
1199}
1200
1201pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1202
1203pub struct CheckpointBuilder {
1204    state: Arc<AuthorityState>,
1205    store: Arc<CheckpointStore>,
1206    epoch_store: Arc<AuthorityPerEpochStore>,
1207    notify: Arc<Notify>,
1208    notify_aggregator: Arc<Notify>,
1209    last_built: watch::Sender<CheckpointSequenceNumber>,
1210    effects_store: Arc<dyn TransactionCacheRead>,
1211    global_state_hasher: Weak<GlobalStateHasher>,
1212    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1213    output: Box<dyn CheckpointOutput>,
1214    metrics: Arc<CheckpointMetrics>,
1215    max_transactions_per_checkpoint: usize,
1216    max_checkpoint_size_bytes: usize,
1217}
1218
1219pub struct CheckpointAggregator {
1220    store: Arc<CheckpointStore>,
1221    epoch_store: Arc<AuthorityPerEpochStore>,
1222    notify: Arc<Notify>,
1223    current: Option<CheckpointSignatureAggregator>,
1224    output: Box<dyn CertifiedCheckpointOutput>,
1225    state: Arc<AuthorityState>,
1226    metrics: Arc<CheckpointMetrics>,
1227}
1228
1229// This holds information to aggregate signatures for one checkpoint
1230pub struct CheckpointSignatureAggregator {
1231    next_index: u64,
1232    summary: CheckpointSummary,
1233    digest: CheckpointDigest,
1234    /// Aggregates voting stake for each signed checkpoint proposal by authority
1235    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1236    store: Arc<CheckpointStore>,
1237    state: Arc<AuthorityState>,
1238    metrics: Arc<CheckpointMetrics>,
1239}
1240
1241impl CheckpointBuilder {
1242    fn new(
1243        state: Arc<AuthorityState>,
1244        store: Arc<CheckpointStore>,
1245        epoch_store: Arc<AuthorityPerEpochStore>,
1246        notify: Arc<Notify>,
1247        effects_store: Arc<dyn TransactionCacheRead>,
1248        // for synchronous accumulation of end-of-epoch checkpoint
1249        global_state_hasher: Weak<GlobalStateHasher>,
1250        // for asynchronous/concurrent accumulation of regular checkpoints
1251        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1252        output: Box<dyn CheckpointOutput>,
1253        notify_aggregator: Arc<Notify>,
1254        last_built: watch::Sender<CheckpointSequenceNumber>,
1255        metrics: Arc<CheckpointMetrics>,
1256        max_transactions_per_checkpoint: usize,
1257        max_checkpoint_size_bytes: usize,
1258    ) -> Self {
1259        Self {
1260            state,
1261            store,
1262            epoch_store,
1263            notify,
1264            effects_store,
1265            global_state_hasher,
1266            send_to_hasher,
1267            output,
1268            notify_aggregator,
1269            last_built,
1270            metrics,
1271            max_transactions_per_checkpoint,
1272            max_checkpoint_size_bytes,
1273        }
1274    }
1275
1276    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1277    /// commits that have been processed before the last restart, if consensus_replay_waiter
1278    /// is supplied. Then it starts building checkpoints in a loop.
1279    ///
1280    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1281    /// if slow recovery of previously built checkpoints is due to consensus replay or
1282    /// checkpoint building.
1283    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1284        if let Some(replay_waiter) = consensus_replay_waiter {
1285            info!("Waiting for consensus commits to replay ...");
1286            replay_waiter.wait_for_replay().await;
1287            info!("Consensus commits finished replaying");
1288        }
1289        info!("Starting CheckpointBuilder");
1290        loop {
1291            match self.maybe_build_checkpoints().await {
1292                Ok(()) => {}
1293                err @ Err(
1294                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1295                    | CheckpointBuilderError::SystemPackagesMissing,
1296                ) => {
1297                    info!("CheckpointBuilder stopping: {:?}", err);
1298                    return;
1299                }
1300                Err(CheckpointBuilderError::Retry(inner)) => {
1301                    let msg = format!("{:?}", inner);
1302                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1303                    tokio::time::sleep(Duration::from_secs(1)).await;
1304                    self.metrics.checkpoint_errors.inc();
1305                    continue;
1306                }
1307            }
1308
1309            self.notify.notified().await;
1310        }
1311    }
1312
1313    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1314        let _scope = monitored_scope("BuildCheckpoints");
1315
1316        // Collect info about the most recently built checkpoint.
1317        let summary = self
1318            .epoch_store
1319            .last_built_checkpoint_builder_summary()
1320            .expect("epoch should not have ended");
1321        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1322        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1323
1324        let min_checkpoint_interval_ms = self
1325            .epoch_store
1326            .protocol_config()
1327            .min_checkpoint_interval_ms_as_option()
1328            .unwrap_or_default();
1329        let mut grouped_pending_checkpoints = Vec::new();
1330        let mut checkpoints_iter = self
1331            .epoch_store
1332            .get_pending_checkpoints(last_height)
1333            .expect("unexpected epoch store error")
1334            .into_iter()
1335            .peekable();
1336        while let Some((height, pending)) = checkpoints_iter.next() {
1337            // Group PendingCheckpoints until:
1338            // - minimum interval has elapsed ...
1339            let current_timestamp = pending.details().timestamp_ms;
1340            let can_build = match last_timestamp {
1341                    Some(last_timestamp) => {
1342                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1343                    }
1344                    None => true,
1345                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1346                //   should be written separately) ...
1347                } || checkpoints_iter
1348                    .peek()
1349                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1350                // - or, we have reached end of epoch.
1351                    || pending.details().last_of_epoch;
1352            grouped_pending_checkpoints.push(pending);
1353            if !can_build {
1354                debug!(
1355                    checkpoint_commit_height = height,
1356                    ?last_timestamp,
1357                    ?current_timestamp,
1358                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1359                );
1360                continue;
1361            }
1362
1363            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1364            last_height = Some(height);
1365            last_timestamp = Some(current_timestamp);
1366            debug!(
1367                checkpoint_commit_height_from = grouped_pending_checkpoints
1368                    .first()
1369                    .unwrap()
1370                    .details()
1371                    .checkpoint_height,
1372                checkpoint_commit_height_to = last_height,
1373                "Making checkpoint with commit height range"
1374            );
1375
1376            let seq = self
1377                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1378                .await?;
1379
1380            self.last_built.send_if_modified(|cur| {
1381                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1382                if seq > *cur {
1383                    *cur = seq;
1384                    true
1385                } else {
1386                    false
1387                }
1388            });
1389
1390            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1391            // execution.
1392            tokio::task::yield_now().await;
1393        }
1394        debug!(
1395            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1396            grouped_pending_checkpoints.len(),
1397        );
1398
1399        Ok(())
1400    }
1401
1402    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1403    async fn make_checkpoint(
1404        &mut self,
1405        pendings: Vec<PendingCheckpoint>,
1406    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1407        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1408
1409        let pending_ckpt_str = pendings
1410            .iter()
1411            .map(|p| {
1412                format!(
1413                    "height={}, commit={}",
1414                    p.details().checkpoint_height,
1415                    p.details().consensus_commit_ref
1416                )
1417            })
1418            .join("; ");
1419
1420        let last_details = pendings.last().unwrap().details().clone();
1421
1422        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1423        // in this order.
1424        let highest_executed_sequence = self
1425            .store
1426            .get_highest_executed_checkpoint_seq_number()
1427            .expect("db error")
1428            .unwrap_or(0);
1429
1430        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1431        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1432
1433        let new_checkpoints = self
1434            .create_checkpoints(
1435                sorted_tx_effects_included_in_checkpoint,
1436                &last_details,
1437                &all_roots,
1438            )
1439            .await?;
1440        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1441        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1442            debug_fatal!(
1443                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1444            );
1445        }
1446
1447        let new_ckpt_str = new_checkpoints
1448            .iter()
1449            .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1450            .join("; ");
1451
1452        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1453            .await?;
1454        info!(
1455            "Made new checkpoint {} from pending checkpoint {}",
1456            new_ckpt_str, pending_ckpt_str
1457        );
1458
1459        Ok(highest_sequence)
1460    }
1461
1462    async fn construct_and_execute_settlement_transactions(
1463        &self,
1464        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1465        checkpoint_height: CheckpointHeight,
1466        checkpoint_seq: CheckpointSequenceNumber,
1467        tx_index_offset: u64,
1468    ) -> (TransactionKey, Vec<TransactionEffects>) {
1469        let _scope =
1470            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1471
1472        let tx_key =
1473            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1474
1475        let epoch = self.epoch_store.epoch();
1476        let accumulator_root_obj_initial_shared_version = self
1477            .epoch_store
1478            .epoch_start_config()
1479            .accumulator_root_obj_initial_shared_version()
1480            .expect("accumulator root object must exist");
1481
1482        let builder = AccumulatorSettlementTxBuilder::new(
1483            Some(self.effects_store.as_ref()),
1484            sorted_tx_effects_included_in_checkpoint,
1485            checkpoint_seq,
1486            tx_index_offset,
1487        );
1488
1489        let accumulator_changes = builder.collect_accumulator_changes();
1490        let num_updates = builder.num_updates();
1491        let (settlement_txns, barrier_tx) = builder.build_tx(
1492            self.epoch_store.protocol_config(),
1493            epoch,
1494            accumulator_root_obj_initial_shared_version,
1495            checkpoint_height,
1496            checkpoint_seq,
1497        );
1498
1499        let settlement_txns: Vec<_> = settlement_txns
1500            .into_iter()
1501            .chain(std::iter::once(barrier_tx))
1502            .map(|tx| {
1503                VerifiedExecutableTransaction::new_system(
1504                    VerifiedTransaction::new_system_transaction(tx),
1505                    self.epoch_store.epoch(),
1506                )
1507            })
1508            .collect();
1509
1510        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1511
1512        debug!(
1513            ?settlement_digests,
1514            ?tx_key,
1515            "created settlement transactions with {num_updates} updates"
1516        );
1517
1518        self.epoch_store
1519            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1520
1521        let settlement_effects = loop {
1522            match tokio::time::timeout(Duration::from_secs(5), async {
1523                self.effects_store
1524                    .notify_read_executed_effects(
1525                        "CheckpointBuilder::notify_read_settlement_effects",
1526                        &settlement_digests,
1527                    )
1528                    .await
1529            })
1530            .await
1531            {
1532                Ok(effects) => break effects,
1533                Err(_) => {
1534                    debug_fatal!(
1535                        "Timeout waiting for settlement transactions to be executed {:?}, retrying...",
1536                        tx_key
1537                    );
1538                }
1539            }
1540        };
1541
1542        let mut next_accumulator_version = None;
1543        for fx in settlement_effects.iter() {
1544            assert!(
1545                fx.status().is_ok(),
1546                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1547                fx.transaction_digest(),
1548                fx
1549            );
1550            if let Some(version) = fx
1551                .mutated()
1552                .iter()
1553                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1554            {
1555                assert!(
1556                    next_accumulator_version.is_none(),
1557                    "Only one settlement transaction should mutate the accumulator root object"
1558                );
1559                next_accumulator_version = Some(version);
1560            }
1561        }
1562        let settlements = BalanceSettlement {
1563            next_accumulator_version: next_accumulator_version
1564                .expect("Accumulator root object should be mutated in the settlement transactions"),
1565            balance_changes: accumulator_changes,
1566        };
1567
1568        self.state
1569            .execution_scheduler()
1570            .settle_balances(settlements);
1571
1572        (tx_key, settlement_effects)
1573    }
1574
1575    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1576    // the checkpoint, and return them in the order they should be included in the checkpoint.
1577    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1578    // checkpoint.
1579    #[instrument(level = "debug", skip_all)]
1580    async fn resolve_checkpoint_transactions(
1581        &self,
1582        pending_checkpoints: Vec<PendingCheckpoint>,
1583    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1584        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1585
1586        // Keeps track of the effects that are already included in the current checkpoint.
1587        // This is used when there are multiple pending checkpoints to create a single checkpoint
1588        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1589        // or in earlier pending checkpoints.
1590        let mut effects_in_current_checkpoint = BTreeSet::new();
1591
1592        let mut tx_effects = Vec::new();
1593        let mut tx_roots = HashSet::new();
1594
1595        for pending_checkpoint in pending_checkpoints.into_iter() {
1596            let mut pending = pending_checkpoint;
1597            debug!(
1598                checkpoint_commit_height = pending.details.checkpoint_height,
1599                "Resolving checkpoint transactions for pending checkpoint.",
1600            );
1601
1602            trace!(
1603                "roots for pending checkpoint {:?}: {:?}",
1604                pending.details.checkpoint_height, pending.roots,
1605            );
1606
1607            let settlement_root = if self.epoch_store.accumulators_enabled() {
1608                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1609                    pending.roots.pop()
1610                else {
1611                    fatal!("No settlement root found");
1612                };
1613                Some(settlement_root)
1614            } else {
1615                None
1616            };
1617
1618            let roots = &pending.roots;
1619
1620            self.metrics
1621                .checkpoint_roots_count
1622                .inc_by(roots.len() as u64);
1623
1624            let root_digests = self
1625                .epoch_store
1626                .notify_read_tx_key_to_digest(roots)
1627                .in_monitored_scope("CheckpointNotifyDigests")
1628                .await?;
1629            let root_effects = self
1630                .effects_store
1631                .notify_read_executed_effects(
1632                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1633                    &root_digests,
1634                )
1635                .in_monitored_scope("CheckpointNotifyRead")
1636                .await;
1637
1638            let consensus_commit_prologue = if self
1639                .epoch_store
1640                .protocol_config()
1641                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1642            {
1643                // If the roots contains consensus commit prologue transaction, we want to extract it,
1644                // and put it to the front of the checkpoint.
1645
1646                let consensus_commit_prologue =
1647                    self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1648
1649                // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1650                // other dependencies that haven't been included in any previous checkpoints.
1651                if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1652                    let unsorted_ccp = self.complete_checkpoint_effects(
1653                        vec![ccp_effects.clone()],
1654                        &mut effects_in_current_checkpoint,
1655                    )?;
1656
1657                    // No other dependencies of this consensus commit prologue that haven't been included
1658                    // in any previous checkpoint.
1659                    if unsorted_ccp.len() != 1 {
1660                        fatal!(
1661                            "Expected 1 consensus commit prologue, got {:?}",
1662                            unsorted_ccp
1663                                .iter()
1664                                .map(|e| e.transaction_digest())
1665                                .collect::<Vec<_>>()
1666                        );
1667                    }
1668                    assert_eq!(unsorted_ccp.len(), 1);
1669                    assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1670                }
1671                consensus_commit_prologue
1672            } else {
1673                None
1674            };
1675
1676            let unsorted =
1677                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1678
1679            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1680            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1681            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1682                if cfg!(debug_assertions) {
1683                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1684                    for tx in unsorted.iter() {
1685                        assert!(tx.transaction_digest() != &ccp_digest);
1686                    }
1687                }
1688                sorted.push(ccp_effects);
1689            }
1690            sorted.extend(CausalOrder::causal_sort(unsorted));
1691
1692            if let Some(settlement_root) = settlement_root {
1693                //TODO: this is an incorrect heuristic for checkpoint seq number
1694                //      due to checkpoint splitting, to be fixed separately
1695                let last_checkpoint =
1696                    Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1697                let next_checkpoint_seq = last_checkpoint
1698                    .as_ref()
1699                    .map(|(seq, _)| *seq)
1700                    .unwrap_or_default()
1701                    + 1;
1702                let tx_index_offset = tx_effects.len() as u64;
1703
1704                let (tx_key, settlement_effects) = self
1705                    .construct_and_execute_settlement_transactions(
1706                        &sorted,
1707                        pending.details.checkpoint_height,
1708                        next_checkpoint_seq,
1709                        tx_index_offset,
1710                    )
1711                    .await;
1712                debug!(?tx_key, "executed settlement transactions");
1713
1714                assert_eq!(settlement_root, tx_key);
1715
1716                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1717                // should only include the digests of transactions that were originally roots in
1718                // the pending checkpoint. It is later used to identify transactions which were
1719                // added as dependencies, so that those transactions can be waited on using
1720                // `consensus_messages_processed_notify()`. System transactions (such as
1721                // settlements) are exempt from this already.
1722                sorted.extend(settlement_effects);
1723            }
1724
1725            #[cfg(msim)]
1726            {
1727                // Check consensus commit prologue invariants in sim test.
1728                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1729            }
1730
1731            tx_effects.extend(sorted);
1732            tx_roots.extend(root_digests);
1733        }
1734
1735        Ok((tx_effects, tx_roots))
1736    }
1737
1738    // This function is used to extract the consensus commit prologue digest and effects from the root
1739    // transactions.
1740    // This function can only be used when prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled.
1741    // The consensus commit prologue is expected to be the first transaction in the roots.
1742    fn extract_consensus_commit_prologue(
1743        &self,
1744        root_digests: &[TransactionDigest],
1745        root_effects: &[TransactionEffects],
1746    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1747        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1748        if root_digests.is_empty() {
1749            return Ok(None);
1750        }
1751
1752        // Reads the first transaction in the roots, and checks whether it is a consensus commit prologue
1753        // transaction.
1754        // When prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled, the consensus commit prologue
1755        // transaction should be the first transaction in the roots written by the consensus handler.
1756        let first_tx = self
1757            .state
1758            .get_transaction_cache_reader()
1759            .get_transaction_block(&root_digests[0])
1760            .expect("Transaction block must exist");
1761
1762        Ok(first_tx
1763            .transaction_data()
1764            .is_consensus_commit_prologue()
1765            .then(|| {
1766                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1767                (*first_tx.digest(), root_effects[0].clone())
1768            }))
1769    }
1770
1771    #[instrument(level = "debug", skip_all)]
1772    async fn write_checkpoints(
1773        &mut self,
1774        height: CheckpointHeight,
1775        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1776    ) -> SuiResult {
1777        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1778        let mut batch = self.store.tables.checkpoint_content.batch();
1779        let mut all_tx_digests =
1780            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1781
1782        for (summary, contents) in &new_checkpoints {
1783            debug!(
1784                checkpoint_commit_height = height,
1785                checkpoint_seq = summary.sequence_number,
1786                contents_digest = ?contents.digest(),
1787                "writing checkpoint",
1788            );
1789
1790            if let Some(previously_computed_summary) = self
1791                .store
1792                .tables
1793                .locally_computed_checkpoints
1794                .get(&summary.sequence_number)?
1795                && previously_computed_summary.digest() != summary.digest()
1796            {
1797                fatal!(
1798                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1799                    summary.sequence_number,
1800                    previously_computed_summary.digest(),
1801                    summary.digest()
1802                );
1803            }
1804
1805            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1806
1807            self.metrics
1808                .transactions_included_in_checkpoint
1809                .inc_by(contents.size() as u64);
1810            let sequence_number = summary.sequence_number;
1811            self.metrics
1812                .last_constructed_checkpoint
1813                .set(sequence_number as i64);
1814
1815            batch.insert_batch(
1816                &self.store.tables.checkpoint_content,
1817                [(contents.digest(), contents)],
1818            )?;
1819
1820            batch.insert_batch(
1821                &self.store.tables.locally_computed_checkpoints,
1822                [(sequence_number, summary)],
1823            )?;
1824        }
1825
1826        batch.write()?;
1827
1828        // Send all checkpoint sigs to consensus.
1829        for (summary, contents) in &new_checkpoints {
1830            self.output
1831                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1832                .await?;
1833        }
1834
1835        for (local_checkpoint, _) in &new_checkpoints {
1836            if let Some(certified_checkpoint) = self
1837                .store
1838                .tables
1839                .certified_checkpoints
1840                .get(local_checkpoint.sequence_number())?
1841            {
1842                self.store
1843                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1844            }
1845        }
1846
1847        self.notify_aggregator.notify_one();
1848        self.epoch_store
1849            .process_constructed_checkpoint(height, new_checkpoints);
1850        Ok(())
1851    }
1852
1853    #[allow(clippy::type_complexity)]
1854    fn split_checkpoint_chunks(
1855        &self,
1856        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1857        signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
1858    ) -> CheckpointBuilderResult<
1859        Vec<
1860            Vec<(
1861                TransactionEffects,
1862                Vec<(GenericSignature, Option<SequenceNumber>)>,
1863            )>,
1864        >,
1865    > {
1866        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1867        let mut chunks = Vec::new();
1868        let mut chunk = Vec::new();
1869        let mut chunk_size: usize = 0;
1870        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1871            .into_iter()
1872            .zip(signatures.into_iter())
1873        {
1874            // Roll over to a new chunk after either max count or max size is reached.
1875            // The size calculation here is intended to estimate the size of the
1876            // FullCheckpointContents struct. If this code is modified, that struct
1877            // should also be updated accordingly.
1878            let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
1879                bcs::serialized_size(&signatures)?
1880            } else {
1881                let signatures: Vec<&GenericSignature> =
1882                    signatures.iter().map(|(s, _)| s).collect();
1883                bcs::serialized_size(&signatures)?
1884            };
1885            let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
1886            if chunk.len() == self.max_transactions_per_checkpoint
1887                || (chunk_size + size) > self.max_checkpoint_size_bytes
1888            {
1889                if chunk.is_empty() {
1890                    // Always allow at least one tx in a checkpoint.
1891                    warn!(
1892                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1893                        self.max_checkpoint_size_bytes
1894                    );
1895                } else {
1896                    chunks.push(chunk);
1897                    chunk = Vec::new();
1898                    chunk_size = 0;
1899                }
1900            }
1901
1902            chunk.push((effects, signatures));
1903            chunk_size += size;
1904        }
1905
1906        if !chunk.is_empty() || chunks.is_empty() {
1907            // We intentionally create an empty checkpoint if there is no content provided
1908            // to make a 'heartbeat' checkpoint.
1909            // Important: if some conditions are added here later, we need to make sure we always
1910            // have at least one chunk if last_pending_of_epoch is set
1911            chunks.push(chunk);
1912            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
1913            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
1914            // distinguish between "no transactions have happened" and "i am not receiving new
1915            // checkpoints".
1916        }
1917        Ok(chunks)
1918    }
1919
1920    fn load_last_built_checkpoint_summary(
1921        epoch_store: &AuthorityPerEpochStore,
1922        store: &CheckpointStore,
1923    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1924        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1925        if last_checkpoint.is_none() {
1926            let epoch = epoch_store.epoch();
1927            if epoch > 0 {
1928                let previous_epoch = epoch - 1;
1929                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1930                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1931                if let Some((ref seq, _)) = last_checkpoint {
1932                    debug!(
1933                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1934                    );
1935                } else {
1936                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
1937                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1938                }
1939            }
1940        }
1941        Ok(last_checkpoint)
1942    }
1943
1944    #[instrument(level = "debug", skip_all)]
1945    async fn create_checkpoints(
1946        &self,
1947        all_effects: Vec<TransactionEffects>,
1948        details: &PendingCheckpointInfo,
1949        all_roots: &HashSet<TransactionDigest>,
1950    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1951        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1952
1953        let total = all_effects.len();
1954        let mut last_checkpoint =
1955            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1956        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1957        info!(
1958            checkpoint_commit_height = details.checkpoint_height,
1959            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1960            checkpoint_timestamp = details.timestamp_ms,
1961            "Creating checkpoint(s) for {} transactions",
1962            all_effects.len(),
1963        );
1964
1965        let all_digests: Vec<_> = all_effects
1966            .iter()
1967            .map(|effect| *effect.transaction_digest())
1968            .collect();
1969        let transactions_and_sizes = self
1970            .state
1971            .get_transaction_cache_reader()
1972            .get_transactions_and_serialized_sizes(&all_digests)?;
1973        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1974        let mut transactions = Vec::with_capacity(all_effects.len());
1975        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1976        let mut randomness_rounds = BTreeMap::new();
1977        {
1978            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1979            debug!(
1980                ?last_checkpoint_seq,
1981                "Waiting for {:?} certificates to appear in consensus",
1982                all_effects.len()
1983            );
1984
1985            for (effects, transaction_and_size) in all_effects
1986                .into_iter()
1987                .zip(transactions_and_sizes.into_iter())
1988            {
1989                let (transaction, size) = transaction_and_size
1990                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1991                match transaction.inner().transaction_data().kind() {
1992                    TransactionKind::ConsensusCommitPrologue(_)
1993                    | TransactionKind::ConsensusCommitPrologueV2(_)
1994                    | TransactionKind::ConsensusCommitPrologueV3(_)
1995                    | TransactionKind::ConsensusCommitPrologueV4(_)
1996                    | TransactionKind::AuthenticatorStateUpdate(_) => {
1997                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
1998                        // processed before we reach here.
1999                    }
2000                    TransactionKind::ProgrammableSystemTransaction(_) => {
2001                        // settlement transactions are added by checkpoint builder
2002                    }
2003                    TransactionKind::ChangeEpoch(_)
2004                    | TransactionKind::Genesis(_)
2005                    | TransactionKind::EndOfEpochTransaction(_) => {
2006                        fatal!(
2007                            "unexpected transaction in checkpoint effects: {:?}",
2008                            transaction
2009                        );
2010                    }
2011                    TransactionKind::RandomnessStateUpdate(rsu) => {
2012                        randomness_rounds
2013                            .insert(*effects.transaction_digest(), rsu.randomness_round);
2014                    }
2015                    TransactionKind::ProgrammableTransaction(_) => {
2016                        // Only transactions that are not roots should be included in the call to
2017                        // `consensus_messages_processed_notify`. roots come directly from the consensus
2018                        // commit and so are known to be processed already.
2019                        let digest = *effects.transaction_digest();
2020                        if !all_roots.contains(&digest) {
2021                            transaction_keys.push(SequencedConsensusTransactionKey::External(
2022                                ConsensusTransactionKey::Certificate(digest),
2023                            ));
2024                        }
2025                    }
2026                }
2027                transactions.push(transaction);
2028                all_effects_and_transaction_sizes.push((effects, size));
2029            }
2030
2031            self.epoch_store
2032                .consensus_messages_processed_notify(transaction_keys)
2033                .await?;
2034        }
2035
2036        let signatures = self
2037            .epoch_store
2038            .user_signatures_for_checkpoint(&transactions, &all_digests);
2039        debug!(
2040            ?last_checkpoint_seq,
2041            "Received {} checkpoint user signatures from consensus",
2042            signatures.len()
2043        );
2044
2045        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2046            Some(
2047                transactions
2048                    .iter()
2049                    .flat_map(|tx| {
2050                        if let TransactionKind::ProgrammableTransaction(ptb) =
2051                            tx.transaction_data().kind()
2052                        {
2053                            itertools::Either::Left(
2054                                ptb.commands
2055                                    .iter()
2056                                    .map(ExecutionTimeObservationKey::from_command),
2057                            )
2058                        } else {
2059                            itertools::Either::Right(std::iter::empty())
2060                        }
2061                    })
2062                    .collect(),
2063            )
2064        } else {
2065            None
2066        };
2067
2068        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2069        let chunks_count = chunks.len();
2070
2071        let mut checkpoints = Vec::with_capacity(chunks_count);
2072        debug!(
2073            ?last_checkpoint_seq,
2074            "Creating {} checkpoints with {} transactions", chunks_count, total,
2075        );
2076
2077        let epoch = self.epoch_store.epoch();
2078        for (index, transactions) in chunks.into_iter().enumerate() {
2079            let first_checkpoint_of_epoch = index == 0
2080                && last_checkpoint
2081                    .as_ref()
2082                    .map(|(_, c)| c.epoch != epoch)
2083                    .unwrap_or(true);
2084            if first_checkpoint_of_epoch {
2085                self.epoch_store
2086                    .record_epoch_first_checkpoint_creation_time_metric();
2087            }
2088            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2089
2090            let sequence_number = last_checkpoint
2091                .as_ref()
2092                .map(|(_, c)| c.sequence_number + 1)
2093                .unwrap_or_default();
2094            let mut timestamp_ms = details.timestamp_ms;
2095            if let Some((_, last_checkpoint)) = &last_checkpoint
2096                && last_checkpoint.timestamp_ms > timestamp_ms
2097            {
2098                // First consensus commit of an epoch can have zero timestamp.
2099                debug!(
2100                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2101                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2102                );
2103                if self
2104                    .epoch_store
2105                    .protocol_config()
2106                    .enforce_checkpoint_timestamp_monotonicity()
2107                {
2108                    timestamp_ms = last_checkpoint.timestamp_ms;
2109                }
2110            }
2111
2112            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2113            let epoch_rolling_gas_cost_summary =
2114                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2115
2116            let end_of_epoch_data = if last_checkpoint_of_epoch {
2117                let system_state_obj = self
2118                    .augment_epoch_last_checkpoint(
2119                        &epoch_rolling_gas_cost_summary,
2120                        timestamp_ms,
2121                        &mut effects,
2122                        &mut signatures,
2123                        sequence_number,
2124                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2125                        last_checkpoint_seq.unwrap_or_default(),
2126                    )
2127                    .await?;
2128
2129                let committee = system_state_obj
2130                    .get_current_epoch_committee()
2131                    .committee()
2132                    .clone();
2133
2134                // This must happen after the call to augment_epoch_last_checkpoint,
2135                // otherwise we will not capture the change_epoch tx.
2136                let root_state_digest = {
2137                    let state_acc = self
2138                        .global_state_hasher
2139                        .upgrade()
2140                        .expect("No checkpoints should be getting built after local configuration");
2141                    let acc = state_acc.accumulate_checkpoint(
2142                        &effects,
2143                        sequence_number,
2144                        &self.epoch_store,
2145                    )?;
2146
2147                    state_acc
2148                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2149                        .await?;
2150
2151                    state_acc.accumulate_running_root(
2152                        &self.epoch_store,
2153                        sequence_number,
2154                        Some(acc),
2155                    )?;
2156                    state_acc
2157                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2158                        .await?
2159                };
2160                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2161                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2162
2163                let epoch_commitments = if self
2164                    .epoch_store
2165                    .protocol_config()
2166                    .check_commit_root_state_digest_supported()
2167                {
2168                    vec![root_state_digest.into()]
2169                } else {
2170                    vec![]
2171                };
2172
2173                Some(EndOfEpochData {
2174                    next_epoch_committee: committee.voting_rights,
2175                    next_epoch_protocol_version: ProtocolVersion::new(
2176                        system_state_obj.protocol_version(),
2177                    ),
2178                    epoch_commitments,
2179                })
2180            } else {
2181                self.send_to_hasher
2182                    .send((sequence_number, effects.clone()))
2183                    .await?;
2184
2185                None
2186            };
2187            let contents = if self.epoch_store.protocol_config().address_aliases() {
2188                CheckpointContents::new_v2(&effects, signatures)
2189            } else {
2190                CheckpointContents::new_with_digests_and_signatures(
2191                    effects.iter().map(TransactionEffects::execution_digests),
2192                    signatures
2193                        .into_iter()
2194                        .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2195                        .collect(),
2196                )
2197            };
2198
2199            let num_txns = contents.size() as u64;
2200
2201            let network_total_transactions = last_checkpoint
2202                .as_ref()
2203                .map(|(_, c)| c.network_total_transactions + num_txns)
2204                .unwrap_or(num_txns);
2205
2206            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2207
2208            let matching_randomness_rounds: Vec<_> = effects
2209                .iter()
2210                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2211                .copied()
2212                .collect();
2213
2214            let checkpoint_commitments = if self
2215                .epoch_store
2216                .protocol_config()
2217                .include_checkpoint_artifacts_digest_in_summary()
2218            {
2219                let artifacts = CheckpointArtifacts::from(&effects[..]);
2220                let artifacts_digest = artifacts.digest()?;
2221                vec![artifacts_digest.into()]
2222            } else {
2223                Default::default()
2224            };
2225
2226            let summary = CheckpointSummary::new(
2227                self.epoch_store.protocol_config(),
2228                epoch,
2229                sequence_number,
2230                network_total_transactions,
2231                &contents,
2232                previous_digest,
2233                epoch_rolling_gas_cost_summary,
2234                end_of_epoch_data,
2235                timestamp_ms,
2236                matching_randomness_rounds,
2237                checkpoint_commitments,
2238            );
2239            summary.report_checkpoint_age(
2240                &self.metrics.last_created_checkpoint_age,
2241                &self.metrics.last_created_checkpoint_age_ms,
2242            );
2243            if last_checkpoint_of_epoch {
2244                info!(
2245                    checkpoint_seq = sequence_number,
2246                    "creating last checkpoint of epoch {}", epoch
2247                );
2248                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2249                    self.epoch_store
2250                        .report_epoch_metrics_at_last_checkpoint(stats);
2251                }
2252            }
2253            last_checkpoint = Some((sequence_number, summary.clone()));
2254            checkpoints.push((summary, contents));
2255        }
2256
2257        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2258    }
2259
2260    fn get_epoch_total_gas_cost(
2261        &self,
2262        last_checkpoint: Option<&CheckpointSummary>,
2263        cur_checkpoint_effects: &[TransactionEffects],
2264    ) -> GasCostSummary {
2265        let (previous_epoch, previous_gas_costs) = last_checkpoint
2266            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2267            .unwrap_or_default();
2268        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2269        if previous_epoch == self.epoch_store.epoch() {
2270            // sum only when we are within the same epoch
2271            GasCostSummary::new(
2272                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2273                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2274                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2275                previous_gas_costs.non_refundable_storage_fee
2276                    + current_gas_costs.non_refundable_storage_fee,
2277            )
2278        } else {
2279            current_gas_costs
2280        }
2281    }
2282
2283    #[instrument(level = "error", skip_all)]
2284    async fn augment_epoch_last_checkpoint(
2285        &self,
2286        epoch_total_gas_cost: &GasCostSummary,
2287        epoch_start_timestamp_ms: CheckpointTimestamp,
2288        checkpoint_effects: &mut Vec<TransactionEffects>,
2289        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2290        checkpoint: CheckpointSequenceNumber,
2291        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2292        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2293        // >1 checkpoint.
2294        last_checkpoint: CheckpointSequenceNumber,
2295    ) -> CheckpointBuilderResult<SuiSystemState> {
2296        let (system_state, effects) = self
2297            .state
2298            .create_and_execute_advance_epoch_tx(
2299                &self.epoch_store,
2300                epoch_total_gas_cost,
2301                checkpoint,
2302                epoch_start_timestamp_ms,
2303                end_of_epoch_observation_keys,
2304                last_checkpoint,
2305            )
2306            .await?;
2307        checkpoint_effects.push(effects);
2308        signatures.push(vec![]);
2309        Ok(system_state)
2310    }
2311
2312    /// For the given roots return complete list of effects to include in checkpoint
2313    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2314    /// Note that this function may be called multiple times to construct the checkpoint.
2315    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2316    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2317    /// after the call of this function.
2318    #[instrument(level = "debug", skip_all)]
2319    fn complete_checkpoint_effects(
2320        &self,
2321        mut roots: Vec<TransactionEffects>,
2322        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2323    ) -> SuiResult<Vec<TransactionEffects>> {
2324        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2325        let mut results = vec![];
2326        let mut seen = HashSet::new();
2327        loop {
2328            let mut pending = HashSet::new();
2329
2330            let transactions_included = self
2331                .epoch_store
2332                .builder_included_transactions_in_checkpoint(
2333                    roots.iter().map(|e| e.transaction_digest()),
2334                )?;
2335
2336            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2337                let digest = effect.transaction_digest();
2338                // Unnecessary to read effects of a dependency if the effect is already processed.
2339                seen.insert(*digest);
2340
2341                // Skip roots that are already included in the checkpoint.
2342                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2343                    continue;
2344                }
2345
2346                // Skip roots already included in checkpoints or roots from previous epochs
2347                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2348                    continue;
2349                }
2350
2351                let existing_effects = self
2352                    .epoch_store
2353                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2354
2355                for (dependency, effects_signature_exists) in
2356                    effect.dependencies().iter().zip(existing_effects.iter())
2357                {
2358                    // Skip here if dependency not executed in the current epoch.
2359                    // Note that the existence of an effects signature in the
2360                    // epoch store for the given digest indicates that the transaction
2361                    // was locally executed in the current epoch
2362                    if !effects_signature_exists {
2363                        continue;
2364                    }
2365                    if seen.insert(*dependency) {
2366                        pending.insert(*dependency);
2367                    }
2368                }
2369                results.push(effect);
2370            }
2371            if pending.is_empty() {
2372                break;
2373            }
2374            let pending = pending.into_iter().collect::<Vec<_>>();
2375            let effects = self.effects_store.multi_get_executed_effects(&pending);
2376            let effects = effects
2377                .into_iter()
2378                .zip(pending)
2379                .map(|(opt, digest)| match opt {
2380                    Some(x) => x,
2381                    None => panic!(
2382                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2383                        digest
2384                    ),
2385                })
2386                .collect::<Vec<_>>();
2387            roots = effects;
2388        }
2389
2390        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2391        Ok(results)
2392    }
2393
2394    // This function is used to check the invariants of the consensus commit prologue transactions in the checkpoint
2395    // in simtest.
2396    #[cfg(msim)]
2397    fn expensive_consensus_commit_prologue_invariants_check(
2398        &self,
2399        root_digests: &[TransactionDigest],
2400        sorted: &[TransactionEffects],
2401    ) {
2402        if !self
2403            .epoch_store
2404            .protocol_config()
2405            .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2406        {
2407            return;
2408        }
2409
2410        // Gets all the consensus commit prologue transactions from the roots.
2411        let root_txs = self
2412            .state
2413            .get_transaction_cache_reader()
2414            .multi_get_transaction_blocks(root_digests);
2415        let ccps = root_txs
2416            .iter()
2417            .filter_map(|tx| {
2418                if let Some(tx) = tx {
2419                    if tx.transaction_data().is_consensus_commit_prologue() {
2420                        Some(tx)
2421                    } else {
2422                        None
2423                    }
2424                } else {
2425                    None
2426                }
2427            })
2428            .collect::<Vec<_>>();
2429
2430        // There should be at most one consensus commit prologue transaction in the roots.
2431        assert!(ccps.len() <= 1);
2432
2433        // Get all the transactions in the checkpoint.
2434        let txs = self
2435            .state
2436            .get_transaction_cache_reader()
2437            .multi_get_transaction_blocks(
2438                &sorted
2439                    .iter()
2440                    .map(|tx| tx.transaction_digest().clone())
2441                    .collect::<Vec<_>>(),
2442            );
2443
2444        if ccps.len() == 0 {
2445            // If there is no consensus commit prologue transaction in the roots, then there should be no
2446            // consensus commit prologue transaction in the checkpoint.
2447            for tx in txs.iter() {
2448                if let Some(tx) = tx {
2449                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2450                }
2451            }
2452        } else {
2453            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2454            assert!(
2455                txs[0]
2456                    .as_ref()
2457                    .unwrap()
2458                    .transaction_data()
2459                    .is_consensus_commit_prologue()
2460            );
2461
2462            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2463
2464            for tx in txs.iter().skip(1) {
2465                if let Some(tx) = tx {
2466                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2467                }
2468            }
2469        }
2470    }
2471}
2472
2473impl CheckpointAggregator {
2474    fn new(
2475        tables: Arc<CheckpointStore>,
2476        epoch_store: Arc<AuthorityPerEpochStore>,
2477        notify: Arc<Notify>,
2478        output: Box<dyn CertifiedCheckpointOutput>,
2479        state: Arc<AuthorityState>,
2480        metrics: Arc<CheckpointMetrics>,
2481    ) -> Self {
2482        let current = None;
2483        Self {
2484            store: tables,
2485            epoch_store,
2486            notify,
2487            current,
2488            output,
2489            state,
2490            metrics,
2491        }
2492    }
2493
2494    async fn run(mut self) {
2495        info!("Starting CheckpointAggregator");
2496        loop {
2497            if let Err(e) = self.run_and_notify().await {
2498                error!(
2499                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2500                    e
2501                );
2502                self.metrics.checkpoint_errors.inc();
2503                tokio::time::sleep(Duration::from_secs(1)).await;
2504                continue;
2505            }
2506
2507            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2508        }
2509    }
2510
2511    async fn run_and_notify(&mut self) -> SuiResult {
2512        let summaries = self.run_inner()?;
2513        for summary in summaries {
2514            self.output.certified_checkpoint_created(&summary).await?;
2515        }
2516        Ok(())
2517    }
2518
2519    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2520        let _scope = monitored_scope("CheckpointAggregator");
2521        let mut result = vec![];
2522        'outer: loop {
2523            let next_to_certify = self.next_checkpoint_to_certify()?;
2524            let current = if let Some(current) = &mut self.current {
2525                // It's possible that the checkpoint was already certified by
2526                // the rest of the network and we've already received the
2527                // certified checkpoint via StateSync. In this case, we reset
2528                // the current signature aggregator to the next checkpoint to
2529                // be certified
2530                if current.summary.sequence_number < next_to_certify {
2531                    assert_reachable!("skip checkpoint certification");
2532                    self.current = None;
2533                    continue;
2534                }
2535                current
2536            } else {
2537                let Some(summary) = self
2538                    .epoch_store
2539                    .get_built_checkpoint_summary(next_to_certify)?
2540                else {
2541                    return Ok(result);
2542                };
2543                self.current = Some(CheckpointSignatureAggregator {
2544                    next_index: 0,
2545                    digest: summary.digest(),
2546                    summary,
2547                    signatures_by_digest: MultiStakeAggregator::new(
2548                        self.epoch_store.committee().clone(),
2549                    ),
2550                    store: self.store.clone(),
2551                    state: self.state.clone(),
2552                    metrics: self.metrics.clone(),
2553                });
2554                self.current.as_mut().unwrap()
2555            };
2556
2557            let epoch_tables = self
2558                .epoch_store
2559                .tables()
2560                .expect("should not run past end of epoch");
2561            let iter = epoch_tables
2562                .pending_checkpoint_signatures
2563                .safe_iter_with_bounds(
2564                    Some((current.summary.sequence_number, current.next_index)),
2565                    None,
2566                );
2567            for item in iter {
2568                let ((seq, index), data) = item?;
2569                if seq != current.summary.sequence_number {
2570                    trace!(
2571                        checkpoint_seq =? current.summary.sequence_number,
2572                        "Not enough checkpoint signatures",
2573                    );
2574                    // No more signatures (yet) for this checkpoint
2575                    return Ok(result);
2576                }
2577                trace!(
2578                    checkpoint_seq = current.summary.sequence_number,
2579                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2580                    current.summary.digest(),
2581                    data.summary.auth_sig().authority.concise()
2582                );
2583                self.metrics
2584                    .checkpoint_participation
2585                    .with_label_values(&[&format!(
2586                        "{:?}",
2587                        data.summary.auth_sig().authority.concise()
2588                    )])
2589                    .inc();
2590                if let Ok(auth_signature) = current.try_aggregate(data) {
2591                    debug!(
2592                        checkpoint_seq = current.summary.sequence_number,
2593                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2594                        current.summary.digest(),
2595                    );
2596                    let summary = VerifiedCheckpoint::new_unchecked(
2597                        CertifiedCheckpointSummary::new_from_data_and_sig(
2598                            current.summary.clone(),
2599                            auth_signature,
2600                        ),
2601                    );
2602
2603                    self.store.insert_certified_checkpoint(&summary)?;
2604                    self.metrics
2605                        .last_certified_checkpoint
2606                        .set(current.summary.sequence_number as i64);
2607                    current.summary.report_checkpoint_age(
2608                        &self.metrics.last_certified_checkpoint_age,
2609                        &self.metrics.last_certified_checkpoint_age_ms,
2610                    );
2611                    result.push(summary.into_inner());
2612                    self.current = None;
2613                    continue 'outer;
2614                } else {
2615                    current.next_index = index + 1;
2616                }
2617            }
2618            break;
2619        }
2620        Ok(result)
2621    }
2622
2623    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2624        Ok(self
2625            .store
2626            .tables
2627            .certified_checkpoints
2628            .reversed_safe_iter_with_bounds(None, None)?
2629            .next()
2630            .transpose()?
2631            .map(|(seq, _)| seq + 1)
2632            .unwrap_or_default())
2633    }
2634}
2635
2636impl CheckpointSignatureAggregator {
2637    #[allow(clippy::result_unit_err)]
2638    pub fn try_aggregate(
2639        &mut self,
2640        data: CheckpointSignatureMessage,
2641    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2642        let their_digest = *data.summary.digest();
2643        let (_, signature) = data.summary.into_data_and_sig();
2644        let author = signature.authority;
2645        let envelope =
2646            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2647        match self.signatures_by_digest.insert(their_digest, envelope) {
2648            // ignore repeated signatures
2649            InsertResult::Failed { error }
2650                if matches!(
2651                    error.as_inner(),
2652                    SuiErrorKind::StakeAggregatorRepeatedSigner {
2653                        conflicting_sig: false,
2654                        ..
2655                    },
2656                ) =>
2657            {
2658                Err(())
2659            }
2660            InsertResult::Failed { error } => {
2661                warn!(
2662                    checkpoint_seq = self.summary.sequence_number,
2663                    "Failed to aggregate new signature from validator {:?}: {:?}",
2664                    author.concise(),
2665                    error
2666                );
2667                self.check_for_split_brain();
2668                Err(())
2669            }
2670            InsertResult::QuorumReached(cert) => {
2671                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
2672                // the signature so we know that the author signed the message at some point.
2673                if their_digest != self.digest {
2674                    self.metrics.remote_checkpoint_forks.inc();
2675                    warn!(
2676                        checkpoint_seq = self.summary.sequence_number,
2677                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2678                        author.concise(),
2679                        their_digest,
2680                        self.digest
2681                    );
2682                    return Err(());
2683                }
2684                Ok(cert)
2685            }
2686            InsertResult::NotEnoughVotes {
2687                bad_votes: _,
2688                bad_authorities: _,
2689            } => {
2690                self.check_for_split_brain();
2691                Err(())
2692            }
2693        }
2694    }
2695
2696    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
2697    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
2698    /// irrespective of the outcome of any outstanding votes.
2699    fn check_for_split_brain(&self) {
2700        debug!(
2701            checkpoint_seq = self.summary.sequence_number,
2702            "Checking for split brain condition"
2703        );
2704        if self.signatures_by_digest.quorum_unreachable() {
2705            // TODO: at this point we should immediately halt processing
2706            // of new transaction certificates to avoid building on top of
2707            // forked output
2708            // self.halt_all_execution();
2709
2710            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2711            let digests_by_stake_messages = all_unique_values
2712                .iter()
2713                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2714                .map(|(digest, (_authorities, total_stake))| {
2715                    format!("{:?} (total stake: {})", digest, total_stake)
2716                })
2717                .collect::<Vec<String>>();
2718            fail_point_arg!("kill_split_brain_node", |(
2719                checkpoint_overrides,
2720                forked_authorities,
2721            ): (
2722                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2723                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2724            )| {
2725                #[cfg(msim)]
2726                {
2727                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2728                        (checkpoint_overrides.lock(), forked_authorities.lock())
2729                    {
2730                        // Find the digest produced by non-forked authorities
2731                        let correct_digest = all_unique_values
2732                            .iter()
2733                            .find(|(_, (authorities, _))| {
2734                                // Check if any authority that produced this digest is NOT in the forked set
2735                                authorities
2736                                    .iter()
2737                                    .any(|auth| !forked_authorities_set.contains(auth))
2738                            })
2739                            .map(|(digest, _)| digest.to_string())
2740                            .unwrap_or_else(|| {
2741                                // Fallback: use the digest with the highest stake
2742                                all_unique_values
2743                                    .iter()
2744                                    .max_by_key(|(_, (_, stake))| *stake)
2745                                    .map(|(digest, _)| digest.to_string())
2746                                    .unwrap_or_else(|| self.digest.to_string())
2747                            });
2748
2749                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
2750
2751                        tracing::error!(
2752                            fatal = true,
2753                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2754                            self.summary.sequence_number,
2755                            correct_digest
2756                        );
2757                    }
2758                }
2759            });
2760
2761            debug_fatal!(
2762                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2763                self.summary.sequence_number,
2764                self.signatures_by_digest.uncommitted_stake(),
2765                digests_by_stake_messages
2766            );
2767            self.metrics.split_brain_checkpoint_forks.inc();
2768
2769            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2770            let local_summary = self.summary.clone();
2771            let state = self.state.clone();
2772            let tables = self.store.clone();
2773
2774            tokio::spawn(async move {
2775                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2776            });
2777        }
2778    }
2779}
2780
2781/// Create data dump containing relevant data for diagnosing cause of the
2782/// split brain by querying one disagreeing validator for full checkpoint contents.
2783/// To minimize peer chatter, we only query one validator at random from each
2784/// disagreeing faction, as all honest validators that participated in this round may
2785/// inevitably run the same process.
2786async fn diagnose_split_brain(
2787    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2788    local_summary: CheckpointSummary,
2789    state: Arc<AuthorityState>,
2790    tables: Arc<CheckpointStore>,
2791) {
2792    debug!(
2793        checkpoint_seq = local_summary.sequence_number,
2794        "Running split brain diagnostics..."
2795    );
2796    let time = SystemTime::now();
2797    // collect one random disagreeing validator per differing digest
2798    let digest_to_validator = all_unique_values
2799        .iter()
2800        .filter_map(|(digest, (validators, _))| {
2801            if *digest != local_summary.digest() {
2802                let random_validator = validators.choose(&mut get_rng()).unwrap();
2803                Some((*digest, *random_validator))
2804            } else {
2805                None
2806            }
2807        })
2808        .collect::<HashMap<_, _>>();
2809    if digest_to_validator.is_empty() {
2810        panic!(
2811            "Given split brain condition, there should be at \
2812                least one validator that disagrees with local signature"
2813        );
2814    }
2815
2816    let epoch_store = state.load_epoch_store_one_call_per_task();
2817    let committee = epoch_store
2818        .epoch_start_state()
2819        .get_sui_committee_with_network_metadata();
2820    let network_config = default_mysten_network_config();
2821    let network_clients =
2822        make_network_authority_clients_with_network_config(&committee, &network_config);
2823
2824    // Query all disagreeing validators
2825    let response_futures = digest_to_validator
2826        .values()
2827        .cloned()
2828        .map(|validator| {
2829            let client = network_clients
2830                .get(&validator)
2831                .expect("Failed to get network client");
2832            let request = CheckpointRequestV2 {
2833                sequence_number: Some(local_summary.sequence_number),
2834                request_content: true,
2835                certified: false,
2836            };
2837            client.handle_checkpoint_v2(request)
2838        })
2839        .collect::<Vec<_>>();
2840
2841    let digest_name_pair = digest_to_validator.iter();
2842    let response_data = futures::future::join_all(response_futures)
2843        .await
2844        .into_iter()
2845        .zip(digest_name_pair)
2846        .filter_map(|(response, (digest, name))| match response {
2847            Ok(response) => match response {
2848                CheckpointResponseV2 {
2849                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2850                    contents: Some(contents),
2851                } => Some((*name, *digest, summary, contents)),
2852                CheckpointResponseV2 {
2853                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2854                    contents: _,
2855                } => {
2856                    panic!("Expected pending checkpoint, but got certified checkpoint");
2857                }
2858                CheckpointResponseV2 {
2859                    checkpoint: None,
2860                    contents: _,
2861                } => {
2862                    error!(
2863                        "Summary for checkpoint {:?} not found on validator {:?}",
2864                        local_summary.sequence_number, name
2865                    );
2866                    None
2867                }
2868                CheckpointResponseV2 {
2869                    checkpoint: _,
2870                    contents: None,
2871                } => {
2872                    error!(
2873                        "Contents for checkpoint {:?} not found on validator {:?}",
2874                        local_summary.sequence_number, name
2875                    );
2876                    None
2877                }
2878            },
2879            Err(e) => {
2880                error!(
2881                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2882                    e
2883                );
2884                None
2885            }
2886        })
2887        .collect::<Vec<_>>();
2888
2889    let local_checkpoint_contents = tables
2890        .get_checkpoint_contents(&local_summary.content_digest)
2891        .unwrap_or_else(|_| {
2892            panic!(
2893                "Could not find checkpoint contents for digest {:?}",
2894                local_summary.digest()
2895            )
2896        })
2897        .unwrap_or_else(|| {
2898            panic!(
2899                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2900                local_summary.sequence_number,
2901                local_summary.digest()
2902            )
2903        });
2904    let local_contents_text = format!("{local_checkpoint_contents:?}");
2905
2906    let local_summary_text = format!("{local_summary:?}");
2907    let local_validator = state.name.concise();
2908    let diff_patches = response_data
2909        .iter()
2910        .map(|(name, other_digest, other_summary, contents)| {
2911            let other_contents_text = format!("{contents:?}");
2912            let other_summary_text = format!("{other_summary:?}");
2913            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2914                .enumerate_transactions(&local_summary)
2915                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2916                .unzip();
2917            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2918                .enumerate_transactions(other_summary)
2919                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2920                .unzip();
2921            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2922            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2923            let local_transactions_text = format!("{local_transactions:#?}");
2924            let other_transactions_text = format!("{other_transactions:#?}");
2925            let transactions_patch =
2926                create_patch(&local_transactions_text, &other_transactions_text);
2927            let local_effects_text = format!("{local_effects:#?}");
2928            let other_effects_text = format!("{other_effects:#?}");
2929            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2930            let seq_number = local_summary.sequence_number;
2931            let local_digest = local_summary.digest();
2932            let other_validator = name.concise();
2933            format!(
2934                "Checkpoint: {seq_number:?}\n\
2935                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2936                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2937                Summary Diff: \n{summary_patch}\n\n\
2938                Contents Diff: \n{contents_patch}\n\n\
2939                Transactions Diff: \n{transactions_patch}\n\n\
2940                Effects Diff: \n{effects_patch}",
2941            )
2942        })
2943        .collect::<Vec<_>>()
2944        .join("\n\n\n");
2945
2946    let header = format!(
2947        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2948        Datetime: {:?}",
2949        time
2950    );
2951    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2952    let path = tempfile::tempdir()
2953        .expect("Failed to create tempdir")
2954        .keep()
2955        .join(Path::new("checkpoint_fork_dump.txt"));
2956    let mut file = File::create(path).unwrap();
2957    write!(file, "{}", fork_logs_text).unwrap();
2958    debug!("{}", fork_logs_text);
2959}
2960
2961pub trait CheckpointServiceNotify {
2962    fn notify_checkpoint_signature(
2963        &self,
2964        epoch_store: &AuthorityPerEpochStore,
2965        info: &CheckpointSignatureMessage,
2966    ) -> SuiResult;
2967
2968    fn notify_checkpoint(&self) -> SuiResult;
2969}
2970
2971#[allow(clippy::large_enum_variant)]
2972enum CheckpointServiceState {
2973    Unstarted(
2974        (
2975            CheckpointBuilder,
2976            CheckpointAggregator,
2977            CheckpointStateHasher,
2978        ),
2979    ),
2980    Started,
2981}
2982
2983impl CheckpointServiceState {
2984    fn take_unstarted(
2985        &mut self,
2986    ) -> (
2987        CheckpointBuilder,
2988        CheckpointAggregator,
2989        CheckpointStateHasher,
2990    ) {
2991        let mut state = CheckpointServiceState::Started;
2992        std::mem::swap(self, &mut state);
2993
2994        match state {
2995            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2996                (builder, aggregator, hasher)
2997            }
2998            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2999        }
3000    }
3001}
3002
3003pub struct CheckpointService {
3004    tables: Arc<CheckpointStore>,
3005    notify_builder: Arc<Notify>,
3006    notify_aggregator: Arc<Notify>,
3007    last_signature_index: Mutex<u64>,
3008    // A notification for the current highest built sequence number.
3009    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3010    // The highest sequence number that had already been built at the time CheckpointService
3011    // was constructed
3012    highest_previously_built_seq: CheckpointSequenceNumber,
3013    metrics: Arc<CheckpointMetrics>,
3014    state: Mutex<CheckpointServiceState>,
3015}
3016
3017impl CheckpointService {
3018    /// Constructs a new CheckpointService in an un-started state.
3019    pub fn build(
3020        state: Arc<AuthorityState>,
3021        checkpoint_store: Arc<CheckpointStore>,
3022        epoch_store: Arc<AuthorityPerEpochStore>,
3023        effects_store: Arc<dyn TransactionCacheRead>,
3024        global_state_hasher: Weak<GlobalStateHasher>,
3025        checkpoint_output: Box<dyn CheckpointOutput>,
3026        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3027        metrics: Arc<CheckpointMetrics>,
3028        max_transactions_per_checkpoint: usize,
3029        max_checkpoint_size_bytes: usize,
3030    ) -> Arc<Self> {
3031        info!(
3032            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3033        );
3034        let notify_builder = Arc::new(Notify::new());
3035        let notify_aggregator = Arc::new(Notify::new());
3036
3037        // We may have built higher checkpoint numbers before restarting.
3038        let highest_previously_built_seq = checkpoint_store
3039            .get_latest_locally_computed_checkpoint()
3040            .expect("failed to get latest locally computed checkpoint")
3041            .map(|s| s.sequence_number)
3042            .unwrap_or(0);
3043
3044        let highest_currently_built_seq =
3045            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3046                .expect("epoch should not have ended")
3047                .map(|(seq, _)| seq)
3048                .unwrap_or(0);
3049
3050        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3051
3052        let aggregator = CheckpointAggregator::new(
3053            checkpoint_store.clone(),
3054            epoch_store.clone(),
3055            notify_aggregator.clone(),
3056            certified_checkpoint_output,
3057            state.clone(),
3058            metrics.clone(),
3059        );
3060
3061        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3062
3063        let ckpt_state_hasher = CheckpointStateHasher::new(
3064            epoch_store.clone(),
3065            global_state_hasher.clone(),
3066            receive_from_builder,
3067        );
3068
3069        let builder = CheckpointBuilder::new(
3070            state.clone(),
3071            checkpoint_store.clone(),
3072            epoch_store.clone(),
3073            notify_builder.clone(),
3074            effects_store,
3075            global_state_hasher,
3076            send_to_hasher,
3077            checkpoint_output,
3078            notify_aggregator.clone(),
3079            highest_currently_built_seq_tx.clone(),
3080            metrics.clone(),
3081            max_transactions_per_checkpoint,
3082            max_checkpoint_size_bytes,
3083        );
3084
3085        let last_signature_index = epoch_store
3086            .get_last_checkpoint_signature_index()
3087            .expect("should not cross end of epoch");
3088        let last_signature_index = Mutex::new(last_signature_index);
3089
3090        Arc::new(Self {
3091            tables: checkpoint_store,
3092            notify_builder,
3093            notify_aggregator,
3094            last_signature_index,
3095            highest_currently_built_seq_tx,
3096            highest_previously_built_seq,
3097            metrics,
3098            state: Mutex::new(CheckpointServiceState::Unstarted((
3099                builder,
3100                aggregator,
3101                ckpt_state_hasher,
3102            ))),
3103        })
3104    }
3105
3106    /// Starts the CheckpointService.
3107    ///
3108    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3109    /// been built before the most recent restart. You can think of this as a WAL replay
3110    /// operation. Upon startup, we may have a number of consensus commits and resulting
3111    /// checkpoints that were built but not committed to disk. We want to reprocess the
3112    /// commits and rebuild the checkpoints before starting normal operation.
3113    pub async fn spawn(
3114        &self,
3115        epoch_store: Arc<AuthorityPerEpochStore>,
3116        consensus_replay_waiter: Option<ReplayWaiter>,
3117    ) {
3118        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3119
3120        // Clean up state hashes computed after the last committed checkpoint
3121        // This prevents ECMH divergence after fork recovery restarts
3122        if let Some(last_committed_seq) = self
3123            .tables
3124            .get_highest_executed_checkpoint()
3125            .expect("Failed to get highest executed checkpoint")
3126            .map(|checkpoint| *checkpoint.sequence_number())
3127        {
3128            if let Err(e) = builder
3129                .epoch_store
3130                .clear_state_hashes_after_checkpoint(last_committed_seq)
3131            {
3132                error!(
3133                    "Failed to clear state hashes after checkpoint {}: {:?}",
3134                    last_committed_seq, e
3135                );
3136            } else {
3137                info!(
3138                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3139                    last_committed_seq
3140                );
3141            }
3142        }
3143
3144        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3145
3146        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3147        let aggregator_task = spawn_monitored_task!(aggregator.run());
3148
3149        spawn_monitored_task!(async move {
3150            epoch_store
3151                .within_alive_epoch(async move {
3152                    builder.run(consensus_replay_waiter).await;
3153                    builder_finished_tx.send(()).ok();
3154                })
3155                .await
3156                .ok();
3157
3158            // state hasher will terminate as soon as it has finished processing all messages from builder
3159            state_hasher_task
3160                .await
3161                .expect("state hasher should exit normally");
3162
3163            // builder must shut down before aggregator and state_hasher, since it sends
3164            // messages to them
3165            aggregator_task.abort();
3166            aggregator_task.await.ok();
3167        });
3168
3169        // If this times out, the validator may still start up. The worst that can
3170        // happen is that we will crash later on instead of immediately. The eventual
3171        // crash would occur because we may be missing transactions that are below the
3172        // highest_synced_checkpoint watermark, which can cause a crash in
3173        // `CheckpointExecutor::extract_randomness_rounds`.
3174        if tokio::time::timeout(Duration::from_secs(120), async move {
3175            tokio::select! {
3176                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3177                _ = self.wait_for_rebuilt_checkpoints() => (),
3178            }
3179        })
3180        .await
3181        .is_err()
3182        {
3183            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3184        }
3185    }
3186}
3187
3188impl CheckpointService {
3189    /// Waits until all checkpoints had been built before the node restarted
3190    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3191    /// (and their transactions) below the highest_synced_checkpoint watermark are
3192    /// available. Once the checkpoints are constructed, we can be sure that the
3193    /// transactions have also been executed.
3194    pub async fn wait_for_rebuilt_checkpoints(&self) {
3195        let highest_previously_built_seq = self.highest_previously_built_seq;
3196        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3197        let mut highest_currently_built_seq = *rx.borrow_and_update();
3198        info!(
3199            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3200        );
3201        loop {
3202            if highest_currently_built_seq >= highest_previously_built_seq {
3203                info!("Checkpoint rebuild complete");
3204                break;
3205            }
3206            rx.changed().await.unwrap();
3207            highest_currently_built_seq = *rx.borrow_and_update();
3208        }
3209    }
3210
3211    #[cfg(test)]
3212    fn write_and_notify_checkpoint_for_testing(
3213        &self,
3214        epoch_store: &AuthorityPerEpochStore,
3215        checkpoint: PendingCheckpoint,
3216    ) -> SuiResult {
3217        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3218
3219        let mut output = ConsensusCommitOutput::new(0);
3220        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3221        output.set_default_commit_stats_for_testing();
3222        epoch_store.push_consensus_output_for_tests(output);
3223        self.notify_checkpoint()?;
3224        Ok(())
3225    }
3226}
3227
3228impl CheckpointServiceNotify for CheckpointService {
3229    fn notify_checkpoint_signature(
3230        &self,
3231        epoch_store: &AuthorityPerEpochStore,
3232        info: &CheckpointSignatureMessage,
3233    ) -> SuiResult {
3234        let sequence = info.summary.sequence_number;
3235        let signer = info.summary.auth_sig().authority.concise();
3236
3237        if let Some(highest_verified_checkpoint) = self
3238            .tables
3239            .get_highest_verified_checkpoint()?
3240            .map(|x| *x.sequence_number())
3241            && sequence <= highest_verified_checkpoint
3242        {
3243            trace!(
3244                checkpoint_seq = sequence,
3245                "Ignore checkpoint signature from {} - already certified", signer,
3246            );
3247            self.metrics
3248                .last_ignored_checkpoint_signature_received
3249                .set(sequence as i64);
3250            return Ok(());
3251        }
3252        trace!(
3253            checkpoint_seq = sequence,
3254            "Received checkpoint signature, digest {} from {}",
3255            info.summary.digest(),
3256            signer,
3257        );
3258        self.metrics
3259            .last_received_checkpoint_signatures
3260            .with_label_values(&[&signer.to_string()])
3261            .set(sequence as i64);
3262        // While it can be tempting to make last_signature_index into AtomicU64, this won't work
3263        // We need to make sure we write to `pending_signatures` and trigger `notify_aggregator` without race conditions
3264        let mut index = self.last_signature_index.lock();
3265        *index += 1;
3266        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3267        self.notify_aggregator.notify_one();
3268        Ok(())
3269    }
3270
3271    fn notify_checkpoint(&self) -> SuiResult {
3272        self.notify_builder.notify_one();
3273        Ok(())
3274    }
3275}
3276
3277// test helper
3278pub struct CheckpointServiceNoop {}
3279impl CheckpointServiceNotify for CheckpointServiceNoop {
3280    fn notify_checkpoint_signature(
3281        &self,
3282        _: &AuthorityPerEpochStore,
3283        _: &CheckpointSignatureMessage,
3284    ) -> SuiResult {
3285        Ok(())
3286    }
3287
3288    fn notify_checkpoint(&self) -> SuiResult {
3289        Ok(())
3290    }
3291}
3292
3293impl PendingCheckpoint {
3294    pub fn height(&self) -> CheckpointHeight {
3295        self.details.checkpoint_height
3296    }
3297
3298    pub fn roots(&self) -> &Vec<TransactionKey> {
3299        &self.roots
3300    }
3301
3302    pub fn details(&self) -> &PendingCheckpointInfo {
3303        &self.details
3304    }
3305}
3306
3307pin_project! {
3308    pub struct PollCounter<Fut> {
3309        #[pin]
3310        future: Fut,
3311        count: usize,
3312    }
3313}
3314
3315impl<Fut> PollCounter<Fut> {
3316    pub fn new(future: Fut) -> Self {
3317        Self { future, count: 0 }
3318    }
3319
3320    pub fn count(&self) -> usize {
3321        self.count
3322    }
3323}
3324
3325impl<Fut: Future> Future for PollCounter<Fut> {
3326    type Output = (usize, Fut::Output);
3327
3328    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3329        let this = self.project();
3330        *this.count += 1;
3331        match this.future.poll(cx) {
3332            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3333            Poll::Pending => Poll::Pending,
3334        }
3335    }
3336}
3337
3338fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3339    PollCounter::new(future)
3340}
3341
3342#[cfg(test)]
3343mod tests {
3344    use super::*;
3345    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3346    use crate::transaction_outputs::TransactionOutputs;
3347    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3348    use futures::FutureExt as _;
3349    use futures::future::BoxFuture;
3350    use std::collections::HashMap;
3351    use std::ops::Deref;
3352    use sui_macros::sim_test;
3353    use sui_protocol_config::{Chain, ProtocolConfig};
3354    use sui_types::accumulator_event::AccumulatorEvent;
3355    use sui_types::authenticator_state::ActiveJwk;
3356    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3357    use sui_types::crypto::Signature;
3358    use sui_types::effects::{TransactionEffects, TransactionEvents};
3359    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3360    use sui_types::transaction::VerifiedTransaction;
3361    use tokio::sync::mpsc;
3362
3363    #[tokio::test]
3364    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3365        let store = CheckpointStore::new_for_tests();
3366        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3367        for seq in 70u64..=80u64 {
3368            let contents =
3369                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3370                    [sui_types::base_types::ExecutionDigests::new(
3371                        sui_types::digests::TransactionDigest::random(),
3372                        sui_types::digests::TransactionEffectsDigest::ZERO,
3373                    )],
3374                );
3375            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3376                &protocol,
3377                0,
3378                seq,
3379                0,
3380                &contents,
3381                None,
3382                sui_types::gas::GasCostSummary::default(),
3383                None,
3384                0,
3385                Vec::new(),
3386                Vec::new(),
3387            );
3388            store
3389                .tables
3390                .locally_computed_checkpoints
3391                .insert(&seq, &summary)
3392                .unwrap();
3393        }
3394
3395        store
3396            .clear_locally_computed_checkpoints_from(76)
3397            .expect("clear should succeed");
3398
3399        // Explicit boundary checks: 75 must remain, 76 must be deleted
3400        assert!(
3401            store
3402                .tables
3403                .locally_computed_checkpoints
3404                .get(&75)
3405                .unwrap()
3406                .is_some()
3407        );
3408        assert!(
3409            store
3410                .tables
3411                .locally_computed_checkpoints
3412                .get(&76)
3413                .unwrap()
3414                .is_none()
3415        );
3416
3417        for seq in 70u64..76u64 {
3418            assert!(
3419                store
3420                    .tables
3421                    .locally_computed_checkpoints
3422                    .get(&seq)
3423                    .unwrap()
3424                    .is_some()
3425            );
3426        }
3427        for seq in 76u64..=80u64 {
3428            assert!(
3429                store
3430                    .tables
3431                    .locally_computed_checkpoints
3432                    .get(&seq)
3433                    .unwrap()
3434                    .is_none()
3435            );
3436        }
3437    }
3438
3439    #[tokio::test]
3440    async fn test_fork_detection_storage() {
3441        let store = CheckpointStore::new_for_tests();
3442        // checkpoint fork
3443        let seq_num = 42;
3444        let digest = CheckpointDigest::random();
3445
3446        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3447
3448        store
3449            .record_checkpoint_fork_detected(seq_num, digest)
3450            .unwrap();
3451
3452        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3453        assert!(retrieved.is_some());
3454        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3455        assert_eq!(retrieved_seq, seq_num);
3456        assert_eq!(retrieved_digest, digest);
3457
3458        store.clear_checkpoint_fork_detected().unwrap();
3459        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3460
3461        // txn fork
3462        let tx_digest = TransactionDigest::random();
3463        let expected_effects = TransactionEffectsDigest::random();
3464        let actual_effects = TransactionEffectsDigest::random();
3465
3466        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3467
3468        store
3469            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3470            .unwrap();
3471
3472        let retrieved = store.get_transaction_fork_detected().unwrap();
3473        assert!(retrieved.is_some());
3474        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3475        assert_eq!(retrieved_tx, tx_digest);
3476        assert_eq!(retrieved_expected, expected_effects);
3477        assert_eq!(retrieved_actual, actual_effects);
3478
3479        store.clear_transaction_fork_detected().unwrap();
3480        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3481    }
3482
3483    #[sim_test]
3484    pub async fn checkpoint_builder_test() {
3485        telemetry_subscribers::init_for_testing();
3486
3487        let mut protocol_config =
3488            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3489        protocol_config.disable_accumulators_for_testing();
3490        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3491        let state = TestAuthorityBuilder::new()
3492            .with_protocol_config(protocol_config)
3493            .build()
3494            .await;
3495
3496        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3497            0,
3498            0,
3499            vec![],
3500            SequenceNumber::new(),
3501        );
3502
3503        let jwks = {
3504            let mut jwks = Vec::new();
3505            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3506                jwks.push(ActiveJwk {
3507                    jwk_id: JwkId::new(
3508                        "https://accounts.google.com".to_string(),
3509                        "1234567890".to_string(),
3510                    ),
3511                    jwk: JWK {
3512                        kty: "RSA".to_string(),
3513                        e: "AQAB".to_string(),
3514                        n: "1234567890".to_string(),
3515                        alg: "RS256".to_string(),
3516                    },
3517                    epoch: 0,
3518                });
3519            }
3520            jwks
3521        };
3522
3523        let dummy_tx_with_data =
3524            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3525
3526        for i in 0..15 {
3527            state
3528                .database_for_testing()
3529                .perpetual_tables
3530                .transactions
3531                .insert(&d(i), dummy_tx.serializable_ref())
3532                .unwrap();
3533        }
3534        for i in 15..20 {
3535            state
3536                .database_for_testing()
3537                .perpetual_tables
3538                .transactions
3539                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3540                .unwrap();
3541        }
3542
3543        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3544        commit_cert_for_test(
3545            &mut store,
3546            state.clone(),
3547            d(1),
3548            vec![d(2), d(3)],
3549            GasCostSummary::new(11, 12, 11, 1),
3550        );
3551        commit_cert_for_test(
3552            &mut store,
3553            state.clone(),
3554            d(2),
3555            vec![d(3), d(4)],
3556            GasCostSummary::new(21, 22, 21, 1),
3557        );
3558        commit_cert_for_test(
3559            &mut store,
3560            state.clone(),
3561            d(3),
3562            vec![],
3563            GasCostSummary::new(31, 32, 31, 1),
3564        );
3565        commit_cert_for_test(
3566            &mut store,
3567            state.clone(),
3568            d(4),
3569            vec![],
3570            GasCostSummary::new(41, 42, 41, 1),
3571        );
3572        for i in [5, 6, 7, 10, 11, 12, 13] {
3573            commit_cert_for_test(
3574                &mut store,
3575                state.clone(),
3576                d(i),
3577                vec![],
3578                GasCostSummary::new(41, 42, 41, 1),
3579            );
3580        }
3581        for i in [15, 16, 17] {
3582            commit_cert_for_test(
3583                &mut store,
3584                state.clone(),
3585                d(i),
3586                vec![],
3587                GasCostSummary::new(51, 52, 51, 1),
3588            );
3589        }
3590        let all_digests: Vec<_> = store.keys().copied().collect();
3591        for digest in all_digests {
3592            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3593            state
3594                .epoch_store_for_testing()
3595                .test_insert_user_signature(digest, vec![(signature, None)]);
3596        }
3597
3598        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3599        let (certified_output, mut certified_result) =
3600            mpsc::channel::<CertifiedCheckpointSummary>(10);
3601        let store = Arc::new(store);
3602
3603        let ckpt_dir = tempfile::tempdir().unwrap();
3604        let checkpoint_store =
3605            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3606        let epoch_store = state.epoch_store_for_testing();
3607
3608        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3609            state.get_global_state_hash_store().clone(),
3610        ));
3611
3612        let checkpoint_service = CheckpointService::build(
3613            state.clone(),
3614            checkpoint_store,
3615            epoch_store.clone(),
3616            store,
3617            Arc::downgrade(&global_state_hasher),
3618            Box::new(output),
3619            Box::new(certified_output),
3620            CheckpointMetrics::new_for_tests(),
3621            3,
3622            100_000,
3623        );
3624        checkpoint_service.spawn(epoch_store.clone(), None).await;
3625
3626        checkpoint_service
3627            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3628            .unwrap();
3629        checkpoint_service
3630            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3631            .unwrap();
3632        checkpoint_service
3633            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3634            .unwrap();
3635        checkpoint_service
3636            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3637            .unwrap();
3638        checkpoint_service
3639            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3640            .unwrap();
3641        checkpoint_service
3642            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3643            .unwrap();
3644
3645        let (c1c, c1s) = result.recv().await.unwrap();
3646        let (c2c, c2s) = result.recv().await.unwrap();
3647
3648        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3649        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3650        assert_eq!(c1t, vec![d(4)]);
3651        assert_eq!(c1s.previous_digest, None);
3652        assert_eq!(c1s.sequence_number, 0);
3653        assert_eq!(
3654            c1s.epoch_rolling_gas_cost_summary,
3655            GasCostSummary::new(41, 42, 41, 1)
3656        );
3657
3658        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3659        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3660        assert_eq!(c2s.sequence_number, 1);
3661        assert_eq!(
3662            c2s.epoch_rolling_gas_cost_summary,
3663            GasCostSummary::new(104, 108, 104, 4)
3664        );
3665
3666        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
3667        // Verify that we split into 2 checkpoints.
3668        let (c3c, c3s) = result.recv().await.unwrap();
3669        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3670        let (c4c, c4s) = result.recv().await.unwrap();
3671        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3672        assert_eq!(c3s.sequence_number, 2);
3673        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3674        assert_eq!(c4s.sequence_number, 3);
3675        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3676        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3677        assert_eq!(c4t, vec![d(13)]);
3678
3679        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
3680        // Verify that we split into 2 checkpoints.
3681        let (c5c, c5s) = result.recv().await.unwrap();
3682        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3683        let (c6c, c6s) = result.recv().await.unwrap();
3684        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3685        assert_eq!(c5s.sequence_number, 4);
3686        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3687        assert_eq!(c6s.sequence_number, 5);
3688        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3689        assert_eq!(c5t, vec![d(15), d(16)]);
3690        assert_eq!(c6t, vec![d(17)]);
3691
3692        // Pending at index 4 was too soon after the prior one and should be coalesced into
3693        // the next one.
3694        let (c7c, c7s) = result.recv().await.unwrap();
3695        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3696        assert_eq!(c7t, vec![d(5), d(6)]);
3697        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3698        assert_eq!(c7s.sequence_number, 6);
3699
3700        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3701        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3702
3703        checkpoint_service
3704            .notify_checkpoint_signature(
3705                &epoch_store,
3706                &CheckpointSignatureMessage { summary: c2ss },
3707            )
3708            .unwrap();
3709        checkpoint_service
3710            .notify_checkpoint_signature(
3711                &epoch_store,
3712                &CheckpointSignatureMessage { summary: c1ss },
3713            )
3714            .unwrap();
3715
3716        let c1sc = certified_result.recv().await.unwrap();
3717        let c2sc = certified_result.recv().await.unwrap();
3718        assert_eq!(c1sc.sequence_number, 0);
3719        assert_eq!(c2sc.sequence_number, 1);
3720    }
3721
3722    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3723        fn notify_read_executed_effects(
3724            &self,
3725            _: &str,
3726            digests: &[TransactionDigest],
3727        ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3728            std::future::ready(
3729                digests
3730                    .iter()
3731                    .map(|d| self.get(d).expect("effects not found").clone())
3732                    .collect(),
3733            )
3734            .boxed()
3735        }
3736
3737        fn notify_read_executed_effects_digests(
3738            &self,
3739            _: &str,
3740            digests: &[TransactionDigest],
3741        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3742            std::future::ready(
3743                digests
3744                    .iter()
3745                    .map(|d| {
3746                        self.get(d)
3747                            .map(|fx| fx.digest())
3748                            .expect("effects not found")
3749                    })
3750                    .collect(),
3751            )
3752            .boxed()
3753        }
3754
3755        fn multi_get_executed_effects(
3756            &self,
3757            digests: &[TransactionDigest],
3758        ) -> Vec<Option<TransactionEffects>> {
3759            digests.iter().map(|d| self.get(d).cloned()).collect()
3760        }
3761
3762        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
3763        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
3764        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
3765        // ExecutionCacheRead implementors).
3766
3767        fn multi_get_transaction_blocks(
3768            &self,
3769            _: &[TransactionDigest],
3770        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3771            unimplemented!()
3772        }
3773
3774        fn multi_get_executed_effects_digests(
3775            &self,
3776            _: &[TransactionDigest],
3777        ) -> Vec<Option<TransactionEffectsDigest>> {
3778            unimplemented!()
3779        }
3780
3781        fn multi_get_effects(
3782            &self,
3783            _: &[TransactionEffectsDigest],
3784        ) -> Vec<Option<TransactionEffects>> {
3785            unimplemented!()
3786        }
3787
3788        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3789            unimplemented!()
3790        }
3791
3792        fn get_mysticeti_fastpath_outputs(
3793            &self,
3794            _: &TransactionDigest,
3795        ) -> Option<Arc<TransactionOutputs>> {
3796            unimplemented!()
3797        }
3798
3799        fn notify_read_fastpath_transaction_outputs<'a>(
3800            &'a self,
3801            _: &'a [TransactionDigest],
3802        ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3803            unimplemented!()
3804        }
3805
3806        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3807            unimplemented!()
3808        }
3809
3810        fn get_unchanged_loaded_runtime_objects(
3811            &self,
3812            _digest: &TransactionDigest,
3813        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3814            unimplemented!()
3815        }
3816    }
3817
3818    #[async_trait::async_trait]
3819    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3820        async fn checkpoint_created(
3821            &self,
3822            summary: &CheckpointSummary,
3823            contents: &CheckpointContents,
3824            _epoch_store: &Arc<AuthorityPerEpochStore>,
3825            _checkpoint_store: &Arc<CheckpointStore>,
3826        ) -> SuiResult {
3827            self.try_send((contents.clone(), summary.clone())).unwrap();
3828            Ok(())
3829        }
3830    }
3831
3832    #[async_trait::async_trait]
3833    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3834        async fn certified_checkpoint_created(
3835            &self,
3836            summary: &CertifiedCheckpointSummary,
3837        ) -> SuiResult {
3838            self.try_send(summary.clone()).unwrap();
3839            Ok(())
3840        }
3841    }
3842
3843    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3844        PendingCheckpoint {
3845            roots: t
3846                .into_iter()
3847                .map(|t| TransactionKey::Digest(d(t)))
3848                .collect(),
3849            details: PendingCheckpointInfo {
3850                timestamp_ms,
3851                last_of_epoch: false,
3852                checkpoint_height: i,
3853                consensus_commit_ref: CommitRef::default(),
3854                rejected_transactions_digest: Digest::default(),
3855            },
3856        }
3857    }
3858
3859    fn d(i: u8) -> TransactionDigest {
3860        let mut bytes: [u8; 32] = Default::default();
3861        bytes[0] = i;
3862        TransactionDigest::new(bytes)
3863    }
3864
3865    fn e(
3866        transaction_digest: TransactionDigest,
3867        dependencies: Vec<TransactionDigest>,
3868        gas_used: GasCostSummary,
3869    ) -> TransactionEffects {
3870        let mut effects = TransactionEffects::default();
3871        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3872        *effects.dependencies_mut_for_testing() = dependencies;
3873        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3874        effects
3875    }
3876
3877    fn commit_cert_for_test(
3878        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3879        state: Arc<AuthorityState>,
3880        digest: TransactionDigest,
3881        dependencies: Vec<TransactionDigest>,
3882        gas_used: GasCostSummary,
3883    ) {
3884        let epoch_store = state.epoch_store_for_testing();
3885        let effects = e(digest, dependencies, gas_used);
3886        store.insert(digest, effects.clone());
3887        epoch_store.insert_executed_in_epoch(&digest);
3888    }
3889}