sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::CertifiedCheckpointOutput;
15pub use crate::checkpoints::checkpoint_output::{
16    CheckpointOutput, LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::global_state_hasher::GlobalStateHasher;
23use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
24use consensus_core::CommitRef;
25use diffy::create_patch;
26use itertools::Itertools;
27use mysten_common::ZipDebugEqIteratorExt;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use parking_lot::Mutex;
33use pin_project_lite::pin_project;
34use serde::{Deserialize, Serialize};
35use sui_macros::fail_point_arg;
36use sui_network::default_mysten_network_config;
37use sui_types::accumulator_metadata;
38use sui_types::base_types::{ConciseableName, SequenceNumber};
39use sui_types::execution::ExecutionTimeObservationKey;
40use sui_types::messages_checkpoint::{
41    CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
42};
43use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
44use tokio::sync::{mpsc, watch};
45use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
46
47use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
48use crate::authority::authority_store_pruner::PrunerWatermarks;
49use crate::consensus_handler::SequencedConsensusTransactionKey;
50use rand::seq::SliceRandom;
51use std::collections::{BTreeMap, HashMap, HashSet};
52use std::fs::File;
53use std::future::Future;
54use std::io::Write;
55use std::path::Path;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::sync::Weak;
59use std::task::{Context, Poll};
60use std::time::{Duration, SystemTime};
61use sui_protocol_config::ProtocolVersion;
62use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
63use sui_types::committee::StakeUnit;
64use sui_types::crypto::AuthorityStrongQuorumSignInfo;
65use sui_types::digests::{
66    CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
67};
68use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
69use sui_types::error::{SuiErrorKind, SuiResult};
70use sui_types::gas::GasCostSummary;
71use sui_types::message_envelope::Message;
72use sui_types::messages_checkpoint::{
73    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
74    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
75    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
76    VerifiedCheckpointContents,
77};
78use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
79use sui_types::messages_consensus::ConsensusTransactionKey;
80use sui_types::signature::GenericSignature;
81use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
82use sui_types::transaction::{
83    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
84};
85use tokio::sync::Notify;
86use tracing::{debug, error, info, instrument, trace, warn};
87use typed_store::DBMapUtils;
88use typed_store::Map;
89use typed_store::{
90    TypedStoreError,
91    rocks::{DBMap, MetricConf},
92};
93
94const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
95
96pub type CheckpointHeight = u64;
97
98pub struct EpochStats {
99    pub checkpoint_count: u64,
100    pub transaction_count: u64,
101    pub total_gas_reward: u64,
102}
103
104#[derive(Clone, Debug)]
105pub struct PendingCheckpointInfo {
106    pub timestamp_ms: CheckpointTimestamp,
107    pub last_of_epoch: bool,
108    pub checkpoint_height: CheckpointHeight,
109    // Consensus commit ref and rejected transactions digest which corresponds to this checkpoint.
110    pub consensus_commit_ref: CommitRef,
111    pub rejected_transactions_digest: Digest,
112    // Pre-assigned checkpoint sequence number from consensus handler.
113    pub checkpoint_seq: CheckpointSequenceNumber,
114}
115
116#[derive(Clone, Debug, Default)]
117pub struct CheckpointRoots {
118    pub tx_roots: Vec<TransactionKey>,
119    pub settlement_root: Option<TransactionKey>,
120    pub height: CheckpointHeight,
121}
122
123/// Consensus commits are merged and split into PendingCheckpoints in ConsensusHandler.
124/// Each CheckpointRoots represents a group of transactions settled together.
125#[derive(Clone, Debug)]
126pub struct PendingCheckpoint {
127    pub roots: Vec<CheckpointRoots>,
128    pub details: PendingCheckpointInfo,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
132pub struct BuilderCheckpointSummary {
133    pub summary: CheckpointSummary,
134    // Height at which this checkpoint summary was built. None for genesis checkpoint
135    pub checkpoint_height: Option<CheckpointHeight>,
136    // Always 0: each height now maps to exactly one checkpoint. Kept for DB format
137    // compatibility; old rows may contain nonzero values from builder-side splitting.
138    pub position_in_commit: usize,
139}
140
141#[derive(DBMapUtils)]
142#[cfg_attr(tidehunter, tidehunter)]
143pub struct CheckpointStoreTables {
144    /// Maps checkpoint contents digest to checkpoint contents
145    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
146
147    /// Maps checkpoint contents digest to checkpoint sequence number
148    pub(crate) checkpoint_sequence_by_contents_digest:
149        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
150
151    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
152    /// efficient reads of full checkpoints. Entries from this table are deleted after state
153    /// accumulation has completed.
154    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
155    // TODO: Once the switch to `full_checkpoint_content_v2` is fully active on mainnet,
156    // deprecate this table (and remove when possible).
157    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
158
159    /// Stores certified checkpoints
160    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
161    /// Map from checkpoint digest to certified checkpoint
162    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
163
164    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
165    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
166    /// certified checkpoint.
167    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
168
169    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
170    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
171
172    /// Watermarks used to determine the highest verified, fully synced, and
173    /// fully executed checkpoints
174    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
175
176    /// Stores transaction fork detection information
177    pub(crate) transaction_fork_detected: DBMap<
178        u8,
179        (
180            TransactionDigest,
181            TransactionEffectsDigest,
182            TransactionEffectsDigest,
183        ),
184    >,
185    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
186    full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
187}
188
189fn full_checkpoint_content_table_default_config() -> DBOptions {
190    DBOptions {
191        options: default_db_options().options,
192        // We have seen potential data corruption issues in this table after forced shutdowns
193        // so we enable value hash logging to help with debugging.
194        // TODO: remove this once we have a better understanding of the root cause.
195        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
196    }
197}
198
199impl CheckpointStoreTables {
200    #[cfg(not(tidehunter))]
201    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
202        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
203    }
204
205    #[cfg(tidehunter)]
206    pub fn new(
207        path: &Path,
208        metric_name: &'static str,
209        pruner_watermarks: Arc<PrunerWatermarks>,
210    ) -> Self {
211        tracing::warn!("Checkpoint DB using tidehunter");
212        use crate::authority::authority_store_pruner::apply_relocation_filter;
213        use typed_store::tidehunter_util::{
214            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
215            default_max_dirty_keys, default_mutex_count, default_value_cache_size,
216        };
217        let mutexes = default_mutex_count();
218        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
219        let override_dirty_keys_config = KeySpaceConfig::new()
220            .with_max_dirty_keys(16 * default_max_dirty_keys())
221            .with_value_cache_size(default_value_cache_size());
222        let config_u64 = ThConfig::new_with_config(
223            8,
224            mutexes,
225            u64_sequence_key,
226            override_dirty_keys_config.clone(),
227        );
228        let digest_config = ThConfig::new_with_rm_prefix(
229            32,
230            mutexes,
231            KeyType::uniform(default_cells_per_mutex()),
232            KeySpaceConfig::default(),
233            vec![0, 0, 0, 0, 0, 0, 0, 32],
234        );
235        let watermarks_config = KeySpaceConfig::new()
236            .with_value_cache_size(10)
237            .disable_unload();
238        let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
239        let configs = vec![
240            (
241                "checkpoint_content",
242                digest_config.clone().with_config(
243                    KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
244                ),
245            ),
246            (
247                "checkpoint_sequence_by_contents_digest",
248                digest_config.clone().with_config(apply_relocation_filter(
249                    KeySpaceConfig::default(),
250                    pruner_watermarks.checkpoint_id.clone(),
251                    |sequence_number: CheckpointSequenceNumber| sequence_number,
252                    false,
253                )),
254            ),
255            (
256                "full_checkpoint_content",
257                config_u64.clone().with_config(apply_relocation_filter(
258                    override_dirty_keys_config.clone(),
259                    pruner_watermarks.checkpoint_id.clone(),
260                    |sequence_number: CheckpointSequenceNumber| sequence_number,
261                    true,
262                )),
263            ),
264            ("certified_checkpoints", config_u64.clone()),
265            (
266                "checkpoint_by_digest",
267                digest_config.clone().with_config(apply_relocation_filter(
268                    lru_config,
269                    pruner_watermarks.epoch_id.clone(),
270                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
271                    false,
272                )),
273            ),
274            (
275                "locally_computed_checkpoints",
276                config_u64.clone().with_config(apply_relocation_filter(
277                    override_dirty_keys_config.clone(),
278                    pruner_watermarks.checkpoint_id.clone(),
279                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
280                    true,
281                )),
282            ),
283            ("epoch_last_checkpoint_map", config_u64.clone()),
284            (
285                "watermarks",
286                ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
287            ),
288            (
289                "transaction_fork_detected",
290                ThConfig::new_with_config(
291                    1,
292                    1,
293                    KeyType::uniform(1),
294                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
295                ),
296            ),
297            (
298                "full_checkpoint_content_v2",
299                config_u64.clone().with_config(apply_relocation_filter(
300                    override_dirty_keys_config.clone(),
301                    pruner_watermarks.checkpoint_id.clone(),
302                    |sequence_number: CheckpointSequenceNumber| sequence_number,
303                    true,
304                )),
305            ),
306        ];
307        Self::open_tables_read_write(
308            path.to_path_buf(),
309            MetricConf::new(metric_name),
310            configs
311                .into_iter()
312                .map(|(cf, config)| (cf.to_string(), config))
313                .collect(),
314        )
315    }
316
317    #[cfg(not(tidehunter))]
318    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
319        Self::get_read_only_handle(
320            path.to_path_buf(),
321            None,
322            None,
323            MetricConf::new("checkpoint_readonly"),
324        )
325    }
326
327    #[cfg(tidehunter)]
328    pub fn open_readonly(path: &Path) -> Self {
329        Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
330    }
331}
332
333pub struct CheckpointStore {
334    pub(crate) tables: CheckpointStoreTables,
335    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
336    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
337}
338
339impl CheckpointStore {
340    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
341        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
342        Arc::new(Self {
343            tables,
344            synced_checkpoint_notify_read: NotifyRead::new(),
345            executed_checkpoint_notify_read: NotifyRead::new(),
346        })
347    }
348
349    pub fn new_for_tests() -> Arc<Self> {
350        let ckpt_dir = mysten_common::tempdir().unwrap();
351        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
352    }
353
354    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
355        let tables = CheckpointStoreTables::new(
356            path,
357            "db_checkpoint",
358            Arc::new(PrunerWatermarks::default()),
359        );
360        Arc::new(Self {
361            tables,
362            synced_checkpoint_notify_read: NotifyRead::new(),
363            executed_checkpoint_notify_read: NotifyRead::new(),
364        })
365    }
366
367    #[cfg(not(tidehunter))]
368    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
369        CheckpointStoreTables::open_readonly(path)
370    }
371
372    #[cfg(tidehunter)]
373    pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
374        CheckpointStoreTables::open_readonly(path)
375    }
376
377    #[instrument(level = "info", skip_all)]
378    pub fn insert_genesis_checkpoint(
379        &self,
380        checkpoint: VerifiedCheckpoint,
381        contents: CheckpointContents,
382        epoch_store: &AuthorityPerEpochStore,
383    ) {
384        assert_eq!(
385            checkpoint.epoch(),
386            0,
387            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
388        );
389        assert_eq!(
390            *checkpoint.sequence_number(),
391            0,
392            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
393        );
394
395        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
396        match self.get_checkpoint_by_sequence_number(0).unwrap() {
397            Some(existing_checkpoint) => {
398                assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
399            }
400            None => {
401                if epoch_store.epoch() == checkpoint.epoch {
402                    epoch_store
403                        .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
404                        .unwrap();
405                } else {
406                    debug!(
407                        validator_epoch =% epoch_store.epoch(),
408                        genesis_epoch =% checkpoint.epoch(),
409                        "Not inserting checkpoint builder data for genesis checkpoint",
410                    );
411                }
412                self.insert_checkpoint_contents(contents).unwrap();
413                self.insert_verified_checkpoint(&checkpoint).unwrap();
414                self.update_highest_synced_checkpoint(&checkpoint).unwrap();
415            }
416        }
417    }
418
419    pub fn get_checkpoint_by_digest(
420        &self,
421        digest: &CheckpointDigest,
422    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
423        self.tables
424            .checkpoint_by_digest
425            .get(digest)
426            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
427    }
428
429    pub fn get_checkpoint_by_sequence_number(
430        &self,
431        sequence_number: CheckpointSequenceNumber,
432    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433        self.tables
434            .certified_checkpoints
435            .get(&sequence_number)
436            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
437    }
438
439    pub fn get_locally_computed_checkpoint(
440        &self,
441        sequence_number: CheckpointSequenceNumber,
442    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
443        self.tables
444            .locally_computed_checkpoints
445            .get(&sequence_number)
446    }
447
448    pub fn multi_get_locally_computed_checkpoints(
449        &self,
450        sequence_numbers: &[CheckpointSequenceNumber],
451    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
452        let checkpoints = self
453            .tables
454            .locally_computed_checkpoints
455            .multi_get(sequence_numbers)?;
456
457        Ok(checkpoints)
458    }
459
460    pub fn get_sequence_number_by_contents_digest(
461        &self,
462        digest: &CheckpointContentsDigest,
463    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
464        self.tables
465            .checkpoint_sequence_by_contents_digest
466            .get(digest)
467    }
468
469    pub fn delete_contents_digest_sequence_number_mapping(
470        &self,
471        digest: &CheckpointContentsDigest,
472    ) -> Result<(), TypedStoreError> {
473        self.tables
474            .checkpoint_sequence_by_contents_digest
475            .remove(digest)
476    }
477
478    pub fn get_latest_certified_checkpoint(
479        &self,
480    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
481        Ok(self
482            .tables
483            .certified_checkpoints
484            .reversed_safe_iter_with_bounds(None, None)?
485            .next()
486            .transpose()?
487            .map(|(_, v)| v.into()))
488    }
489
490    pub fn get_latest_locally_computed_checkpoint(
491        &self,
492    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
493        Ok(self
494            .tables
495            .locally_computed_checkpoints
496            .reversed_safe_iter_with_bounds(None, None)?
497            .next()
498            .transpose()?
499            .map(|(_, v)| v))
500    }
501
502    pub fn multi_get_checkpoint_by_sequence_number(
503        &self,
504        sequence_numbers: &[CheckpointSequenceNumber],
505    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
506        let checkpoints = self
507            .tables
508            .certified_checkpoints
509            .multi_get(sequence_numbers)?
510            .into_iter()
511            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
512            .collect();
513
514        Ok(checkpoints)
515    }
516
517    pub fn multi_get_checkpoint_content(
518        &self,
519        contents_digest: &[CheckpointContentsDigest],
520    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
521        self.tables.checkpoint_content.multi_get(contents_digest)
522    }
523
524    pub fn get_highest_verified_checkpoint(
525        &self,
526    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
527        let highest_verified = if let Some(highest_verified) = self
528            .tables
529            .watermarks
530            .get(&CheckpointWatermark::HighestVerified)?
531        {
532            highest_verified
533        } else {
534            return Ok(None);
535        };
536        self.get_checkpoint_by_digest(&highest_verified.1)
537    }
538
539    pub fn get_highest_synced_checkpoint(
540        &self,
541    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542        let highest_synced = if let Some(highest_synced) = self
543            .tables
544            .watermarks
545            .get(&CheckpointWatermark::HighestSynced)?
546        {
547            highest_synced
548        } else {
549            return Ok(None);
550        };
551        self.get_checkpoint_by_digest(&highest_synced.1)
552    }
553
554    pub fn get_highest_synced_checkpoint_seq_number(
555        &self,
556    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
557        if let Some(highest_synced) = self
558            .tables
559            .watermarks
560            .get(&CheckpointWatermark::HighestSynced)?
561        {
562            Ok(Some(highest_synced.0))
563        } else {
564            Ok(None)
565        }
566    }
567
568    pub fn get_highest_executed_checkpoint_seq_number(
569        &self,
570    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571        if let Some(highest_executed) = self
572            .tables
573            .watermarks
574            .get(&CheckpointWatermark::HighestExecuted)?
575        {
576            Ok(Some(highest_executed.0))
577        } else {
578            Ok(None)
579        }
580    }
581
582    pub fn get_highest_executed_checkpoint(
583        &self,
584    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
585        let highest_executed = if let Some(highest_executed) = self
586            .tables
587            .watermarks
588            .get(&CheckpointWatermark::HighestExecuted)?
589        {
590            highest_executed
591        } else {
592            return Ok(None);
593        };
594        self.get_checkpoint_by_digest(&highest_executed.1)
595    }
596
597    pub fn get_highest_pruned_checkpoint_seq_number(
598        &self,
599    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
600        self.tables
601            .watermarks
602            .get(&CheckpointWatermark::HighestPruned)
603            .map(|watermark| watermark.map(|w| w.0))
604    }
605
606    pub fn get_checkpoint_contents(
607        &self,
608        digest: &CheckpointContentsDigest,
609    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
610        self.tables.checkpoint_content.get(digest)
611    }
612
613    pub fn get_full_checkpoint_contents_by_sequence_number(
614        &self,
615        seq: CheckpointSequenceNumber,
616    ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
617        self.tables.full_checkpoint_content_v2.get(&seq)
618    }
619
620    fn prune_local_summaries(&self) -> SuiResult {
621        if let Some((last_local_summary, _)) = self
622            .tables
623            .locally_computed_checkpoints
624            .reversed_safe_iter_with_bounds(None, None)?
625            .next()
626            .transpose()?
627        {
628            let mut batch = self.tables.locally_computed_checkpoints.batch();
629            batch.schedule_delete_range(
630                &self.tables.locally_computed_checkpoints,
631                &0,
632                &last_local_summary,
633            )?;
634            batch.write()?;
635            info!("Pruned local summaries up to {:?}", last_local_summary);
636        }
637        Ok(())
638    }
639
640    pub fn clear_locally_computed_checkpoints_from(
641        &self,
642        from_seq: CheckpointSequenceNumber,
643    ) -> SuiResult {
644        let keys: Vec<_> = self
645            .tables
646            .locally_computed_checkpoints
647            .safe_iter_with_bounds(Some(from_seq), None)
648            .map(|r| r.map(|(k, _)| k))
649            .collect::<Result<_, _>>()?;
650        if let Some(&last_local_summary) = keys.last() {
651            let mut batch = self.tables.locally_computed_checkpoints.batch();
652            batch
653                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
654                .expect("Failed to delete locally computed checkpoints");
655            batch
656                .write()
657                .expect("Failed to delete locally computed checkpoints");
658            warn!(
659                from_seq,
660                last_local_summary,
661                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
662                from_seq,
663                last_local_summary
664            );
665        }
666        Ok(())
667    }
668
669    fn check_for_checkpoint_fork(
670        &self,
671        local_checkpoint: &CheckpointSummary,
672        verified_checkpoint: &VerifiedCheckpoint,
673    ) {
674        if local_checkpoint != verified_checkpoint.data() {
675            let verified_contents = self
676                .get_checkpoint_contents(&verified_checkpoint.content_digest)
677                .map(|opt_contents| {
678                    opt_contents
679                        .map(|contents| format!("{:?}", contents))
680                        .unwrap_or_else(|| {
681                            format!(
682                                "Verified checkpoint contents not found, digest: {:?}",
683                                verified_checkpoint.content_digest,
684                            )
685                        })
686                })
687                .map_err(|e| {
688                    format!(
689                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
690                        verified_checkpoint.content_digest, e
691                    )
692                })
693                .unwrap_or_else(|err_msg| err_msg);
694
695            let local_contents = self
696                .get_checkpoint_contents(&local_checkpoint.content_digest)
697                .map(|opt_contents| {
698                    opt_contents
699                        .map(|contents| format!("{:?}", contents))
700                        .unwrap_or_else(|| {
701                            format!(
702                                "Local checkpoint contents not found, digest: {:?}",
703                                local_checkpoint.content_digest
704                            )
705                        })
706                })
707                .map_err(|e| {
708                    format!(
709                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
710                        local_checkpoint.content_digest, e
711                    )
712                })
713                .unwrap_or_else(|err_msg| err_msg);
714
715            // checkpoint contents may be too large for panic message.
716            error!(
717                verified_checkpoint = ?verified_checkpoint.data(),
718                ?verified_contents,
719                ?local_checkpoint,
720                ?local_contents,
721                "Local checkpoint fork detected!",
722            );
723
724            // Record the fork in the database before crashing
725            if let Err(e) = self.record_checkpoint_fork_detected(
726                *local_checkpoint.sequence_number(),
727                local_checkpoint.digest(),
728            ) {
729                error!("Failed to record checkpoint fork in database: {:?}", e);
730            }
731
732            fail_point_arg!(
733                "kill_checkpoint_fork_node",
734                |checkpoint_overrides: std::sync::Arc<
735                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
736                >| {
737                    #[cfg(msim)]
738                    {
739                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
740                            overrides.insert(
741                                local_checkpoint.sequence_number,
742                                verified_checkpoint.digest().to_string(),
743                            );
744                        }
745                        tracing::error!(
746                            fatal = true,
747                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
748                            local_checkpoint.sequence_number(),
749                            verified_checkpoint.digest()
750                        );
751                        sui_simulator::task::shutdown_current_node();
752                    }
753                }
754            );
755
756            fatal!(
757                "Local checkpoint fork detected for sequence number: {}",
758                local_checkpoint.sequence_number()
759            );
760        }
761    }
762
763    // Called by consensus (ConsensusAggregator).
764    // Different from `insert_verified_checkpoint`, it does not touch
765    // the highest_verified_checkpoint watermark such that state sync
766    // will have a chance to process this checkpoint and perform some
767    // state-sync only things.
768    pub fn insert_certified_checkpoint(
769        &self,
770        checkpoint: &VerifiedCheckpoint,
771    ) -> Result<(), TypedStoreError> {
772        debug!(
773            checkpoint_seq = checkpoint.sequence_number(),
774            "Inserting certified checkpoint",
775        );
776        let mut batch = self.tables.certified_checkpoints.batch();
777        batch
778            .insert_batch(
779                &self.tables.certified_checkpoints,
780                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
781            )?
782            .insert_batch(
783                &self.tables.checkpoint_by_digest,
784                [(checkpoint.digest(), checkpoint.serializable_ref())],
785            )?;
786        if checkpoint.next_epoch_committee().is_some() {
787            batch.insert_batch(
788                &self.tables.epoch_last_checkpoint_map,
789                [(&checkpoint.epoch(), checkpoint.sequence_number())],
790            )?;
791        }
792        batch.write()?;
793
794        if let Some(local_checkpoint) = self
795            .tables
796            .locally_computed_checkpoints
797            .get(checkpoint.sequence_number())?
798        {
799            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
800        }
801
802        Ok(())
803    }
804
805    // Called by state sync, apart from inserting the checkpoint and updating
806    // related tables, it also bumps the highest_verified_checkpoint watermark.
807    #[instrument(level = "debug", skip_all)]
808    pub fn insert_verified_checkpoint(
809        &self,
810        checkpoint: &VerifiedCheckpoint,
811    ) -> Result<(), TypedStoreError> {
812        self.insert_certified_checkpoint(checkpoint)?;
813        self.update_highest_verified_checkpoint(checkpoint)
814    }
815
816    pub fn update_highest_verified_checkpoint(
817        &self,
818        checkpoint: &VerifiedCheckpoint,
819    ) -> Result<(), TypedStoreError> {
820        if Some(*checkpoint.sequence_number())
821            > self
822                .get_highest_verified_checkpoint()?
823                .map(|x| *x.sequence_number())
824        {
825            debug!(
826                checkpoint_seq = checkpoint.sequence_number(),
827                "Updating highest verified checkpoint",
828            );
829            self.tables.watermarks.insert(
830                &CheckpointWatermark::HighestVerified,
831                &(*checkpoint.sequence_number(), *checkpoint.digest()),
832            )?;
833        }
834
835        Ok(())
836    }
837
838    pub fn update_highest_synced_checkpoint(
839        &self,
840        checkpoint: &VerifiedCheckpoint,
841    ) -> Result<(), TypedStoreError> {
842        let seq = *checkpoint.sequence_number();
843        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
844        self.tables.watermarks.insert(
845            &CheckpointWatermark::HighestSynced,
846            &(seq, *checkpoint.digest()),
847        )?;
848        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
849        Ok(())
850    }
851
852    async fn notify_read_checkpoint_watermark<F>(
853        &self,
854        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
855        seq: CheckpointSequenceNumber,
856        get_watermark: F,
857    ) -> VerifiedCheckpoint
858    where
859        F: Fn() -> Option<CheckpointSequenceNumber>,
860    {
861        notify_read
862            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
863                let seq = seqs[0];
864                let Some(highest) = get_watermark() else {
865                    return vec![None];
866                };
867                if highest < seq {
868                    return vec![None];
869                }
870                let checkpoint = self
871                    .get_checkpoint_by_sequence_number(seq)
872                    .expect("db error")
873                    .expect("checkpoint not found");
874                vec![Some(checkpoint)]
875            })
876            .await
877            .into_iter()
878            .next()
879            .unwrap()
880    }
881
882    pub async fn notify_read_synced_checkpoint(
883        &self,
884        seq: CheckpointSequenceNumber,
885    ) -> VerifiedCheckpoint {
886        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
887            self.get_highest_synced_checkpoint_seq_number()
888                .expect("db error")
889        })
890        .await
891    }
892
893    pub async fn notify_read_executed_checkpoint(
894        &self,
895        seq: CheckpointSequenceNumber,
896    ) -> VerifiedCheckpoint {
897        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
898            self.get_highest_executed_checkpoint_seq_number()
899                .expect("db error")
900        })
901        .await
902    }
903
904    pub fn update_highest_executed_checkpoint(
905        &self,
906        checkpoint: &VerifiedCheckpoint,
907    ) -> Result<(), TypedStoreError> {
908        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
909            if seq_number >= *checkpoint.sequence_number() {
910                return Ok(());
911            }
912            assert_eq!(
913                seq_number + 1,
914                *checkpoint.sequence_number(),
915                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
916                checkpoint.sequence_number(),
917                seq_number
918            );
919        }
920        let seq = *checkpoint.sequence_number();
921        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
922        self.tables.watermarks.insert(
923            &CheckpointWatermark::HighestExecuted,
924            &(seq, *checkpoint.digest()),
925        )?;
926        self.executed_checkpoint_notify_read
927            .notify(&seq, checkpoint);
928        Ok(())
929    }
930
931    pub fn update_highest_pruned_checkpoint(
932        &self,
933        checkpoint: &VerifiedCheckpoint,
934    ) -> Result<(), TypedStoreError> {
935        self.tables.watermarks.insert(
936            &CheckpointWatermark::HighestPruned,
937            &(*checkpoint.sequence_number(), *checkpoint.digest()),
938        )
939    }
940
941    /// Sets highest executed checkpoint to any value.
942    ///
943    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
944    /// It should only be used in one-off cases or tests after fully understanding the risk.
945    pub fn set_highest_executed_checkpoint_subtle(
946        &self,
947        checkpoint: &VerifiedCheckpoint,
948    ) -> Result<(), TypedStoreError> {
949        self.tables.watermarks.insert(
950            &CheckpointWatermark::HighestExecuted,
951            &(*checkpoint.sequence_number(), *checkpoint.digest()),
952        )
953    }
954
955    pub fn insert_checkpoint_contents(
956        &self,
957        contents: CheckpointContents,
958    ) -> Result<(), TypedStoreError> {
959        debug!(
960            checkpoint_seq = ?contents.digest(),
961            "Inserting checkpoint contents",
962        );
963        self.tables
964            .checkpoint_content
965            .insert(contents.digest(), &contents)
966    }
967
968    pub fn insert_verified_checkpoint_contents(
969        &self,
970        checkpoint: &VerifiedCheckpoint,
971        full_contents: VerifiedCheckpointContents,
972    ) -> Result<(), TypedStoreError> {
973        let mut batch = self.tables.full_checkpoint_content_v2.batch();
974        batch.insert_batch(
975            &self.tables.checkpoint_sequence_by_contents_digest,
976            [(&checkpoint.content_digest, checkpoint.sequence_number())],
977        )?;
978        let full_contents = full_contents.into_inner();
979        batch.insert_batch(
980            &self.tables.full_checkpoint_content_v2,
981            [(checkpoint.sequence_number(), &full_contents)],
982        )?;
983
984        let contents = full_contents.into_checkpoint_contents();
985        assert_eq!(&checkpoint.content_digest, contents.digest());
986
987        batch.insert_batch(
988            &self.tables.checkpoint_content,
989            [(contents.digest(), &contents)],
990        )?;
991
992        batch.write()
993    }
994
995    pub fn delete_full_checkpoint_contents(
996        &self,
997        seq: CheckpointSequenceNumber,
998    ) -> Result<(), TypedStoreError> {
999        self.tables.full_checkpoint_content.remove(&seq)?;
1000        self.tables.full_checkpoint_content_v2.remove(&seq)
1001    }
1002
1003    pub fn get_epoch_last_checkpoint(
1004        &self,
1005        epoch_id: EpochId,
1006    ) -> SuiResult<Option<VerifiedCheckpoint>> {
1007        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1008        let checkpoint = match seq {
1009            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1010            None => None,
1011        };
1012        Ok(checkpoint)
1013    }
1014
1015    pub fn get_epoch_last_checkpoint_seq_number(
1016        &self,
1017        epoch_id: EpochId,
1018    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1019        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1020        Ok(seq)
1021    }
1022
1023    /// Returns the sequence number of the first checkpoint in the given epoch.
1024    /// For epoch 0 this is always 0; for epoch N > 0 it is last_checkpoint(N-1) + 1.
1025    pub fn get_epoch_first_checkpoint_seq(
1026        &self,
1027        epoch: EpochId,
1028    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1029        if epoch == 0 {
1030            return Ok(Some(0));
1031        }
1032        Ok(self
1033            .tables
1034            .epoch_last_checkpoint_map
1035            .get(&(epoch - 1))?
1036            .map(|s| s + 1))
1037    }
1038
1039    /// Iterate certified checkpoints starting at `start` (inclusive), up to `limit`.
1040    pub fn list_checkpoints_from_seq(
1041        &self,
1042        start: Option<CheckpointSequenceNumber>,
1043        limit: usize,
1044    ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1045        self.tables
1046            .certified_checkpoints
1047            .safe_iter_with_bounds(start, None)
1048            .take(limit)
1049            .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1050            .collect()
1051    }
1052
1053    /// Iterate epoch→last-checkpoint-seq entries starting at `start` epoch, up to `limit`.
1054    pub fn list_epoch_last_checkpoints(
1055        &self,
1056        start: Option<EpochId>,
1057        limit: usize,
1058    ) -> Result<Vec<(EpochId, CheckpointSequenceNumber)>, TypedStoreError> {
1059        self.tables
1060            .epoch_last_checkpoint_map
1061            .safe_iter_with_bounds(start, None)
1062            .take(limit)
1063            .collect()
1064    }
1065
1066    /// Iterate checkpoint digests from `checkpoint_by_digest`, starting at `start`, up to `limit`.
1067    pub fn list_checkpoint_digests(
1068        &self,
1069        start: Option<CheckpointDigest>,
1070        limit: usize,
1071    ) -> Result<Vec<CheckpointDigest>, TypedStoreError> {
1072        self.tables
1073            .checkpoint_by_digest
1074            .safe_iter_with_bounds(start, None)
1075            .take(limit)
1076            .map(|r| r.map(|(d, _)| d))
1077            .collect()
1078    }
1079
1080    /// Iterate checkpoint contents digests, starting at `start`, up to `limit`.
1081    pub fn list_checkpoint_contents_digests(
1082        &self,
1083        start: Option<CheckpointContentsDigest>,
1084        limit: usize,
1085    ) -> Result<Vec<CheckpointContentsDigest>, TypedStoreError> {
1086        self.tables
1087            .checkpoint_content
1088            .safe_iter_with_bounds(start, None)
1089            .take(limit)
1090            .map(|r| r.map(|(d, _)| d))
1091            .collect()
1092    }
1093
1094    /// Iterate certified checkpoints belonging to `epoch`, starting at `start_seq`, up to `limit`.
1095    pub fn list_epoch_checkpoints(
1096        &self,
1097        epoch: EpochId,
1098        start_seq: Option<CheckpointSequenceNumber>,
1099        limit: usize,
1100    ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1101        let Some(last_seq) = self.tables.epoch_last_checkpoint_map.get(&epoch)? else {
1102            return Ok(vec![]);
1103        };
1104        // Compute first seq directly to stay within TypedStoreError.
1105        let first_seq = if epoch == 0 {
1106            0
1107        } else {
1108            self.tables
1109                .epoch_last_checkpoint_map
1110                .get(&(epoch - 1))?
1111                .map(|s| s + 1)
1112                .unwrap_or(0)
1113        };
1114        let start = start_seq.map(|s| s.max(first_seq)).unwrap_or(first_seq);
1115        self.tables
1116            .certified_checkpoints
1117            .safe_iter_with_bounds(Some(start), Some(last_seq + 1))
1118            .take(limit)
1119            .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1120            .collect()
1121    }
1122
1123    pub fn insert_epoch_last_checkpoint(
1124        &self,
1125        epoch_id: EpochId,
1126        checkpoint: &VerifiedCheckpoint,
1127    ) -> SuiResult {
1128        self.tables
1129            .epoch_last_checkpoint_map
1130            .insert(&epoch_id, checkpoint.sequence_number())?;
1131        Ok(())
1132    }
1133
1134    pub fn get_epoch_state_commitments(
1135        &self,
1136        epoch: EpochId,
1137    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1138        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1139            checkpoint
1140                .end_of_epoch_data
1141                .as_ref()
1142                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1143                .epoch_commitments
1144                .clone()
1145        });
1146        Ok(commitments)
1147    }
1148
1149    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1150    pub fn get_epoch_stats(
1151        &self,
1152        epoch: EpochId,
1153        last_checkpoint: &CheckpointSummary,
1154    ) -> Option<EpochStats> {
1155        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1156            (0, 0)
1157        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1158            (
1159                checkpoint.sequence_number + 1,
1160                checkpoint.network_total_transactions,
1161            )
1162        } else {
1163            return None;
1164        };
1165        Some(EpochStats {
1166            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1167            transaction_count: last_checkpoint.network_total_transactions
1168                - prev_epoch_network_transactions,
1169            total_gas_reward: last_checkpoint
1170                .epoch_rolling_gas_cost_summary
1171                .computation_cost,
1172        })
1173    }
1174
1175    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1176        // This checkpoints the entire db and not one column family
1177        self.tables
1178            .checkpoint_content
1179            .checkpoint_db(path)
1180            .map_err(Into::into)
1181    }
1182
1183    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1184        let mut wb = self.tables.watermarks.batch();
1185        wb.delete_batch(
1186            &self.tables.watermarks,
1187            std::iter::once(CheckpointWatermark::HighestExecuted),
1188        )?;
1189        wb.write()?;
1190        Ok(())
1191    }
1192
1193    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1194        self.delete_highest_executed_checkpoint_test_only()?;
1195        Ok(())
1196    }
1197
1198    pub fn record_checkpoint_fork_detected(
1199        &self,
1200        checkpoint_seq: CheckpointSequenceNumber,
1201        checkpoint_digest: CheckpointDigest,
1202    ) -> Result<(), TypedStoreError> {
1203        info!(
1204            checkpoint_seq = checkpoint_seq,
1205            checkpoint_digest = ?checkpoint_digest,
1206            "Recording checkpoint fork detection in database"
1207        );
1208        self.tables.watermarks.insert(
1209            &CheckpointWatermark::CheckpointForkDetected,
1210            &(checkpoint_seq, checkpoint_digest),
1211        )
1212    }
1213
1214    pub fn get_checkpoint_fork_detected(
1215        &self,
1216    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1217        self.tables
1218            .watermarks
1219            .get(&CheckpointWatermark::CheckpointForkDetected)
1220    }
1221
1222    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1223        self.tables
1224            .watermarks
1225            .remove(&CheckpointWatermark::CheckpointForkDetected)
1226    }
1227
1228    pub fn record_transaction_fork_detected(
1229        &self,
1230        tx_digest: TransactionDigest,
1231        expected_effects_digest: TransactionEffectsDigest,
1232        actual_effects_digest: TransactionEffectsDigest,
1233    ) -> Result<(), TypedStoreError> {
1234        info!(
1235            tx_digest = ?tx_digest,
1236            expected_effects_digest = ?expected_effects_digest,
1237            actual_effects_digest = ?actual_effects_digest,
1238            "Recording transaction fork detection in database"
1239        );
1240        self.tables.transaction_fork_detected.insert(
1241            &TRANSACTION_FORK_DETECTED_KEY,
1242            &(tx_digest, expected_effects_digest, actual_effects_digest),
1243        )
1244    }
1245
1246    pub fn get_transaction_fork_detected(
1247        &self,
1248    ) -> Result<
1249        Option<(
1250            TransactionDigest,
1251            TransactionEffectsDigest,
1252            TransactionEffectsDigest,
1253        )>,
1254        TypedStoreError,
1255    > {
1256        self.tables
1257            .transaction_fork_detected
1258            .get(&TRANSACTION_FORK_DETECTED_KEY)
1259    }
1260
1261    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1262        self.tables
1263            .transaction_fork_detected
1264            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1265    }
1266}
1267
1268#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1269pub enum CheckpointWatermark {
1270    HighestVerified,
1271    HighestSynced,
1272    HighestExecuted,
1273    HighestPruned,
1274    CheckpointForkDetected,
1275}
1276
1277struct CheckpointStateHasher {
1278    epoch_store: Arc<AuthorityPerEpochStore>,
1279    hasher: Weak<GlobalStateHasher>,
1280    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1281}
1282
1283impl CheckpointStateHasher {
1284    fn new(
1285        epoch_store: Arc<AuthorityPerEpochStore>,
1286        hasher: Weak<GlobalStateHasher>,
1287        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1288    ) -> Self {
1289        Self {
1290            epoch_store,
1291            hasher,
1292            receive_from_builder,
1293        }
1294    }
1295
1296    async fn run(self) {
1297        let Self {
1298            epoch_store,
1299            hasher,
1300            mut receive_from_builder,
1301        } = self;
1302        while let Some((seq, effects)) = receive_from_builder.recv().await {
1303            let Some(hasher) = hasher.upgrade() else {
1304                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1305                break;
1306            };
1307            hasher
1308                .accumulate_checkpoint(&effects, seq, &epoch_store)
1309                .expect("epoch ended while accumulating checkpoint");
1310        }
1311    }
1312}
1313
1314#[derive(Debug)]
1315pub enum CheckpointBuilderError {
1316    ChangeEpochTxAlreadyExecuted,
1317    SystemPackagesMissing,
1318    Retry(anyhow::Error),
1319}
1320
1321impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1322    for CheckpointBuilderError
1323{
1324    fn from(e: SuiError) -> Self {
1325        Self::Retry(e.into())
1326    }
1327}
1328
1329pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1330
1331pub struct CheckpointBuilder {
1332    state: Arc<AuthorityState>,
1333    store: Arc<CheckpointStore>,
1334    epoch_store: Arc<AuthorityPerEpochStore>,
1335    notify: Arc<Notify>,
1336    notify_aggregator: Arc<Notify>,
1337    last_built: watch::Sender<CheckpointSequenceNumber>,
1338    effects_store: Arc<dyn TransactionCacheRead>,
1339    global_state_hasher: Weak<GlobalStateHasher>,
1340    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1341    output: Box<dyn CheckpointOutput>,
1342    metrics: Arc<CheckpointMetrics>,
1343}
1344
1345pub struct CheckpointAggregator {
1346    store: Arc<CheckpointStore>,
1347    epoch_store: Arc<AuthorityPerEpochStore>,
1348    notify: Arc<Notify>,
1349    receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1350    pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1351    current: Option<CheckpointSignatureAggregator>,
1352    output: Box<dyn CertifiedCheckpointOutput>,
1353    state: Arc<AuthorityState>,
1354    metrics: Arc<CheckpointMetrics>,
1355}
1356
1357// This holds information to aggregate signatures for one checkpoint
1358pub struct CheckpointSignatureAggregator {
1359    summary: CheckpointSummary,
1360    digest: CheckpointDigest,
1361    /// Aggregates voting stake for each signed checkpoint proposal by authority
1362    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1363    store: Arc<CheckpointStore>,
1364    state: Arc<AuthorityState>,
1365    metrics: Arc<CheckpointMetrics>,
1366}
1367
1368impl CheckpointBuilder {
1369    fn new(
1370        state: Arc<AuthorityState>,
1371        store: Arc<CheckpointStore>,
1372        epoch_store: Arc<AuthorityPerEpochStore>,
1373        notify: Arc<Notify>,
1374        effects_store: Arc<dyn TransactionCacheRead>,
1375        // for synchronous accumulation of end-of-epoch checkpoint
1376        global_state_hasher: Weak<GlobalStateHasher>,
1377        // for asynchronous/concurrent accumulation of regular checkpoints
1378        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1379        output: Box<dyn CheckpointOutput>,
1380        notify_aggregator: Arc<Notify>,
1381        last_built: watch::Sender<CheckpointSequenceNumber>,
1382        metrics: Arc<CheckpointMetrics>,
1383    ) -> Self {
1384        Self {
1385            state,
1386            store,
1387            epoch_store,
1388            notify,
1389            effects_store,
1390            global_state_hasher,
1391            send_to_hasher,
1392            output,
1393            notify_aggregator,
1394            last_built,
1395            metrics,
1396        }
1397    }
1398
1399    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1400    /// commits that have been processed before the last restart, if consensus_replay_waiter
1401    /// is supplied. Then it starts building checkpoints in a loop.
1402    ///
1403    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1404    /// if slow recovery of previously built checkpoints is due to consensus replay or
1405    /// checkpoint building.
1406    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1407        if let Some(replay_waiter) = consensus_replay_waiter {
1408            info!("Waiting for consensus commits to replay ...");
1409            replay_waiter.wait_for_replay().await;
1410            info!("Consensus commits finished replaying");
1411        }
1412        info!("Starting CheckpointBuilder");
1413        loop {
1414            match self.maybe_build_checkpoints().await {
1415                Ok(()) => {}
1416                err @ Err(
1417                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1418                    | CheckpointBuilderError::SystemPackagesMissing,
1419                ) => {
1420                    info!("CheckpointBuilder stopping: {:?}", err);
1421                    return;
1422                }
1423                Err(CheckpointBuilderError::Retry(inner)) => {
1424                    let msg = format!("{:?}", inner);
1425                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1426                    tokio::time::sleep(Duration::from_secs(1)).await;
1427                    self.metrics.checkpoint_errors.inc();
1428                    continue;
1429                }
1430            }
1431
1432            self.notify.notified().await;
1433        }
1434    }
1435
1436    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1437        let _scope = monitored_scope("BuildCheckpoints");
1438
1439        // Collect info about the most recently built checkpoint.
1440        let last_height = self
1441            .epoch_store
1442            .last_built_checkpoint_builder_summary()
1443            .expect("epoch should not have ended")
1444            .and_then(|s| s.checkpoint_height);
1445
1446        for (height, pending) in self
1447            .epoch_store
1448            .get_pending_checkpoints(last_height)
1449            .expect("unexpected epoch store error")
1450        {
1451            debug!(checkpoint_commit_height = height, "Making checkpoint");
1452
1453            let seq = self.make_checkpoint(pending).await?;
1454
1455            self.last_built.send_if_modified(|cur| {
1456                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1457                if seq > *cur {
1458                    *cur = seq;
1459                    true
1460                } else {
1461                    false
1462                }
1463            });
1464
1465            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1466            // execution.
1467            tokio::task::yield_now().await;
1468        }
1469
1470        Ok(())
1471    }
1472
1473    #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1474    async fn make_checkpoint(
1475        &mut self,
1476        pending: PendingCheckpoint,
1477    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1478        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1479
1480        let details = pending.details.clone();
1481
1482        let highest_executed_sequence = self
1483            .store
1484            .get_highest_executed_checkpoint_seq_number()
1485            .expect("db error")
1486            .unwrap_or(0);
1487
1488        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pending)).await;
1489        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1490
1491        let new_checkpoint = self
1492            .create_checkpoint(
1493                sorted_tx_effects_included_in_checkpoint,
1494                &details,
1495                &all_roots,
1496            )
1497            .await?;
1498        let sequence = *new_checkpoint.0.sequence_number();
1499        let digest = new_checkpoint.0.digest();
1500        if sequence <= highest_executed_sequence && poll_count > 1 {
1501            debug_fatal!(
1502                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1503            );
1504        }
1505
1506        self.write_checkpoint(details.checkpoint_height, new_checkpoint)
1507            .await?;
1508        info!(
1509            seq = sequence,
1510            %digest,
1511            height = details.checkpoint_height,
1512            commit = %details.consensus_commit_ref,
1513            "Made new checkpoint"
1514        );
1515
1516        Ok(sequence)
1517    }
1518
1519    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1520    // the checkpoint, and return them in the order they should be included in the checkpoint.
1521    #[instrument(level = "debug", skip_all)]
1522    async fn resolve_checkpoint_transactions(
1523        &self,
1524        pending: PendingCheckpoint,
1525    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1526        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1527
1528        debug!(
1529            checkpoint_commit_height = pending.details.checkpoint_height,
1530            "Resolving checkpoint transactions for pending checkpoint.",
1531        );
1532
1533        trace!(
1534            "roots for pending checkpoint {:?}: {:?}",
1535            pending.details.checkpoint_height, pending.roots,
1536        );
1537
1538        assert!(
1539            self.epoch_store
1540                .protocol_config()
1541                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1542        );
1543
1544        let mut all_effects: Vec<TransactionEffects> = Vec::new();
1545        let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1546
1547        for checkpoint_roots in &pending.roots {
1548            let tx_roots = &checkpoint_roots.tx_roots;
1549
1550            self.metrics
1551                .checkpoint_roots_count
1552                .inc_by(tx_roots.len() as u64);
1553
1554            let root_digests = self
1555                .epoch_store
1556                .notify_read_tx_key_to_digest(tx_roots)
1557                .in_monitored_scope("CheckpointNotifyDigests")
1558                .await?;
1559
1560            all_root_digests.extend(root_digests.iter().cloned());
1561
1562            let root_effects = self
1563                .effects_store
1564                .notify_read_executed_effects(
1565                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1566                    &root_digests,
1567                )
1568                .in_monitored_scope("CheckpointNotifyRead")
1569                .await;
1570            let consensus_commit_prologue =
1571                self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1572
1573            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1574            let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1575            let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1576
1577            if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1578                let checkpoint_seq = pending.details.checkpoint_seq;
1579                let tx_index_offset = all_effects.len() as u64;
1580                let effects = self
1581                    .resolve_settlement_effects(
1582                        *settlement_key,
1583                        &sorted,
1584                        checkpoint_roots.height,
1585                        checkpoint_seq,
1586                        tx_index_offset,
1587                    )
1588                    .await;
1589                sorted.extend(effects);
1590            }
1591
1592            #[cfg(msim)]
1593            {
1594                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1595            }
1596
1597            all_effects.extend(sorted);
1598        }
1599        Ok((all_effects, all_root_digests.into_iter().collect()))
1600    }
1601
1602    /// Constructs settlement transactions to compute their digests, then reads effects
1603    /// directly from the cache. If execution is ahead of the checkpoint builder, the
1604    /// effects are already cached and this returns instantly. Otherwise it waits for
1605    /// the execution scheduler's queue worker to execute them.
1606    async fn resolve_settlement_effects(
1607        &self,
1608        settlement_key: TransactionKey,
1609        sorted_root_effects: &[TransactionEffects],
1610        checkpoint_height: CheckpointHeight,
1611        checkpoint_seq: CheckpointSequenceNumber,
1612        tx_index_offset: u64,
1613    ) -> Vec<TransactionEffects> {
1614        let epoch = self.epoch_store.epoch();
1615        let accumulator_root_obj_initial_shared_version = self
1616            .epoch_store
1617            .epoch_start_config()
1618            .accumulator_root_obj_initial_shared_version()
1619            .expect("accumulator root object must exist");
1620
1621        let builder = AccumulatorSettlementTxBuilder::new(
1622            None,
1623            sorted_root_effects,
1624            checkpoint_seq,
1625            tx_index_offset,
1626        );
1627
1628        let settlement_digests: Vec<_> = builder
1629            .build_tx(
1630                self.epoch_store.protocol_config(),
1631                epoch,
1632                accumulator_root_obj_initial_shared_version,
1633                checkpoint_height,
1634                checkpoint_seq,
1635            )
1636            .into_iter()
1637            .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
1638            .collect();
1639
1640        debug!(
1641            ?settlement_digests,
1642            ?settlement_key,
1643            "reading settlement effects from cache"
1644        );
1645
1646        let settlement_effects = wait_for_effects_with_retry(
1647            self.effects_store.as_ref(),
1648            "CheckpointBuilder::settlement_effects",
1649            &settlement_digests,
1650            settlement_key,
1651        )
1652        .await;
1653        let (accounts_created, accounts_deleted) =
1654            accumulators::count_accumulator_object_changes(&settlement_effects);
1655        self.metrics
1656            .report_accumulator_account_changes(accounts_created, accounts_deleted);
1657
1658        let barrier_digest = *VerifiedTransaction::new_system_transaction(
1659            accumulators::build_accumulator_barrier_tx(
1660                epoch,
1661                accumulator_root_obj_initial_shared_version,
1662                checkpoint_height,
1663                &settlement_effects,
1664            ),
1665        )
1666        .digest();
1667
1668        let barrier_effects = wait_for_effects_with_retry(
1669            self.effects_store.as_ref(),
1670            "CheckpointBuilder::barrier_effects",
1671            &[barrier_digest],
1672            settlement_key,
1673        )
1674        .await;
1675
1676        // Assert success here, in the builder task, before these effects are included in
1677        // the checkpoint. The settlement scheduler also asserts this, but it runs in a
1678        // separate task, so its assertion does not order against checkpoint persistence -
1679        // checking it here is what prevents a checkpoint from being built over the effects
1680        // of a failed settlement transaction.
1681        for fx in settlement_effects.iter().chain(barrier_effects.iter()) {
1682            assert!(
1683                fx.status().is_ok(),
1684                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1685                fx.transaction_digest(),
1686                fx
1687            );
1688        }
1689
1690        settlement_effects
1691            .into_iter()
1692            .chain(barrier_effects)
1693            .collect()
1694    }
1695
1696    // Extracts the consensus commit prologue digest and effects from the root transactions.
1697    // The consensus commit prologue is expected to be the first transaction in the roots.
1698    fn extract_consensus_commit_prologue(
1699        &self,
1700        root_digests: &[TransactionDigest],
1701        root_effects: &[TransactionEffects],
1702    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1703        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1704        if root_digests.is_empty() {
1705            return Ok(None);
1706        }
1707
1708        // Reads the first transaction in the roots, and checks whether it is a consensus commit
1709        // prologue transaction. The consensus commit prologue transaction should be the first
1710        // transaction in the roots written by the consensus handler.
1711        let first_tx = self
1712            .state
1713            .get_transaction_cache_reader()
1714            .get_transaction_block(&root_digests[0])
1715            .expect("Transaction block must exist");
1716
1717        Ok(first_tx
1718            .transaction_data()
1719            .is_consensus_commit_prologue()
1720            .then(|| {
1721                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1722                (*first_tx.digest(), root_effects[0].clone())
1723            }))
1724    }
1725
1726    #[instrument(level = "debug", skip_all)]
1727    async fn write_checkpoint(
1728        &mut self,
1729        height: CheckpointHeight,
1730        new_checkpoint: (CheckpointSummary, CheckpointContents),
1731    ) -> SuiResult {
1732        let _scope = monitored_scope("CheckpointBuilder::write_checkpoint");
1733        let mut batch = self.store.tables.checkpoint_content.batch();
1734
1735        let (summary, contents) = &new_checkpoint;
1736        debug!(
1737            checkpoint_commit_height = height,
1738            checkpoint_seq = summary.sequence_number,
1739            contents_digest = ?contents.digest(),
1740            "writing checkpoint",
1741        );
1742
1743        if let Some(previously_computed_summary) = self
1744            .store
1745            .tables
1746            .locally_computed_checkpoints
1747            .get(&summary.sequence_number)?
1748            && previously_computed_summary.digest() != summary.digest()
1749        {
1750            fatal!(
1751                "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1752                summary.sequence_number,
1753                previously_computed_summary.digest(),
1754                summary.digest()
1755            );
1756        }
1757
1758        self.metrics
1759            .transactions_included_in_checkpoint
1760            .inc_by(contents.size() as u64);
1761        let sequence_number = summary.sequence_number;
1762        self.metrics
1763            .last_constructed_checkpoint
1764            .set(sequence_number as i64);
1765
1766        batch.insert_batch(
1767            &self.store.tables.checkpoint_content,
1768            [(contents.digest(), contents)],
1769        )?;
1770
1771        batch.insert_batch(
1772            &self.store.tables.locally_computed_checkpoints,
1773            [(sequence_number, summary)],
1774        )?;
1775
1776        batch.write()?;
1777
1778        // Send checkpoint sigs to consensus.
1779        self.output
1780            .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1781            .await?;
1782
1783        if let Some(certified_checkpoint) = self
1784            .store
1785            .tables
1786            .certified_checkpoints
1787            .get(summary.sequence_number())?
1788        {
1789            self.store
1790                .check_for_checkpoint_fork(summary, &certified_checkpoint.into());
1791        }
1792
1793        self.notify_aggregator.notify_one();
1794        self.epoch_store
1795            .process_constructed_checkpoint(height, new_checkpoint);
1796        Ok(())
1797    }
1798
1799    fn load_last_built_checkpoint_summary(
1800        epoch_store: &AuthorityPerEpochStore,
1801        store: &CheckpointStore,
1802    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1803        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1804        if last_checkpoint.is_none() {
1805            let epoch = epoch_store.epoch();
1806            if epoch > 0 {
1807                let previous_epoch = epoch - 1;
1808                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1809                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1810                if let Some((ref seq, _)) = last_checkpoint {
1811                    debug!(
1812                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1813                    );
1814                } else {
1815                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
1816                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1817                }
1818            }
1819        }
1820        Ok(last_checkpoint)
1821    }
1822
1823    #[instrument(level = "debug", skip_all)]
1824    async fn create_checkpoint(
1825        &self,
1826        all_effects: Vec<TransactionEffects>,
1827        details: &PendingCheckpointInfo,
1828        all_roots: &HashSet<TransactionDigest>,
1829    ) -> CheckpointBuilderResult<(CheckpointSummary, CheckpointContents)> {
1830        let _scope = monitored_scope("CheckpointBuilder::create_checkpoint");
1831
1832        let last_checkpoint =
1833            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1834        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1835        debug!(
1836            checkpoint_commit_height = details.checkpoint_height,
1837            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1838            checkpoint_timestamp = details.timestamp_ms,
1839            "Creating checkpoint for {} transactions",
1840            all_effects.len(),
1841        );
1842
1843        let all_digests: Vec<_> = all_effects
1844            .iter()
1845            .map(|effect| *effect.transaction_digest())
1846            .collect();
1847        let transaction_blocks = self
1848            .state
1849            .get_transaction_cache_reader()
1850            .multi_get_transaction_blocks(&all_digests);
1851        let mut transactions = Vec::with_capacity(all_effects.len());
1852        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1853        let mut randomness_rounds = BTreeMap::new();
1854        {
1855            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1856            debug!(
1857                ?last_checkpoint_seq,
1858                "Waiting for {:?} certificates to appear in consensus",
1859                all_effects.len()
1860            );
1861
1862            for (effects, transaction) in all_effects
1863                .iter()
1864                .zip_debug_eq(transaction_blocks.into_iter())
1865            {
1866                let transaction = transaction
1867                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1868                match transaction.inner().transaction_data().kind() {
1869                    TransactionKind::ConsensusCommitPrologue(_)
1870                    | TransactionKind::ConsensusCommitPrologueV2(_)
1871                    | TransactionKind::ConsensusCommitPrologueV3(_)
1872                    | TransactionKind::ConsensusCommitPrologueV4(_)
1873                    | TransactionKind::AuthenticatorStateUpdate(_) => {
1874                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
1875                        // processed before we reach here.
1876                    }
1877                    TransactionKind::ProgrammableSystemTransaction(_) => {
1878                        // settlement transactions are added by checkpoint builder
1879                    }
1880                    TransactionKind::ChangeEpoch(_)
1881                    | TransactionKind::Genesis(_)
1882                    | TransactionKind::EndOfEpochTransaction(_) => {
1883                        fatal!(
1884                            "unexpected transaction in checkpoint effects: {:?}",
1885                            transaction
1886                        );
1887                    }
1888                    TransactionKind::RandomnessStateUpdate(rsu) => {
1889                        randomness_rounds
1890                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1891                    }
1892                    TransactionKind::ProgrammableTransaction(_) => {
1893                        // Only transactions that are not roots should be included in the call to
1894                        // `consensus_messages_processed_notify`. roots come directly from the consensus
1895                        // commit and so are known to be processed already.
1896                        let digest = *effects.transaction_digest();
1897                        if !all_roots.contains(&digest) {
1898                            transaction_keys.push(SequencedConsensusTransactionKey::External(
1899                                ConsensusTransactionKey::Certificate(digest),
1900                            ));
1901                        }
1902                    }
1903                }
1904                transactions.push((*transaction).clone());
1905            }
1906
1907            self.epoch_store
1908                .consensus_messages_processed_notify(transaction_keys)
1909                .await?;
1910        }
1911
1912        let signatures = self
1913            .epoch_store
1914            .user_signatures_for_checkpoint(&transactions, &all_digests);
1915        debug!(
1916            ?last_checkpoint_seq,
1917            "Received {} checkpoint user signatures from consensus",
1918            signatures.len()
1919        );
1920
1921        let end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
1922            Some(
1923                transactions
1924                    .iter()
1925                    .flat_map(|tx| {
1926                        if let TransactionKind::ProgrammableTransaction(ptb) =
1927                            tx.transaction_data().kind()
1928                        {
1929                            itertools::Either::Left(
1930                                ptb.commands
1931                                    .iter()
1932                                    .map(ExecutionTimeObservationKey::from_command),
1933                            )
1934                        } else {
1935                            itertools::Either::Right(std::iter::empty())
1936                        }
1937                    })
1938                    .collect(),
1939            )
1940        } else {
1941            None
1942        };
1943
1944        let epoch = self.epoch_store.epoch();
1945        let first_checkpoint_of_epoch = last_checkpoint
1946            .as_ref()
1947            .map(|(_, c)| c.epoch != epoch)
1948            .unwrap_or(true);
1949        if first_checkpoint_of_epoch {
1950            self.epoch_store
1951                .record_epoch_first_checkpoint_creation_time_metric();
1952        }
1953        let last_checkpoint_of_epoch = details.last_of_epoch;
1954
1955        let sequence_number = details.checkpoint_seq;
1956        let mut timestamp_ms = details.timestamp_ms;
1957        if let Some((_, last_checkpoint)) = &last_checkpoint
1958            && last_checkpoint.timestamp_ms > timestamp_ms
1959        {
1960            // First consensus commit of an epoch can have zero timestamp.
1961            debug!(
1962                "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1963                sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1964            );
1965            if self
1966                .epoch_store
1967                .protocol_config()
1968                .enforce_checkpoint_timestamp_monotonicity()
1969            {
1970                timestamp_ms = last_checkpoint.timestamp_ms;
1971            }
1972        }
1973
1974        let mut effects = all_effects;
1975        let mut signatures = signatures;
1976        let epoch_rolling_gas_cost_summary =
1977            self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1978
1979        let end_of_epoch_data = if last_checkpoint_of_epoch {
1980            let system_state_obj = self
1981                .augment_epoch_last_checkpoint(
1982                    &epoch_rolling_gas_cost_summary,
1983                    timestamp_ms,
1984                    &mut effects,
1985                    &mut signatures,
1986                    sequence_number,
1987                    end_of_epoch_observation_keys.expect(
1988                        "end_of_epoch_observation_keys must be populated for the last checkpoint",
1989                    ),
1990                    last_checkpoint_seq.unwrap_or_default(),
1991                )
1992                .await?;
1993
1994            let committee = system_state_obj
1995                .get_current_epoch_committee()
1996                .committee()
1997                .clone();
1998
1999            // This must happen after the call to augment_epoch_last_checkpoint,
2000            // otherwise we will not capture the change_epoch tx.
2001            let root_state_digest = {
2002                let state_acc = self
2003                    .global_state_hasher
2004                    .upgrade()
2005                    .expect("No checkpoints should be getting built after local configuration");
2006                let acc = state_acc.accumulate_checkpoint(
2007                    &effects,
2008                    sequence_number,
2009                    &self.epoch_store,
2010                )?;
2011
2012                state_acc
2013                    .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2014                    .await?;
2015
2016                state_acc.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))?;
2017                state_acc
2018                    .digest_epoch(self.epoch_store.clone(), sequence_number)
2019                    .await?
2020            };
2021            self.metrics.highest_accumulated_epoch.set(epoch as i64);
2022            info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2023
2024            let epoch_commitments = if self
2025                .epoch_store
2026                .protocol_config()
2027                .check_commit_root_state_digest_supported()
2028            {
2029                vec![root_state_digest.into()]
2030            } else {
2031                vec![]
2032            };
2033
2034            Some(EndOfEpochData {
2035                next_epoch_committee: committee.voting_rights,
2036                next_epoch_protocol_version: ProtocolVersion::new(
2037                    system_state_obj.protocol_version(),
2038                ),
2039                epoch_commitments,
2040            })
2041        } else {
2042            self.send_to_hasher
2043                .send((sequence_number, effects.clone()))
2044                .await?;
2045
2046            None
2047        };
2048        let contents = if self.epoch_store.protocol_config().address_aliases() {
2049            CheckpointContents::new_v2(&effects, signatures)
2050        } else {
2051            CheckpointContents::new_with_digests_and_signatures(
2052                effects.iter().map(TransactionEffects::execution_digests),
2053                signatures
2054                    .into_iter()
2055                    .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2056                    .collect(),
2057            )
2058        };
2059
2060        let num_txns = contents.size() as u64;
2061
2062        let network_total_transactions = last_checkpoint
2063            .as_ref()
2064            .map(|(_, c)| c.network_total_transactions + num_txns)
2065            .unwrap_or(num_txns);
2066
2067        let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2068
2069        let matching_randomness_rounds: Vec<_> = effects
2070            .iter()
2071            .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2072            .copied()
2073            .collect();
2074
2075        let checkpoint_commitments = if self
2076            .epoch_store
2077            .protocol_config()
2078            .include_checkpoint_artifacts_digest_in_summary()
2079        {
2080            let artifacts = CheckpointArtifacts::from(&effects[..]);
2081            let artifacts_digest = artifacts.digest()?;
2082            vec![artifacts_digest.into()]
2083        } else {
2084            Default::default()
2085        };
2086
2087        let summary = CheckpointSummary::new(
2088            self.epoch_store.protocol_config(),
2089            epoch,
2090            sequence_number,
2091            network_total_transactions,
2092            &contents,
2093            previous_digest,
2094            epoch_rolling_gas_cost_summary,
2095            end_of_epoch_data,
2096            timestamp_ms,
2097            matching_randomness_rounds,
2098            checkpoint_commitments,
2099        );
2100        summary.report_checkpoint_age(
2101            &self.metrics.last_created_checkpoint_age,
2102            &self.metrics.last_created_checkpoint_age_ms,
2103        );
2104        if last_checkpoint_of_epoch {
2105            info!(
2106                checkpoint_seq = sequence_number,
2107                "creating last checkpoint of epoch {}", epoch
2108            );
2109            if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2110                self.epoch_store
2111                    .report_epoch_metrics_at_last_checkpoint(stats);
2112            }
2113        }
2114
2115        Ok((summary, contents))
2116    }
2117
2118    fn get_epoch_total_gas_cost(
2119        &self,
2120        last_checkpoint: Option<&CheckpointSummary>,
2121        cur_checkpoint_effects: &[TransactionEffects],
2122    ) -> GasCostSummary {
2123        let (previous_epoch, previous_gas_costs) = last_checkpoint
2124            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2125            .unwrap_or_default();
2126        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2127        if previous_epoch == self.epoch_store.epoch() {
2128            // sum only when we are within the same epoch
2129            GasCostSummary::new(
2130                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2131                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2132                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2133                previous_gas_costs.non_refundable_storage_fee
2134                    + current_gas_costs.non_refundable_storage_fee,
2135            )
2136        } else {
2137            current_gas_costs
2138        }
2139    }
2140
2141    #[instrument(level = "error", skip_all)]
2142    async fn augment_epoch_last_checkpoint(
2143        &self,
2144        epoch_total_gas_cost: &GasCostSummary,
2145        epoch_start_timestamp_ms: CheckpointTimestamp,
2146        checkpoint_effects: &mut Vec<TransactionEffects>,
2147        signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2148        checkpoint: CheckpointSequenceNumber,
2149        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2150        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2151        // >1 checkpoint.
2152        last_checkpoint: CheckpointSequenceNumber,
2153    ) -> CheckpointBuilderResult<SuiSystemState> {
2154        let (system_state, effects) = self
2155            .state
2156            .create_and_execute_advance_epoch_tx(
2157                &self.epoch_store,
2158                epoch_total_gas_cost,
2159                checkpoint,
2160                epoch_start_timestamp_ms,
2161                end_of_epoch_observation_keys,
2162                last_checkpoint,
2163            )
2164            .await?;
2165        checkpoint_effects.push(effects);
2166        signatures.push(vec![]);
2167        Ok(system_state)
2168    }
2169
2170    // Checks the invariants of the consensus commit prologue transactions in the checkpoint
2171    // in simtest.
2172    #[cfg(msim)]
2173    fn expensive_consensus_commit_prologue_invariants_check(
2174        &self,
2175        root_digests: &[TransactionDigest],
2176        sorted: &[TransactionEffects],
2177    ) {
2178        // Gets all the consensus commit prologue transactions from the roots.
2179        let root_txs = self
2180            .state
2181            .get_transaction_cache_reader()
2182            .multi_get_transaction_blocks(root_digests);
2183        let ccps = root_txs
2184            .iter()
2185            .filter_map(|tx| {
2186                if let Some(tx) = tx {
2187                    if tx.transaction_data().is_consensus_commit_prologue() {
2188                        Some(tx)
2189                    } else {
2190                        None
2191                    }
2192                } else {
2193                    None
2194                }
2195            })
2196            .collect::<Vec<_>>();
2197
2198        // There should be at most one consensus commit prologue transaction in the roots.
2199        assert!(ccps.len() <= 1);
2200
2201        // Get all the transactions in the checkpoint.
2202        let txs = self
2203            .state
2204            .get_transaction_cache_reader()
2205            .multi_get_transaction_blocks(
2206                &sorted
2207                    .iter()
2208                    .map(|tx| tx.transaction_digest().clone())
2209                    .collect::<Vec<_>>(),
2210            );
2211
2212        if ccps.len() == 0 {
2213            // If there is no consensus commit prologue transaction in the roots, then there should be no
2214            // consensus commit prologue transaction in the checkpoint.
2215            for tx in txs.iter() {
2216                if let Some(tx) = tx {
2217                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2218                }
2219            }
2220        } else {
2221            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2222            assert!(
2223                txs[0]
2224                    .as_ref()
2225                    .unwrap()
2226                    .transaction_data()
2227                    .is_consensus_commit_prologue()
2228            );
2229
2230            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2231
2232            for tx in txs.iter().skip(1) {
2233                if let Some(tx) = tx {
2234                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2235                }
2236            }
2237        }
2238    }
2239}
2240
2241async fn wait_for_effects_with_retry(
2242    effects_store: &dyn TransactionCacheRead,
2243    task_name: &'static str,
2244    digests: &[TransactionDigest],
2245    tx_key: TransactionKey,
2246) -> Vec<TransactionEffects> {
2247    let delay = if in_antithesis() {
2248        // antithesis has aggressive thread pausing, 5 seconds causes false positives
2249        15
2250    } else {
2251        5
2252    };
2253    loop {
2254        match tokio::time::timeout(Duration::from_secs(delay), async {
2255            effects_store
2256                .notify_read_executed_effects(task_name, digests)
2257                .await
2258        })
2259        .await
2260        {
2261            Ok(effects) => break effects,
2262            Err(_) => {
2263                debug_fatal!(
2264                    "Timeout waiting for transactions to be executed {:?}, retrying...",
2265                    tx_key
2266                );
2267            }
2268        }
2269    }
2270}
2271
2272impl CheckpointAggregator {
2273    fn new(
2274        tables: Arc<CheckpointStore>,
2275        epoch_store: Arc<AuthorityPerEpochStore>,
2276        notify: Arc<Notify>,
2277        receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2278        output: Box<dyn CertifiedCheckpointOutput>,
2279        state: Arc<AuthorityState>,
2280        metrics: Arc<CheckpointMetrics>,
2281    ) -> Self {
2282        Self {
2283            store: tables,
2284            epoch_store,
2285            notify,
2286            receiver,
2287            pending: BTreeMap::new(),
2288            current: None,
2289            output,
2290            state,
2291            metrics,
2292        }
2293    }
2294
2295    async fn run(mut self) {
2296        info!("Starting CheckpointAggregator");
2297        loop {
2298            // Drain all signatures that arrived since the last iteration into the pending buffer
2299            while let Ok(sig) = self.receiver.try_recv() {
2300                self.pending
2301                    .entry(sig.summary.sequence_number)
2302                    .or_default()
2303                    .push(sig);
2304            }
2305
2306            if let Err(e) = self.run_and_notify().await {
2307                error!(
2308                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2309                    e
2310                );
2311                self.metrics.checkpoint_errors.inc();
2312                tokio::time::sleep(Duration::from_secs(1)).await;
2313                continue;
2314            }
2315
2316            tokio::select! {
2317                Some(sig) = self.receiver.recv() => {
2318                    self.pending
2319                        .entry(sig.summary.sequence_number)
2320                        .or_default()
2321                        .push(sig);
2322                }
2323                _ = self.notify.notified() => {}
2324                _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2325            }
2326        }
2327    }
2328
2329    async fn run_and_notify(&mut self) -> SuiResult {
2330        let summaries = self.run_inner()?;
2331        for summary in summaries {
2332            self.output.certified_checkpoint_created(&summary).await?;
2333        }
2334        Ok(())
2335    }
2336
2337    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2338        let _scope = monitored_scope("CheckpointAggregator");
2339        let mut result = vec![];
2340        'outer: loop {
2341            let next_to_certify = self.next_checkpoint_to_certify()?;
2342            // Discard buffered signatures for checkpoints already certified
2343            // (e.g. certified via StateSync before local aggregation completed).
2344            self.pending.retain(|&seq, _| seq >= next_to_certify);
2345            let current = if let Some(current) = &mut self.current {
2346                // It's possible that the checkpoint was already certified by
2347                // the rest of the network and we've already received the
2348                // certified checkpoint via StateSync. In this case, we reset
2349                // the current signature aggregator to the next checkpoint to
2350                // be certified
2351                if current.summary.sequence_number < next_to_certify {
2352                    assert_reachable!("skip checkpoint certification");
2353                    self.current = None;
2354                    continue;
2355                }
2356                current
2357            } else {
2358                let Some(summary) = self
2359                    .epoch_store
2360                    .get_built_checkpoint_summary(next_to_certify)?
2361                else {
2362                    return Ok(result);
2363                };
2364                self.current = Some(CheckpointSignatureAggregator {
2365                    digest: summary.digest(),
2366                    summary,
2367                    signatures_by_digest: MultiStakeAggregator::new(
2368                        self.epoch_store.committee().clone(),
2369                    ),
2370                    store: self.store.clone(),
2371                    state: self.state.clone(),
2372                    metrics: self.metrics.clone(),
2373                });
2374                self.current.as_mut().unwrap()
2375            };
2376
2377            let seq = current.summary.sequence_number;
2378            let sigs = self.pending.remove(&seq).unwrap_or_default();
2379            if sigs.is_empty() {
2380                trace!(
2381                    checkpoint_seq =? seq,
2382                    "Not enough checkpoint signatures",
2383                );
2384                return Ok(result);
2385            }
2386            for data in sigs {
2387                trace!(
2388                    checkpoint_seq = seq,
2389                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2390                    current.summary.digest(),
2391                    data.summary.auth_sig().authority.concise()
2392                );
2393                self.metrics
2394                    .checkpoint_participation
2395                    .with_label_values(&[&format!(
2396                        "{:?}",
2397                        data.summary.auth_sig().authority.concise()
2398                    )])
2399                    .inc();
2400                if let Ok(auth_signature) = current.try_aggregate(data) {
2401                    debug!(
2402                        checkpoint_seq = seq,
2403                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2404                        current.summary.digest(),
2405                    );
2406                    let summary = VerifiedCheckpoint::new_unchecked(
2407                        CertifiedCheckpointSummary::new_from_data_and_sig(
2408                            current.summary.clone(),
2409                            auth_signature,
2410                        ),
2411                    );
2412
2413                    self.store.insert_certified_checkpoint(&summary)?;
2414                    self.metrics.last_certified_checkpoint.set(seq as i64);
2415                    current.summary.report_checkpoint_age(
2416                        &self.metrics.last_certified_checkpoint_age,
2417                        &self.metrics.last_certified_checkpoint_age_ms,
2418                    );
2419                    result.push(summary.into_inner());
2420                    self.current = None;
2421                    continue 'outer;
2422                }
2423            }
2424            break;
2425        }
2426        Ok(result)
2427    }
2428
2429    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2430        Ok(self
2431            .store
2432            .tables
2433            .certified_checkpoints
2434            .reversed_safe_iter_with_bounds(None, None)?
2435            .next()
2436            .transpose()?
2437            .map(|(seq, _)| seq + 1)
2438            .unwrap_or_default())
2439    }
2440}
2441
2442impl CheckpointSignatureAggregator {
2443    #[allow(clippy::result_unit_err)]
2444    pub fn try_aggregate(
2445        &mut self,
2446        data: CheckpointSignatureMessage,
2447    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2448        let their_digest = *data.summary.digest();
2449        let (_, signature) = data.summary.into_data_and_sig();
2450        let author = signature.authority;
2451        let envelope =
2452            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2453        match self.signatures_by_digest.insert(their_digest, envelope) {
2454            // ignore repeated signatures
2455            InsertResult::Failed { error }
2456                if matches!(
2457                    error.as_inner(),
2458                    SuiErrorKind::StakeAggregatorRepeatedSigner {
2459                        conflicting_sig: false,
2460                        ..
2461                    },
2462                ) =>
2463            {
2464                Err(())
2465            }
2466            InsertResult::Failed { error } => {
2467                warn!(
2468                    checkpoint_seq = self.summary.sequence_number,
2469                    "Failed to aggregate new signature from validator {:?}: {:?}",
2470                    author.concise(),
2471                    error
2472                );
2473                self.check_for_split_brain();
2474                Err(())
2475            }
2476            InsertResult::QuorumReached(cert) => {
2477                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
2478                // the signature so we know that the author signed the message at some point.
2479                if their_digest != self.digest {
2480                    self.metrics.remote_checkpoint_forks.inc();
2481                    warn!(
2482                        checkpoint_seq = self.summary.sequence_number,
2483                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2484                        author.concise(),
2485                        their_digest,
2486                        self.digest
2487                    );
2488                    return Err(());
2489                }
2490                Ok(cert)
2491            }
2492            InsertResult::NotEnoughVotes {
2493                bad_votes: _,
2494                bad_authorities: _,
2495            } => {
2496                self.check_for_split_brain();
2497                Err(())
2498            }
2499        }
2500    }
2501
2502    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
2503    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
2504    /// irrespective of the outcome of any outstanding votes.
2505    fn check_for_split_brain(&self) {
2506        debug!(
2507            checkpoint_seq = self.summary.sequence_number,
2508            "Checking for split brain condition"
2509        );
2510        if self.signatures_by_digest.quorum_unreachable() {
2511            // TODO: at this point we should immediately halt processing
2512            // of new transaction certificates to avoid building on top of
2513            // forked output
2514            // self.halt_all_execution();
2515
2516            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2517            let digests_by_stake_messages = all_unique_values
2518                .iter()
2519                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2520                .map(|(digest, (_authorities, total_stake))| {
2521                    format!("{:?} (total stake: {})", digest, total_stake)
2522                })
2523                .collect::<Vec<String>>();
2524            fail_point_arg!("kill_split_brain_node", |(
2525                checkpoint_overrides,
2526                forked_authorities,
2527            ): (
2528                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2529                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2530            )| {
2531                #[cfg(msim)]
2532                {
2533                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2534                        (checkpoint_overrides.lock(), forked_authorities.lock())
2535                    {
2536                        // Find the digest produced by non-forked authorities
2537                        let correct_digest = all_unique_values
2538                            .iter()
2539                            .find(|(_, (authorities, _))| {
2540                                // Check if any authority that produced this digest is NOT in the forked set
2541                                authorities
2542                                    .iter()
2543                                    .any(|auth| !forked_authorities_set.contains(auth))
2544                            })
2545                            .map(|(digest, _)| digest.to_string())
2546                            .unwrap_or_else(|| {
2547                                // Fallback: use the digest with the highest stake
2548                                all_unique_values
2549                                    .iter()
2550                                    .max_by_key(|(_, (_, stake))| *stake)
2551                                    .map(|(digest, _)| digest.to_string())
2552                                    .unwrap_or_else(|| self.digest.to_string())
2553                            });
2554
2555                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
2556
2557                        tracing::error!(
2558                            fatal = true,
2559                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2560                            self.summary.sequence_number,
2561                            correct_digest
2562                        );
2563                    }
2564                }
2565            });
2566
2567            debug_fatal!(
2568                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2569                self.summary.sequence_number,
2570                self.signatures_by_digest.uncommitted_stake(),
2571                digests_by_stake_messages
2572            );
2573            self.metrics.split_brain_checkpoint_forks.inc();
2574
2575            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2576            let local_summary = self.summary.clone();
2577            let state = self.state.clone();
2578            let tables = self.store.clone();
2579
2580            tokio::spawn(async move {
2581                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2582            });
2583        }
2584    }
2585}
2586
2587/// Create data dump containing relevant data for diagnosing cause of the
2588/// split brain by querying one disagreeing validator for full checkpoint contents.
2589/// To minimize peer chatter, we only query one validator at random from each
2590/// disagreeing faction, as all honest validators that participated in this round may
2591/// inevitably run the same process.
2592async fn diagnose_split_brain(
2593    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2594    local_summary: CheckpointSummary,
2595    state: Arc<AuthorityState>,
2596    tables: Arc<CheckpointStore>,
2597) {
2598    debug!(
2599        checkpoint_seq = local_summary.sequence_number,
2600        "Running split brain diagnostics..."
2601    );
2602    let time = SystemTime::now();
2603    // collect one random disagreeing validator per differing digest
2604    let digest_to_validator = all_unique_values
2605        .iter()
2606        .filter_map(|(digest, (validators, _))| {
2607            if *digest != local_summary.digest() {
2608                let random_validator = validators.choose(&mut get_rng()).unwrap();
2609                Some((*digest, *random_validator))
2610            } else {
2611                None
2612            }
2613        })
2614        .collect::<HashMap<_, _>>();
2615    if digest_to_validator.is_empty() {
2616        panic!(
2617            "Given split brain condition, there should be at \
2618                least one validator that disagrees with local signature"
2619        );
2620    }
2621
2622    let epoch_store = state.load_epoch_store_one_call_per_task();
2623    let committee = epoch_store
2624        .epoch_start_state()
2625        .get_sui_committee_with_network_metadata();
2626    let network_config = default_mysten_network_config();
2627    let network_clients =
2628        make_network_authority_clients_with_network_config(&committee, &network_config);
2629
2630    // Query all disagreeing validators
2631    let response_futures = digest_to_validator
2632        .values()
2633        .cloned()
2634        .map(|validator| {
2635            let client = network_clients
2636                .get(&validator)
2637                .expect("Failed to get network client");
2638            let request = CheckpointRequestV2 {
2639                sequence_number: Some(local_summary.sequence_number),
2640                request_content: true,
2641                certified: false,
2642            };
2643            client.handle_checkpoint_v2(request)
2644        })
2645        .collect::<Vec<_>>();
2646
2647    let digest_name_pair = digest_to_validator.iter();
2648    let response_data = futures::future::join_all(response_futures)
2649        .await
2650        .into_iter()
2651        .zip_debug_eq(digest_name_pair)
2652        .filter_map(|(response, (digest, name))| match response {
2653            Ok(response) => match response {
2654                CheckpointResponseV2 {
2655                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2656                    contents: Some(contents),
2657                } => Some((*name, *digest, summary, contents)),
2658                CheckpointResponseV2 {
2659                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2660                    contents: _,
2661                } => {
2662                    panic!("Expected pending checkpoint, but got certified checkpoint");
2663                }
2664                CheckpointResponseV2 {
2665                    checkpoint: None,
2666                    contents: _,
2667                } => {
2668                    error!(
2669                        "Summary for checkpoint {:?} not found on validator {:?}",
2670                        local_summary.sequence_number, name
2671                    );
2672                    None
2673                }
2674                CheckpointResponseV2 {
2675                    checkpoint: _,
2676                    contents: None,
2677                } => {
2678                    error!(
2679                        "Contents for checkpoint {:?} not found on validator {:?}",
2680                        local_summary.sequence_number, name
2681                    );
2682                    None
2683                }
2684            },
2685            Err(e) => {
2686                error!(
2687                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2688                    e
2689                );
2690                None
2691            }
2692        })
2693        .collect::<Vec<_>>();
2694
2695    let local_checkpoint_contents = tables
2696        .get_checkpoint_contents(&local_summary.content_digest)
2697        .unwrap_or_else(|_| {
2698            panic!(
2699                "Could not find checkpoint contents for digest {:?}",
2700                local_summary.digest()
2701            )
2702        })
2703        .unwrap_or_else(|| {
2704            panic!(
2705                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2706                local_summary.sequence_number,
2707                local_summary.digest()
2708            )
2709        });
2710    let local_contents_text = format!("{local_checkpoint_contents:?}");
2711
2712    let local_summary_text = format!("{local_summary:?}");
2713    let local_validator = state.name.concise();
2714    let diff_patches = response_data
2715        .iter()
2716        .map(|(name, other_digest, other_summary, contents)| {
2717            let other_contents_text = format!("{contents:?}");
2718            let other_summary_text = format!("{other_summary:?}");
2719            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2720                .enumerate_transactions(&local_summary)
2721                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2722                .unzip();
2723            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2724                .enumerate_transactions(other_summary)
2725                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2726                .unzip();
2727            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2728            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2729            let local_transactions_text = format!("{local_transactions:#?}");
2730            let other_transactions_text = format!("{other_transactions:#?}");
2731            let transactions_patch =
2732                create_patch(&local_transactions_text, &other_transactions_text);
2733            let local_effects_text = format!("{local_effects:#?}");
2734            let other_effects_text = format!("{other_effects:#?}");
2735            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2736            let seq_number = local_summary.sequence_number;
2737            let local_digest = local_summary.digest();
2738            let other_validator = name.concise();
2739            format!(
2740                "Checkpoint: {seq_number:?}\n\
2741                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2742                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2743                Summary Diff: \n{summary_patch}\n\n\
2744                Contents Diff: \n{contents_patch}\n\n\
2745                Transactions Diff: \n{transactions_patch}\n\n\
2746                Effects Diff: \n{effects_patch}",
2747            )
2748        })
2749        .collect::<Vec<_>>()
2750        .join("\n\n\n");
2751
2752    let header = format!(
2753        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2754        Datetime: {:?}",
2755        time
2756    );
2757    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2758    let path = tempfile::tempdir()
2759        .expect("Failed to create tempdir")
2760        .keep()
2761        .join(Path::new("checkpoint_fork_dump.txt"));
2762    let mut file = File::create(path).unwrap();
2763    write!(file, "{}", fork_logs_text).unwrap();
2764    debug!("{}", fork_logs_text);
2765}
2766
2767pub trait CheckpointServiceNotify {
2768    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
2769
2770    fn notify_checkpoint(&self) -> SuiResult;
2771}
2772
2773#[allow(clippy::large_enum_variant)]
2774enum CheckpointServiceState {
2775    Unstarted(
2776        (
2777            CheckpointBuilder,
2778            CheckpointAggregator,
2779            CheckpointStateHasher,
2780        ),
2781    ),
2782    Started,
2783}
2784
2785impl CheckpointServiceState {
2786    fn take_unstarted(
2787        &mut self,
2788    ) -> (
2789        CheckpointBuilder,
2790        CheckpointAggregator,
2791        CheckpointStateHasher,
2792    ) {
2793        let mut state = CheckpointServiceState::Started;
2794        std::mem::swap(self, &mut state);
2795
2796        match state {
2797            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2798                (builder, aggregator, hasher)
2799            }
2800            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2801        }
2802    }
2803}
2804
2805pub struct CheckpointService {
2806    tables: Arc<CheckpointStore>,
2807    notify_builder: Arc<Notify>,
2808    signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
2809    // A notification for the current highest built sequence number.
2810    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2811    // The highest sequence number that had already been built at the time CheckpointService
2812    // was constructed
2813    highest_previously_built_seq: CheckpointSequenceNumber,
2814    metrics: Arc<CheckpointMetrics>,
2815    state: Mutex<CheckpointServiceState>,
2816}
2817
2818impl CheckpointService {
2819    /// Constructs a new CheckpointService in an un-started state.
2820    // The signature channel is unbounded because notify_checkpoint_signature is called from a
2821    // sync context (consensus_validator.rs implements a sync external trait) and cannot block.
2822    // The channel is consumed by a single async aggregator task that drains it continuously, so
2823    // unbounded growth is not a concern in practice.
2824    #[allow(clippy::disallowed_methods)]
2825    pub fn build(
2826        state: Arc<AuthorityState>,
2827        checkpoint_store: Arc<CheckpointStore>,
2828        epoch_store: Arc<AuthorityPerEpochStore>,
2829        effects_store: Arc<dyn TransactionCacheRead>,
2830        global_state_hasher: Weak<GlobalStateHasher>,
2831        checkpoint_output: Box<dyn CheckpointOutput>,
2832        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2833        metrics: Arc<CheckpointMetrics>,
2834    ) -> Arc<Self> {
2835        info!("Starting checkpoint service");
2836        Self::initialize_accumulator_account_metrics(&state, &epoch_store, &metrics);
2837        let notify_builder = Arc::new(Notify::new());
2838        let notify_aggregator = Arc::new(Notify::new());
2839
2840        // We may have built higher checkpoint numbers before restarting.
2841        let highest_previously_built_seq = checkpoint_store
2842            .get_latest_locally_computed_checkpoint()
2843            .expect("failed to get latest locally computed checkpoint")
2844            .map(|s| s.sequence_number)
2845            .unwrap_or(0);
2846
2847        let highest_currently_built_seq =
2848            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2849                .expect("epoch should not have ended")
2850                .map(|(seq, _)| seq)
2851                .unwrap_or(0);
2852
2853        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2854
2855        let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
2856
2857        let aggregator = CheckpointAggregator::new(
2858            checkpoint_store.clone(),
2859            epoch_store.clone(),
2860            notify_aggregator.clone(),
2861            signature_receiver,
2862            certified_checkpoint_output,
2863            state.clone(),
2864            metrics.clone(),
2865        );
2866
2867        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
2868
2869        let ckpt_state_hasher = CheckpointStateHasher::new(
2870            epoch_store.clone(),
2871            global_state_hasher.clone(),
2872            receive_from_builder,
2873        );
2874
2875        let builder = CheckpointBuilder::new(
2876            state.clone(),
2877            checkpoint_store.clone(),
2878            epoch_store.clone(),
2879            notify_builder.clone(),
2880            effects_store,
2881            global_state_hasher,
2882            send_to_hasher,
2883            checkpoint_output,
2884            notify_aggregator.clone(),
2885            highest_currently_built_seq_tx.clone(),
2886            metrics.clone(),
2887        );
2888
2889        Arc::new(Self {
2890            tables: checkpoint_store,
2891            notify_builder,
2892            signature_sender,
2893            highest_currently_built_seq_tx,
2894            highest_previously_built_seq,
2895            metrics,
2896            state: Mutex::new(CheckpointServiceState::Unstarted((
2897                builder,
2898                aggregator,
2899                ckpt_state_hasher,
2900            ))),
2901        })
2902    }
2903
2904    fn initialize_accumulator_account_metrics(
2905        state: &AuthorityState,
2906        epoch_store: &AuthorityPerEpochStore,
2907        metrics: &CheckpointMetrics,
2908    ) {
2909        if !epoch_store.protocol_config().enable_accumulators() {
2910            return;
2911        }
2912
2913        let object_store = state.get_object_store();
2914        match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
2915            Ok(Some(count)) => metrics.initialize_accumulator_accounts_live(count),
2916            Ok(None) => {}
2917            Err(e) => fatal!("failed to initialize accumulator account metrics: {e}"),
2918        }
2919    }
2920
2921    /// Starts the CheckpointService.
2922    ///
2923    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
2924    /// been built before the most recent restart. You can think of this as a WAL replay
2925    /// operation. Upon startup, we may have a number of consensus commits and resulting
2926    /// checkpoints that were built but not committed to disk. We want to reprocess the
2927    /// commits and rebuild the checkpoints before starting normal operation.
2928    pub async fn spawn(
2929        &self,
2930        epoch_store: Arc<AuthorityPerEpochStore>,
2931        consensus_replay_waiter: Option<ReplayWaiter>,
2932    ) {
2933        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
2934
2935        // Clean up state hashes computed after the last built checkpoint
2936        // This prevents ECMH divergence after fork recovery restarts
2937
2938        // Note: there is a rare crash recovery edge case where we write the builder
2939        // summary, but crash before we can bump the highest executed checkpoint.
2940        // If we committed the builder summary, it was certified and unforked, so there
2941        // is no need to clear that state hash. If we do clear it, then checkpoint executor
2942        // will wait forever for checkpoint builder to produce the state hash, which will
2943        // never happen.
2944        let last_persisted_builder_seq = epoch_store
2945            .last_persisted_checkpoint_builder_summary()
2946            .expect("epoch should not have ended")
2947            .map(|s| s.summary.sequence_number);
2948
2949        let last_executed_seq = self
2950            .tables
2951            .get_highest_executed_checkpoint()
2952            .expect("Failed to get highest executed checkpoint")
2953            .map(|checkpoint| *checkpoint.sequence_number());
2954
2955        if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
2956            if let Err(e) = builder
2957                .epoch_store
2958                .clear_state_hashes_after_checkpoint(last_committed_seq)
2959            {
2960                error!(
2961                    "Failed to clear state hashes after checkpoint {}: {:?}",
2962                    last_committed_seq, e
2963                );
2964            } else {
2965                info!(
2966                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
2967                    last_committed_seq
2968                );
2969            }
2970        }
2971
2972        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
2973
2974        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
2975        let aggregator_task = spawn_monitored_task!(aggregator.run());
2976
2977        spawn_monitored_task!(async move {
2978            epoch_store
2979                .within_alive_epoch(async move {
2980                    builder.run(consensus_replay_waiter).await;
2981                    builder_finished_tx.send(()).ok();
2982                })
2983                .await
2984                .ok();
2985
2986            // state hasher will terminate as soon as it has finished processing all messages from builder
2987            state_hasher_task
2988                .await
2989                .expect("state hasher should exit normally");
2990
2991            // builder must shut down before aggregator and state_hasher, since it sends
2992            // messages to them
2993            aggregator_task.abort();
2994            aggregator_task.await.ok();
2995        });
2996
2997        // If this times out, the validator may still start up. The worst that can
2998        // happen is that we will crash later on instead of immediately. The eventual
2999        // crash would occur because we may be missing transactions that are below the
3000        // highest_synced_checkpoint watermark, which can cause a crash in
3001        // `CheckpointExecutor::extract_randomness_rounds`.
3002        if tokio::time::timeout(Duration::from_secs(120), async move {
3003            tokio::select! {
3004                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3005                _ = self.wait_for_rebuilt_checkpoints() => (),
3006            }
3007        })
3008        .await
3009        .is_err()
3010        {
3011            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3012        }
3013    }
3014}
3015
3016impl CheckpointService {
3017    /// Waits until all checkpoints had been built before the node restarted
3018    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3019    /// (and their transactions) below the highest_synced_checkpoint watermark are
3020    /// available. Once the checkpoints are constructed, we can be sure that the
3021    /// transactions have also been executed.
3022    pub async fn wait_for_rebuilt_checkpoints(&self) {
3023        let highest_previously_built_seq = self.highest_previously_built_seq;
3024        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3025        let mut highest_currently_built_seq = *rx.borrow_and_update();
3026        info!(
3027            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3028        );
3029        loop {
3030            if highest_currently_built_seq >= highest_previously_built_seq {
3031                info!("Checkpoint rebuild complete");
3032                break;
3033            }
3034            rx.changed().await.unwrap();
3035            highest_currently_built_seq = *rx.borrow_and_update();
3036        }
3037    }
3038
3039    #[cfg(test)]
3040    fn write_and_notify_checkpoint_for_testing(
3041        &self,
3042        epoch_store: &AuthorityPerEpochStore,
3043        checkpoint: PendingCheckpoint,
3044    ) -> SuiResult {
3045        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3046
3047        let mut output = ConsensusCommitOutput::new(0);
3048        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3049        output.set_default_commit_stats_for_testing();
3050        epoch_store.push_consensus_output_for_tests(output);
3051        self.notify_checkpoint()?;
3052        Ok(())
3053    }
3054}
3055
3056impl CheckpointServiceNotify for CheckpointService {
3057    fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3058        let sequence = info.summary.sequence_number;
3059        let signer = info.summary.auth_sig().authority.concise();
3060
3061        if let Some(highest_verified_checkpoint) = self
3062            .tables
3063            .get_highest_verified_checkpoint()?
3064            .map(|x| *x.sequence_number())
3065            && sequence <= highest_verified_checkpoint
3066        {
3067            trace!(
3068                checkpoint_seq = sequence,
3069                "Ignore checkpoint signature from {} - already certified", signer,
3070            );
3071            self.metrics
3072                .last_ignored_checkpoint_signature_received
3073                .set(sequence as i64);
3074            return Ok(());
3075        }
3076        trace!(
3077            checkpoint_seq = sequence,
3078            "Received checkpoint signature, digest {} from {}",
3079            info.summary.digest(),
3080            signer,
3081        );
3082        self.metrics
3083            .last_received_checkpoint_signatures
3084            .with_label_values(&[&signer.to_string()])
3085            .set(sequence as i64);
3086        self.signature_sender.send(info.clone()).ok();
3087        Ok(())
3088    }
3089
3090    fn notify_checkpoint(&self) -> SuiResult {
3091        self.notify_builder.notify_one();
3092        Ok(())
3093    }
3094}
3095
3096// test helper
3097pub struct CheckpointServiceNoop {}
3098impl CheckpointServiceNotify for CheckpointServiceNoop {
3099    fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3100        Ok(())
3101    }
3102
3103    fn notify_checkpoint(&self) -> SuiResult {
3104        Ok(())
3105    }
3106}
3107
3108impl PendingCheckpoint {
3109    pub fn height(&self) -> CheckpointHeight {
3110        self.details.checkpoint_height
3111    }
3112
3113    pub(crate) fn num_roots(&self) -> usize {
3114        self.roots.iter().map(|r| r.tx_roots.len()).sum()
3115    }
3116}
3117
3118pin_project! {
3119    pub struct PollCounter<Fut> {
3120        #[pin]
3121        future: Fut,
3122        count: usize,
3123    }
3124}
3125
3126impl<Fut> PollCounter<Fut> {
3127    pub fn new(future: Fut) -> Self {
3128        Self { future, count: 0 }
3129    }
3130
3131    pub fn count(&self) -> usize {
3132        self.count
3133    }
3134}
3135
3136impl<Fut: Future> Future for PollCounter<Fut> {
3137    type Output = (usize, Fut::Output);
3138
3139    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3140        let this = self.project();
3141        *this.count += 1;
3142        match this.future.poll(cx) {
3143            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3144            Poll::Pending => Poll::Pending,
3145        }
3146    }
3147}
3148
3149fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3150    PollCounter::new(future)
3151}
3152
3153#[cfg(test)]
3154mod tests {
3155    use super::*;
3156    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3157    use futures::FutureExt as _;
3158    use futures::future::BoxFuture;
3159    use std::collections::HashMap;
3160    use std::ops::Deref;
3161    use sui_macros::sim_test;
3162    use sui_protocol_config::{Chain, ProtocolConfig};
3163    use sui_types::accumulator_event::AccumulatorEvent;
3164    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3165    use sui_types::crypto::Signature;
3166    use sui_types::effects::{TransactionEffects, TransactionEvents};
3167    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3168    use sui_types::transaction::VerifiedTransaction;
3169    use tokio::sync::mpsc;
3170
3171    #[tokio::test]
3172    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3173        let store = CheckpointStore::new_for_tests();
3174        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3175        for seq in 70u64..=80u64 {
3176            let contents =
3177                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3178                    [sui_types::base_types::ExecutionDigests::new(
3179                        sui_types::digests::TransactionDigest::random(),
3180                        sui_types::digests::TransactionEffectsDigest::ZERO,
3181                    )],
3182                );
3183            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3184                &protocol,
3185                0,
3186                seq,
3187                0,
3188                &contents,
3189                None,
3190                sui_types::gas::GasCostSummary::default(),
3191                None,
3192                0,
3193                Vec::new(),
3194                Vec::new(),
3195            );
3196            store
3197                .tables
3198                .locally_computed_checkpoints
3199                .insert(&seq, &summary)
3200                .unwrap();
3201        }
3202
3203        store
3204            .clear_locally_computed_checkpoints_from(76)
3205            .expect("clear should succeed");
3206
3207        // Explicit boundary checks: 75 must remain, 76 must be deleted
3208        assert!(
3209            store
3210                .tables
3211                .locally_computed_checkpoints
3212                .get(&75)
3213                .unwrap()
3214                .is_some()
3215        );
3216        assert!(
3217            store
3218                .tables
3219                .locally_computed_checkpoints
3220                .get(&76)
3221                .unwrap()
3222                .is_none()
3223        );
3224
3225        for seq in 70u64..76u64 {
3226            assert!(
3227                store
3228                    .tables
3229                    .locally_computed_checkpoints
3230                    .get(&seq)
3231                    .unwrap()
3232                    .is_some()
3233            );
3234        }
3235        for seq in 76u64..=80u64 {
3236            assert!(
3237                store
3238                    .tables
3239                    .locally_computed_checkpoints
3240                    .get(&seq)
3241                    .unwrap()
3242                    .is_none()
3243            );
3244        }
3245    }
3246
3247    #[tokio::test]
3248    async fn test_fork_detection_storage() {
3249        let store = CheckpointStore::new_for_tests();
3250        // checkpoint fork
3251        let seq_num = 42;
3252        let digest = CheckpointDigest::random();
3253
3254        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3255
3256        store
3257            .record_checkpoint_fork_detected(seq_num, digest)
3258            .unwrap();
3259
3260        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3261        assert!(retrieved.is_some());
3262        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3263        assert_eq!(retrieved_seq, seq_num);
3264        assert_eq!(retrieved_digest, digest);
3265
3266        store.clear_checkpoint_fork_detected().unwrap();
3267        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3268
3269        // txn fork
3270        let tx_digest = TransactionDigest::random();
3271        let expected_effects = TransactionEffectsDigest::random();
3272        let actual_effects = TransactionEffectsDigest::random();
3273
3274        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3275
3276        store
3277            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3278            .unwrap();
3279
3280        let retrieved = store.get_transaction_fork_detected().unwrap();
3281        assert!(retrieved.is_some());
3282        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3283        assert_eq!(retrieved_tx, tx_digest);
3284        assert_eq!(retrieved_expected, expected_effects);
3285        assert_eq!(retrieved_actual, actual_effects);
3286
3287        store.clear_transaction_fork_detected().unwrap();
3288        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3289    }
3290
3291    #[sim_test]
3292    pub async fn checkpoint_builder_test() {
3293        telemetry_subscribers::init_for_testing();
3294
3295        let mut protocol_config =
3296            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3297        protocol_config.disable_accumulators_for_testing();
3298        let state = TestAuthorityBuilder::new()
3299            .with_protocol_config(protocol_config)
3300            .build()
3301            .await;
3302
3303        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3304            0,
3305            0,
3306            vec![],
3307            SequenceNumber::new(),
3308        );
3309
3310        for i in 0..20 {
3311            state
3312                .database_for_testing()
3313                .perpetual_tables
3314                .transactions
3315                .insert(&d(i), dummy_tx.serializable_ref())
3316                .unwrap();
3317        }
3318
3319        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3320        commit_cert_for_test(
3321            &mut store,
3322            state.clone(),
3323            d(1),
3324            vec![d(2), d(3)],
3325            GasCostSummary::new(11, 12, 11, 1),
3326        );
3327        commit_cert_for_test(
3328            &mut store,
3329            state.clone(),
3330            d(2),
3331            vec![d(3), d(4)],
3332            GasCostSummary::new(21, 22, 21, 1),
3333        );
3334        commit_cert_for_test(
3335            &mut store,
3336            state.clone(),
3337            d(3),
3338            vec![],
3339            GasCostSummary::new(31, 32, 31, 1),
3340        );
3341        commit_cert_for_test(
3342            &mut store,
3343            state.clone(),
3344            d(4),
3345            vec![],
3346            GasCostSummary::new(41, 42, 41, 1),
3347        );
3348        for i in [5, 6, 7, 10, 11, 12, 13] {
3349            commit_cert_for_test(
3350                &mut store,
3351                state.clone(),
3352                d(i),
3353                vec![],
3354                GasCostSummary::new(41, 42, 41, 1),
3355            );
3356        }
3357        for i in [15, 16, 17] {
3358            commit_cert_for_test(
3359                &mut store,
3360                state.clone(),
3361                d(i),
3362                vec![],
3363                GasCostSummary::new(51, 52, 51, 1),
3364            );
3365        }
3366        let all_digests: Vec<_> = store.keys().copied().collect();
3367        for digest in all_digests {
3368            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3369            state
3370                .epoch_store_for_testing()
3371                .test_insert_user_signature(digest, vec![(signature, None)]);
3372        }
3373
3374        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3375        let (certified_output, mut certified_result) =
3376            mpsc::channel::<CertifiedCheckpointSummary>(10);
3377        let store = Arc::new(store);
3378
3379        let ckpt_dir = tempfile::tempdir().unwrap();
3380        let checkpoint_store =
3381            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3382        let epoch_store = state.epoch_store_for_testing();
3383
3384        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3385            state.get_global_state_hash_store().clone(),
3386        ));
3387
3388        let checkpoint_service = CheckpointService::build(
3389            state.clone(),
3390            checkpoint_store,
3391            epoch_store.clone(),
3392            store,
3393            Arc::downgrade(&global_state_hasher),
3394            Box::new(output),
3395            Box::new(certified_output),
3396            CheckpointMetrics::new_for_tests(),
3397        );
3398        checkpoint_service.spawn(epoch_store.clone(), None).await;
3399
3400        checkpoint_service
3401            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3402            .unwrap();
3403        checkpoint_service
3404            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3405            .unwrap();
3406        checkpoint_service
3407            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3408            .unwrap();
3409        checkpoint_service
3410            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3411            .unwrap();
3412        checkpoint_service
3413            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3414            .unwrap();
3415        checkpoint_service
3416            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3417            .unwrap();
3418
3419        let (c1c, c1s) = result.recv().await.unwrap();
3420        let (c2c, c2s) = result.recv().await.unwrap();
3421
3422        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3423        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3424        assert_eq!(c1t, vec![d(4)]);
3425        assert_eq!(c1s.previous_digest, None);
3426        assert_eq!(c1s.sequence_number, 0);
3427        assert_eq!(
3428            c1s.epoch_rolling_gas_cost_summary,
3429            GasCostSummary::new(41, 42, 41, 1)
3430        );
3431
3432        // Causal order places d(3) before d(1), which depends on it.
3433        assert_eq!(c2t, vec![d(3), d(1)]);
3434        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3435        assert_eq!(c2s.sequence_number, 1);
3436        assert_eq!(
3437            c2s.epoch_rolling_gas_cost_summary,
3438            GasCostSummary::new(83, 86, 83, 3)
3439        );
3440
3441        // Each pending checkpoint produces exactly one checkpoint; splitting is
3442        // done in the consensus handler.
3443        let (c3c, c3s) = result.recv().await.unwrap();
3444        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3445        assert_eq!(c3s.sequence_number, 2);
3446        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3447        assert_eq!(c3t, vec![d(10), d(11), d(12), d(13)]);
3448
3449        let (c4c, c4s) = result.recv().await.unwrap();
3450        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3451        assert_eq!(c4s.sequence_number, 3);
3452        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3453        assert_eq!(c4t, vec![d(15), d(16), d(17)]);
3454
3455        let (c5c, c5s) = result.recv().await.unwrap();
3456        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3457        assert_eq!(c5s.sequence_number, 4);
3458        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3459        assert_eq!(c5t, vec![d(5)]);
3460
3461        let (c6c, c6s) = result.recv().await.unwrap();
3462        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3463        assert_eq!(c6s.sequence_number, 5);
3464        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3465        assert_eq!(c6t, vec![d(6)]);
3466
3467        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3468        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3469
3470        checkpoint_service
3471            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
3472            .unwrap();
3473        checkpoint_service
3474            .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
3475            .unwrap();
3476
3477        let c1sc = certified_result.recv().await.unwrap();
3478        let c2sc = certified_result.recv().await.unwrap();
3479        assert_eq!(c1sc.sequence_number, 0);
3480        assert_eq!(c2sc.sequence_number, 1);
3481    }
3482
3483    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3484        fn notify_read_executed_effects_may_fail(
3485            &self,
3486            _: &str,
3487            digests: &[TransactionDigest],
3488        ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
3489            std::future::ready(Ok(digests
3490                .iter()
3491                .map(|d| self.get(d).expect("effects not found").clone())
3492                .collect()))
3493            .boxed()
3494        }
3495
3496        fn notify_read_executed_effects_digests(
3497            &self,
3498            _: &str,
3499            digests: &[TransactionDigest],
3500        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3501            std::future::ready(
3502                digests
3503                    .iter()
3504                    .map(|d| {
3505                        self.get(d)
3506                            .map(|fx| fx.digest())
3507                            .expect("effects not found")
3508                    })
3509                    .collect(),
3510            )
3511            .boxed()
3512        }
3513
3514        fn multi_get_executed_effects(
3515            &self,
3516            digests: &[TransactionDigest],
3517        ) -> Vec<Option<TransactionEffects>> {
3518            digests.iter().map(|d| self.get(d).cloned()).collect()
3519        }
3520
3521        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
3522        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
3523        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
3524        // ExecutionCacheRead implementors).
3525
3526        fn multi_get_transaction_blocks(
3527            &self,
3528            _: &[TransactionDigest],
3529        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3530            unimplemented!()
3531        }
3532
3533        fn multi_get_executed_effects_digests(
3534            &self,
3535            _: &[TransactionDigest],
3536        ) -> Vec<Option<TransactionEffectsDigest>> {
3537            unimplemented!()
3538        }
3539
3540        fn multi_get_effects(
3541            &self,
3542            _: &[TransactionEffectsDigest],
3543        ) -> Vec<Option<TransactionEffects>> {
3544            unimplemented!()
3545        }
3546
3547        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3548            unimplemented!()
3549        }
3550
3551        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3552            unimplemented!()
3553        }
3554
3555        fn get_unchanged_loaded_runtime_objects(
3556            &self,
3557            _digest: &TransactionDigest,
3558        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3559            unimplemented!()
3560        }
3561
3562        fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
3563            unimplemented!()
3564        }
3565    }
3566
3567    #[async_trait::async_trait]
3568    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3569        async fn checkpoint_created(
3570            &self,
3571            summary: &CheckpointSummary,
3572            contents: &CheckpointContents,
3573            _epoch_store: &Arc<AuthorityPerEpochStore>,
3574            _checkpoint_store: &Arc<CheckpointStore>,
3575        ) -> SuiResult {
3576            self.try_send((contents.clone(), summary.clone())).unwrap();
3577            Ok(())
3578        }
3579    }
3580
3581    #[async_trait::async_trait]
3582    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3583        async fn certified_checkpoint_created(
3584            &self,
3585            summary: &CertifiedCheckpointSummary,
3586        ) -> SuiResult {
3587            self.try_send(summary.clone()).unwrap();
3588            Ok(())
3589        }
3590    }
3591
3592    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3593        PendingCheckpoint {
3594            roots: vec![CheckpointRoots {
3595                tx_roots: t
3596                    .into_iter()
3597                    .map(|t| TransactionKey::Digest(d(t)))
3598                    .collect(),
3599                settlement_root: None,
3600                height: i,
3601            }],
3602            details: PendingCheckpointInfo {
3603                timestamp_ms,
3604                last_of_epoch: false,
3605                checkpoint_height: i,
3606                consensus_commit_ref: CommitRef::default(),
3607                rejected_transactions_digest: Digest::default(),
3608                checkpoint_seq: i,
3609            },
3610        }
3611    }
3612
3613    fn d(i: u8) -> TransactionDigest {
3614        let mut bytes: [u8; 32] = Default::default();
3615        bytes[0] = i;
3616        TransactionDigest::new(bytes)
3617    }
3618
3619    fn e(
3620        transaction_digest: TransactionDigest,
3621        dependencies: Vec<TransactionDigest>,
3622        gas_used: GasCostSummary,
3623    ) -> TransactionEffects {
3624        let mut effects = TransactionEffects::default();
3625        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3626        *effects.dependencies_mut_for_testing() = dependencies;
3627        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3628        effects
3629    }
3630
3631    fn commit_cert_for_test(
3632        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3633        state: Arc<AuthorityState>,
3634        digest: TransactionDigest,
3635        dependencies: Vec<TransactionDigest>,
3636        gas_used: GasCostSummary,
3637    ) {
3638        let epoch_store = state.epoch_store_for_testing();
3639        let effects = e(digest, dependencies, gas_used);
3640        store.insert(digest, effects.clone());
3641        epoch_store.insert_executed_in_epoch(&digest);
3642    }
3643}