sui_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::AccumulatorSettlementTxBuilder;
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16    LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::balance_withdraw_scheduler::BalanceSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use diffy::create_patch;
26use itertools::Itertools;
27use mysten_common::random::get_rng;
28use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
29use mysten_common::{assert_reachable, debug_fatal, fatal};
30use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
31use nonempty::NonEmpty;
32use parking_lot::Mutex;
33use pin_project_lite::pin_project;
34use serde::{Deserialize, Serialize};
35use sui_macros::fail_point_arg;
36use sui_network::default_mysten_network_config;
37use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
38use sui_types::base_types::ConciseableName;
39use sui_types::executable_transaction::VerifiedExecutableTransaction;
40use sui_types::execution::ExecutionTimeObservationKey;
41use sui_types::messages_checkpoint::{CheckpointArtifacts, CheckpointCommitment};
42use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
43use tokio::sync::{mpsc, watch};
44use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::authority::authority_store_pruner::PrunerWatermarks;
48use crate::consensus_handler::SequencedConsensusTransactionKey;
49use rand::seq::SliceRandom;
50use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
51use std::fs::File;
52use std::future::Future;
53use std::io::Write;
54use std::path::Path;
55use std::pin::Pin;
56use std::sync::Arc;
57use std::sync::Weak;
58use std::task::{Context, Poll};
59use std::time::{Duration, SystemTime};
60use sui_protocol_config::ProtocolVersion;
61use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
62use sui_types::committee::StakeUnit;
63use sui_types::crypto::AuthorityStrongQuorumSignInfo;
64use sui_types::digests::{CheckpointContentsDigest, CheckpointDigest, TransactionEffectsDigest};
65use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
66use sui_types::error::{SuiErrorKind, SuiResult};
67use sui_types::gas::GasCostSummary;
68use sui_types::message_envelope::Message;
69use sui_types::messages_checkpoint::{
70    CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
71    CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
72    EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
73    VerifiedCheckpointContents,
74};
75use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
76use sui_types::messages_consensus::ConsensusTransactionKey;
77use sui_types::signature::GenericSignature;
78use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
79use sui_types::transaction::{
80    TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
81};
82use tokio::{sync::Notify, time::timeout};
83use tracing::{debug, error, info, instrument, trace, warn};
84use typed_store::DBMapUtils;
85use typed_store::Map;
86use typed_store::{
87    TypedStoreError,
88    rocks::{DBMap, MetricConf},
89};
90
91const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
92
93pub type CheckpointHeight = u64;
94
95pub struct EpochStats {
96    pub checkpoint_count: u64,
97    pub transaction_count: u64,
98    pub total_gas_reward: u64,
99}
100
101#[derive(Clone, Debug)]
102pub struct PendingCheckpointInfo {
103    pub timestamp_ms: CheckpointTimestamp,
104    pub last_of_epoch: bool,
105    // Computed in calculate_pending_checkpoint_height() from consensus round,
106    // there is no guarantee that this is increasing per checkpoint, because of checkpoint splitting.
107    pub checkpoint_height: CheckpointHeight,
108}
109
110#[derive(Clone, Debug)]
111pub struct PendingCheckpoint {
112    pub roots: Vec<TransactionKey>,
113    pub details: PendingCheckpointInfo,
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize)]
117pub struct BuilderCheckpointSummary {
118    pub summary: CheckpointSummary,
119    // Height at which this checkpoint summary was built. None for genesis checkpoint
120    pub checkpoint_height: Option<CheckpointHeight>,
121    pub position_in_commit: usize,
122}
123
124#[derive(DBMapUtils)]
125#[cfg_attr(tidehunter, tidehunter)]
126pub struct CheckpointStoreTables {
127    /// Maps checkpoint contents digest to checkpoint contents
128    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
129
130    /// Maps checkpoint contents digest to checkpoint sequence number
131    pub(crate) checkpoint_sequence_by_contents_digest:
132        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
133
134    /// Stores entire checkpoint contents from state sync, indexed by sequence number, for
135    /// efficient reads of full checkpoints. Entries from this table are deleted after state
136    /// accumulation has completed.
137    #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
138    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
139
140    /// Stores certified checkpoints
141    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
142    /// Map from checkpoint digest to certified checkpoint
143    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
144
145    /// Store locally computed checkpoint summaries so that we can detect forks and log useful
146    /// information. Can be pruned as soon as we verify that we are in agreement with the latest
147    /// certified checkpoint.
148    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
149
150    /// A map from epoch ID to the sequence number of the last checkpoint in that epoch.
151    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
152
153    /// Watermarks used to determine the highest verified, fully synced, and
154    /// fully executed checkpoints
155    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
156
157    /// Stores transaction fork detection information
158    pub(crate) transaction_fork_detected: DBMap<
159        u8,
160        (
161            TransactionDigest,
162            TransactionEffectsDigest,
163            TransactionEffectsDigest,
164        ),
165    >,
166}
167
168fn full_checkpoint_content_table_default_config() -> DBOptions {
169    DBOptions {
170        options: default_db_options().options,
171        // We have seen potential data corruption issues in this table after forced shutdowns
172        // so we enable value hash logging to help with debugging.
173        // TODO: remove this once we have a better understanding of the root cause.
174        rw_options: ReadWriteOptions::default().set_log_value_hash(true),
175    }
176}
177
178impl CheckpointStoreTables {
179    #[cfg(not(tidehunter))]
180    pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
181        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
182    }
183
184    #[cfg(tidehunter)]
185    pub fn new(
186        path: &Path,
187        metric_name: &'static str,
188        pruner_watermarks: Arc<PrunerWatermarks>,
189    ) -> Self {
190        tracing::warn!("Checkpoint DB using tidehunter");
191        use crate::authority::authority_store_pruner::apply_relocation_filter;
192        use typed_store::tidehunter_util::{
193            Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
194            default_mutex_count, default_value_cache_size,
195        };
196        let mutexes = default_mutex_count() * 4;
197        let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
198        let override_dirty_keys_config = KeySpaceConfig::new()
199            .with_max_dirty_keys(64_000)
200            .with_value_cache_size(default_value_cache_size());
201        let config_u64 = ThConfig::new_with_config(
202            8,
203            mutexes,
204            u64_sequence_key,
205            override_dirty_keys_config.clone(),
206        );
207        let digest_config = ThConfig::new_with_rm_prefix(
208            32,
209            mutexes,
210            KeyType::uniform(default_cells_per_mutex()),
211            KeySpaceConfig::default(),
212            vec![0, 0, 0, 0, 0, 0, 0, 32],
213        );
214        let watermarks_config = KeySpaceConfig::new()
215            .with_value_cache_size(10)
216            .disable_unload();
217        let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
218        let configs = vec![
219            (
220                "checkpoint_content",
221                digest_config.clone().with_config(
222                    lru_config
223                        .clone()
224                        .with_relocation_filter(|_, _| Decision::Remove),
225                ),
226            ),
227            (
228                "checkpoint_sequence_by_contents_digest",
229                digest_config.clone().with_config(apply_relocation_filter(
230                    KeySpaceConfig::default(),
231                    pruner_watermarks.checkpoint_id.clone(),
232                    |sequence_number: CheckpointSequenceNumber| sequence_number,
233                    false,
234                )),
235            ),
236            (
237                "full_checkpoint_content",
238                config_u64.clone().with_config(apply_relocation_filter(
239                    override_dirty_keys_config.clone(),
240                    pruner_watermarks.checkpoint_id.clone(),
241                    |sequence_number: CheckpointSequenceNumber| sequence_number,
242                    true,
243                )),
244            ),
245            ("certified_checkpoints", config_u64.clone()),
246            (
247                "checkpoint_by_digest",
248                digest_config.clone().with_config(apply_relocation_filter(
249                    lru_config,
250                    pruner_watermarks.epoch_id.clone(),
251                    |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
252                    false,
253                )),
254            ),
255            (
256                "locally_computed_checkpoints",
257                config_u64.clone().with_config(apply_relocation_filter(
258                    override_dirty_keys_config.clone(),
259                    pruner_watermarks.checkpoint_id.clone(),
260                    |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
261                    true,
262                )),
263            ),
264            ("epoch_last_checkpoint_map", config_u64.clone()),
265            (
266                "watermarks",
267                ThConfig::new_with_config(
268                    4,
269                    1,
270                    KeyType::uniform(1),
271                    apply_relocation_filter(
272                        watermarks_config.clone(),
273                        pruner_watermarks.checkpoint_id.clone(),
274                        |(watermark, _): (CheckpointSequenceNumber, CheckpointDigest)| watermark,
275                        false,
276                    ),
277                ),
278            ),
279            (
280                "transaction_fork_detected",
281                ThConfig::new_with_config(
282                    1,
283                    1,
284                    KeyType::uniform(1),
285                    watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
286                ),
287            ),
288        ];
289        Self::open_tables_read_write(
290            path.to_path_buf(),
291            MetricConf::new(metric_name),
292            configs
293                .into_iter()
294                .map(|(cf, config)| (cf.to_string(), config))
295                .collect(),
296        )
297    }
298
299    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
300        Self::get_read_only_handle(
301            path.to_path_buf(),
302            None,
303            None,
304            MetricConf::new("checkpoint_readonly"),
305        )
306    }
307}
308
309pub struct CheckpointStore {
310    pub(crate) tables: CheckpointStoreTables,
311    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
312    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
313}
314
315impl CheckpointStore {
316    pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
317        let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
318        Arc::new(Self {
319            tables,
320            synced_checkpoint_notify_read: NotifyRead::new(),
321            executed_checkpoint_notify_read: NotifyRead::new(),
322        })
323    }
324
325    pub fn new_for_tests() -> Arc<Self> {
326        let ckpt_dir = mysten_common::tempdir().unwrap();
327        CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
328    }
329
330    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
331        let tables = CheckpointStoreTables::new(
332            path,
333            "db_checkpoint",
334            Arc::new(PrunerWatermarks::default()),
335        );
336        Arc::new(Self {
337            tables,
338            synced_checkpoint_notify_read: NotifyRead::new(),
339            executed_checkpoint_notify_read: NotifyRead::new(),
340        })
341    }
342
343    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
344        CheckpointStoreTables::open_readonly(path)
345    }
346
347    #[instrument(level = "info", skip_all)]
348    pub fn insert_genesis_checkpoint(
349        &self,
350        checkpoint: VerifiedCheckpoint,
351        contents: CheckpointContents,
352        epoch_store: &AuthorityPerEpochStore,
353    ) {
354        assert_eq!(
355            checkpoint.epoch(),
356            0,
357            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
358        );
359        assert_eq!(
360            *checkpoint.sequence_number(),
361            0,
362            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
363        );
364
365        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
366        if self
367            .get_checkpoint_by_digest(checkpoint.digest())
368            .unwrap()
369            .is_none()
370        {
371            if epoch_store.epoch() == checkpoint.epoch {
372                epoch_store
373                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
374                    .unwrap();
375            } else {
376                debug!(
377                    validator_epoch =% epoch_store.epoch(),
378                    genesis_epoch =% checkpoint.epoch(),
379                    "Not inserting checkpoint builder data for genesis checkpoint",
380                );
381            }
382            self.insert_checkpoint_contents(contents).unwrap();
383            self.insert_verified_checkpoint(&checkpoint).unwrap();
384            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
385        }
386    }
387
388    pub fn get_checkpoint_by_digest(
389        &self,
390        digest: &CheckpointDigest,
391    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
392        self.tables
393            .checkpoint_by_digest
394            .get(digest)
395            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
396    }
397
398    pub fn get_checkpoint_by_sequence_number(
399        &self,
400        sequence_number: CheckpointSequenceNumber,
401    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
402        self.tables
403            .certified_checkpoints
404            .get(&sequence_number)
405            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
406    }
407
408    pub fn get_locally_computed_checkpoint(
409        &self,
410        sequence_number: CheckpointSequenceNumber,
411    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
412        self.tables
413            .locally_computed_checkpoints
414            .get(&sequence_number)
415    }
416
417    pub fn multi_get_locally_computed_checkpoints(
418        &self,
419        sequence_numbers: &[CheckpointSequenceNumber],
420    ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
421        let checkpoints = self
422            .tables
423            .locally_computed_checkpoints
424            .multi_get(sequence_numbers)?;
425
426        Ok(checkpoints)
427    }
428
429    pub fn get_sequence_number_by_contents_digest(
430        &self,
431        digest: &CheckpointContentsDigest,
432    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
433        self.tables
434            .checkpoint_sequence_by_contents_digest
435            .get(digest)
436    }
437
438    pub fn delete_contents_digest_sequence_number_mapping(
439        &self,
440        digest: &CheckpointContentsDigest,
441    ) -> Result<(), TypedStoreError> {
442        self.tables
443            .checkpoint_sequence_by_contents_digest
444            .remove(digest)
445    }
446
447    pub fn get_latest_certified_checkpoint(
448        &self,
449    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
450        Ok(self
451            .tables
452            .certified_checkpoints
453            .reversed_safe_iter_with_bounds(None, None)?
454            .next()
455            .transpose()?
456            .map(|(_, v)| v.into()))
457    }
458
459    pub fn get_latest_locally_computed_checkpoint(
460        &self,
461    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
462        Ok(self
463            .tables
464            .locally_computed_checkpoints
465            .reversed_safe_iter_with_bounds(None, None)?
466            .next()
467            .transpose()?
468            .map(|(_, v)| v))
469    }
470
471    pub fn multi_get_checkpoint_by_sequence_number(
472        &self,
473        sequence_numbers: &[CheckpointSequenceNumber],
474    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
475        let checkpoints = self
476            .tables
477            .certified_checkpoints
478            .multi_get(sequence_numbers)?
479            .into_iter()
480            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
481            .collect();
482
483        Ok(checkpoints)
484    }
485
486    pub fn multi_get_checkpoint_content(
487        &self,
488        contents_digest: &[CheckpointContentsDigest],
489    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
490        self.tables.checkpoint_content.multi_get(contents_digest)
491    }
492
493    pub fn get_highest_verified_checkpoint(
494        &self,
495    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
496        let highest_verified = if let Some(highest_verified) = self
497            .tables
498            .watermarks
499            .get(&CheckpointWatermark::HighestVerified)?
500        {
501            highest_verified
502        } else {
503            return Ok(None);
504        };
505        self.get_checkpoint_by_digest(&highest_verified.1)
506    }
507
508    pub fn get_highest_synced_checkpoint(
509        &self,
510    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
511        let highest_synced = if let Some(highest_synced) = self
512            .tables
513            .watermarks
514            .get(&CheckpointWatermark::HighestSynced)?
515        {
516            highest_synced
517        } else {
518            return Ok(None);
519        };
520        self.get_checkpoint_by_digest(&highest_synced.1)
521    }
522
523    pub fn get_highest_synced_checkpoint_seq_number(
524        &self,
525    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
526        if let Some(highest_synced) = self
527            .tables
528            .watermarks
529            .get(&CheckpointWatermark::HighestSynced)?
530        {
531            Ok(Some(highest_synced.0))
532        } else {
533            Ok(None)
534        }
535    }
536
537    pub fn get_highest_executed_checkpoint_seq_number(
538        &self,
539    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
540        if let Some(highest_executed) = self
541            .tables
542            .watermarks
543            .get(&CheckpointWatermark::HighestExecuted)?
544        {
545            Ok(Some(highest_executed.0))
546        } else {
547            Ok(None)
548        }
549    }
550
551    pub fn get_highest_executed_checkpoint(
552        &self,
553    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
554        let highest_executed = if let Some(highest_executed) = self
555            .tables
556            .watermarks
557            .get(&CheckpointWatermark::HighestExecuted)?
558        {
559            highest_executed
560        } else {
561            return Ok(None);
562        };
563        self.get_checkpoint_by_digest(&highest_executed.1)
564    }
565
566    pub fn get_highest_pruned_checkpoint_seq_number(
567        &self,
568    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
569        self.tables
570            .watermarks
571            .get(&CheckpointWatermark::HighestPruned)
572            .map(|watermark| watermark.map(|w| w.0))
573    }
574
575    pub fn get_checkpoint_contents(
576        &self,
577        digest: &CheckpointContentsDigest,
578    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
579        self.tables.checkpoint_content.get(digest)
580    }
581
582    pub fn get_full_checkpoint_contents_by_sequence_number(
583        &self,
584        seq: CheckpointSequenceNumber,
585    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
586        self.tables.full_checkpoint_content.get(&seq)
587    }
588
589    fn prune_local_summaries(&self) -> SuiResult {
590        if let Some((last_local_summary, _)) = self
591            .tables
592            .locally_computed_checkpoints
593            .reversed_safe_iter_with_bounds(None, None)?
594            .next()
595            .transpose()?
596        {
597            let mut batch = self.tables.locally_computed_checkpoints.batch();
598            batch.schedule_delete_range(
599                &self.tables.locally_computed_checkpoints,
600                &0,
601                &last_local_summary,
602            )?;
603            batch.write()?;
604            info!("Pruned local summaries up to {:?}", last_local_summary);
605        }
606        Ok(())
607    }
608
609    pub fn clear_locally_computed_checkpoints_from(
610        &self,
611        from_seq: CheckpointSequenceNumber,
612    ) -> SuiResult {
613        let keys: Vec<_> = self
614            .tables
615            .locally_computed_checkpoints
616            .safe_iter_with_bounds(Some(from_seq), None)
617            .map(|r| r.map(|(k, _)| k))
618            .collect::<Result<_, _>>()?;
619        if let Some(&last_local_summary) = keys.last() {
620            let mut batch = self.tables.locally_computed_checkpoints.batch();
621            batch
622                .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
623                .expect("Failed to delete locally computed checkpoints");
624            batch
625                .write()
626                .expect("Failed to delete locally computed checkpoints");
627            warn!(
628                from_seq,
629                last_local_summary,
630                "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
631                from_seq,
632                last_local_summary
633            );
634        }
635        Ok(())
636    }
637
638    fn check_for_checkpoint_fork(
639        &self,
640        local_checkpoint: &CheckpointSummary,
641        verified_checkpoint: &VerifiedCheckpoint,
642    ) {
643        if local_checkpoint != verified_checkpoint.data() {
644            let verified_contents = self
645                .get_checkpoint_contents(&verified_checkpoint.content_digest)
646                .map(|opt_contents| {
647                    opt_contents
648                        .map(|contents| format!("{:?}", contents))
649                        .unwrap_or_else(|| {
650                            format!(
651                                "Verified checkpoint contents not found, digest: {:?}",
652                                verified_checkpoint.content_digest,
653                            )
654                        })
655                })
656                .map_err(|e| {
657                    format!(
658                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
659                        verified_checkpoint.content_digest, e
660                    )
661                })
662                .unwrap_or_else(|err_msg| err_msg);
663
664            let local_contents = self
665                .get_checkpoint_contents(&local_checkpoint.content_digest)
666                .map(|opt_contents| {
667                    opt_contents
668                        .map(|contents| format!("{:?}", contents))
669                        .unwrap_or_else(|| {
670                            format!(
671                                "Local checkpoint contents not found, digest: {:?}",
672                                local_checkpoint.content_digest
673                            )
674                        })
675                })
676                .map_err(|e| {
677                    format!(
678                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
679                        local_checkpoint.content_digest, e
680                    )
681                })
682                .unwrap_or_else(|err_msg| err_msg);
683
684            // checkpoint contents may be too large for panic message.
685            error!(
686                verified_checkpoint = ?verified_checkpoint.data(),
687                ?verified_contents,
688                ?local_checkpoint,
689                ?local_contents,
690                "Local checkpoint fork detected!",
691            );
692
693            // Record the fork in the database before crashing
694            if let Err(e) = self.record_checkpoint_fork_detected(
695                *local_checkpoint.sequence_number(),
696                local_checkpoint.digest(),
697            ) {
698                error!("Failed to record checkpoint fork in database: {:?}", e);
699            }
700
701            fail_point_arg!(
702                "kill_checkpoint_fork_node",
703                |checkpoint_overrides: std::sync::Arc<
704                    std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
705                >| {
706                    #[cfg(msim)]
707                    {
708                        if let Ok(mut overrides) = checkpoint_overrides.lock() {
709                            overrides.insert(
710                                local_checkpoint.sequence_number,
711                                verified_checkpoint.digest().to_string(),
712                            );
713                        }
714                        tracing::error!(
715                            fatal = true,
716                            "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
717                            local_checkpoint.sequence_number(),
718                            verified_checkpoint.digest()
719                        );
720                        sui_simulator::task::shutdown_current_node();
721                    }
722                }
723            );
724
725            fatal!(
726                "Local checkpoint fork detected for sequence number: {}",
727                local_checkpoint.sequence_number()
728            );
729        }
730    }
731
732    // Called by consensus (ConsensusAggregator).
733    // Different from `insert_verified_checkpoint`, it does not touch
734    // the highest_verified_checkpoint watermark such that state sync
735    // will have a chance to process this checkpoint and perform some
736    // state-sync only things.
737    pub fn insert_certified_checkpoint(
738        &self,
739        checkpoint: &VerifiedCheckpoint,
740    ) -> Result<(), TypedStoreError> {
741        debug!(
742            checkpoint_seq = checkpoint.sequence_number(),
743            "Inserting certified checkpoint",
744        );
745        let mut batch = self.tables.certified_checkpoints.batch();
746        batch
747            .insert_batch(
748                &self.tables.certified_checkpoints,
749                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
750            )?
751            .insert_batch(
752                &self.tables.checkpoint_by_digest,
753                [(checkpoint.digest(), checkpoint.serializable_ref())],
754            )?;
755        if checkpoint.next_epoch_committee().is_some() {
756            batch.insert_batch(
757                &self.tables.epoch_last_checkpoint_map,
758                [(&checkpoint.epoch(), checkpoint.sequence_number())],
759            )?;
760        }
761        batch.write()?;
762
763        if let Some(local_checkpoint) = self
764            .tables
765            .locally_computed_checkpoints
766            .get(checkpoint.sequence_number())?
767        {
768            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
769        }
770
771        Ok(())
772    }
773
774    // Called by state sync, apart from inserting the checkpoint and updating
775    // related tables, it also bumps the highest_verified_checkpoint watermark.
776    #[instrument(level = "debug", skip_all)]
777    pub fn insert_verified_checkpoint(
778        &self,
779        checkpoint: &VerifiedCheckpoint,
780    ) -> Result<(), TypedStoreError> {
781        self.insert_certified_checkpoint(checkpoint)?;
782        self.update_highest_verified_checkpoint(checkpoint)
783    }
784
785    pub fn update_highest_verified_checkpoint(
786        &self,
787        checkpoint: &VerifiedCheckpoint,
788    ) -> Result<(), TypedStoreError> {
789        if Some(*checkpoint.sequence_number())
790            > self
791                .get_highest_verified_checkpoint()?
792                .map(|x| *x.sequence_number())
793        {
794            debug!(
795                checkpoint_seq = checkpoint.sequence_number(),
796                "Updating highest verified checkpoint",
797            );
798            self.tables.watermarks.insert(
799                &CheckpointWatermark::HighestVerified,
800                &(*checkpoint.sequence_number(), *checkpoint.digest()),
801            )?;
802        }
803
804        Ok(())
805    }
806
807    pub fn update_highest_synced_checkpoint(
808        &self,
809        checkpoint: &VerifiedCheckpoint,
810    ) -> Result<(), TypedStoreError> {
811        let seq = *checkpoint.sequence_number();
812        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
813        self.tables.watermarks.insert(
814            &CheckpointWatermark::HighestSynced,
815            &(seq, *checkpoint.digest()),
816        )?;
817        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
818        Ok(())
819    }
820
821    async fn notify_read_checkpoint_watermark<F>(
822        &self,
823        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
824        seq: CheckpointSequenceNumber,
825        get_watermark: F,
826    ) -> VerifiedCheckpoint
827    where
828        F: Fn() -> Option<CheckpointSequenceNumber>,
829    {
830        notify_read
831            .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
832                let seq = seqs[0];
833                let Some(highest) = get_watermark() else {
834                    return vec![None];
835                };
836                if highest < seq {
837                    return vec![None];
838                }
839                let checkpoint = self
840                    .get_checkpoint_by_sequence_number(seq)
841                    .expect("db error")
842                    .expect("checkpoint not found");
843                vec![Some(checkpoint)]
844            })
845            .await
846            .into_iter()
847            .next()
848            .unwrap()
849    }
850
851    pub async fn notify_read_synced_checkpoint(
852        &self,
853        seq: CheckpointSequenceNumber,
854    ) -> VerifiedCheckpoint {
855        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
856            self.get_highest_synced_checkpoint_seq_number()
857                .expect("db error")
858        })
859        .await
860    }
861
862    pub async fn notify_read_executed_checkpoint(
863        &self,
864        seq: CheckpointSequenceNumber,
865    ) -> VerifiedCheckpoint {
866        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
867            self.get_highest_executed_checkpoint_seq_number()
868                .expect("db error")
869        })
870        .await
871    }
872
873    pub fn update_highest_executed_checkpoint(
874        &self,
875        checkpoint: &VerifiedCheckpoint,
876    ) -> Result<(), TypedStoreError> {
877        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
878            if seq_number >= *checkpoint.sequence_number() {
879                return Ok(());
880            }
881            assert_eq!(
882                seq_number + 1,
883                *checkpoint.sequence_number(),
884                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
885                checkpoint.sequence_number(),
886                seq_number
887            );
888        }
889        let seq = *checkpoint.sequence_number();
890        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
891        self.tables.watermarks.insert(
892            &CheckpointWatermark::HighestExecuted,
893            &(seq, *checkpoint.digest()),
894        )?;
895        self.executed_checkpoint_notify_read
896            .notify(&seq, checkpoint);
897        Ok(())
898    }
899
900    pub fn update_highest_pruned_checkpoint(
901        &self,
902        checkpoint: &VerifiedCheckpoint,
903    ) -> Result<(), TypedStoreError> {
904        self.tables.watermarks.insert(
905            &CheckpointWatermark::HighestPruned,
906            &(*checkpoint.sequence_number(), *checkpoint.digest()),
907        )
908    }
909
910    /// Sets highest executed checkpoint to any value.
911    ///
912    /// WARNING: This method is very subtle and can corrupt the database if used incorrectly.
913    /// It should only be used in one-off cases or tests after fully understanding the risk.
914    pub fn set_highest_executed_checkpoint_subtle(
915        &self,
916        checkpoint: &VerifiedCheckpoint,
917    ) -> Result<(), TypedStoreError> {
918        self.tables.watermarks.insert(
919            &CheckpointWatermark::HighestExecuted,
920            &(*checkpoint.sequence_number(), *checkpoint.digest()),
921        )
922    }
923
924    pub fn insert_checkpoint_contents(
925        &self,
926        contents: CheckpointContents,
927    ) -> Result<(), TypedStoreError> {
928        debug!(
929            checkpoint_seq = ?contents.digest(),
930            "Inserting checkpoint contents",
931        );
932        self.tables
933            .checkpoint_content
934            .insert(contents.digest(), &contents)
935    }
936
937    pub fn insert_verified_checkpoint_contents(
938        &self,
939        checkpoint: &VerifiedCheckpoint,
940        full_contents: VerifiedCheckpointContents,
941    ) -> Result<(), TypedStoreError> {
942        let mut batch = self.tables.full_checkpoint_content.batch();
943        batch.insert_batch(
944            &self.tables.checkpoint_sequence_by_contents_digest,
945            [(&checkpoint.content_digest, checkpoint.sequence_number())],
946        )?;
947        let full_contents = full_contents.into_inner();
948        batch.insert_batch(
949            &self.tables.full_checkpoint_content,
950            [(checkpoint.sequence_number(), &full_contents)],
951        )?;
952
953        let contents = full_contents.into_checkpoint_contents();
954        assert_eq!(&checkpoint.content_digest, contents.digest());
955
956        batch.insert_batch(
957            &self.tables.checkpoint_content,
958            [(contents.digest(), &contents)],
959        )?;
960
961        batch.write()
962    }
963
964    pub fn delete_full_checkpoint_contents(
965        &self,
966        seq: CheckpointSequenceNumber,
967    ) -> Result<(), TypedStoreError> {
968        self.tables.full_checkpoint_content.remove(&seq)
969    }
970
971    pub fn get_epoch_last_checkpoint(
972        &self,
973        epoch_id: EpochId,
974    ) -> SuiResult<Option<VerifiedCheckpoint>> {
975        let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
976        let checkpoint = match seq {
977            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
978            None => None,
979        };
980        Ok(checkpoint)
981    }
982
983    pub fn get_epoch_last_checkpoint_seq_number(
984        &self,
985        epoch_id: EpochId,
986    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
987        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
988        Ok(seq)
989    }
990
991    pub fn insert_epoch_last_checkpoint(
992        &self,
993        epoch_id: EpochId,
994        checkpoint: &VerifiedCheckpoint,
995    ) -> SuiResult {
996        self.tables
997            .epoch_last_checkpoint_map
998            .insert(&epoch_id, checkpoint.sequence_number())?;
999        Ok(())
1000    }
1001
1002    pub fn get_epoch_state_commitments(
1003        &self,
1004        epoch: EpochId,
1005    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1006        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1007            checkpoint
1008                .end_of_epoch_data
1009                .as_ref()
1010                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1011                .epoch_commitments
1012                .clone()
1013        });
1014        Ok(commitments)
1015    }
1016
1017    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few statistics of the epoch.
1018    pub fn get_epoch_stats(
1019        &self,
1020        epoch: EpochId,
1021        last_checkpoint: &CheckpointSummary,
1022    ) -> Option<EpochStats> {
1023        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1024            (0, 0)
1025        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1026            (
1027                checkpoint.sequence_number + 1,
1028                checkpoint.network_total_transactions,
1029            )
1030        } else {
1031            return None;
1032        };
1033        Some(EpochStats {
1034            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1035            transaction_count: last_checkpoint.network_total_transactions
1036                - prev_epoch_network_transactions,
1037            total_gas_reward: last_checkpoint
1038                .epoch_rolling_gas_cost_summary
1039                .computation_cost,
1040        })
1041    }
1042
1043    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1044        // This checkpoints the entire db and not one column family
1045        self.tables
1046            .checkpoint_content
1047            .checkpoint_db(path)
1048            .map_err(Into::into)
1049    }
1050
1051    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1052        let mut wb = self.tables.watermarks.batch();
1053        wb.delete_batch(
1054            &self.tables.watermarks,
1055            std::iter::once(CheckpointWatermark::HighestExecuted),
1056        )?;
1057        wb.write()?;
1058        Ok(())
1059    }
1060
1061    pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1062        self.delete_highest_executed_checkpoint_test_only()?;
1063        Ok(())
1064    }
1065
1066    pub fn record_checkpoint_fork_detected(
1067        &self,
1068        checkpoint_seq: CheckpointSequenceNumber,
1069        checkpoint_digest: CheckpointDigest,
1070    ) -> Result<(), TypedStoreError> {
1071        info!(
1072            checkpoint_seq = checkpoint_seq,
1073            checkpoint_digest = ?checkpoint_digest,
1074            "Recording checkpoint fork detection in database"
1075        );
1076        self.tables.watermarks.insert(
1077            &CheckpointWatermark::CheckpointForkDetected,
1078            &(checkpoint_seq, checkpoint_digest),
1079        )
1080    }
1081
1082    pub fn get_checkpoint_fork_detected(
1083        &self,
1084    ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1085        self.tables
1086            .watermarks
1087            .get(&CheckpointWatermark::CheckpointForkDetected)
1088    }
1089
1090    pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1091        self.tables
1092            .watermarks
1093            .remove(&CheckpointWatermark::CheckpointForkDetected)
1094    }
1095
1096    pub fn record_transaction_fork_detected(
1097        &self,
1098        tx_digest: TransactionDigest,
1099        expected_effects_digest: TransactionEffectsDigest,
1100        actual_effects_digest: TransactionEffectsDigest,
1101    ) -> Result<(), TypedStoreError> {
1102        info!(
1103            tx_digest = ?tx_digest,
1104            expected_effects_digest = ?expected_effects_digest,
1105            actual_effects_digest = ?actual_effects_digest,
1106            "Recording transaction fork detection in database"
1107        );
1108        self.tables.transaction_fork_detected.insert(
1109            &TRANSACTION_FORK_DETECTED_KEY,
1110            &(tx_digest, expected_effects_digest, actual_effects_digest),
1111        )
1112    }
1113
1114    pub fn get_transaction_fork_detected(
1115        &self,
1116    ) -> Result<
1117        Option<(
1118            TransactionDigest,
1119            TransactionEffectsDigest,
1120            TransactionEffectsDigest,
1121        )>,
1122        TypedStoreError,
1123    > {
1124        self.tables
1125            .transaction_fork_detected
1126            .get(&TRANSACTION_FORK_DETECTED_KEY)
1127    }
1128
1129    pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1130        self.tables
1131            .transaction_fork_detected
1132            .remove(&TRANSACTION_FORK_DETECTED_KEY)
1133    }
1134}
1135
1136#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1137pub enum CheckpointWatermark {
1138    HighestVerified,
1139    HighestSynced,
1140    HighestExecuted,
1141    HighestPruned,
1142    CheckpointForkDetected,
1143}
1144
1145struct CheckpointStateHasher {
1146    epoch_store: Arc<AuthorityPerEpochStore>,
1147    hasher: Weak<GlobalStateHasher>,
1148    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1149}
1150
1151impl CheckpointStateHasher {
1152    fn new(
1153        epoch_store: Arc<AuthorityPerEpochStore>,
1154        hasher: Weak<GlobalStateHasher>,
1155        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1156    ) -> Self {
1157        Self {
1158            epoch_store,
1159            hasher,
1160            receive_from_builder,
1161        }
1162    }
1163
1164    async fn run(self) {
1165        let Self {
1166            epoch_store,
1167            hasher,
1168            mut receive_from_builder,
1169        } = self;
1170        while let Some((seq, effects)) = receive_from_builder.recv().await {
1171            let Some(hasher) = hasher.upgrade() else {
1172                info!("Object state hasher was dropped, stopping checkpoint accumulation");
1173                break;
1174            };
1175            hasher
1176                .accumulate_checkpoint(&effects, seq, &epoch_store)
1177                .expect("epoch ended while accumulating checkpoint");
1178        }
1179    }
1180}
1181
1182#[derive(Debug)]
1183pub(crate) enum CheckpointBuilderError {
1184    ChangeEpochTxAlreadyExecuted,
1185    SystemPackagesMissing,
1186    Retry(anyhow::Error),
1187}
1188
1189impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1190    for CheckpointBuilderError
1191{
1192    fn from(e: SuiError) -> Self {
1193        Self::Retry(e.into())
1194    }
1195}
1196
1197pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1198
1199pub struct CheckpointBuilder {
1200    state: Arc<AuthorityState>,
1201    store: Arc<CheckpointStore>,
1202    epoch_store: Arc<AuthorityPerEpochStore>,
1203    notify: Arc<Notify>,
1204    notify_aggregator: Arc<Notify>,
1205    last_built: watch::Sender<CheckpointSequenceNumber>,
1206    effects_store: Arc<dyn TransactionCacheRead>,
1207    global_state_hasher: Weak<GlobalStateHasher>,
1208    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1209    output: Box<dyn CheckpointOutput>,
1210    metrics: Arc<CheckpointMetrics>,
1211    max_transactions_per_checkpoint: usize,
1212    max_checkpoint_size_bytes: usize,
1213}
1214
1215pub struct CheckpointAggregator {
1216    store: Arc<CheckpointStore>,
1217    epoch_store: Arc<AuthorityPerEpochStore>,
1218    notify: Arc<Notify>,
1219    current: Option<CheckpointSignatureAggregator>,
1220    output: Box<dyn CertifiedCheckpointOutput>,
1221    state: Arc<AuthorityState>,
1222    metrics: Arc<CheckpointMetrics>,
1223}
1224
1225// This holds information to aggregate signatures for one checkpoint
1226pub struct CheckpointSignatureAggregator {
1227    next_index: u64,
1228    summary: CheckpointSummary,
1229    digest: CheckpointDigest,
1230    /// Aggregates voting stake for each signed checkpoint proposal by authority
1231    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1232    store: Arc<CheckpointStore>,
1233    state: Arc<AuthorityState>,
1234    metrics: Arc<CheckpointMetrics>,
1235}
1236
1237impl CheckpointBuilder {
1238    fn new(
1239        state: Arc<AuthorityState>,
1240        store: Arc<CheckpointStore>,
1241        epoch_store: Arc<AuthorityPerEpochStore>,
1242        notify: Arc<Notify>,
1243        effects_store: Arc<dyn TransactionCacheRead>,
1244        // for synchronous accumulation of end-of-epoch checkpoint
1245        global_state_hasher: Weak<GlobalStateHasher>,
1246        // for asynchronous/concurrent accumulation of regular checkpoints
1247        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1248        output: Box<dyn CheckpointOutput>,
1249        notify_aggregator: Arc<Notify>,
1250        last_built: watch::Sender<CheckpointSequenceNumber>,
1251        metrics: Arc<CheckpointMetrics>,
1252        max_transactions_per_checkpoint: usize,
1253        max_checkpoint_size_bytes: usize,
1254    ) -> Self {
1255        Self {
1256            state,
1257            store,
1258            epoch_store,
1259            notify,
1260            effects_store,
1261            global_state_hasher,
1262            send_to_hasher,
1263            output,
1264            notify_aggregator,
1265            last_built,
1266            metrics,
1267            max_transactions_per_checkpoint,
1268            max_checkpoint_size_bytes,
1269        }
1270    }
1271
1272    /// This function first waits for ConsensusCommitHandler to finish reprocessing
1273    /// commits that have been processed before the last restart, if consensus_replay_waiter
1274    /// is supplied. Then it starts building checkpoints in a loop.
1275    ///
1276    /// It is optional to pass in consensus_replay_waiter, to make it easier to attribute
1277    /// if slow recovery of previously built checkpoints is due to consensus replay or
1278    /// checkpoint building.
1279    async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1280        if let Some(replay_waiter) = consensus_replay_waiter {
1281            info!("Waiting for consensus commits to replay ...");
1282            replay_waiter.wait_for_replay().await;
1283            info!("Consensus commits finished replaying");
1284        }
1285        info!("Starting CheckpointBuilder");
1286        loop {
1287            match self.maybe_build_checkpoints().await {
1288                Ok(()) => {}
1289                err @ Err(
1290                    CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1291                    | CheckpointBuilderError::SystemPackagesMissing,
1292                ) => {
1293                    info!("CheckpointBuilder stopping: {:?}", err);
1294                    return;
1295                }
1296                Err(CheckpointBuilderError::Retry(inner)) => {
1297                    let msg = format!("{:?}", inner);
1298                    debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1299                    tokio::time::sleep(Duration::from_secs(1)).await;
1300                    self.metrics.checkpoint_errors.inc();
1301                    continue;
1302                }
1303            }
1304
1305            self.notify.notified().await;
1306        }
1307    }
1308
1309    async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1310        let _scope = monitored_scope("BuildCheckpoints");
1311
1312        // Collect info about the most recently built checkpoint.
1313        let summary = self
1314            .epoch_store
1315            .last_built_checkpoint_builder_summary()
1316            .expect("epoch should not have ended");
1317        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1318        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1319
1320        let min_checkpoint_interval_ms = self
1321            .epoch_store
1322            .protocol_config()
1323            .min_checkpoint_interval_ms_as_option()
1324            .unwrap_or_default();
1325        let mut grouped_pending_checkpoints = Vec::new();
1326        let mut checkpoints_iter = self
1327            .epoch_store
1328            .get_pending_checkpoints(last_height)
1329            .expect("unexpected epoch store error")
1330            .into_iter()
1331            .peekable();
1332        while let Some((height, pending)) = checkpoints_iter.next() {
1333            // Group PendingCheckpoints until:
1334            // - minimum interval has elapsed ...
1335            let current_timestamp = pending.details().timestamp_ms;
1336            let can_build = match last_timestamp {
1337                    Some(last_timestamp) => {
1338                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1339                    }
1340                    None => true,
1341                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1342                //   should be written separately) ...
1343                } || checkpoints_iter
1344                    .peek()
1345                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1346                // - or, we have reached end of epoch.
1347                    || pending.details().last_of_epoch;
1348            grouped_pending_checkpoints.push(pending);
1349            if !can_build {
1350                debug!(
1351                    checkpoint_commit_height = height,
1352                    ?last_timestamp,
1353                    ?current_timestamp,
1354                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1355                );
1356                continue;
1357            }
1358
1359            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1360            last_height = Some(height);
1361            last_timestamp = Some(current_timestamp);
1362            debug!(
1363                checkpoint_commit_height_from = grouped_pending_checkpoints
1364                    .first()
1365                    .unwrap()
1366                    .details()
1367                    .checkpoint_height,
1368                checkpoint_commit_height_to = last_height,
1369                "Making checkpoint with commit height range"
1370            );
1371
1372            let seq = self
1373                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1374                .await?;
1375
1376            self.last_built.send_if_modified(|cur| {
1377                // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1378                if seq > *cur {
1379                    *cur = seq;
1380                    true
1381                } else {
1382                    false
1383                }
1384            });
1385
1386            // ensure that the task can be cancelled at end of epoch, even if no other await yields
1387            // execution.
1388            tokio::task::yield_now().await;
1389        }
1390        debug!(
1391            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1392            grouped_pending_checkpoints.len(),
1393        );
1394
1395        Ok(())
1396    }
1397
1398    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1399    async fn make_checkpoint(
1400        &mut self,
1401        pendings: Vec<PendingCheckpoint>,
1402    ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1403        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1404        let last_details = pendings.last().unwrap().details().clone();
1405
1406        // Stores the transactions that should be included in the checkpoint. Transactions will be recorded in the checkpoint
1407        // in this order.
1408        let highest_executed_sequence = self
1409            .store
1410            .get_highest_executed_checkpoint_seq_number()
1411            .expect("db error")
1412            .unwrap_or(0);
1413
1414        let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1415        let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1416
1417        let new_checkpoints = self
1418            .create_checkpoints(
1419                sorted_tx_effects_included_in_checkpoint,
1420                &last_details,
1421                &all_roots,
1422            )
1423            .await?;
1424        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1425        if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1426            debug_fatal!(
1427                "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1428            );
1429        }
1430
1431        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1432            .await?;
1433        Ok(highest_sequence)
1434    }
1435
1436    async fn construct_and_execute_settlement_transactions(
1437        &self,
1438        sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1439        checkpoint_height: CheckpointHeight,
1440        tx_index_offset: u64,
1441    ) -> (TransactionKey, Vec<TransactionEffects>) {
1442        let _scope =
1443            monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1444
1445        let tx_key =
1446            TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1447
1448        let epoch = self.epoch_store.epoch();
1449        let accumulator_root_obj_initial_shared_version = self
1450            .epoch_store
1451            .epoch_start_config()
1452            .accumulator_root_obj_initial_shared_version()
1453            .expect("accumulator root object must exist");
1454
1455        let builder = AccumulatorSettlementTxBuilder::new(
1456            Some(self.effects_store.as_ref()),
1457            sorted_tx_effects_included_in_checkpoint,
1458            tx_index_offset,
1459        );
1460
1461        let accumulator_changes = builder.collect_accumulator_changes();
1462        let num_updates = builder.num_updates();
1463        let (settlement_txns, barrier_tx) = builder.build_tx(
1464            self.epoch_store.protocol_config(),
1465            epoch,
1466            accumulator_root_obj_initial_shared_version,
1467            checkpoint_height,
1468        );
1469
1470        let settlement_txns: Vec<_> = settlement_txns
1471            .into_iter()
1472            .chain(std::iter::once(barrier_tx))
1473            .map(|tx| {
1474                VerifiedExecutableTransaction::new_system(
1475                    VerifiedTransaction::new_system_transaction(tx),
1476                    self.epoch_store.epoch(),
1477                )
1478            })
1479            .collect();
1480
1481        let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1482
1483        debug!(
1484            ?settlement_digests,
1485            ?tx_key,
1486            "created settlement transactions with {num_updates} updates"
1487        );
1488
1489        self.epoch_store
1490            .notify_settlement_transactions_ready(tx_key, settlement_txns);
1491
1492        let settlement_effects = loop {
1493            match tokio::time::timeout(Duration::from_secs(5), async {
1494                self.effects_store
1495                    .notify_read_executed_effects(
1496                        "CheckpointBuilder::notify_read_settlement_effects",
1497                        &settlement_digests,
1498                    )
1499                    .await
1500            })
1501            .await
1502            {
1503                Ok(effects) => break effects,
1504                Err(_) => {
1505                    debug_fatal!(
1506                        "Timeout waiting for settlement transactions to be executed {:?}, retrying...",
1507                        tx_key
1508                    );
1509                }
1510            }
1511        };
1512
1513        let mut next_accumulator_version = None;
1514        for fx in settlement_effects.iter() {
1515            assert!(
1516                fx.status().is_ok(),
1517                "settlement transaction cannot fail (digest: {:?}) {:#?}",
1518                fx.transaction_digest(),
1519                fx
1520            );
1521            if let Some(version) = fx
1522                .mutated()
1523                .iter()
1524                .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1525            {
1526                assert!(
1527                    next_accumulator_version.is_none(),
1528                    "Only one settlement transaction should mutate the accumulator root object"
1529                );
1530                next_accumulator_version = Some(version);
1531            }
1532        }
1533        let settlements = BalanceSettlement {
1534            next_accumulator_version: next_accumulator_version
1535                .expect("Accumulator root object should be mutated in the settlement transactions"),
1536            balance_changes: accumulator_changes,
1537        };
1538
1539        self.state
1540            .execution_scheduler()
1541            .settle_balances(settlements);
1542
1543        (tx_key, settlement_effects)
1544    }
1545
1546    // Given the root transactions of a pending checkpoint, resolve the transactions should be included in
1547    // the checkpoint, and return them in the order they should be included in the checkpoint.
1548    // `effects_in_current_checkpoint` tracks the transactions that already exist in the current
1549    // checkpoint.
1550    #[instrument(level = "debug", skip_all)]
1551    async fn resolve_checkpoint_transactions(
1552        &self,
1553        pending_checkpoints: Vec<PendingCheckpoint>,
1554    ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1555        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1556
1557        // Keeps track of the effects that are already included in the current checkpoint.
1558        // This is used when there are multiple pending checkpoints to create a single checkpoint
1559        // because in such scenarios, dependencies of a transaction may in earlier created checkpoints,
1560        // or in earlier pending checkpoints.
1561        let mut effects_in_current_checkpoint = BTreeSet::new();
1562
1563        let mut tx_effects = Vec::new();
1564        let mut tx_roots = HashSet::new();
1565
1566        for pending_checkpoint in pending_checkpoints.into_iter() {
1567            let mut pending = pending_checkpoint;
1568            debug!(
1569                checkpoint_commit_height = pending.details.checkpoint_height,
1570                "Resolving checkpoint transactions for pending checkpoint.",
1571            );
1572
1573            trace!(
1574                "roots for pending checkpoint {:?}: {:?}",
1575                pending.details.checkpoint_height, pending.roots,
1576            );
1577
1578            let settlement_root = if self.epoch_store.accumulators_enabled() {
1579                let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1580                    pending.roots.pop()
1581                else {
1582                    fatal!("No settlement root found");
1583                };
1584                Some(settlement_root)
1585            } else {
1586                None
1587            };
1588
1589            let roots = &pending.roots;
1590
1591            self.metrics
1592                .checkpoint_roots_count
1593                .inc_by(roots.len() as u64);
1594
1595            let root_digests = self
1596                .epoch_store
1597                .notify_read_tx_key_to_digest(roots)
1598                .in_monitored_scope("CheckpointNotifyDigests")
1599                .await?;
1600            let root_effects = self
1601                .effects_store
1602                .notify_read_executed_effects(
1603                    CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1604                    &root_digests,
1605                )
1606                .in_monitored_scope("CheckpointNotifyRead")
1607                .await;
1608
1609            let consensus_commit_prologue = if self
1610                .epoch_store
1611                .protocol_config()
1612                .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1613            {
1614                // If the roots contains consensus commit prologue transaction, we want to extract it,
1615                // and put it to the front of the checkpoint.
1616
1617                let consensus_commit_prologue =
1618                    self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1619
1620                // Get the unincluded depdnencies of the consensus commit prologue. We should expect no
1621                // other dependencies that haven't been included in any previous checkpoints.
1622                if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1623                    let unsorted_ccp = self.complete_checkpoint_effects(
1624                        vec![ccp_effects.clone()],
1625                        &mut effects_in_current_checkpoint,
1626                    )?;
1627
1628                    // No other dependencies of this consensus commit prologue that haven't been included
1629                    // in any previous checkpoint.
1630                    if unsorted_ccp.len() != 1 {
1631                        fatal!(
1632                            "Expected 1 consensus commit prologue, got {:?}",
1633                            unsorted_ccp
1634                                .iter()
1635                                .map(|e| e.transaction_digest())
1636                                .collect::<Vec<_>>()
1637                        );
1638                    }
1639                    assert_eq!(unsorted_ccp.len(), 1);
1640                    assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1641                }
1642                consensus_commit_prologue
1643            } else {
1644                None
1645            };
1646
1647            let unsorted =
1648                self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1649
1650            let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1651            let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1652            if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1653                if cfg!(debug_assertions) {
1654                    // When consensus_commit_prologue is extracted, it should not be included in the `unsorted`.
1655                    for tx in unsorted.iter() {
1656                        assert!(tx.transaction_digest() != &ccp_digest);
1657                    }
1658                }
1659                sorted.push(ccp_effects);
1660            }
1661            sorted.extend(CausalOrder::causal_sort(unsorted));
1662
1663            if let Some(settlement_root) = settlement_root {
1664                // tx_effects.len() gives us the offset for this pending checkpoint's transactions
1665                // in the final concatenated checkpoint
1666                let tx_index_offset = tx_effects.len() as u64;
1667                let (tx_key, settlement_effects) = self
1668                    .construct_and_execute_settlement_transactions(
1669                        &sorted,
1670                        pending.details.checkpoint_height,
1671                        tx_index_offset,
1672                    )
1673                    .await;
1674                debug!(?tx_key, "executed settlement transactions");
1675
1676                assert_eq!(settlement_root, tx_key);
1677
1678                // Note: we do not need to add the settlement digests to `tx_roots` - `tx_roots`
1679                // should only include the digests of transactions that were originally roots in
1680                // the pending checkpoint. It is later used to identify transactions which were
1681                // added as dependencies, so that those transactions can be waited on using
1682                // `consensus_messages_processed_notify()`. System transactions (such as
1683                // settlements) are exempt from this already.
1684                sorted.extend(settlement_effects);
1685            }
1686
1687            #[cfg(msim)]
1688            {
1689                // Check consensus commit prologue invariants in sim test.
1690                self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1691            }
1692
1693            tx_effects.extend(sorted);
1694            tx_roots.extend(root_digests);
1695        }
1696
1697        Ok((tx_effects, tx_roots))
1698    }
1699
1700    // This function is used to extract the consensus commit prologue digest and effects from the root
1701    // transactions.
1702    // This function can only be used when prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled.
1703    // The consensus commit prologue is expected to be the first transaction in the roots.
1704    fn extract_consensus_commit_prologue(
1705        &self,
1706        root_digests: &[TransactionDigest],
1707        root_effects: &[TransactionEffects],
1708    ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1709        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1710        if root_digests.is_empty() {
1711            return Ok(None);
1712        }
1713
1714        // Reads the first transaction in the roots, and checks whether it is a consensus commit prologue
1715        // transaction.
1716        // When prepend_prologue_tx_in_consensus_commit_in_checkpoints is enabled, the consensus commit prologue
1717        // transaction should be the first transaction in the roots written by the consensus handler.
1718        let first_tx = self
1719            .state
1720            .get_transaction_cache_reader()
1721            .get_transaction_block(&root_digests[0])
1722            .expect("Transaction block must exist");
1723
1724        Ok(first_tx
1725            .transaction_data()
1726            .is_consensus_commit_prologue()
1727            .then(|| {
1728                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1729                (*first_tx.digest(), root_effects[0].clone())
1730            }))
1731    }
1732
1733    #[instrument(level = "debug", skip_all)]
1734    async fn write_checkpoints(
1735        &mut self,
1736        height: CheckpointHeight,
1737        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1738    ) -> SuiResult {
1739        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1740        let mut batch = self.store.tables.checkpoint_content.batch();
1741        let mut all_tx_digests =
1742            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1743
1744        for (summary, contents) in &new_checkpoints {
1745            debug!(
1746                checkpoint_commit_height = height,
1747                checkpoint_seq = summary.sequence_number,
1748                contents_digest = ?contents.digest(),
1749                "writing checkpoint",
1750            );
1751
1752            if let Some(previously_computed_summary) = self
1753                .store
1754                .tables
1755                .locally_computed_checkpoints
1756                .get(&summary.sequence_number)?
1757                && previously_computed_summary.digest() != summary.digest()
1758            {
1759                fatal!(
1760                    "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1761                    summary.sequence_number,
1762                    previously_computed_summary.digest(),
1763                    summary.digest()
1764                );
1765            }
1766
1767            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1768
1769            self.metrics
1770                .transactions_included_in_checkpoint
1771                .inc_by(contents.size() as u64);
1772            let sequence_number = summary.sequence_number;
1773            self.metrics
1774                .last_constructed_checkpoint
1775                .set(sequence_number as i64);
1776
1777            batch.insert_batch(
1778                &self.store.tables.checkpoint_content,
1779                [(contents.digest(), contents)],
1780            )?;
1781
1782            batch.insert_batch(
1783                &self.store.tables.locally_computed_checkpoints,
1784                [(sequence_number, summary)],
1785            )?;
1786        }
1787
1788        batch.write()?;
1789
1790        // Send all checkpoint sigs to consensus.
1791        for (summary, contents) in &new_checkpoints {
1792            self.output
1793                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1794                .await?;
1795        }
1796
1797        for (local_checkpoint, _) in &new_checkpoints {
1798            if let Some(certified_checkpoint) = self
1799                .store
1800                .tables
1801                .certified_checkpoints
1802                .get(local_checkpoint.sequence_number())?
1803            {
1804                self.store
1805                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1806            }
1807        }
1808
1809        self.notify_aggregator.notify_one();
1810        self.epoch_store
1811            .process_constructed_checkpoint(height, new_checkpoints);
1812        Ok(())
1813    }
1814
1815    #[allow(clippy::type_complexity)]
1816    fn split_checkpoint_chunks(
1817        &self,
1818        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1819        signatures: Vec<Vec<GenericSignature>>,
1820    ) -> CheckpointBuilderResult<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1821        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1822        let mut chunks = Vec::new();
1823        let mut chunk = Vec::new();
1824        let mut chunk_size: usize = 0;
1825        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1826            .into_iter()
1827            .zip(signatures.into_iter())
1828        {
1829            // Roll over to a new chunk after either max count or max size is reached.
1830            // The size calculation here is intended to estimate the size of the
1831            // FullCheckpointContents struct. If this code is modified, that struct
1832            // should also be updated accordingly.
1833            let size = transaction_size
1834                + bcs::serialized_size(&effects)?
1835                + bcs::serialized_size(&signatures)?;
1836            if chunk.len() == self.max_transactions_per_checkpoint
1837                || (chunk_size + size) > self.max_checkpoint_size_bytes
1838            {
1839                if chunk.is_empty() {
1840                    // Always allow at least one tx in a checkpoint.
1841                    warn!(
1842                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1843                        self.max_checkpoint_size_bytes
1844                    );
1845                } else {
1846                    chunks.push(chunk);
1847                    chunk = Vec::new();
1848                    chunk_size = 0;
1849                }
1850            }
1851
1852            chunk.push((effects, signatures));
1853            chunk_size += size;
1854        }
1855
1856        if !chunk.is_empty() || chunks.is_empty() {
1857            // We intentionally create an empty checkpoint if there is no content provided
1858            // to make a 'heartbeat' checkpoint.
1859            // Important: if some conditions are added here later, we need to make sure we always
1860            // have at least one chunk if last_pending_of_epoch is set
1861            chunks.push(chunk);
1862            // Note: empty checkpoints are ok - they shouldn't happen at all on a network with even
1863            // modest load. Even if they do happen, it is still useful as it allows fullnodes to
1864            // distinguish between "no transactions have happened" and "i am not receiving new
1865            // checkpoints".
1866        }
1867        Ok(chunks)
1868    }
1869
1870    fn load_last_built_checkpoint_summary(
1871        epoch_store: &AuthorityPerEpochStore,
1872        store: &CheckpointStore,
1873    ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1874        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1875        if last_checkpoint.is_none() {
1876            let epoch = epoch_store.epoch();
1877            if epoch > 0 {
1878                let previous_epoch = epoch - 1;
1879                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1880                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1881                if let Some((ref seq, _)) = last_checkpoint {
1882                    debug!(
1883                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1884                    );
1885                } else {
1886                    // This is some serious bug with when CheckpointBuilder started so surfacing it via panic
1887                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1888                }
1889            }
1890        }
1891        Ok(last_checkpoint)
1892    }
1893
1894    #[instrument(level = "debug", skip_all)]
1895    async fn create_checkpoints(
1896        &self,
1897        all_effects: Vec<TransactionEffects>,
1898        details: &PendingCheckpointInfo,
1899        all_roots: &HashSet<TransactionDigest>,
1900    ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1901        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1902
1903        let total = all_effects.len();
1904        let mut last_checkpoint =
1905            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1906        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1907        info!(
1908            checkpoint_commit_height = details.checkpoint_height,
1909            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1910            checkpoint_timestamp = details.timestamp_ms,
1911            "Creating checkpoint(s) for {} transactions",
1912            all_effects.len(),
1913        );
1914
1915        let all_digests: Vec<_> = all_effects
1916            .iter()
1917            .map(|effect| *effect.transaction_digest())
1918            .collect();
1919        let transactions_and_sizes = self
1920            .state
1921            .get_transaction_cache_reader()
1922            .get_transactions_and_serialized_sizes(&all_digests)?;
1923        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1924        let mut transactions = Vec::with_capacity(all_effects.len());
1925        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1926        let mut randomness_rounds = BTreeMap::new();
1927        {
1928            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1929            debug!(
1930                ?last_checkpoint_seq,
1931                "Waiting for {:?} certificates to appear in consensus",
1932                all_effects.len()
1933            );
1934
1935            for (effects, transaction_and_size) in all_effects
1936                .into_iter()
1937                .zip(transactions_and_sizes.into_iter())
1938            {
1939                let (transaction, size) = transaction_and_size
1940                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1941                match transaction.inner().transaction_data().kind() {
1942                    TransactionKind::ConsensusCommitPrologue(_)
1943                    | TransactionKind::ConsensusCommitPrologueV2(_)
1944                    | TransactionKind::ConsensusCommitPrologueV3(_)
1945                    | TransactionKind::ConsensusCommitPrologueV4(_)
1946                    | TransactionKind::AuthenticatorStateUpdate(_) => {
1947                        // ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
1948                        // processed before we reach here.
1949                    }
1950                    TransactionKind::ProgrammableSystemTransaction(_) => {
1951                        // settlement transactions are added by checkpoint builder
1952                    }
1953                    TransactionKind::ChangeEpoch(_)
1954                    | TransactionKind::Genesis(_)
1955                    | TransactionKind::EndOfEpochTransaction(_) => {
1956                        fatal!(
1957                            "unexpected transaction in checkpoint effects: {:?}",
1958                            transaction
1959                        );
1960                    }
1961                    TransactionKind::RandomnessStateUpdate(rsu) => {
1962                        randomness_rounds
1963                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1964                    }
1965                    TransactionKind::ProgrammableTransaction(_) => {
1966                        // Only transactions that are not roots should be included in the call to
1967                        // `consensus_messages_processed_notify`. roots come directly from the consensus
1968                        // commit and so are known to be processed already.
1969                        let digest = *effects.transaction_digest();
1970                        if !all_roots.contains(&digest) {
1971                            transaction_keys.push(SequencedConsensusTransactionKey::External(
1972                                ConsensusTransactionKey::Certificate(digest),
1973                            ));
1974                        }
1975                    }
1976                }
1977                transactions.push(transaction);
1978                all_effects_and_transaction_sizes.push((effects, size));
1979            }
1980
1981            self.epoch_store
1982                .consensus_messages_processed_notify(transaction_keys)
1983                .await?;
1984        }
1985
1986        let signatures = self
1987            .epoch_store
1988            .user_signatures_for_checkpoint(&transactions, &all_digests);
1989        debug!(
1990            ?last_checkpoint_seq,
1991            "Received {} checkpoint user signatures from consensus",
1992            signatures.len()
1993        );
1994
1995        let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
1996            Some(
1997                transactions
1998                    .iter()
1999                    .flat_map(|tx| {
2000                        if let TransactionKind::ProgrammableTransaction(ptb) =
2001                            tx.transaction_data().kind()
2002                        {
2003                            itertools::Either::Left(
2004                                ptb.commands
2005                                    .iter()
2006                                    .map(ExecutionTimeObservationKey::from_command),
2007                            )
2008                        } else {
2009                            itertools::Either::Right(std::iter::empty())
2010                        }
2011                    })
2012                    .collect(),
2013            )
2014        } else {
2015            None
2016        };
2017
2018        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2019        let chunks_count = chunks.len();
2020
2021        let mut checkpoints = Vec::with_capacity(chunks_count);
2022        debug!(
2023            ?last_checkpoint_seq,
2024            "Creating {} checkpoints with {} transactions", chunks_count, total,
2025        );
2026
2027        let epoch = self.epoch_store.epoch();
2028        for (index, transactions) in chunks.into_iter().enumerate() {
2029            let first_checkpoint_of_epoch = index == 0
2030                && last_checkpoint
2031                    .as_ref()
2032                    .map(|(_, c)| c.epoch != epoch)
2033                    .unwrap_or(true);
2034            if first_checkpoint_of_epoch {
2035                self.epoch_store
2036                    .record_epoch_first_checkpoint_creation_time_metric();
2037            }
2038            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2039
2040            let sequence_number = last_checkpoint
2041                .as_ref()
2042                .map(|(_, c)| c.sequence_number + 1)
2043                .unwrap_or_default();
2044            let mut timestamp_ms = details.timestamp_ms;
2045            if let Some((_, last_checkpoint)) = &last_checkpoint
2046                && last_checkpoint.timestamp_ms > timestamp_ms
2047            {
2048                // First consensus commit of an epoch can have zero timestamp.
2049                debug!(
2050                    "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2051                    sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2052                );
2053                if self
2054                    .epoch_store
2055                    .protocol_config()
2056                    .enforce_checkpoint_timestamp_monotonicity()
2057                {
2058                    timestamp_ms = last_checkpoint.timestamp_ms;
2059                }
2060            }
2061
2062            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2063            let epoch_rolling_gas_cost_summary =
2064                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2065
2066            let end_of_epoch_data = if last_checkpoint_of_epoch {
2067                let system_state_obj = self
2068                    .augment_epoch_last_checkpoint(
2069                        &epoch_rolling_gas_cost_summary,
2070                        timestamp_ms,
2071                        &mut effects,
2072                        &mut signatures,
2073                        sequence_number,
2074                        std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2075                        last_checkpoint_seq.unwrap_or_default(),
2076                    )
2077                    .await?;
2078
2079                let committee = system_state_obj
2080                    .get_current_epoch_committee()
2081                    .committee()
2082                    .clone();
2083
2084                // This must happen after the call to augment_epoch_last_checkpoint,
2085                // otherwise we will not capture the change_epoch tx.
2086                let root_state_digest = {
2087                    let state_acc = self
2088                        .global_state_hasher
2089                        .upgrade()
2090                        .expect("No checkpoints should be getting built after local configuration");
2091                    let acc = state_acc.accumulate_checkpoint(
2092                        &effects,
2093                        sequence_number,
2094                        &self.epoch_store,
2095                    )?;
2096
2097                    state_acc
2098                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2099                        .await?;
2100
2101                    state_acc.accumulate_running_root(
2102                        &self.epoch_store,
2103                        sequence_number,
2104                        Some(acc),
2105                    )?;
2106                    state_acc
2107                        .digest_epoch(self.epoch_store.clone(), sequence_number)
2108                        .await?
2109                };
2110                self.metrics.highest_accumulated_epoch.set(epoch as i64);
2111                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2112
2113                let epoch_commitments = if self
2114                    .epoch_store
2115                    .protocol_config()
2116                    .check_commit_root_state_digest_supported()
2117                {
2118                    vec![root_state_digest.into()]
2119                } else {
2120                    vec![]
2121                };
2122
2123                Some(EndOfEpochData {
2124                    next_epoch_committee: committee.voting_rights,
2125                    next_epoch_protocol_version: ProtocolVersion::new(
2126                        system_state_obj.protocol_version(),
2127                    ),
2128                    epoch_commitments,
2129                })
2130            } else {
2131                self.send_to_hasher
2132                    .send((sequence_number, effects.clone()))
2133                    .await?;
2134
2135                None
2136            };
2137            let contents = CheckpointContents::new_with_digests_and_signatures(
2138                effects.iter().map(TransactionEffects::execution_digests),
2139                signatures,
2140            );
2141
2142            let num_txns = contents.size() as u64;
2143
2144            let network_total_transactions = last_checkpoint
2145                .as_ref()
2146                .map(|(_, c)| c.network_total_transactions + num_txns)
2147                .unwrap_or(num_txns);
2148
2149            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2150
2151            let matching_randomness_rounds: Vec<_> = effects
2152                .iter()
2153                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2154                .copied()
2155                .collect();
2156
2157            let checkpoint_commitments = if self
2158                .epoch_store
2159                .protocol_config()
2160                .include_checkpoint_artifacts_digest_in_summary()
2161            {
2162                let artifacts = CheckpointArtifacts::from(&effects[..]);
2163                let artifacts_digest = artifacts.digest()?;
2164                vec![artifacts_digest.into()]
2165            } else {
2166                Default::default()
2167            };
2168
2169            let summary = CheckpointSummary::new(
2170                self.epoch_store.protocol_config(),
2171                epoch,
2172                sequence_number,
2173                network_total_transactions,
2174                &contents,
2175                previous_digest,
2176                epoch_rolling_gas_cost_summary,
2177                end_of_epoch_data,
2178                timestamp_ms,
2179                matching_randomness_rounds,
2180                checkpoint_commitments,
2181            );
2182            summary.report_checkpoint_age(
2183                &self.metrics.last_created_checkpoint_age,
2184                &self.metrics.last_created_checkpoint_age_ms,
2185            );
2186            if last_checkpoint_of_epoch {
2187                info!(
2188                    checkpoint_seq = sequence_number,
2189                    "creating last checkpoint of epoch {}", epoch
2190                );
2191                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2192                    self.epoch_store
2193                        .report_epoch_metrics_at_last_checkpoint(stats);
2194                }
2195            }
2196            last_checkpoint = Some((sequence_number, summary.clone()));
2197            checkpoints.push((summary, contents));
2198        }
2199
2200        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2201    }
2202
2203    fn get_epoch_total_gas_cost(
2204        &self,
2205        last_checkpoint: Option<&CheckpointSummary>,
2206        cur_checkpoint_effects: &[TransactionEffects],
2207    ) -> GasCostSummary {
2208        let (previous_epoch, previous_gas_costs) = last_checkpoint
2209            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2210            .unwrap_or_default();
2211        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2212        if previous_epoch == self.epoch_store.epoch() {
2213            // sum only when we are within the same epoch
2214            GasCostSummary::new(
2215                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2216                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2217                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2218                previous_gas_costs.non_refundable_storage_fee
2219                    + current_gas_costs.non_refundable_storage_fee,
2220            )
2221        } else {
2222            current_gas_costs
2223        }
2224    }
2225
2226    #[instrument(level = "error", skip_all)]
2227    async fn augment_epoch_last_checkpoint(
2228        &self,
2229        epoch_total_gas_cost: &GasCostSummary,
2230        epoch_start_timestamp_ms: CheckpointTimestamp,
2231        checkpoint_effects: &mut Vec<TransactionEffects>,
2232        signatures: &mut Vec<Vec<GenericSignature>>,
2233        checkpoint: CheckpointSequenceNumber,
2234        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2235        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
2236        // >1 checkpoint.
2237        last_checkpoint: CheckpointSequenceNumber,
2238    ) -> CheckpointBuilderResult<SuiSystemState> {
2239        let (system_state, effects) = self
2240            .state
2241            .create_and_execute_advance_epoch_tx(
2242                &self.epoch_store,
2243                epoch_total_gas_cost,
2244                checkpoint,
2245                epoch_start_timestamp_ms,
2246                end_of_epoch_observation_keys,
2247                last_checkpoint,
2248            )
2249            .await?;
2250        checkpoint_effects.push(effects);
2251        signatures.push(vec![]);
2252        Ok(system_state)
2253    }
2254
2255    /// For the given roots return complete list of effects to include in checkpoint
2256    /// This list includes the roots and all their dependencies, which are not part of checkpoint already.
2257    /// Note that this function may be called multiple times to construct the checkpoint.
2258    /// `existing_tx_digests_in_checkpoint` is used to track the transactions that are already included in the checkpoint.
2259    /// Txs in `roots` that need to be included in the checkpoint will be added to `existing_tx_digests_in_checkpoint`
2260    /// after the call of this function.
2261    #[instrument(level = "debug", skip_all)]
2262    fn complete_checkpoint_effects(
2263        &self,
2264        mut roots: Vec<TransactionEffects>,
2265        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2266    ) -> SuiResult<Vec<TransactionEffects>> {
2267        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2268        let mut results = vec![];
2269        let mut seen = HashSet::new();
2270        loop {
2271            let mut pending = HashSet::new();
2272
2273            let transactions_included = self
2274                .epoch_store
2275                .builder_included_transactions_in_checkpoint(
2276                    roots.iter().map(|e| e.transaction_digest()),
2277                )?;
2278
2279            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2280                let digest = effect.transaction_digest();
2281                // Unnecessary to read effects of a dependency if the effect is already processed.
2282                seen.insert(*digest);
2283
2284                // Skip roots that are already included in the checkpoint.
2285                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2286                    continue;
2287                }
2288
2289                // Skip roots already included in checkpoints or roots from previous epochs
2290                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2291                    continue;
2292                }
2293
2294                let existing_effects = self
2295                    .epoch_store
2296                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
2297
2298                for (dependency, effects_signature_exists) in
2299                    effect.dependencies().iter().zip(existing_effects.iter())
2300                {
2301                    // Skip here if dependency not executed in the current epoch.
2302                    // Note that the existence of an effects signature in the
2303                    // epoch store for the given digest indicates that the transaction
2304                    // was locally executed in the current epoch
2305                    if !effects_signature_exists {
2306                        continue;
2307                    }
2308                    if seen.insert(*dependency) {
2309                        pending.insert(*dependency);
2310                    }
2311                }
2312                results.push(effect);
2313            }
2314            if pending.is_empty() {
2315                break;
2316            }
2317            let pending = pending.into_iter().collect::<Vec<_>>();
2318            let effects = self.effects_store.multi_get_executed_effects(&pending);
2319            let effects = effects
2320                .into_iter()
2321                .zip(pending)
2322                .map(|(opt, digest)| match opt {
2323                    Some(x) => x,
2324                    None => panic!(
2325                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2326                        digest
2327                    ),
2328                })
2329                .collect::<Vec<_>>();
2330            roots = effects;
2331        }
2332
2333        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2334        Ok(results)
2335    }
2336
2337    // This function is used to check the invariants of the consensus commit prologue transactions in the checkpoint
2338    // in simtest.
2339    #[cfg(msim)]
2340    fn expensive_consensus_commit_prologue_invariants_check(
2341        &self,
2342        root_digests: &[TransactionDigest],
2343        sorted: &[TransactionEffects],
2344    ) {
2345        if !self
2346            .epoch_store
2347            .protocol_config()
2348            .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2349        {
2350            return;
2351        }
2352
2353        // Gets all the consensus commit prologue transactions from the roots.
2354        let root_txs = self
2355            .state
2356            .get_transaction_cache_reader()
2357            .multi_get_transaction_blocks(root_digests);
2358        let ccps = root_txs
2359            .iter()
2360            .filter_map(|tx| {
2361                if let Some(tx) = tx {
2362                    if tx.transaction_data().is_consensus_commit_prologue() {
2363                        Some(tx)
2364                    } else {
2365                        None
2366                    }
2367                } else {
2368                    None
2369                }
2370            })
2371            .collect::<Vec<_>>();
2372
2373        // There should be at most one consensus commit prologue transaction in the roots.
2374        assert!(ccps.len() <= 1);
2375
2376        // Get all the transactions in the checkpoint.
2377        let txs = self
2378            .state
2379            .get_transaction_cache_reader()
2380            .multi_get_transaction_blocks(
2381                &sorted
2382                    .iter()
2383                    .map(|tx| tx.transaction_digest().clone())
2384                    .collect::<Vec<_>>(),
2385            );
2386
2387        if ccps.len() == 0 {
2388            // If there is no consensus commit prologue transaction in the roots, then there should be no
2389            // consensus commit prologue transaction in the checkpoint.
2390            for tx in txs.iter() {
2391                if let Some(tx) = tx {
2392                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2393                }
2394            }
2395        } else {
2396            // If there is one consensus commit prologue, it must be the first one in the checkpoint.
2397            assert!(
2398                txs[0]
2399                    .as_ref()
2400                    .unwrap()
2401                    .transaction_data()
2402                    .is_consensus_commit_prologue()
2403            );
2404
2405            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2406
2407            for tx in txs.iter().skip(1) {
2408                if let Some(tx) = tx {
2409                    assert!(!tx.transaction_data().is_consensus_commit_prologue());
2410                }
2411            }
2412        }
2413    }
2414}
2415
2416impl CheckpointAggregator {
2417    fn new(
2418        tables: Arc<CheckpointStore>,
2419        epoch_store: Arc<AuthorityPerEpochStore>,
2420        notify: Arc<Notify>,
2421        output: Box<dyn CertifiedCheckpointOutput>,
2422        state: Arc<AuthorityState>,
2423        metrics: Arc<CheckpointMetrics>,
2424    ) -> Self {
2425        let current = None;
2426        Self {
2427            store: tables,
2428            epoch_store,
2429            notify,
2430            current,
2431            output,
2432            state,
2433            metrics,
2434        }
2435    }
2436
2437    async fn run(mut self) {
2438        info!("Starting CheckpointAggregator");
2439        loop {
2440            if let Err(e) = self.run_and_notify().await {
2441                error!(
2442                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
2443                    e
2444                );
2445                self.metrics.checkpoint_errors.inc();
2446                tokio::time::sleep(Duration::from_secs(1)).await;
2447                continue;
2448            }
2449
2450            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2451        }
2452    }
2453
2454    async fn run_and_notify(&mut self) -> SuiResult {
2455        let summaries = self.run_inner()?;
2456        for summary in summaries {
2457            self.output.certified_checkpoint_created(&summary).await?;
2458        }
2459        Ok(())
2460    }
2461
2462    fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2463        let _scope = monitored_scope("CheckpointAggregator");
2464        let mut result = vec![];
2465        'outer: loop {
2466            let next_to_certify = self.next_checkpoint_to_certify()?;
2467            let current = if let Some(current) = &mut self.current {
2468                // It's possible that the checkpoint was already certified by
2469                // the rest of the network and we've already received the
2470                // certified checkpoint via StateSync. In this case, we reset
2471                // the current signature aggregator to the next checkpoint to
2472                // be certified
2473                if current.summary.sequence_number < next_to_certify {
2474                    assert_reachable!("skip checkpoint certification");
2475                    self.current = None;
2476                    continue;
2477                }
2478                current
2479            } else {
2480                let Some(summary) = self
2481                    .epoch_store
2482                    .get_built_checkpoint_summary(next_to_certify)?
2483                else {
2484                    return Ok(result);
2485                };
2486                self.current = Some(CheckpointSignatureAggregator {
2487                    next_index: 0,
2488                    digest: summary.digest(),
2489                    summary,
2490                    signatures_by_digest: MultiStakeAggregator::new(
2491                        self.epoch_store.committee().clone(),
2492                    ),
2493                    store: self.store.clone(),
2494                    state: self.state.clone(),
2495                    metrics: self.metrics.clone(),
2496                });
2497                self.current.as_mut().unwrap()
2498            };
2499
2500            let epoch_tables = self
2501                .epoch_store
2502                .tables()
2503                .expect("should not run past end of epoch");
2504            let iter = epoch_tables
2505                .pending_checkpoint_signatures
2506                .safe_iter_with_bounds(
2507                    Some((current.summary.sequence_number, current.next_index)),
2508                    None,
2509                );
2510            for item in iter {
2511                let ((seq, index), data) = item?;
2512                if seq != current.summary.sequence_number {
2513                    trace!(
2514                        checkpoint_seq =? current.summary.sequence_number,
2515                        "Not enough checkpoint signatures",
2516                    );
2517                    // No more signatures (yet) for this checkpoint
2518                    return Ok(result);
2519                }
2520                trace!(
2521                    checkpoint_seq = current.summary.sequence_number,
2522                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2523                    current.summary.digest(),
2524                    data.summary.auth_sig().authority.concise()
2525                );
2526                self.metrics
2527                    .checkpoint_participation
2528                    .with_label_values(&[&format!(
2529                        "{:?}",
2530                        data.summary.auth_sig().authority.concise()
2531                    )])
2532                    .inc();
2533                if let Ok(auth_signature) = current.try_aggregate(data) {
2534                    debug!(
2535                        checkpoint_seq = current.summary.sequence_number,
2536                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2537                        current.summary.digest(),
2538                    );
2539                    let summary = VerifiedCheckpoint::new_unchecked(
2540                        CertifiedCheckpointSummary::new_from_data_and_sig(
2541                            current.summary.clone(),
2542                            auth_signature,
2543                        ),
2544                    );
2545
2546                    self.store.insert_certified_checkpoint(&summary)?;
2547                    self.metrics
2548                        .last_certified_checkpoint
2549                        .set(current.summary.sequence_number as i64);
2550                    current.summary.report_checkpoint_age(
2551                        &self.metrics.last_certified_checkpoint_age,
2552                        &self.metrics.last_certified_checkpoint_age_ms,
2553                    );
2554                    result.push(summary.into_inner());
2555                    self.current = None;
2556                    continue 'outer;
2557                } else {
2558                    current.next_index = index + 1;
2559                }
2560            }
2561            break;
2562        }
2563        Ok(result)
2564    }
2565
2566    fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2567        Ok(self
2568            .store
2569            .tables
2570            .certified_checkpoints
2571            .reversed_safe_iter_with_bounds(None, None)?
2572            .next()
2573            .transpose()?
2574            .map(|(seq, _)| seq + 1)
2575            .unwrap_or_default())
2576    }
2577}
2578
2579impl CheckpointSignatureAggregator {
2580    #[allow(clippy::result_unit_err)]
2581    pub fn try_aggregate(
2582        &mut self,
2583        data: CheckpointSignatureMessage,
2584    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2585        let their_digest = *data.summary.digest();
2586        let (_, signature) = data.summary.into_data_and_sig();
2587        let author = signature.authority;
2588        let envelope =
2589            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2590        match self.signatures_by_digest.insert(their_digest, envelope) {
2591            // ignore repeated signatures
2592            InsertResult::Failed { error }
2593                if matches!(
2594                    error.as_inner(),
2595                    SuiErrorKind::StakeAggregatorRepeatedSigner {
2596                        conflicting_sig: false,
2597                        ..
2598                    },
2599                ) =>
2600            {
2601                Err(())
2602            }
2603            InsertResult::Failed { error } => {
2604                warn!(
2605                    checkpoint_seq = self.summary.sequence_number,
2606                    "Failed to aggregate new signature from validator {:?}: {:?}",
2607                    author.concise(),
2608                    error
2609                );
2610                self.check_for_split_brain();
2611                Err(())
2612            }
2613            InsertResult::QuorumReached(cert) => {
2614                // It is not guaranteed that signature.authority == narwhal_cert.author, but we do verify
2615                // the signature so we know that the author signed the message at some point.
2616                if their_digest != self.digest {
2617                    self.metrics.remote_checkpoint_forks.inc();
2618                    warn!(
2619                        checkpoint_seq = self.summary.sequence_number,
2620                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2621                        author.concise(),
2622                        their_digest,
2623                        self.digest
2624                    );
2625                    return Err(());
2626                }
2627                Ok(cert)
2628            }
2629            InsertResult::NotEnoughVotes {
2630                bad_votes: _,
2631                bad_authorities: _,
2632            } => {
2633                self.check_for_split_brain();
2634                Err(())
2635            }
2636        }
2637    }
2638
2639    /// Check if there is a split brain condition in checkpoint signature aggregation, defined
2640    /// as any state wherein it is no longer possible to achieve quorum on a checkpoint proposal,
2641    /// irrespective of the outcome of any outstanding votes.
2642    fn check_for_split_brain(&self) {
2643        debug!(
2644            checkpoint_seq = self.summary.sequence_number,
2645            "Checking for split brain condition"
2646        );
2647        if self.signatures_by_digest.quorum_unreachable() {
2648            // TODO: at this point we should immediately halt processing
2649            // of new transaction certificates to avoid building on top of
2650            // forked output
2651            // self.halt_all_execution();
2652
2653            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2654            let digests_by_stake_messages = all_unique_values
2655                .iter()
2656                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2657                .map(|(digest, (_authorities, total_stake))| {
2658                    format!("{:?} (total stake: {})", digest, total_stake)
2659                })
2660                .collect::<Vec<String>>();
2661            fail_point_arg!("kill_split_brain_node", |(
2662                checkpoint_overrides,
2663                forked_authorities,
2664            ): (
2665                std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2666                std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2667            )| {
2668                #[cfg(msim)]
2669                {
2670                    if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2671                        (checkpoint_overrides.lock(), forked_authorities.lock())
2672                    {
2673                        // Find the digest produced by non-forked authorities
2674                        let correct_digest = all_unique_values
2675                            .iter()
2676                            .find(|(_, (authorities, _))| {
2677                                // Check if any authority that produced this digest is NOT in the forked set
2678                                authorities
2679                                    .iter()
2680                                    .any(|auth| !forked_authorities_set.contains(auth))
2681                            })
2682                            .map(|(digest, _)| digest.to_string())
2683                            .unwrap_or_else(|| {
2684                                // Fallback: use the digest with the highest stake
2685                                all_unique_values
2686                                    .iter()
2687                                    .max_by_key(|(_, (_, stake))| *stake)
2688                                    .map(|(digest, _)| digest.to_string())
2689                                    .unwrap_or_else(|| self.digest.to_string())
2690                            });
2691
2692                        overrides.insert(self.summary.sequence_number, correct_digest.clone());
2693
2694                        tracing::error!(
2695                            fatal = true,
2696                            "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2697                            self.summary.sequence_number,
2698                            correct_digest
2699                        );
2700                    }
2701                }
2702            });
2703
2704            debug_fatal!(
2705                "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2706                self.summary.sequence_number,
2707                self.signatures_by_digest.uncommitted_stake(),
2708                digests_by_stake_messages
2709            );
2710            self.metrics.split_brain_checkpoint_forks.inc();
2711
2712            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2713            let local_summary = self.summary.clone();
2714            let state = self.state.clone();
2715            let tables = self.store.clone();
2716
2717            tokio::spawn(async move {
2718                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2719            });
2720        }
2721    }
2722}
2723
2724/// Create data dump containing relevant data for diagnosing cause of the
2725/// split brain by querying one disagreeing validator for full checkpoint contents.
2726/// To minimize peer chatter, we only query one validator at random from each
2727/// disagreeing faction, as all honest validators that participated in this round may
2728/// inevitably run the same process.
2729async fn diagnose_split_brain(
2730    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2731    local_summary: CheckpointSummary,
2732    state: Arc<AuthorityState>,
2733    tables: Arc<CheckpointStore>,
2734) {
2735    debug!(
2736        checkpoint_seq = local_summary.sequence_number,
2737        "Running split brain diagnostics..."
2738    );
2739    let time = SystemTime::now();
2740    // collect one random disagreeing validator per differing digest
2741    let digest_to_validator = all_unique_values
2742        .iter()
2743        .filter_map(|(digest, (validators, _))| {
2744            if *digest != local_summary.digest() {
2745                let random_validator = validators.choose(&mut get_rng()).unwrap();
2746                Some((*digest, *random_validator))
2747            } else {
2748                None
2749            }
2750        })
2751        .collect::<HashMap<_, _>>();
2752    if digest_to_validator.is_empty() {
2753        panic!(
2754            "Given split brain condition, there should be at \
2755                least one validator that disagrees with local signature"
2756        );
2757    }
2758
2759    let epoch_store = state.load_epoch_store_one_call_per_task();
2760    let committee = epoch_store
2761        .epoch_start_state()
2762        .get_sui_committee_with_network_metadata();
2763    let network_config = default_mysten_network_config();
2764    let network_clients =
2765        make_network_authority_clients_with_network_config(&committee, &network_config);
2766
2767    // Query all disagreeing validators
2768    let response_futures = digest_to_validator
2769        .values()
2770        .cloned()
2771        .map(|validator| {
2772            let client = network_clients
2773                .get(&validator)
2774                .expect("Failed to get network client");
2775            let request = CheckpointRequestV2 {
2776                sequence_number: Some(local_summary.sequence_number),
2777                request_content: true,
2778                certified: false,
2779            };
2780            client.handle_checkpoint_v2(request)
2781        })
2782        .collect::<Vec<_>>();
2783
2784    let digest_name_pair = digest_to_validator.iter();
2785    let response_data = futures::future::join_all(response_futures)
2786        .await
2787        .into_iter()
2788        .zip(digest_name_pair)
2789        .filter_map(|(response, (digest, name))| match response {
2790            Ok(response) => match response {
2791                CheckpointResponseV2 {
2792                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2793                    contents: Some(contents),
2794                } => Some((*name, *digest, summary, contents)),
2795                CheckpointResponseV2 {
2796                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2797                    contents: _,
2798                } => {
2799                    panic!("Expected pending checkpoint, but got certified checkpoint");
2800                }
2801                CheckpointResponseV2 {
2802                    checkpoint: None,
2803                    contents: _,
2804                } => {
2805                    error!(
2806                        "Summary for checkpoint {:?} not found on validator {:?}",
2807                        local_summary.sequence_number, name
2808                    );
2809                    None
2810                }
2811                CheckpointResponseV2 {
2812                    checkpoint: _,
2813                    contents: None,
2814                } => {
2815                    error!(
2816                        "Contents for checkpoint {:?} not found on validator {:?}",
2817                        local_summary.sequence_number, name
2818                    );
2819                    None
2820                }
2821            },
2822            Err(e) => {
2823                error!(
2824                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2825                    e
2826                );
2827                None
2828            }
2829        })
2830        .collect::<Vec<_>>();
2831
2832    let local_checkpoint_contents = tables
2833        .get_checkpoint_contents(&local_summary.content_digest)
2834        .unwrap_or_else(|_| {
2835            panic!(
2836                "Could not find checkpoint contents for digest {:?}",
2837                local_summary.digest()
2838            )
2839        })
2840        .unwrap_or_else(|| {
2841            panic!(
2842                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2843                local_summary.sequence_number,
2844                local_summary.digest()
2845            )
2846        });
2847    let local_contents_text = format!("{local_checkpoint_contents:?}");
2848
2849    let local_summary_text = format!("{local_summary:?}");
2850    let local_validator = state.name.concise();
2851    let diff_patches = response_data
2852        .iter()
2853        .map(|(name, other_digest, other_summary, contents)| {
2854            let other_contents_text = format!("{contents:?}");
2855            let other_summary_text = format!("{other_summary:?}");
2856            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2857                .enumerate_transactions(&local_summary)
2858                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2859                .unzip();
2860            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2861                .enumerate_transactions(other_summary)
2862                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2863                .unzip();
2864            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2865            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2866            let local_transactions_text = format!("{local_transactions:#?}");
2867            let other_transactions_text = format!("{other_transactions:#?}");
2868            let transactions_patch =
2869                create_patch(&local_transactions_text, &other_transactions_text);
2870            let local_effects_text = format!("{local_effects:#?}");
2871            let other_effects_text = format!("{other_effects:#?}");
2872            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2873            let seq_number = local_summary.sequence_number;
2874            let local_digest = local_summary.digest();
2875            let other_validator = name.concise();
2876            format!(
2877                "Checkpoint: {seq_number:?}\n\
2878                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2879                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2880                Summary Diff: \n{summary_patch}\n\n\
2881                Contents Diff: \n{contents_patch}\n\n\
2882                Transactions Diff: \n{transactions_patch}\n\n\
2883                Effects Diff: \n{effects_patch}",
2884            )
2885        })
2886        .collect::<Vec<_>>()
2887        .join("\n\n\n");
2888
2889    let header = format!(
2890        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2891        Datetime: {:?}",
2892        time
2893    );
2894    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2895    let path = tempfile::tempdir()
2896        .expect("Failed to create tempdir")
2897        .keep()
2898        .join(Path::new("checkpoint_fork_dump.txt"));
2899    let mut file = File::create(path).unwrap();
2900    write!(file, "{}", fork_logs_text).unwrap();
2901    debug!("{}", fork_logs_text);
2902}
2903
2904pub trait CheckpointServiceNotify {
2905    fn notify_checkpoint_signature(
2906        &self,
2907        epoch_store: &AuthorityPerEpochStore,
2908        info: &CheckpointSignatureMessage,
2909    ) -> SuiResult;
2910
2911    fn notify_checkpoint(&self) -> SuiResult;
2912}
2913
2914#[allow(clippy::large_enum_variant)]
2915enum CheckpointServiceState {
2916    Unstarted(
2917        (
2918            CheckpointBuilder,
2919            CheckpointAggregator,
2920            CheckpointStateHasher,
2921        ),
2922    ),
2923    Started,
2924}
2925
2926impl CheckpointServiceState {
2927    fn take_unstarted(
2928        &mut self,
2929    ) -> (
2930        CheckpointBuilder,
2931        CheckpointAggregator,
2932        CheckpointStateHasher,
2933    ) {
2934        let mut state = CheckpointServiceState::Started;
2935        std::mem::swap(self, &mut state);
2936
2937        match state {
2938            CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2939                (builder, aggregator, hasher)
2940            }
2941            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2942        }
2943    }
2944}
2945
2946pub struct CheckpointService {
2947    tables: Arc<CheckpointStore>,
2948    notify_builder: Arc<Notify>,
2949    notify_aggregator: Arc<Notify>,
2950    last_signature_index: Mutex<u64>,
2951    // A notification for the current highest built sequence number.
2952    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2953    // The highest sequence number that had already been built at the time CheckpointService
2954    // was constructed
2955    highest_previously_built_seq: CheckpointSequenceNumber,
2956    metrics: Arc<CheckpointMetrics>,
2957    state: Mutex<CheckpointServiceState>,
2958}
2959
2960impl CheckpointService {
2961    /// Constructs a new CheckpointService in an un-started state.
2962    pub fn build(
2963        state: Arc<AuthorityState>,
2964        checkpoint_store: Arc<CheckpointStore>,
2965        epoch_store: Arc<AuthorityPerEpochStore>,
2966        effects_store: Arc<dyn TransactionCacheRead>,
2967        global_state_hasher: Weak<GlobalStateHasher>,
2968        checkpoint_output: Box<dyn CheckpointOutput>,
2969        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2970        metrics: Arc<CheckpointMetrics>,
2971        max_transactions_per_checkpoint: usize,
2972        max_checkpoint_size_bytes: usize,
2973    ) -> Arc<Self> {
2974        info!(
2975            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2976        );
2977        let notify_builder = Arc::new(Notify::new());
2978        let notify_aggregator = Arc::new(Notify::new());
2979
2980        // We may have built higher checkpoint numbers before restarting.
2981        let highest_previously_built_seq = checkpoint_store
2982            .get_latest_locally_computed_checkpoint()
2983            .expect("failed to get latest locally computed checkpoint")
2984            .map(|s| s.sequence_number)
2985            .unwrap_or(0);
2986
2987        let highest_currently_built_seq =
2988            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2989                .expect("epoch should not have ended")
2990                .map(|(seq, _)| seq)
2991                .unwrap_or(0);
2992
2993        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2994
2995        let aggregator = CheckpointAggregator::new(
2996            checkpoint_store.clone(),
2997            epoch_store.clone(),
2998            notify_aggregator.clone(),
2999            certified_checkpoint_output,
3000            state.clone(),
3001            metrics.clone(),
3002        );
3003
3004        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3005
3006        let ckpt_state_hasher = CheckpointStateHasher::new(
3007            epoch_store.clone(),
3008            global_state_hasher.clone(),
3009            receive_from_builder,
3010        );
3011
3012        let builder = CheckpointBuilder::new(
3013            state.clone(),
3014            checkpoint_store.clone(),
3015            epoch_store.clone(),
3016            notify_builder.clone(),
3017            effects_store,
3018            global_state_hasher,
3019            send_to_hasher,
3020            checkpoint_output,
3021            notify_aggregator.clone(),
3022            highest_currently_built_seq_tx.clone(),
3023            metrics.clone(),
3024            max_transactions_per_checkpoint,
3025            max_checkpoint_size_bytes,
3026        );
3027
3028        let last_signature_index = epoch_store
3029            .get_last_checkpoint_signature_index()
3030            .expect("should not cross end of epoch");
3031        let last_signature_index = Mutex::new(last_signature_index);
3032
3033        Arc::new(Self {
3034            tables: checkpoint_store,
3035            notify_builder,
3036            notify_aggregator,
3037            last_signature_index,
3038            highest_currently_built_seq_tx,
3039            highest_previously_built_seq,
3040            metrics,
3041            state: Mutex::new(CheckpointServiceState::Unstarted((
3042                builder,
3043                aggregator,
3044                ckpt_state_hasher,
3045            ))),
3046        })
3047    }
3048
3049    /// Starts the CheckpointService.
3050    ///
3051    /// This function blocks until the CheckpointBuilder re-builds all checkpoints that had
3052    /// been built before the most recent restart. You can think of this as a WAL replay
3053    /// operation. Upon startup, we may have a number of consensus commits and resulting
3054    /// checkpoints that were built but not committed to disk. We want to reprocess the
3055    /// commits and rebuild the checkpoints before starting normal operation.
3056    pub async fn spawn(
3057        &self,
3058        epoch_store: Arc<AuthorityPerEpochStore>,
3059        consensus_replay_waiter: Option<ReplayWaiter>,
3060    ) {
3061        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3062
3063        // Clean up state hashes computed after the last committed checkpoint
3064        // This prevents ECMH divergence after fork recovery restarts
3065        if let Some(last_committed_seq) = self
3066            .tables
3067            .get_highest_executed_checkpoint()
3068            .expect("Failed to get highest executed checkpoint")
3069            .map(|checkpoint| *checkpoint.sequence_number())
3070        {
3071            if let Err(e) = builder
3072                .epoch_store
3073                .clear_state_hashes_after_checkpoint(last_committed_seq)
3074            {
3075                error!(
3076                    "Failed to clear state hashes after checkpoint {}: {:?}",
3077                    last_committed_seq, e
3078                );
3079            } else {
3080                info!(
3081                    "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3082                    last_committed_seq
3083                );
3084            }
3085        }
3086
3087        let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3088
3089        let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3090        let aggregator_task = spawn_monitored_task!(aggregator.run());
3091
3092        spawn_monitored_task!(async move {
3093            epoch_store
3094                .within_alive_epoch(async move {
3095                    builder.run(consensus_replay_waiter).await;
3096                    builder_finished_tx.send(()).ok();
3097                })
3098                .await
3099                .ok();
3100
3101            // state hasher will terminate as soon as it has finished processing all messages from builder
3102            state_hasher_task
3103                .await
3104                .expect("state hasher should exit normally");
3105
3106            // builder must shut down before aggregator and state_hasher, since it sends
3107            // messages to them
3108            aggregator_task.abort();
3109            aggregator_task.await.ok();
3110        });
3111
3112        // If this times out, the validator may still start up. The worst that can
3113        // happen is that we will crash later on instead of immediately. The eventual
3114        // crash would occur because we may be missing transactions that are below the
3115        // highest_synced_checkpoint watermark, which can cause a crash in
3116        // `CheckpointExecutor::extract_randomness_rounds`.
3117        if tokio::time::timeout(Duration::from_secs(120), async move {
3118            tokio::select! {
3119                _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3120                _ = self.wait_for_rebuilt_checkpoints() => (),
3121            }
3122        })
3123        .await
3124        .is_err()
3125        {
3126            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3127        }
3128    }
3129}
3130
3131impl CheckpointService {
3132    /// Waits until all checkpoints had been built before the node restarted
3133    /// are rebuilt. This is required to preserve the invariant that all checkpoints
3134    /// (and their transactions) below the highest_synced_checkpoint watermark are
3135    /// available. Once the checkpoints are constructed, we can be sure that the
3136    /// transactions have also been executed.
3137    pub async fn wait_for_rebuilt_checkpoints(&self) {
3138        let highest_previously_built_seq = self.highest_previously_built_seq;
3139        let mut rx = self.highest_currently_built_seq_tx.subscribe();
3140        let mut highest_currently_built_seq = *rx.borrow_and_update();
3141        info!(
3142            "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3143        );
3144        loop {
3145            if highest_currently_built_seq >= highest_previously_built_seq {
3146                info!("Checkpoint rebuild complete");
3147                break;
3148            }
3149            rx.changed().await.unwrap();
3150            highest_currently_built_seq = *rx.borrow_and_update();
3151        }
3152    }
3153
3154    #[cfg(test)]
3155    fn write_and_notify_checkpoint_for_testing(
3156        &self,
3157        epoch_store: &AuthorityPerEpochStore,
3158        checkpoint: PendingCheckpoint,
3159    ) -> SuiResult {
3160        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3161
3162        let mut output = ConsensusCommitOutput::new(0);
3163        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3164        output.set_default_commit_stats_for_testing();
3165        epoch_store.push_consensus_output_for_tests(output);
3166        self.notify_checkpoint()?;
3167        Ok(())
3168    }
3169}
3170
3171impl CheckpointServiceNotify for CheckpointService {
3172    fn notify_checkpoint_signature(
3173        &self,
3174        epoch_store: &AuthorityPerEpochStore,
3175        info: &CheckpointSignatureMessage,
3176    ) -> SuiResult {
3177        let sequence = info.summary.sequence_number;
3178        let signer = info.summary.auth_sig().authority.concise();
3179
3180        if let Some(highest_verified_checkpoint) = self
3181            .tables
3182            .get_highest_verified_checkpoint()?
3183            .map(|x| *x.sequence_number())
3184            && sequence <= highest_verified_checkpoint
3185        {
3186            trace!(
3187                checkpoint_seq = sequence,
3188                "Ignore checkpoint signature from {} - already certified", signer,
3189            );
3190            self.metrics
3191                .last_ignored_checkpoint_signature_received
3192                .set(sequence as i64);
3193            return Ok(());
3194        }
3195        trace!(
3196            checkpoint_seq = sequence,
3197            "Received checkpoint signature, digest {} from {}",
3198            info.summary.digest(),
3199            signer,
3200        );
3201        self.metrics
3202            .last_received_checkpoint_signatures
3203            .with_label_values(&[&signer.to_string()])
3204            .set(sequence as i64);
3205        // While it can be tempting to make last_signature_index into AtomicU64, this won't work
3206        // We need to make sure we write to `pending_signatures` and trigger `notify_aggregator` without race conditions
3207        let mut index = self.last_signature_index.lock();
3208        *index += 1;
3209        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3210        self.notify_aggregator.notify_one();
3211        Ok(())
3212    }
3213
3214    fn notify_checkpoint(&self) -> SuiResult {
3215        self.notify_builder.notify_one();
3216        Ok(())
3217    }
3218}
3219
3220// test helper
3221pub struct CheckpointServiceNoop {}
3222impl CheckpointServiceNotify for CheckpointServiceNoop {
3223    fn notify_checkpoint_signature(
3224        &self,
3225        _: &AuthorityPerEpochStore,
3226        _: &CheckpointSignatureMessage,
3227    ) -> SuiResult {
3228        Ok(())
3229    }
3230
3231    fn notify_checkpoint(&self) -> SuiResult {
3232        Ok(())
3233    }
3234}
3235
3236impl PendingCheckpoint {
3237    pub fn height(&self) -> CheckpointHeight {
3238        self.details.checkpoint_height
3239    }
3240
3241    pub fn roots(&self) -> &Vec<TransactionKey> {
3242        &self.roots
3243    }
3244
3245    pub fn details(&self) -> &PendingCheckpointInfo {
3246        &self.details
3247    }
3248}
3249
3250pin_project! {
3251    pub struct PollCounter<Fut> {
3252        #[pin]
3253        future: Fut,
3254        count: usize,
3255    }
3256}
3257
3258impl<Fut> PollCounter<Fut> {
3259    pub fn new(future: Fut) -> Self {
3260        Self { future, count: 0 }
3261    }
3262
3263    pub fn count(&self) -> usize {
3264        self.count
3265    }
3266}
3267
3268impl<Fut: Future> Future for PollCounter<Fut> {
3269    type Output = (usize, Fut::Output);
3270
3271    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3272        let this = self.project();
3273        *this.count += 1;
3274        match this.future.poll(cx) {
3275            Poll::Ready(output) => Poll::Ready((*this.count, output)),
3276            Poll::Pending => Poll::Pending,
3277        }
3278    }
3279}
3280
3281fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3282    PollCounter::new(future)
3283}
3284
3285#[cfg(test)]
3286mod tests {
3287    use super::*;
3288    use crate::authority::test_authority_builder::TestAuthorityBuilder;
3289    use crate::transaction_outputs::TransactionOutputs;
3290    use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3291    use futures::FutureExt as _;
3292    use futures::future::BoxFuture;
3293    use std::collections::HashMap;
3294    use std::ops::Deref;
3295    use sui_macros::sim_test;
3296    use sui_protocol_config::{Chain, ProtocolConfig};
3297    use sui_types::accumulator_event::AccumulatorEvent;
3298    use sui_types::authenticator_state::ActiveJwk;
3299    use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3300    use sui_types::crypto::Signature;
3301    use sui_types::effects::{TransactionEffects, TransactionEvents};
3302    use sui_types::messages_checkpoint::SignedCheckpointSummary;
3303    use sui_types::transaction::VerifiedTransaction;
3304    use tokio::sync::mpsc;
3305
3306    #[tokio::test]
3307    async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3308        let store = CheckpointStore::new_for_tests();
3309        let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3310        for seq in 70u64..=80u64 {
3311            let contents =
3312                sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3313                    [sui_types::base_types::ExecutionDigests::new(
3314                        sui_types::digests::TransactionDigest::random(),
3315                        sui_types::digests::TransactionEffectsDigest::ZERO,
3316                    )],
3317                );
3318            let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3319                &protocol,
3320                0,
3321                seq,
3322                0,
3323                &contents,
3324                None,
3325                sui_types::gas::GasCostSummary::default(),
3326                None,
3327                0,
3328                Vec::new(),
3329                Vec::new(),
3330            );
3331            store
3332                .tables
3333                .locally_computed_checkpoints
3334                .insert(&seq, &summary)
3335                .unwrap();
3336        }
3337
3338        store
3339            .clear_locally_computed_checkpoints_from(76)
3340            .expect("clear should succeed");
3341
3342        // Explicit boundary checks: 75 must remain, 76 must be deleted
3343        assert!(
3344            store
3345                .tables
3346                .locally_computed_checkpoints
3347                .get(&75)
3348                .unwrap()
3349                .is_some()
3350        );
3351        assert!(
3352            store
3353                .tables
3354                .locally_computed_checkpoints
3355                .get(&76)
3356                .unwrap()
3357                .is_none()
3358        );
3359
3360        for seq in 70u64..76u64 {
3361            assert!(
3362                store
3363                    .tables
3364                    .locally_computed_checkpoints
3365                    .get(&seq)
3366                    .unwrap()
3367                    .is_some()
3368            );
3369        }
3370        for seq in 76u64..=80u64 {
3371            assert!(
3372                store
3373                    .tables
3374                    .locally_computed_checkpoints
3375                    .get(&seq)
3376                    .unwrap()
3377                    .is_none()
3378            );
3379        }
3380    }
3381
3382    #[tokio::test]
3383    async fn test_fork_detection_storage() {
3384        let store = CheckpointStore::new_for_tests();
3385        // checkpoint fork
3386        let seq_num = 42;
3387        let digest = CheckpointDigest::random();
3388
3389        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3390
3391        store
3392            .record_checkpoint_fork_detected(seq_num, digest)
3393            .unwrap();
3394
3395        let retrieved = store.get_checkpoint_fork_detected().unwrap();
3396        assert!(retrieved.is_some());
3397        let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3398        assert_eq!(retrieved_seq, seq_num);
3399        assert_eq!(retrieved_digest, digest);
3400
3401        store.clear_checkpoint_fork_detected().unwrap();
3402        assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3403
3404        // txn fork
3405        let tx_digest = TransactionDigest::random();
3406        let expected_effects = TransactionEffectsDigest::random();
3407        let actual_effects = TransactionEffectsDigest::random();
3408
3409        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3410
3411        store
3412            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3413            .unwrap();
3414
3415        let retrieved = store.get_transaction_fork_detected().unwrap();
3416        assert!(retrieved.is_some());
3417        let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3418        assert_eq!(retrieved_tx, tx_digest);
3419        assert_eq!(retrieved_expected, expected_effects);
3420        assert_eq!(retrieved_actual, actual_effects);
3421
3422        store.clear_transaction_fork_detected().unwrap();
3423        assert!(store.get_transaction_fork_detected().unwrap().is_none());
3424    }
3425
3426    #[sim_test]
3427    pub async fn checkpoint_builder_test() {
3428        telemetry_subscribers::init_for_testing();
3429
3430        let mut protocol_config =
3431            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3432        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3433        let state = TestAuthorityBuilder::new()
3434            .with_protocol_config(protocol_config)
3435            .build()
3436            .await;
3437
3438        let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3439            0,
3440            0,
3441            vec![],
3442            SequenceNumber::new(),
3443        );
3444
3445        let jwks = {
3446            let mut jwks = Vec::new();
3447            while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3448                jwks.push(ActiveJwk {
3449                    jwk_id: JwkId::new(
3450                        "https://accounts.google.com".to_string(),
3451                        "1234567890".to_string(),
3452                    ),
3453                    jwk: JWK {
3454                        kty: "RSA".to_string(),
3455                        e: "AQAB".to_string(),
3456                        n: "1234567890".to_string(),
3457                        alg: "RS256".to_string(),
3458                    },
3459                    epoch: 0,
3460                });
3461            }
3462            jwks
3463        };
3464
3465        let dummy_tx_with_data =
3466            VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3467
3468        for i in 0..15 {
3469            state
3470                .database_for_testing()
3471                .perpetual_tables
3472                .transactions
3473                .insert(&d(i), dummy_tx.serializable_ref())
3474                .unwrap();
3475        }
3476        for i in 15..20 {
3477            state
3478                .database_for_testing()
3479                .perpetual_tables
3480                .transactions
3481                .insert(&d(i), dummy_tx_with_data.serializable_ref())
3482                .unwrap();
3483        }
3484
3485        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3486        commit_cert_for_test(
3487            &mut store,
3488            state.clone(),
3489            d(1),
3490            vec![d(2), d(3)],
3491            GasCostSummary::new(11, 12, 11, 1),
3492        );
3493        commit_cert_for_test(
3494            &mut store,
3495            state.clone(),
3496            d(2),
3497            vec![d(3), d(4)],
3498            GasCostSummary::new(21, 22, 21, 1),
3499        );
3500        commit_cert_for_test(
3501            &mut store,
3502            state.clone(),
3503            d(3),
3504            vec![],
3505            GasCostSummary::new(31, 32, 31, 1),
3506        );
3507        commit_cert_for_test(
3508            &mut store,
3509            state.clone(),
3510            d(4),
3511            vec![],
3512            GasCostSummary::new(41, 42, 41, 1),
3513        );
3514        for i in [5, 6, 7, 10, 11, 12, 13] {
3515            commit_cert_for_test(
3516                &mut store,
3517                state.clone(),
3518                d(i),
3519                vec![],
3520                GasCostSummary::new(41, 42, 41, 1),
3521            );
3522        }
3523        for i in [15, 16, 17] {
3524            commit_cert_for_test(
3525                &mut store,
3526                state.clone(),
3527                d(i),
3528                vec![],
3529                GasCostSummary::new(51, 52, 51, 1),
3530            );
3531        }
3532        let all_digests: Vec<_> = store.keys().copied().collect();
3533        for digest in all_digests {
3534            let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3535            state
3536                .epoch_store_for_testing()
3537                .test_insert_user_signature(digest, vec![signature]);
3538        }
3539
3540        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3541        let (certified_output, mut certified_result) =
3542            mpsc::channel::<CertifiedCheckpointSummary>(10);
3543        let store = Arc::new(store);
3544
3545        let ckpt_dir = tempfile::tempdir().unwrap();
3546        let checkpoint_store =
3547            CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3548        let epoch_store = state.epoch_store_for_testing();
3549
3550        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3551            state.get_global_state_hash_store().clone(),
3552        ));
3553
3554        let checkpoint_service = CheckpointService::build(
3555            state.clone(),
3556            checkpoint_store,
3557            epoch_store.clone(),
3558            store,
3559            Arc::downgrade(&global_state_hasher),
3560            Box::new(output),
3561            Box::new(certified_output),
3562            CheckpointMetrics::new_for_tests(),
3563            3,
3564            100_000,
3565        );
3566        checkpoint_service.spawn(epoch_store.clone(), None).await;
3567
3568        checkpoint_service
3569            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3570            .unwrap();
3571        checkpoint_service
3572            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3573            .unwrap();
3574        checkpoint_service
3575            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3576            .unwrap();
3577        checkpoint_service
3578            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3579            .unwrap();
3580        checkpoint_service
3581            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3582            .unwrap();
3583        checkpoint_service
3584            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3585            .unwrap();
3586
3587        let (c1c, c1s) = result.recv().await.unwrap();
3588        let (c2c, c2s) = result.recv().await.unwrap();
3589
3590        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3591        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3592        assert_eq!(c1t, vec![d(4)]);
3593        assert_eq!(c1s.previous_digest, None);
3594        assert_eq!(c1s.sequence_number, 0);
3595        assert_eq!(
3596            c1s.epoch_rolling_gas_cost_summary,
3597            GasCostSummary::new(41, 42, 41, 1)
3598        );
3599
3600        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3601        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3602        assert_eq!(c2s.sequence_number, 1);
3603        assert_eq!(
3604            c2s.epoch_rolling_gas_cost_summary,
3605            GasCostSummary::new(104, 108, 104, 4)
3606        );
3607
3608        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
3609        // Verify that we split into 2 checkpoints.
3610        let (c3c, c3s) = result.recv().await.unwrap();
3611        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3612        let (c4c, c4s) = result.recv().await.unwrap();
3613        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3614        assert_eq!(c3s.sequence_number, 2);
3615        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3616        assert_eq!(c4s.sequence_number, 3);
3617        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3618        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3619        assert_eq!(c4t, vec![d(13)]);
3620
3621        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K max.
3622        // Verify that we split into 2 checkpoints.
3623        let (c5c, c5s) = result.recv().await.unwrap();
3624        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3625        let (c6c, c6s) = result.recv().await.unwrap();
3626        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3627        assert_eq!(c5s.sequence_number, 4);
3628        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3629        assert_eq!(c6s.sequence_number, 5);
3630        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3631        assert_eq!(c5t, vec![d(15), d(16)]);
3632        assert_eq!(c6t, vec![d(17)]);
3633
3634        // Pending at index 4 was too soon after the prior one and should be coalesced into
3635        // the next one.
3636        let (c7c, c7s) = result.recv().await.unwrap();
3637        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3638        assert_eq!(c7t, vec![d(5), d(6)]);
3639        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3640        assert_eq!(c7s.sequence_number, 6);
3641
3642        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3643        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3644
3645        checkpoint_service
3646            .notify_checkpoint_signature(
3647                &epoch_store,
3648                &CheckpointSignatureMessage { summary: c2ss },
3649            )
3650            .unwrap();
3651        checkpoint_service
3652            .notify_checkpoint_signature(
3653                &epoch_store,
3654                &CheckpointSignatureMessage { summary: c1ss },
3655            )
3656            .unwrap();
3657
3658        let c1sc = certified_result.recv().await.unwrap();
3659        let c2sc = certified_result.recv().await.unwrap();
3660        assert_eq!(c1sc.sequence_number, 0);
3661        assert_eq!(c2sc.sequence_number, 1);
3662    }
3663
3664    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3665        fn notify_read_executed_effects(
3666            &self,
3667            _: &str,
3668            digests: &[TransactionDigest],
3669        ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3670            std::future::ready(
3671                digests
3672                    .iter()
3673                    .map(|d| self.get(d).expect("effects not found").clone())
3674                    .collect(),
3675            )
3676            .boxed()
3677        }
3678
3679        fn notify_read_executed_effects_digests(
3680            &self,
3681            _: &str,
3682            digests: &[TransactionDigest],
3683        ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3684            std::future::ready(
3685                digests
3686                    .iter()
3687                    .map(|d| {
3688                        self.get(d)
3689                            .map(|fx| fx.digest())
3690                            .expect("effects not found")
3691                    })
3692                    .collect(),
3693            )
3694            .boxed()
3695        }
3696
3697        fn multi_get_executed_effects(
3698            &self,
3699            digests: &[TransactionDigest],
3700        ) -> Vec<Option<TransactionEffects>> {
3701            digests.iter().map(|d| self.get(d).cloned()).collect()
3702        }
3703
3704        // Unimplemented methods - its unfortunate to have this big blob of useless code, but it wasn't
3705        // worth it to keep EffectsNotifyRead around just for these tests, as it caused a ton of
3706        // complication in non-test code. (e.g. had to implement EFfectsNotifyRead for all
3707        // ExecutionCacheRead implementors).
3708
3709        fn multi_get_transaction_blocks(
3710            &self,
3711            _: &[TransactionDigest],
3712        ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3713            unimplemented!()
3714        }
3715
3716        fn multi_get_executed_effects_digests(
3717            &self,
3718            _: &[TransactionDigest],
3719        ) -> Vec<Option<TransactionEffectsDigest>> {
3720            unimplemented!()
3721        }
3722
3723        fn multi_get_effects(
3724            &self,
3725            _: &[TransactionEffectsDigest],
3726        ) -> Vec<Option<TransactionEffects>> {
3727            unimplemented!()
3728        }
3729
3730        fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3731            unimplemented!()
3732        }
3733
3734        fn get_mysticeti_fastpath_outputs(
3735            &self,
3736            _: &TransactionDigest,
3737        ) -> Option<Arc<TransactionOutputs>> {
3738            unimplemented!()
3739        }
3740
3741        fn notify_read_fastpath_transaction_outputs<'a>(
3742            &'a self,
3743            _: &'a [TransactionDigest],
3744        ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3745            unimplemented!()
3746        }
3747
3748        fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3749            unimplemented!()
3750        }
3751
3752        fn get_unchanged_loaded_runtime_objects(
3753            &self,
3754            _digest: &TransactionDigest,
3755        ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3756            unimplemented!()
3757        }
3758    }
3759
3760    #[async_trait::async_trait]
3761    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3762        async fn checkpoint_created(
3763            &self,
3764            summary: &CheckpointSummary,
3765            contents: &CheckpointContents,
3766            _epoch_store: &Arc<AuthorityPerEpochStore>,
3767            _checkpoint_store: &Arc<CheckpointStore>,
3768        ) -> SuiResult {
3769            self.try_send((contents.clone(), summary.clone())).unwrap();
3770            Ok(())
3771        }
3772    }
3773
3774    #[async_trait::async_trait]
3775    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3776        async fn certified_checkpoint_created(
3777            &self,
3778            summary: &CertifiedCheckpointSummary,
3779        ) -> SuiResult {
3780            self.try_send(summary.clone()).unwrap();
3781            Ok(())
3782        }
3783    }
3784
3785    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3786        PendingCheckpoint {
3787            roots: t
3788                .into_iter()
3789                .map(|t| TransactionKey::Digest(d(t)))
3790                .collect(),
3791            details: PendingCheckpointInfo {
3792                timestamp_ms,
3793                last_of_epoch: false,
3794                checkpoint_height: i,
3795            },
3796        }
3797    }
3798
3799    fn d(i: u8) -> TransactionDigest {
3800        let mut bytes: [u8; 32] = Default::default();
3801        bytes[0] = i;
3802        TransactionDigest::new(bytes)
3803    }
3804
3805    fn e(
3806        transaction_digest: TransactionDigest,
3807        dependencies: Vec<TransactionDigest>,
3808        gas_used: GasCostSummary,
3809    ) -> TransactionEffects {
3810        let mut effects = TransactionEffects::default();
3811        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3812        *effects.dependencies_mut_for_testing() = dependencies;
3813        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3814        effects
3815    }
3816
3817    fn commit_cert_for_test(
3818        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3819        state: Arc<AuthorityState>,
3820        digest: TransactionDigest,
3821        dependencies: Vec<TransactionDigest>,
3822        gas_used: GasCostSummary,
3823    ) {
3824        let epoch_store = state.epoch_store_for_testing();
3825        let effects = e(digest, dependencies, gas_used);
3826        store.insert(digest, effects.clone());
3827        epoch_store.insert_executed_in_epoch(&digest);
3828    }
3829}