1pub(crate) mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::CertifiedCheckpointOutput;
15pub use crate::checkpoints::checkpoint_output::{
16 CheckpointOutput, LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::global_state_hasher::GlobalStateHasher;
23use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
24use consensus_core::CommitRef;
25use diffy::create_patch;
26use itertools::Itertools;
27use mysten_common::ZipDebugEqIteratorExt;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal, in_antithesis};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use parking_lot::Mutex;
33use pin_project_lite::pin_project;
34use serde::{Deserialize, Serialize};
35use sui_macros::fail_point_arg;
36use sui_network::default_mysten_network_config;
37use sui_types::accumulator_metadata;
38use sui_types::base_types::{ConciseableName, SequenceNumber};
39use sui_types::execution::ExecutionTimeObservationKey;
40use sui_types::messages_checkpoint::{
41 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
42};
43use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
44use tokio::sync::{mpsc, watch};
45use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
46
47use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
48use crate::authority::authority_store_pruner::PrunerWatermarks;
49use crate::consensus_handler::SequencedConsensusTransactionKey;
50use rand::seq::SliceRandom;
51use std::collections::{BTreeMap, HashMap, HashSet};
52use std::fs::File;
53use std::future::Future;
54use std::io::Write;
55use std::path::Path;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::sync::Weak;
59use std::task::{Context, Poll};
60use std::time::{Duration, SystemTime};
61use sui_protocol_config::ProtocolVersion;
62use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
63use sui_types::committee::StakeUnit;
64use sui_types::crypto::AuthorityStrongQuorumSignInfo;
65use sui_types::digests::{
66 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
67};
68use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
69use sui_types::error::{SuiErrorKind, SuiResult};
70use sui_types::gas::GasCostSummary;
71use sui_types::message_envelope::Message;
72use sui_types::messages_checkpoint::{
73 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
74 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
75 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
76 VerifiedCheckpointContents,
77};
78use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
79use sui_types::messages_consensus::ConsensusTransactionKey;
80use sui_types::signature::GenericSignature;
81use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
82use sui_types::transaction::{
83 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
84};
85use tokio::sync::Notify;
86use tracing::{debug, error, info, instrument, trace, warn};
87use typed_store::DBMapUtils;
88use typed_store::Map;
89use typed_store::{
90 TypedStoreError,
91 rocks::{DBMap, MetricConf},
92};
93
94const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
95
96pub type CheckpointHeight = u64;
97
98pub struct EpochStats {
99 pub checkpoint_count: u64,
100 pub transaction_count: u64,
101 pub total_gas_reward: u64,
102}
103
104#[derive(Clone, Debug)]
105pub struct PendingCheckpointInfo {
106 pub timestamp_ms: CheckpointTimestamp,
107 pub last_of_epoch: bool,
108 pub checkpoint_height: CheckpointHeight,
109 pub consensus_commit_ref: CommitRef,
111 pub rejected_transactions_digest: Digest,
112 pub checkpoint_seq: CheckpointSequenceNumber,
114}
115
116#[derive(Clone, Debug, Default)]
117pub struct CheckpointRoots {
118 pub tx_roots: Vec<TransactionKey>,
119 pub settlement_root: Option<TransactionKey>,
120 pub height: CheckpointHeight,
121}
122
123#[derive(Clone, Debug)]
126pub struct PendingCheckpoint {
127 pub roots: Vec<CheckpointRoots>,
128 pub details: PendingCheckpointInfo,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
132pub struct BuilderCheckpointSummary {
133 pub summary: CheckpointSummary,
134 pub checkpoint_height: Option<CheckpointHeight>,
136 pub position_in_commit: usize,
139}
140
141#[derive(DBMapUtils)]
142#[cfg_attr(tidehunter, tidehunter)]
143pub struct CheckpointStoreTables {
144 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
146
147 pub(crate) checkpoint_sequence_by_contents_digest:
149 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
150
151 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
155 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
158
159 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
161 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
163
164 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
168
169 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
171
172 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
175
176 pub(crate) transaction_fork_detected: DBMap<
178 u8,
179 (
180 TransactionDigest,
181 TransactionEffectsDigest,
182 TransactionEffectsDigest,
183 ),
184 >,
185 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
186 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
187}
188
189fn full_checkpoint_content_table_default_config() -> DBOptions {
190 DBOptions {
191 options: default_db_options().options,
192 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
196 }
197}
198
199impl CheckpointStoreTables {
200 #[cfg(not(tidehunter))]
201 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
202 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
203 }
204
205 #[cfg(tidehunter)]
206 pub fn new(
207 path: &Path,
208 metric_name: &'static str,
209 pruner_watermarks: Arc<PrunerWatermarks>,
210 ) -> Self {
211 tracing::warn!("Checkpoint DB using tidehunter");
212 use crate::authority::authority_store_pruner::apply_relocation_filter;
213 use typed_store::tidehunter_util::{
214 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
215 default_max_dirty_keys, default_mutex_count, default_value_cache_size,
216 };
217 let mutexes = default_mutex_count();
218 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
219 let override_dirty_keys_config = KeySpaceConfig::new()
220 .with_max_dirty_keys(16 * default_max_dirty_keys())
221 .with_value_cache_size(default_value_cache_size());
222 let config_u64 = ThConfig::new_with_config(
223 8,
224 mutexes,
225 u64_sequence_key,
226 override_dirty_keys_config.clone(),
227 );
228 let digest_config = ThConfig::new_with_rm_prefix(
229 32,
230 mutexes,
231 KeyType::uniform(default_cells_per_mutex()),
232 KeySpaceConfig::default(),
233 vec![0, 0, 0, 0, 0, 0, 0, 32],
234 );
235 let watermarks_config = KeySpaceConfig::new()
236 .with_value_cache_size(10)
237 .disable_unload();
238 let lru_config = KeySpaceConfig::new().with_value_cache_size(100);
239 let configs = vec![
240 (
241 "checkpoint_content",
242 digest_config.clone().with_config(
243 KeySpaceConfig::new().with_relocation_filter(|_, _| Decision::Remove),
244 ),
245 ),
246 (
247 "checkpoint_sequence_by_contents_digest",
248 digest_config.clone().with_config(apply_relocation_filter(
249 KeySpaceConfig::default(),
250 pruner_watermarks.checkpoint_id.clone(),
251 |sequence_number: CheckpointSequenceNumber| sequence_number,
252 false,
253 )),
254 ),
255 (
256 "full_checkpoint_content",
257 config_u64.clone().with_config(apply_relocation_filter(
258 override_dirty_keys_config.clone(),
259 pruner_watermarks.checkpoint_id.clone(),
260 |sequence_number: CheckpointSequenceNumber| sequence_number,
261 true,
262 )),
263 ),
264 ("certified_checkpoints", config_u64.clone()),
265 (
266 "checkpoint_by_digest",
267 digest_config.clone().with_config(apply_relocation_filter(
268 lru_config,
269 pruner_watermarks.epoch_id.clone(),
270 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
271 false,
272 )),
273 ),
274 (
275 "locally_computed_checkpoints",
276 config_u64.clone().with_config(apply_relocation_filter(
277 override_dirty_keys_config.clone(),
278 pruner_watermarks.checkpoint_id.clone(),
279 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
280 true,
281 )),
282 ),
283 ("epoch_last_checkpoint_map", config_u64.clone()),
284 (
285 "watermarks",
286 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
287 ),
288 (
289 "transaction_fork_detected",
290 ThConfig::new_with_config(
291 1,
292 1,
293 KeyType::uniform(1),
294 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
295 ),
296 ),
297 (
298 "full_checkpoint_content_v2",
299 config_u64.clone().with_config(apply_relocation_filter(
300 override_dirty_keys_config.clone(),
301 pruner_watermarks.checkpoint_id.clone(),
302 |sequence_number: CheckpointSequenceNumber| sequence_number,
303 true,
304 )),
305 ),
306 ];
307 Self::open_tables_read_write(
308 path.to_path_buf(),
309 MetricConf::new(metric_name),
310 configs
311 .into_iter()
312 .map(|(cf, config)| (cf.to_string(), config))
313 .collect(),
314 )
315 }
316
317 #[cfg(not(tidehunter))]
318 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
319 Self::get_read_only_handle(
320 path.to_path_buf(),
321 None,
322 None,
323 MetricConf::new("checkpoint_readonly"),
324 )
325 }
326
327 #[cfg(tidehunter)]
328 pub fn open_readonly(path: &Path) -> Self {
329 Self::new(path, "checkpoint", Arc::new(PrunerWatermarks::default()))
330 }
331}
332
333pub struct CheckpointStore {
334 pub(crate) tables: CheckpointStoreTables,
335 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
336 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
337}
338
339impl CheckpointStore {
340 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
341 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
342 Arc::new(Self {
343 tables,
344 synced_checkpoint_notify_read: NotifyRead::new(),
345 executed_checkpoint_notify_read: NotifyRead::new(),
346 })
347 }
348
349 pub fn new_for_tests() -> Arc<Self> {
350 let ckpt_dir = mysten_common::tempdir().unwrap();
351 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
352 }
353
354 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
355 let tables = CheckpointStoreTables::new(
356 path,
357 "db_checkpoint",
358 Arc::new(PrunerWatermarks::default()),
359 );
360 Arc::new(Self {
361 tables,
362 synced_checkpoint_notify_read: NotifyRead::new(),
363 executed_checkpoint_notify_read: NotifyRead::new(),
364 })
365 }
366
367 #[cfg(not(tidehunter))]
368 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
369 CheckpointStoreTables::open_readonly(path)
370 }
371
372 #[cfg(tidehunter)]
373 pub fn open_readonly(path: &Path) -> CheckpointStoreTables {
374 CheckpointStoreTables::open_readonly(path)
375 }
376
377 #[instrument(level = "info", skip_all)]
378 pub fn insert_genesis_checkpoint(
379 &self,
380 checkpoint: VerifiedCheckpoint,
381 contents: CheckpointContents,
382 epoch_store: &AuthorityPerEpochStore,
383 ) {
384 assert_eq!(
385 checkpoint.epoch(),
386 0,
387 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
388 );
389 assert_eq!(
390 *checkpoint.sequence_number(),
391 0,
392 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
393 );
394
395 match self.get_checkpoint_by_sequence_number(0).unwrap() {
397 Some(existing_checkpoint) => {
398 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
399 }
400 None => {
401 if epoch_store.epoch() == checkpoint.epoch {
402 epoch_store
403 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
404 .unwrap();
405 } else {
406 debug!(
407 validator_epoch =% epoch_store.epoch(),
408 genesis_epoch =% checkpoint.epoch(),
409 "Not inserting checkpoint builder data for genesis checkpoint",
410 );
411 }
412 self.insert_checkpoint_contents(contents).unwrap();
413 self.insert_verified_checkpoint(&checkpoint).unwrap();
414 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
415 }
416 }
417 }
418
419 pub fn get_checkpoint_by_digest(
420 &self,
421 digest: &CheckpointDigest,
422 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
423 self.tables
424 .checkpoint_by_digest
425 .get(digest)
426 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
427 }
428
429 pub fn get_checkpoint_by_sequence_number(
430 &self,
431 sequence_number: CheckpointSequenceNumber,
432 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433 self.tables
434 .certified_checkpoints
435 .get(&sequence_number)
436 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
437 }
438
439 pub fn get_locally_computed_checkpoint(
440 &self,
441 sequence_number: CheckpointSequenceNumber,
442 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
443 self.tables
444 .locally_computed_checkpoints
445 .get(&sequence_number)
446 }
447
448 pub fn multi_get_locally_computed_checkpoints(
449 &self,
450 sequence_numbers: &[CheckpointSequenceNumber],
451 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
452 let checkpoints = self
453 .tables
454 .locally_computed_checkpoints
455 .multi_get(sequence_numbers)?;
456
457 Ok(checkpoints)
458 }
459
460 pub fn get_sequence_number_by_contents_digest(
461 &self,
462 digest: &CheckpointContentsDigest,
463 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
464 self.tables
465 .checkpoint_sequence_by_contents_digest
466 .get(digest)
467 }
468
469 pub fn delete_contents_digest_sequence_number_mapping(
470 &self,
471 digest: &CheckpointContentsDigest,
472 ) -> Result<(), TypedStoreError> {
473 self.tables
474 .checkpoint_sequence_by_contents_digest
475 .remove(digest)
476 }
477
478 pub fn get_latest_certified_checkpoint(
479 &self,
480 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
481 Ok(self
482 .tables
483 .certified_checkpoints
484 .reversed_safe_iter_with_bounds(None, None)?
485 .next()
486 .transpose()?
487 .map(|(_, v)| v.into()))
488 }
489
490 pub fn get_latest_locally_computed_checkpoint(
491 &self,
492 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
493 Ok(self
494 .tables
495 .locally_computed_checkpoints
496 .reversed_safe_iter_with_bounds(None, None)?
497 .next()
498 .transpose()?
499 .map(|(_, v)| v))
500 }
501
502 pub fn multi_get_checkpoint_by_sequence_number(
503 &self,
504 sequence_numbers: &[CheckpointSequenceNumber],
505 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
506 let checkpoints = self
507 .tables
508 .certified_checkpoints
509 .multi_get(sequence_numbers)?
510 .into_iter()
511 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
512 .collect();
513
514 Ok(checkpoints)
515 }
516
517 pub fn multi_get_checkpoint_content(
518 &self,
519 contents_digest: &[CheckpointContentsDigest],
520 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
521 self.tables.checkpoint_content.multi_get(contents_digest)
522 }
523
524 pub fn get_highest_verified_checkpoint(
525 &self,
526 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
527 let highest_verified = if let Some(highest_verified) = self
528 .tables
529 .watermarks
530 .get(&CheckpointWatermark::HighestVerified)?
531 {
532 highest_verified
533 } else {
534 return Ok(None);
535 };
536 self.get_checkpoint_by_digest(&highest_verified.1)
537 }
538
539 pub fn get_highest_synced_checkpoint(
540 &self,
541 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
542 let highest_synced = if let Some(highest_synced) = self
543 .tables
544 .watermarks
545 .get(&CheckpointWatermark::HighestSynced)?
546 {
547 highest_synced
548 } else {
549 return Ok(None);
550 };
551 self.get_checkpoint_by_digest(&highest_synced.1)
552 }
553
554 pub fn get_highest_synced_checkpoint_seq_number(
555 &self,
556 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
557 if let Some(highest_synced) = self
558 .tables
559 .watermarks
560 .get(&CheckpointWatermark::HighestSynced)?
561 {
562 Ok(Some(highest_synced.0))
563 } else {
564 Ok(None)
565 }
566 }
567
568 pub fn get_highest_executed_checkpoint_seq_number(
569 &self,
570 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571 if let Some(highest_executed) = self
572 .tables
573 .watermarks
574 .get(&CheckpointWatermark::HighestExecuted)?
575 {
576 Ok(Some(highest_executed.0))
577 } else {
578 Ok(None)
579 }
580 }
581
582 pub fn get_highest_executed_checkpoint(
583 &self,
584 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
585 let highest_executed = if let Some(highest_executed) = self
586 .tables
587 .watermarks
588 .get(&CheckpointWatermark::HighestExecuted)?
589 {
590 highest_executed
591 } else {
592 return Ok(None);
593 };
594 self.get_checkpoint_by_digest(&highest_executed.1)
595 }
596
597 pub fn get_highest_pruned_checkpoint_seq_number(
598 &self,
599 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
600 self.tables
601 .watermarks
602 .get(&CheckpointWatermark::HighestPruned)
603 .map(|watermark| watermark.map(|w| w.0))
604 }
605
606 pub fn get_checkpoint_contents(
607 &self,
608 digest: &CheckpointContentsDigest,
609 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
610 self.tables.checkpoint_content.get(digest)
611 }
612
613 pub fn get_full_checkpoint_contents_by_sequence_number(
614 &self,
615 seq: CheckpointSequenceNumber,
616 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
617 self.tables.full_checkpoint_content_v2.get(&seq)
618 }
619
620 fn prune_local_summaries(&self) -> SuiResult {
621 if let Some((last_local_summary, _)) = self
622 .tables
623 .locally_computed_checkpoints
624 .reversed_safe_iter_with_bounds(None, None)?
625 .next()
626 .transpose()?
627 {
628 let mut batch = self.tables.locally_computed_checkpoints.batch();
629 batch.schedule_delete_range(
630 &self.tables.locally_computed_checkpoints,
631 &0,
632 &last_local_summary,
633 )?;
634 batch.write()?;
635 info!("Pruned local summaries up to {:?}", last_local_summary);
636 }
637 Ok(())
638 }
639
640 pub fn clear_locally_computed_checkpoints_from(
641 &self,
642 from_seq: CheckpointSequenceNumber,
643 ) -> SuiResult {
644 let keys: Vec<_> = self
645 .tables
646 .locally_computed_checkpoints
647 .safe_iter_with_bounds(Some(from_seq), None)
648 .map(|r| r.map(|(k, _)| k))
649 .collect::<Result<_, _>>()?;
650 if let Some(&last_local_summary) = keys.last() {
651 let mut batch = self.tables.locally_computed_checkpoints.batch();
652 batch
653 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
654 .expect("Failed to delete locally computed checkpoints");
655 batch
656 .write()
657 .expect("Failed to delete locally computed checkpoints");
658 warn!(
659 from_seq,
660 last_local_summary,
661 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
662 from_seq,
663 last_local_summary
664 );
665 }
666 Ok(())
667 }
668
669 fn check_for_checkpoint_fork(
670 &self,
671 local_checkpoint: &CheckpointSummary,
672 verified_checkpoint: &VerifiedCheckpoint,
673 ) {
674 if local_checkpoint != verified_checkpoint.data() {
675 let verified_contents = self
676 .get_checkpoint_contents(&verified_checkpoint.content_digest)
677 .map(|opt_contents| {
678 opt_contents
679 .map(|contents| format!("{:?}", contents))
680 .unwrap_or_else(|| {
681 format!(
682 "Verified checkpoint contents not found, digest: {:?}",
683 verified_checkpoint.content_digest,
684 )
685 })
686 })
687 .map_err(|e| {
688 format!(
689 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
690 verified_checkpoint.content_digest, e
691 )
692 })
693 .unwrap_or_else(|err_msg| err_msg);
694
695 let local_contents = self
696 .get_checkpoint_contents(&local_checkpoint.content_digest)
697 .map(|opt_contents| {
698 opt_contents
699 .map(|contents| format!("{:?}", contents))
700 .unwrap_or_else(|| {
701 format!(
702 "Local checkpoint contents not found, digest: {:?}",
703 local_checkpoint.content_digest
704 )
705 })
706 })
707 .map_err(|e| {
708 format!(
709 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
710 local_checkpoint.content_digest, e
711 )
712 })
713 .unwrap_or_else(|err_msg| err_msg);
714
715 error!(
717 verified_checkpoint = ?verified_checkpoint.data(),
718 ?verified_contents,
719 ?local_checkpoint,
720 ?local_contents,
721 "Local checkpoint fork detected!",
722 );
723
724 if let Err(e) = self.record_checkpoint_fork_detected(
726 *local_checkpoint.sequence_number(),
727 local_checkpoint.digest(),
728 ) {
729 error!("Failed to record checkpoint fork in database: {:?}", e);
730 }
731
732 fail_point_arg!(
733 "kill_checkpoint_fork_node",
734 |checkpoint_overrides: std::sync::Arc<
735 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
736 >| {
737 #[cfg(msim)]
738 {
739 if let Ok(mut overrides) = checkpoint_overrides.lock() {
740 overrides.insert(
741 local_checkpoint.sequence_number,
742 verified_checkpoint.digest().to_string(),
743 );
744 }
745 tracing::error!(
746 fatal = true,
747 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
748 local_checkpoint.sequence_number(),
749 verified_checkpoint.digest()
750 );
751 sui_simulator::task::shutdown_current_node();
752 }
753 }
754 );
755
756 fatal!(
757 "Local checkpoint fork detected for sequence number: {}",
758 local_checkpoint.sequence_number()
759 );
760 }
761 }
762
763 pub fn insert_certified_checkpoint(
769 &self,
770 checkpoint: &VerifiedCheckpoint,
771 ) -> Result<(), TypedStoreError> {
772 debug!(
773 checkpoint_seq = checkpoint.sequence_number(),
774 "Inserting certified checkpoint",
775 );
776 let mut batch = self.tables.certified_checkpoints.batch();
777 batch
778 .insert_batch(
779 &self.tables.certified_checkpoints,
780 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
781 )?
782 .insert_batch(
783 &self.tables.checkpoint_by_digest,
784 [(checkpoint.digest(), checkpoint.serializable_ref())],
785 )?;
786 if checkpoint.next_epoch_committee().is_some() {
787 batch.insert_batch(
788 &self.tables.epoch_last_checkpoint_map,
789 [(&checkpoint.epoch(), checkpoint.sequence_number())],
790 )?;
791 }
792 batch.write()?;
793
794 if let Some(local_checkpoint) = self
795 .tables
796 .locally_computed_checkpoints
797 .get(checkpoint.sequence_number())?
798 {
799 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
800 }
801
802 Ok(())
803 }
804
805 #[instrument(level = "debug", skip_all)]
808 pub fn insert_verified_checkpoint(
809 &self,
810 checkpoint: &VerifiedCheckpoint,
811 ) -> Result<(), TypedStoreError> {
812 self.insert_certified_checkpoint(checkpoint)?;
813 self.update_highest_verified_checkpoint(checkpoint)
814 }
815
816 pub fn update_highest_verified_checkpoint(
817 &self,
818 checkpoint: &VerifiedCheckpoint,
819 ) -> Result<(), TypedStoreError> {
820 if Some(*checkpoint.sequence_number())
821 > self
822 .get_highest_verified_checkpoint()?
823 .map(|x| *x.sequence_number())
824 {
825 debug!(
826 checkpoint_seq = checkpoint.sequence_number(),
827 "Updating highest verified checkpoint",
828 );
829 self.tables.watermarks.insert(
830 &CheckpointWatermark::HighestVerified,
831 &(*checkpoint.sequence_number(), *checkpoint.digest()),
832 )?;
833 }
834
835 Ok(())
836 }
837
838 pub fn update_highest_synced_checkpoint(
839 &self,
840 checkpoint: &VerifiedCheckpoint,
841 ) -> Result<(), TypedStoreError> {
842 let seq = *checkpoint.sequence_number();
843 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
844 self.tables.watermarks.insert(
845 &CheckpointWatermark::HighestSynced,
846 &(seq, *checkpoint.digest()),
847 )?;
848 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
849 Ok(())
850 }
851
852 async fn notify_read_checkpoint_watermark<F>(
853 &self,
854 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
855 seq: CheckpointSequenceNumber,
856 get_watermark: F,
857 ) -> VerifiedCheckpoint
858 where
859 F: Fn() -> Option<CheckpointSequenceNumber>,
860 {
861 notify_read
862 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
863 let seq = seqs[0];
864 let Some(highest) = get_watermark() else {
865 return vec![None];
866 };
867 if highest < seq {
868 return vec![None];
869 }
870 let checkpoint = self
871 .get_checkpoint_by_sequence_number(seq)
872 .expect("db error")
873 .expect("checkpoint not found");
874 vec![Some(checkpoint)]
875 })
876 .await
877 .into_iter()
878 .next()
879 .unwrap()
880 }
881
882 pub async fn notify_read_synced_checkpoint(
883 &self,
884 seq: CheckpointSequenceNumber,
885 ) -> VerifiedCheckpoint {
886 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
887 self.get_highest_synced_checkpoint_seq_number()
888 .expect("db error")
889 })
890 .await
891 }
892
893 pub async fn notify_read_executed_checkpoint(
894 &self,
895 seq: CheckpointSequenceNumber,
896 ) -> VerifiedCheckpoint {
897 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
898 self.get_highest_executed_checkpoint_seq_number()
899 .expect("db error")
900 })
901 .await
902 }
903
904 pub fn update_highest_executed_checkpoint(
905 &self,
906 checkpoint: &VerifiedCheckpoint,
907 ) -> Result<(), TypedStoreError> {
908 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
909 if seq_number >= *checkpoint.sequence_number() {
910 return Ok(());
911 }
912 assert_eq!(
913 seq_number + 1,
914 *checkpoint.sequence_number(),
915 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
916 checkpoint.sequence_number(),
917 seq_number
918 );
919 }
920 let seq = *checkpoint.sequence_number();
921 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
922 self.tables.watermarks.insert(
923 &CheckpointWatermark::HighestExecuted,
924 &(seq, *checkpoint.digest()),
925 )?;
926 self.executed_checkpoint_notify_read
927 .notify(&seq, checkpoint);
928 Ok(())
929 }
930
931 pub fn update_highest_pruned_checkpoint(
932 &self,
933 checkpoint: &VerifiedCheckpoint,
934 ) -> Result<(), TypedStoreError> {
935 self.tables.watermarks.insert(
936 &CheckpointWatermark::HighestPruned,
937 &(*checkpoint.sequence_number(), *checkpoint.digest()),
938 )
939 }
940
941 pub fn set_highest_executed_checkpoint_subtle(
946 &self,
947 checkpoint: &VerifiedCheckpoint,
948 ) -> Result<(), TypedStoreError> {
949 self.tables.watermarks.insert(
950 &CheckpointWatermark::HighestExecuted,
951 &(*checkpoint.sequence_number(), *checkpoint.digest()),
952 )
953 }
954
955 pub fn insert_checkpoint_contents(
956 &self,
957 contents: CheckpointContents,
958 ) -> Result<(), TypedStoreError> {
959 debug!(
960 checkpoint_seq = ?contents.digest(),
961 "Inserting checkpoint contents",
962 );
963 self.tables
964 .checkpoint_content
965 .insert(contents.digest(), &contents)
966 }
967
968 pub fn insert_verified_checkpoint_contents(
969 &self,
970 checkpoint: &VerifiedCheckpoint,
971 full_contents: VerifiedCheckpointContents,
972 ) -> Result<(), TypedStoreError> {
973 let mut batch = self.tables.full_checkpoint_content_v2.batch();
974 batch.insert_batch(
975 &self.tables.checkpoint_sequence_by_contents_digest,
976 [(&checkpoint.content_digest, checkpoint.sequence_number())],
977 )?;
978 let full_contents = full_contents.into_inner();
979 batch.insert_batch(
980 &self.tables.full_checkpoint_content_v2,
981 [(checkpoint.sequence_number(), &full_contents)],
982 )?;
983
984 let contents = full_contents.into_checkpoint_contents();
985 assert_eq!(&checkpoint.content_digest, contents.digest());
986
987 batch.insert_batch(
988 &self.tables.checkpoint_content,
989 [(contents.digest(), &contents)],
990 )?;
991
992 batch.write()
993 }
994
995 pub fn delete_full_checkpoint_contents(
996 &self,
997 seq: CheckpointSequenceNumber,
998 ) -> Result<(), TypedStoreError> {
999 self.tables.full_checkpoint_content.remove(&seq)?;
1000 self.tables.full_checkpoint_content_v2.remove(&seq)
1001 }
1002
1003 pub fn get_epoch_last_checkpoint(
1004 &self,
1005 epoch_id: EpochId,
1006 ) -> SuiResult<Option<VerifiedCheckpoint>> {
1007 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
1008 let checkpoint = match seq {
1009 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
1010 None => None,
1011 };
1012 Ok(checkpoint)
1013 }
1014
1015 pub fn get_epoch_last_checkpoint_seq_number(
1016 &self,
1017 epoch_id: EpochId,
1018 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1019 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
1020 Ok(seq)
1021 }
1022
1023 pub fn get_epoch_first_checkpoint_seq(
1026 &self,
1027 epoch: EpochId,
1028 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
1029 if epoch == 0 {
1030 return Ok(Some(0));
1031 }
1032 Ok(self
1033 .tables
1034 .epoch_last_checkpoint_map
1035 .get(&(epoch - 1))?
1036 .map(|s| s + 1))
1037 }
1038
1039 pub fn list_checkpoints_from_seq(
1041 &self,
1042 start: Option<CheckpointSequenceNumber>,
1043 limit: usize,
1044 ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1045 self.tables
1046 .certified_checkpoints
1047 .safe_iter_with_bounds(start, None)
1048 .take(limit)
1049 .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1050 .collect()
1051 }
1052
1053 pub fn list_epoch_last_checkpoints(
1055 &self,
1056 start: Option<EpochId>,
1057 limit: usize,
1058 ) -> Result<Vec<(EpochId, CheckpointSequenceNumber)>, TypedStoreError> {
1059 self.tables
1060 .epoch_last_checkpoint_map
1061 .safe_iter_with_bounds(start, None)
1062 .take(limit)
1063 .collect()
1064 }
1065
1066 pub fn list_checkpoint_digests(
1068 &self,
1069 start: Option<CheckpointDigest>,
1070 limit: usize,
1071 ) -> Result<Vec<CheckpointDigest>, TypedStoreError> {
1072 self.tables
1073 .checkpoint_by_digest
1074 .safe_iter_with_bounds(start, None)
1075 .take(limit)
1076 .map(|r| r.map(|(d, _)| d))
1077 .collect()
1078 }
1079
1080 pub fn list_checkpoint_contents_digests(
1082 &self,
1083 start: Option<CheckpointContentsDigest>,
1084 limit: usize,
1085 ) -> Result<Vec<CheckpointContentsDigest>, TypedStoreError> {
1086 self.tables
1087 .checkpoint_content
1088 .safe_iter_with_bounds(start, None)
1089 .take(limit)
1090 .map(|r| r.map(|(d, _)| d))
1091 .collect()
1092 }
1093
1094 pub fn list_epoch_checkpoints(
1096 &self,
1097 epoch: EpochId,
1098 start_seq: Option<CheckpointSequenceNumber>,
1099 limit: usize,
1100 ) -> Result<Vec<(CheckpointSequenceNumber, VerifiedCheckpoint)>, TypedStoreError> {
1101 let Some(last_seq) = self.tables.epoch_last_checkpoint_map.get(&epoch)? else {
1102 return Ok(vec![]);
1103 };
1104 let first_seq = if epoch == 0 {
1106 0
1107 } else {
1108 self.tables
1109 .epoch_last_checkpoint_map
1110 .get(&(epoch - 1))?
1111 .map(|s| s + 1)
1112 .unwrap_or(0)
1113 };
1114 let start = start_seq.map(|s| s.max(first_seq)).unwrap_or(first_seq);
1115 self.tables
1116 .certified_checkpoints
1117 .safe_iter_with_bounds(Some(start), Some(last_seq + 1))
1118 .take(limit)
1119 .map(|r| r.map(|(seq, cp)| (seq, cp.into())))
1120 .collect()
1121 }
1122
1123 pub fn insert_epoch_last_checkpoint(
1124 &self,
1125 epoch_id: EpochId,
1126 checkpoint: &VerifiedCheckpoint,
1127 ) -> SuiResult {
1128 self.tables
1129 .epoch_last_checkpoint_map
1130 .insert(&epoch_id, checkpoint.sequence_number())?;
1131 Ok(())
1132 }
1133
1134 pub fn get_epoch_state_commitments(
1135 &self,
1136 epoch: EpochId,
1137 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1138 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1139 checkpoint
1140 .end_of_epoch_data
1141 .as_ref()
1142 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1143 .epoch_commitments
1144 .clone()
1145 });
1146 Ok(commitments)
1147 }
1148
1149 pub fn get_epoch_stats(
1151 &self,
1152 epoch: EpochId,
1153 last_checkpoint: &CheckpointSummary,
1154 ) -> Option<EpochStats> {
1155 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1156 (0, 0)
1157 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1158 (
1159 checkpoint.sequence_number + 1,
1160 checkpoint.network_total_transactions,
1161 )
1162 } else {
1163 return None;
1164 };
1165 Some(EpochStats {
1166 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1167 transaction_count: last_checkpoint.network_total_transactions
1168 - prev_epoch_network_transactions,
1169 total_gas_reward: last_checkpoint
1170 .epoch_rolling_gas_cost_summary
1171 .computation_cost,
1172 })
1173 }
1174
1175 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1176 self.tables
1178 .checkpoint_content
1179 .checkpoint_db(path)
1180 .map_err(Into::into)
1181 }
1182
1183 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1184 let mut wb = self.tables.watermarks.batch();
1185 wb.delete_batch(
1186 &self.tables.watermarks,
1187 std::iter::once(CheckpointWatermark::HighestExecuted),
1188 )?;
1189 wb.write()?;
1190 Ok(())
1191 }
1192
1193 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1194 self.delete_highest_executed_checkpoint_test_only()?;
1195 Ok(())
1196 }
1197
1198 pub fn record_checkpoint_fork_detected(
1199 &self,
1200 checkpoint_seq: CheckpointSequenceNumber,
1201 checkpoint_digest: CheckpointDigest,
1202 ) -> Result<(), TypedStoreError> {
1203 info!(
1204 checkpoint_seq = checkpoint_seq,
1205 checkpoint_digest = ?checkpoint_digest,
1206 "Recording checkpoint fork detection in database"
1207 );
1208 self.tables.watermarks.insert(
1209 &CheckpointWatermark::CheckpointForkDetected,
1210 &(checkpoint_seq, checkpoint_digest),
1211 )
1212 }
1213
1214 pub fn get_checkpoint_fork_detected(
1215 &self,
1216 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1217 self.tables
1218 .watermarks
1219 .get(&CheckpointWatermark::CheckpointForkDetected)
1220 }
1221
1222 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1223 self.tables
1224 .watermarks
1225 .remove(&CheckpointWatermark::CheckpointForkDetected)
1226 }
1227
1228 pub fn record_transaction_fork_detected(
1229 &self,
1230 tx_digest: TransactionDigest,
1231 expected_effects_digest: TransactionEffectsDigest,
1232 actual_effects_digest: TransactionEffectsDigest,
1233 ) -> Result<(), TypedStoreError> {
1234 info!(
1235 tx_digest = ?tx_digest,
1236 expected_effects_digest = ?expected_effects_digest,
1237 actual_effects_digest = ?actual_effects_digest,
1238 "Recording transaction fork detection in database"
1239 );
1240 self.tables.transaction_fork_detected.insert(
1241 &TRANSACTION_FORK_DETECTED_KEY,
1242 &(tx_digest, expected_effects_digest, actual_effects_digest),
1243 )
1244 }
1245
1246 pub fn get_transaction_fork_detected(
1247 &self,
1248 ) -> Result<
1249 Option<(
1250 TransactionDigest,
1251 TransactionEffectsDigest,
1252 TransactionEffectsDigest,
1253 )>,
1254 TypedStoreError,
1255 > {
1256 self.tables
1257 .transaction_fork_detected
1258 .get(&TRANSACTION_FORK_DETECTED_KEY)
1259 }
1260
1261 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1262 self.tables
1263 .transaction_fork_detected
1264 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1265 }
1266}
1267
1268#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1269pub enum CheckpointWatermark {
1270 HighestVerified,
1271 HighestSynced,
1272 HighestExecuted,
1273 HighestPruned,
1274 CheckpointForkDetected,
1275}
1276
1277struct CheckpointStateHasher {
1278 epoch_store: Arc<AuthorityPerEpochStore>,
1279 hasher: Weak<GlobalStateHasher>,
1280 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1281}
1282
1283impl CheckpointStateHasher {
1284 fn new(
1285 epoch_store: Arc<AuthorityPerEpochStore>,
1286 hasher: Weak<GlobalStateHasher>,
1287 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1288 ) -> Self {
1289 Self {
1290 epoch_store,
1291 hasher,
1292 receive_from_builder,
1293 }
1294 }
1295
1296 async fn run(self) {
1297 let Self {
1298 epoch_store,
1299 hasher,
1300 mut receive_from_builder,
1301 } = self;
1302 while let Some((seq, effects)) = receive_from_builder.recv().await {
1303 let Some(hasher) = hasher.upgrade() else {
1304 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1305 break;
1306 };
1307 hasher
1308 .accumulate_checkpoint(&effects, seq, &epoch_store)
1309 .expect("epoch ended while accumulating checkpoint");
1310 }
1311 }
1312}
1313
1314#[derive(Debug)]
1315pub enum CheckpointBuilderError {
1316 ChangeEpochTxAlreadyExecuted,
1317 SystemPackagesMissing,
1318 Retry(anyhow::Error),
1319}
1320
1321impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1322 for CheckpointBuilderError
1323{
1324 fn from(e: SuiError) -> Self {
1325 Self::Retry(e.into())
1326 }
1327}
1328
1329pub type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1330
1331pub struct CheckpointBuilder {
1332 state: Arc<AuthorityState>,
1333 store: Arc<CheckpointStore>,
1334 epoch_store: Arc<AuthorityPerEpochStore>,
1335 notify: Arc<Notify>,
1336 notify_aggregator: Arc<Notify>,
1337 last_built: watch::Sender<CheckpointSequenceNumber>,
1338 effects_store: Arc<dyn TransactionCacheRead>,
1339 global_state_hasher: Weak<GlobalStateHasher>,
1340 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1341 output: Box<dyn CheckpointOutput>,
1342 metrics: Arc<CheckpointMetrics>,
1343}
1344
1345pub struct CheckpointAggregator {
1346 store: Arc<CheckpointStore>,
1347 epoch_store: Arc<AuthorityPerEpochStore>,
1348 notify: Arc<Notify>,
1349 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
1350 pending: BTreeMap<CheckpointSequenceNumber, Vec<CheckpointSignatureMessage>>,
1351 current: Option<CheckpointSignatureAggregator>,
1352 output: Box<dyn CertifiedCheckpointOutput>,
1353 state: Arc<AuthorityState>,
1354 metrics: Arc<CheckpointMetrics>,
1355}
1356
1357pub struct CheckpointSignatureAggregator {
1359 summary: CheckpointSummary,
1360 digest: CheckpointDigest,
1361 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1363 store: Arc<CheckpointStore>,
1364 state: Arc<AuthorityState>,
1365 metrics: Arc<CheckpointMetrics>,
1366}
1367
1368impl CheckpointBuilder {
1369 fn new(
1370 state: Arc<AuthorityState>,
1371 store: Arc<CheckpointStore>,
1372 epoch_store: Arc<AuthorityPerEpochStore>,
1373 notify: Arc<Notify>,
1374 effects_store: Arc<dyn TransactionCacheRead>,
1375 global_state_hasher: Weak<GlobalStateHasher>,
1377 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1379 output: Box<dyn CheckpointOutput>,
1380 notify_aggregator: Arc<Notify>,
1381 last_built: watch::Sender<CheckpointSequenceNumber>,
1382 metrics: Arc<CheckpointMetrics>,
1383 ) -> Self {
1384 Self {
1385 state,
1386 store,
1387 epoch_store,
1388 notify,
1389 effects_store,
1390 global_state_hasher,
1391 send_to_hasher,
1392 output,
1393 notify_aggregator,
1394 last_built,
1395 metrics,
1396 }
1397 }
1398
1399 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1407 if let Some(replay_waiter) = consensus_replay_waiter {
1408 info!("Waiting for consensus commits to replay ...");
1409 replay_waiter.wait_for_replay().await;
1410 info!("Consensus commits finished replaying");
1411 }
1412 info!("Starting CheckpointBuilder");
1413 loop {
1414 match self.maybe_build_checkpoints().await {
1415 Ok(()) => {}
1416 err @ Err(
1417 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1418 | CheckpointBuilderError::SystemPackagesMissing,
1419 ) => {
1420 info!("CheckpointBuilder stopping: {:?}", err);
1421 return;
1422 }
1423 Err(CheckpointBuilderError::Retry(inner)) => {
1424 let msg = format!("{:?}", inner);
1425 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1426 tokio::time::sleep(Duration::from_secs(1)).await;
1427 self.metrics.checkpoint_errors.inc();
1428 continue;
1429 }
1430 }
1431
1432 self.notify.notified().await;
1433 }
1434 }
1435
1436 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1437 let _scope = monitored_scope("BuildCheckpoints");
1438
1439 let last_height = self
1441 .epoch_store
1442 .last_built_checkpoint_builder_summary()
1443 .expect("epoch should not have ended")
1444 .and_then(|s| s.checkpoint_height);
1445
1446 for (height, pending) in self
1447 .epoch_store
1448 .get_pending_checkpoints(last_height)
1449 .expect("unexpected epoch store error")
1450 {
1451 debug!(checkpoint_commit_height = height, "Making checkpoint");
1452
1453 let seq = self.make_checkpoint(pending).await?;
1454
1455 self.last_built.send_if_modified(|cur| {
1456 if seq > *cur {
1458 *cur = seq;
1459 true
1460 } else {
1461 false
1462 }
1463 });
1464
1465 tokio::task::yield_now().await;
1468 }
1469
1470 Ok(())
1471 }
1472
1473 #[instrument(level = "debug", skip_all, fields(height = pending.details.checkpoint_height))]
1474 async fn make_checkpoint(
1475 &mut self,
1476 pending: PendingCheckpoint,
1477 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1478 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1479
1480 let details = pending.details.clone();
1481
1482 let highest_executed_sequence = self
1483 .store
1484 .get_highest_executed_checkpoint_seq_number()
1485 .expect("db error")
1486 .unwrap_or(0);
1487
1488 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pending)).await;
1489 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1490
1491 let new_checkpoint = self
1492 .create_checkpoint(
1493 sorted_tx_effects_included_in_checkpoint,
1494 &details,
1495 &all_roots,
1496 )
1497 .await?;
1498 let sequence = *new_checkpoint.0.sequence_number();
1499 let digest = new_checkpoint.0.digest();
1500 if sequence <= highest_executed_sequence && poll_count > 1 {
1501 debug_fatal!(
1502 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1503 );
1504 }
1505
1506 self.write_checkpoint(details.checkpoint_height, new_checkpoint)
1507 .await?;
1508 info!(
1509 seq = sequence,
1510 %digest,
1511 height = details.checkpoint_height,
1512 commit = %details.consensus_commit_ref,
1513 "Made new checkpoint"
1514 );
1515
1516 Ok(sequence)
1517 }
1518
1519 #[instrument(level = "debug", skip_all)]
1522 async fn resolve_checkpoint_transactions(
1523 &self,
1524 pending: PendingCheckpoint,
1525 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1526 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1527
1528 debug!(
1529 checkpoint_commit_height = pending.details.checkpoint_height,
1530 "Resolving checkpoint transactions for pending checkpoint.",
1531 );
1532
1533 trace!(
1534 "roots for pending checkpoint {:?}: {:?}",
1535 pending.details.checkpoint_height, pending.roots,
1536 );
1537
1538 assert!(
1539 self.epoch_store
1540 .protocol_config()
1541 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1542 );
1543
1544 let mut all_effects: Vec<TransactionEffects> = Vec::new();
1545 let mut all_root_digests: Vec<TransactionDigest> = Vec::new();
1546
1547 for checkpoint_roots in &pending.roots {
1548 let tx_roots = &checkpoint_roots.tx_roots;
1549
1550 self.metrics
1551 .checkpoint_roots_count
1552 .inc_by(tx_roots.len() as u64);
1553
1554 let root_digests = self
1555 .epoch_store
1556 .notify_read_tx_key_to_digest(tx_roots)
1557 .in_monitored_scope("CheckpointNotifyDigests")
1558 .await?;
1559
1560 all_root_digests.extend(root_digests.iter().cloned());
1561
1562 let root_effects = self
1563 .effects_store
1564 .notify_read_executed_effects(
1565 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1566 &root_digests,
1567 )
1568 .in_monitored_scope("CheckpointNotifyRead")
1569 .await;
1570 let consensus_commit_prologue =
1571 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1572
1573 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1574 let ccp_digest = consensus_commit_prologue.map(|(d, _)| d);
1575 let mut sorted = CausalOrder::causal_sort_with_ccp(root_effects, ccp_digest);
1576
1577 if let Some(settlement_key) = &checkpoint_roots.settlement_root {
1578 let checkpoint_seq = pending.details.checkpoint_seq;
1579 let tx_index_offset = all_effects.len() as u64;
1580 let effects = self
1581 .resolve_settlement_effects(
1582 *settlement_key,
1583 &sorted,
1584 checkpoint_roots.height,
1585 checkpoint_seq,
1586 tx_index_offset,
1587 )
1588 .await;
1589 sorted.extend(effects);
1590 }
1591
1592 #[cfg(msim)]
1593 {
1594 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1595 }
1596
1597 all_effects.extend(sorted);
1598 }
1599 Ok((all_effects, all_root_digests.into_iter().collect()))
1600 }
1601
1602 async fn resolve_settlement_effects(
1607 &self,
1608 settlement_key: TransactionKey,
1609 sorted_root_effects: &[TransactionEffects],
1610 checkpoint_height: CheckpointHeight,
1611 checkpoint_seq: CheckpointSequenceNumber,
1612 tx_index_offset: u64,
1613 ) -> Vec<TransactionEffects> {
1614 let epoch = self.epoch_store.epoch();
1615 let accumulator_root_obj_initial_shared_version = self
1616 .epoch_store
1617 .epoch_start_config()
1618 .accumulator_root_obj_initial_shared_version()
1619 .expect("accumulator root object must exist");
1620
1621 let builder = AccumulatorSettlementTxBuilder::new(
1622 None,
1623 sorted_root_effects,
1624 checkpoint_seq,
1625 tx_index_offset,
1626 );
1627
1628 let settlement_digests: Vec<_> = builder
1629 .build_tx(
1630 self.epoch_store.protocol_config(),
1631 epoch,
1632 accumulator_root_obj_initial_shared_version,
1633 checkpoint_height,
1634 checkpoint_seq,
1635 )
1636 .into_iter()
1637 .map(|tx| *VerifiedTransaction::new_system_transaction(tx).digest())
1638 .collect();
1639
1640 debug!(
1641 ?settlement_digests,
1642 ?settlement_key,
1643 "reading settlement effects from cache"
1644 );
1645
1646 let settlement_effects = wait_for_effects_with_retry(
1647 self.effects_store.as_ref(),
1648 "CheckpointBuilder::settlement_effects",
1649 &settlement_digests,
1650 settlement_key,
1651 )
1652 .await;
1653 let (accounts_created, accounts_deleted) =
1654 accumulators::count_accumulator_object_changes(&settlement_effects);
1655 self.metrics
1656 .report_accumulator_account_changes(accounts_created, accounts_deleted);
1657
1658 let barrier_digest = *VerifiedTransaction::new_system_transaction(
1659 accumulators::build_accumulator_barrier_tx(
1660 epoch,
1661 accumulator_root_obj_initial_shared_version,
1662 checkpoint_height,
1663 &settlement_effects,
1664 ),
1665 )
1666 .digest();
1667
1668 let barrier_effects = wait_for_effects_with_retry(
1669 self.effects_store.as_ref(),
1670 "CheckpointBuilder::barrier_effects",
1671 &[barrier_digest],
1672 settlement_key,
1673 )
1674 .await;
1675
1676 for fx in settlement_effects.iter().chain(barrier_effects.iter()) {
1682 assert!(
1683 fx.status().is_ok(),
1684 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1685 fx.transaction_digest(),
1686 fx
1687 );
1688 }
1689
1690 settlement_effects
1691 .into_iter()
1692 .chain(barrier_effects)
1693 .collect()
1694 }
1695
1696 fn extract_consensus_commit_prologue(
1699 &self,
1700 root_digests: &[TransactionDigest],
1701 root_effects: &[TransactionEffects],
1702 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1703 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1704 if root_digests.is_empty() {
1705 return Ok(None);
1706 }
1707
1708 let first_tx = self
1712 .state
1713 .get_transaction_cache_reader()
1714 .get_transaction_block(&root_digests[0])
1715 .expect("Transaction block must exist");
1716
1717 Ok(first_tx
1718 .transaction_data()
1719 .is_consensus_commit_prologue()
1720 .then(|| {
1721 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1722 (*first_tx.digest(), root_effects[0].clone())
1723 }))
1724 }
1725
1726 #[instrument(level = "debug", skip_all)]
1727 async fn write_checkpoint(
1728 &mut self,
1729 height: CheckpointHeight,
1730 new_checkpoint: (CheckpointSummary, CheckpointContents),
1731 ) -> SuiResult {
1732 let _scope = monitored_scope("CheckpointBuilder::write_checkpoint");
1733 let mut batch = self.store.tables.checkpoint_content.batch();
1734
1735 let (summary, contents) = &new_checkpoint;
1736 debug!(
1737 checkpoint_commit_height = height,
1738 checkpoint_seq = summary.sequence_number,
1739 contents_digest = ?contents.digest(),
1740 "writing checkpoint",
1741 );
1742
1743 if let Some(previously_computed_summary) = self
1744 .store
1745 .tables
1746 .locally_computed_checkpoints
1747 .get(&summary.sequence_number)?
1748 && previously_computed_summary.digest() != summary.digest()
1749 {
1750 fatal!(
1751 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1752 summary.sequence_number,
1753 previously_computed_summary.digest(),
1754 summary.digest()
1755 );
1756 }
1757
1758 self.metrics
1759 .transactions_included_in_checkpoint
1760 .inc_by(contents.size() as u64);
1761 let sequence_number = summary.sequence_number;
1762 self.metrics
1763 .last_constructed_checkpoint
1764 .set(sequence_number as i64);
1765
1766 batch.insert_batch(
1767 &self.store.tables.checkpoint_content,
1768 [(contents.digest(), contents)],
1769 )?;
1770
1771 batch.insert_batch(
1772 &self.store.tables.locally_computed_checkpoints,
1773 [(sequence_number, summary)],
1774 )?;
1775
1776 batch.write()?;
1777
1778 self.output
1780 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1781 .await?;
1782
1783 if let Some(certified_checkpoint) = self
1784 .store
1785 .tables
1786 .certified_checkpoints
1787 .get(summary.sequence_number())?
1788 {
1789 self.store
1790 .check_for_checkpoint_fork(summary, &certified_checkpoint.into());
1791 }
1792
1793 self.notify_aggregator.notify_one();
1794 self.epoch_store
1795 .process_constructed_checkpoint(height, new_checkpoint);
1796 Ok(())
1797 }
1798
1799 fn load_last_built_checkpoint_summary(
1800 epoch_store: &AuthorityPerEpochStore,
1801 store: &CheckpointStore,
1802 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1803 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1804 if last_checkpoint.is_none() {
1805 let epoch = epoch_store.epoch();
1806 if epoch > 0 {
1807 let previous_epoch = epoch - 1;
1808 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1809 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1810 if let Some((ref seq, _)) = last_checkpoint {
1811 debug!(
1812 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1813 );
1814 } else {
1815 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1817 }
1818 }
1819 }
1820 Ok(last_checkpoint)
1821 }
1822
1823 #[instrument(level = "debug", skip_all)]
1824 async fn create_checkpoint(
1825 &self,
1826 all_effects: Vec<TransactionEffects>,
1827 details: &PendingCheckpointInfo,
1828 all_roots: &HashSet<TransactionDigest>,
1829 ) -> CheckpointBuilderResult<(CheckpointSummary, CheckpointContents)> {
1830 let _scope = monitored_scope("CheckpointBuilder::create_checkpoint");
1831
1832 let last_checkpoint =
1833 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1834 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1835 debug!(
1836 checkpoint_commit_height = details.checkpoint_height,
1837 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1838 checkpoint_timestamp = details.timestamp_ms,
1839 "Creating checkpoint for {} transactions",
1840 all_effects.len(),
1841 );
1842
1843 let all_digests: Vec<_> = all_effects
1844 .iter()
1845 .map(|effect| *effect.transaction_digest())
1846 .collect();
1847 let transaction_blocks = self
1848 .state
1849 .get_transaction_cache_reader()
1850 .multi_get_transaction_blocks(&all_digests);
1851 let mut transactions = Vec::with_capacity(all_effects.len());
1852 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1853 let mut randomness_rounds = BTreeMap::new();
1854 {
1855 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1856 debug!(
1857 ?last_checkpoint_seq,
1858 "Waiting for {:?} certificates to appear in consensus",
1859 all_effects.len()
1860 );
1861
1862 for (effects, transaction) in all_effects
1863 .iter()
1864 .zip_debug_eq(transaction_blocks.into_iter())
1865 {
1866 let transaction = transaction
1867 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1868 match transaction.inner().transaction_data().kind() {
1869 TransactionKind::ConsensusCommitPrologue(_)
1870 | TransactionKind::ConsensusCommitPrologueV2(_)
1871 | TransactionKind::ConsensusCommitPrologueV3(_)
1872 | TransactionKind::ConsensusCommitPrologueV4(_)
1873 | TransactionKind::AuthenticatorStateUpdate(_) => {
1874 }
1877 TransactionKind::ProgrammableSystemTransaction(_) => {
1878 }
1880 TransactionKind::ChangeEpoch(_)
1881 | TransactionKind::Genesis(_)
1882 | TransactionKind::EndOfEpochTransaction(_) => {
1883 fatal!(
1884 "unexpected transaction in checkpoint effects: {:?}",
1885 transaction
1886 );
1887 }
1888 TransactionKind::RandomnessStateUpdate(rsu) => {
1889 randomness_rounds
1890 .insert(*effects.transaction_digest(), rsu.randomness_round);
1891 }
1892 TransactionKind::ProgrammableTransaction(_) => {
1893 let digest = *effects.transaction_digest();
1897 if !all_roots.contains(&digest) {
1898 transaction_keys.push(SequencedConsensusTransactionKey::External(
1899 ConsensusTransactionKey::Certificate(digest),
1900 ));
1901 }
1902 }
1903 }
1904 transactions.push((*transaction).clone());
1905 }
1906
1907 self.epoch_store
1908 .consensus_messages_processed_notify(transaction_keys)
1909 .await?;
1910 }
1911
1912 let signatures = self
1913 .epoch_store
1914 .user_signatures_for_checkpoint(&transactions, &all_digests);
1915 debug!(
1916 ?last_checkpoint_seq,
1917 "Received {} checkpoint user signatures from consensus",
1918 signatures.len()
1919 );
1920
1921 let end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
1922 Some(
1923 transactions
1924 .iter()
1925 .flat_map(|tx| {
1926 if let TransactionKind::ProgrammableTransaction(ptb) =
1927 tx.transaction_data().kind()
1928 {
1929 itertools::Either::Left(
1930 ptb.commands
1931 .iter()
1932 .map(ExecutionTimeObservationKey::from_command),
1933 )
1934 } else {
1935 itertools::Either::Right(std::iter::empty())
1936 }
1937 })
1938 .collect(),
1939 )
1940 } else {
1941 None
1942 };
1943
1944 let epoch = self.epoch_store.epoch();
1945 let first_checkpoint_of_epoch = last_checkpoint
1946 .as_ref()
1947 .map(|(_, c)| c.epoch != epoch)
1948 .unwrap_or(true);
1949 if first_checkpoint_of_epoch {
1950 self.epoch_store
1951 .record_epoch_first_checkpoint_creation_time_metric();
1952 }
1953 let last_checkpoint_of_epoch = details.last_of_epoch;
1954
1955 let sequence_number = details.checkpoint_seq;
1956 let mut timestamp_ms = details.timestamp_ms;
1957 if let Some((_, last_checkpoint)) = &last_checkpoint
1958 && last_checkpoint.timestamp_ms > timestamp_ms
1959 {
1960 debug!(
1962 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1963 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1964 );
1965 if self
1966 .epoch_store
1967 .protocol_config()
1968 .enforce_checkpoint_timestamp_monotonicity()
1969 {
1970 timestamp_ms = last_checkpoint.timestamp_ms;
1971 }
1972 }
1973
1974 let mut effects = all_effects;
1975 let mut signatures = signatures;
1976 let epoch_rolling_gas_cost_summary =
1977 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1978
1979 let end_of_epoch_data = if last_checkpoint_of_epoch {
1980 let system_state_obj = self
1981 .augment_epoch_last_checkpoint(
1982 &epoch_rolling_gas_cost_summary,
1983 timestamp_ms,
1984 &mut effects,
1985 &mut signatures,
1986 sequence_number,
1987 end_of_epoch_observation_keys.expect(
1988 "end_of_epoch_observation_keys must be populated for the last checkpoint",
1989 ),
1990 last_checkpoint_seq.unwrap_or_default(),
1991 )
1992 .await?;
1993
1994 let committee = system_state_obj
1995 .get_current_epoch_committee()
1996 .committee()
1997 .clone();
1998
1999 let root_state_digest = {
2002 let state_acc = self
2003 .global_state_hasher
2004 .upgrade()
2005 .expect("No checkpoints should be getting built after local configuration");
2006 let acc = state_acc.accumulate_checkpoint(
2007 &effects,
2008 sequence_number,
2009 &self.epoch_store,
2010 )?;
2011
2012 state_acc
2013 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2014 .await?;
2015
2016 state_acc.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))?;
2017 state_acc
2018 .digest_epoch(self.epoch_store.clone(), sequence_number)
2019 .await?
2020 };
2021 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2022 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2023
2024 let epoch_commitments = if self
2025 .epoch_store
2026 .protocol_config()
2027 .check_commit_root_state_digest_supported()
2028 {
2029 vec![root_state_digest.into()]
2030 } else {
2031 vec![]
2032 };
2033
2034 Some(EndOfEpochData {
2035 next_epoch_committee: committee.voting_rights,
2036 next_epoch_protocol_version: ProtocolVersion::new(
2037 system_state_obj.protocol_version(),
2038 ),
2039 epoch_commitments,
2040 })
2041 } else {
2042 self.send_to_hasher
2043 .send((sequence_number, effects.clone()))
2044 .await?;
2045
2046 None
2047 };
2048 let contents = if self.epoch_store.protocol_config().address_aliases() {
2049 CheckpointContents::new_v2(&effects, signatures)
2050 } else {
2051 CheckpointContents::new_with_digests_and_signatures(
2052 effects.iter().map(TransactionEffects::execution_digests),
2053 signatures
2054 .into_iter()
2055 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2056 .collect(),
2057 )
2058 };
2059
2060 let num_txns = contents.size() as u64;
2061
2062 let network_total_transactions = last_checkpoint
2063 .as_ref()
2064 .map(|(_, c)| c.network_total_transactions + num_txns)
2065 .unwrap_or(num_txns);
2066
2067 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2068
2069 let matching_randomness_rounds: Vec<_> = effects
2070 .iter()
2071 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2072 .copied()
2073 .collect();
2074
2075 let checkpoint_commitments = if self
2076 .epoch_store
2077 .protocol_config()
2078 .include_checkpoint_artifacts_digest_in_summary()
2079 {
2080 let artifacts = CheckpointArtifacts::from(&effects[..]);
2081 let artifacts_digest = artifacts.digest()?;
2082 vec![artifacts_digest.into()]
2083 } else {
2084 Default::default()
2085 };
2086
2087 let summary = CheckpointSummary::new(
2088 self.epoch_store.protocol_config(),
2089 epoch,
2090 sequence_number,
2091 network_total_transactions,
2092 &contents,
2093 previous_digest,
2094 epoch_rolling_gas_cost_summary,
2095 end_of_epoch_data,
2096 timestamp_ms,
2097 matching_randomness_rounds,
2098 checkpoint_commitments,
2099 );
2100 summary.report_checkpoint_age(
2101 &self.metrics.last_created_checkpoint_age,
2102 &self.metrics.last_created_checkpoint_age_ms,
2103 );
2104 if last_checkpoint_of_epoch {
2105 info!(
2106 checkpoint_seq = sequence_number,
2107 "creating last checkpoint of epoch {}", epoch
2108 );
2109 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2110 self.epoch_store
2111 .report_epoch_metrics_at_last_checkpoint(stats);
2112 }
2113 }
2114
2115 Ok((summary, contents))
2116 }
2117
2118 fn get_epoch_total_gas_cost(
2119 &self,
2120 last_checkpoint: Option<&CheckpointSummary>,
2121 cur_checkpoint_effects: &[TransactionEffects],
2122 ) -> GasCostSummary {
2123 let (previous_epoch, previous_gas_costs) = last_checkpoint
2124 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2125 .unwrap_or_default();
2126 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2127 if previous_epoch == self.epoch_store.epoch() {
2128 GasCostSummary::new(
2130 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2131 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2132 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2133 previous_gas_costs.non_refundable_storage_fee
2134 + current_gas_costs.non_refundable_storage_fee,
2135 )
2136 } else {
2137 current_gas_costs
2138 }
2139 }
2140
2141 #[instrument(level = "error", skip_all)]
2142 async fn augment_epoch_last_checkpoint(
2143 &self,
2144 epoch_total_gas_cost: &GasCostSummary,
2145 epoch_start_timestamp_ms: CheckpointTimestamp,
2146 checkpoint_effects: &mut Vec<TransactionEffects>,
2147 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2148 checkpoint: CheckpointSequenceNumber,
2149 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2150 last_checkpoint: CheckpointSequenceNumber,
2153 ) -> CheckpointBuilderResult<SuiSystemState> {
2154 let (system_state, effects) = self
2155 .state
2156 .create_and_execute_advance_epoch_tx(
2157 &self.epoch_store,
2158 epoch_total_gas_cost,
2159 checkpoint,
2160 epoch_start_timestamp_ms,
2161 end_of_epoch_observation_keys,
2162 last_checkpoint,
2163 )
2164 .await?;
2165 checkpoint_effects.push(effects);
2166 signatures.push(vec![]);
2167 Ok(system_state)
2168 }
2169
2170 #[cfg(msim)]
2173 fn expensive_consensus_commit_prologue_invariants_check(
2174 &self,
2175 root_digests: &[TransactionDigest],
2176 sorted: &[TransactionEffects],
2177 ) {
2178 let root_txs = self
2180 .state
2181 .get_transaction_cache_reader()
2182 .multi_get_transaction_blocks(root_digests);
2183 let ccps = root_txs
2184 .iter()
2185 .filter_map(|tx| {
2186 if let Some(tx) = tx {
2187 if tx.transaction_data().is_consensus_commit_prologue() {
2188 Some(tx)
2189 } else {
2190 None
2191 }
2192 } else {
2193 None
2194 }
2195 })
2196 .collect::<Vec<_>>();
2197
2198 assert!(ccps.len() <= 1);
2200
2201 let txs = self
2203 .state
2204 .get_transaction_cache_reader()
2205 .multi_get_transaction_blocks(
2206 &sorted
2207 .iter()
2208 .map(|tx| tx.transaction_digest().clone())
2209 .collect::<Vec<_>>(),
2210 );
2211
2212 if ccps.len() == 0 {
2213 for tx in txs.iter() {
2216 if let Some(tx) = tx {
2217 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2218 }
2219 }
2220 } else {
2221 assert!(
2223 txs[0]
2224 .as_ref()
2225 .unwrap()
2226 .transaction_data()
2227 .is_consensus_commit_prologue()
2228 );
2229
2230 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2231
2232 for tx in txs.iter().skip(1) {
2233 if let Some(tx) = tx {
2234 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2235 }
2236 }
2237 }
2238 }
2239}
2240
2241async fn wait_for_effects_with_retry(
2242 effects_store: &dyn TransactionCacheRead,
2243 task_name: &'static str,
2244 digests: &[TransactionDigest],
2245 tx_key: TransactionKey,
2246) -> Vec<TransactionEffects> {
2247 let delay = if in_antithesis() {
2248 15
2250 } else {
2251 5
2252 };
2253 loop {
2254 match tokio::time::timeout(Duration::from_secs(delay), async {
2255 effects_store
2256 .notify_read_executed_effects(task_name, digests)
2257 .await
2258 })
2259 .await
2260 {
2261 Ok(effects) => break effects,
2262 Err(_) => {
2263 debug_fatal!(
2264 "Timeout waiting for transactions to be executed {:?}, retrying...",
2265 tx_key
2266 );
2267 }
2268 }
2269 }
2270}
2271
2272impl CheckpointAggregator {
2273 fn new(
2274 tables: Arc<CheckpointStore>,
2275 epoch_store: Arc<AuthorityPerEpochStore>,
2276 notify: Arc<Notify>,
2277 receiver: mpsc::UnboundedReceiver<CheckpointSignatureMessage>,
2278 output: Box<dyn CertifiedCheckpointOutput>,
2279 state: Arc<AuthorityState>,
2280 metrics: Arc<CheckpointMetrics>,
2281 ) -> Self {
2282 Self {
2283 store: tables,
2284 epoch_store,
2285 notify,
2286 receiver,
2287 pending: BTreeMap::new(),
2288 current: None,
2289 output,
2290 state,
2291 metrics,
2292 }
2293 }
2294
2295 async fn run(mut self) {
2296 info!("Starting CheckpointAggregator");
2297 loop {
2298 while let Ok(sig) = self.receiver.try_recv() {
2300 self.pending
2301 .entry(sig.summary.sequence_number)
2302 .or_default()
2303 .push(sig);
2304 }
2305
2306 if let Err(e) = self.run_and_notify().await {
2307 error!(
2308 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2309 e
2310 );
2311 self.metrics.checkpoint_errors.inc();
2312 tokio::time::sleep(Duration::from_secs(1)).await;
2313 continue;
2314 }
2315
2316 tokio::select! {
2317 Some(sig) = self.receiver.recv() => {
2318 self.pending
2319 .entry(sig.summary.sequence_number)
2320 .or_default()
2321 .push(sig);
2322 }
2323 _ = self.notify.notified() => {}
2324 _ = tokio::time::sleep(Duration::from_secs(1)) => {}
2325 }
2326 }
2327 }
2328
2329 async fn run_and_notify(&mut self) -> SuiResult {
2330 let summaries = self.run_inner()?;
2331 for summary in summaries {
2332 self.output.certified_checkpoint_created(&summary).await?;
2333 }
2334 Ok(())
2335 }
2336
2337 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2338 let _scope = monitored_scope("CheckpointAggregator");
2339 let mut result = vec![];
2340 'outer: loop {
2341 let next_to_certify = self.next_checkpoint_to_certify()?;
2342 self.pending.retain(|&seq, _| seq >= next_to_certify);
2345 let current = if let Some(current) = &mut self.current {
2346 if current.summary.sequence_number < next_to_certify {
2352 assert_reachable!("skip checkpoint certification");
2353 self.current = None;
2354 continue;
2355 }
2356 current
2357 } else {
2358 let Some(summary) = self
2359 .epoch_store
2360 .get_built_checkpoint_summary(next_to_certify)?
2361 else {
2362 return Ok(result);
2363 };
2364 self.current = Some(CheckpointSignatureAggregator {
2365 digest: summary.digest(),
2366 summary,
2367 signatures_by_digest: MultiStakeAggregator::new(
2368 self.epoch_store.committee().clone(),
2369 ),
2370 store: self.store.clone(),
2371 state: self.state.clone(),
2372 metrics: self.metrics.clone(),
2373 });
2374 self.current.as_mut().unwrap()
2375 };
2376
2377 let seq = current.summary.sequence_number;
2378 let sigs = self.pending.remove(&seq).unwrap_or_default();
2379 if sigs.is_empty() {
2380 trace!(
2381 checkpoint_seq =? seq,
2382 "Not enough checkpoint signatures",
2383 );
2384 return Ok(result);
2385 }
2386 for data in sigs {
2387 trace!(
2388 checkpoint_seq = seq,
2389 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2390 current.summary.digest(),
2391 data.summary.auth_sig().authority.concise()
2392 );
2393 self.metrics
2394 .checkpoint_participation
2395 .with_label_values(&[&format!(
2396 "{:?}",
2397 data.summary.auth_sig().authority.concise()
2398 )])
2399 .inc();
2400 if let Ok(auth_signature) = current.try_aggregate(data) {
2401 debug!(
2402 checkpoint_seq = seq,
2403 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2404 current.summary.digest(),
2405 );
2406 let summary = VerifiedCheckpoint::new_unchecked(
2407 CertifiedCheckpointSummary::new_from_data_and_sig(
2408 current.summary.clone(),
2409 auth_signature,
2410 ),
2411 );
2412
2413 self.store.insert_certified_checkpoint(&summary)?;
2414 self.metrics.last_certified_checkpoint.set(seq as i64);
2415 current.summary.report_checkpoint_age(
2416 &self.metrics.last_certified_checkpoint_age,
2417 &self.metrics.last_certified_checkpoint_age_ms,
2418 );
2419 result.push(summary.into_inner());
2420 self.current = None;
2421 continue 'outer;
2422 }
2423 }
2424 break;
2425 }
2426 Ok(result)
2427 }
2428
2429 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2430 Ok(self
2431 .store
2432 .tables
2433 .certified_checkpoints
2434 .reversed_safe_iter_with_bounds(None, None)?
2435 .next()
2436 .transpose()?
2437 .map(|(seq, _)| seq + 1)
2438 .unwrap_or_default())
2439 }
2440}
2441
2442impl CheckpointSignatureAggregator {
2443 #[allow(clippy::result_unit_err)]
2444 pub fn try_aggregate(
2445 &mut self,
2446 data: CheckpointSignatureMessage,
2447 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2448 let their_digest = *data.summary.digest();
2449 let (_, signature) = data.summary.into_data_and_sig();
2450 let author = signature.authority;
2451 let envelope =
2452 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2453 match self.signatures_by_digest.insert(their_digest, envelope) {
2454 InsertResult::Failed { error }
2456 if matches!(
2457 error.as_inner(),
2458 SuiErrorKind::StakeAggregatorRepeatedSigner {
2459 conflicting_sig: false,
2460 ..
2461 },
2462 ) =>
2463 {
2464 Err(())
2465 }
2466 InsertResult::Failed { error } => {
2467 warn!(
2468 checkpoint_seq = self.summary.sequence_number,
2469 "Failed to aggregate new signature from validator {:?}: {:?}",
2470 author.concise(),
2471 error
2472 );
2473 self.check_for_split_brain();
2474 Err(())
2475 }
2476 InsertResult::QuorumReached(cert) => {
2477 if their_digest != self.digest {
2480 self.metrics.remote_checkpoint_forks.inc();
2481 warn!(
2482 checkpoint_seq = self.summary.sequence_number,
2483 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2484 author.concise(),
2485 their_digest,
2486 self.digest
2487 );
2488 return Err(());
2489 }
2490 Ok(cert)
2491 }
2492 InsertResult::NotEnoughVotes {
2493 bad_votes: _,
2494 bad_authorities: _,
2495 } => {
2496 self.check_for_split_brain();
2497 Err(())
2498 }
2499 }
2500 }
2501
2502 fn check_for_split_brain(&self) {
2506 debug!(
2507 checkpoint_seq = self.summary.sequence_number,
2508 "Checking for split brain condition"
2509 );
2510 if self.signatures_by_digest.quorum_unreachable() {
2511 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2517 let digests_by_stake_messages = all_unique_values
2518 .iter()
2519 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2520 .map(|(digest, (_authorities, total_stake))| {
2521 format!("{:?} (total stake: {})", digest, total_stake)
2522 })
2523 .collect::<Vec<String>>();
2524 fail_point_arg!("kill_split_brain_node", |(
2525 checkpoint_overrides,
2526 forked_authorities,
2527 ): (
2528 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2529 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2530 )| {
2531 #[cfg(msim)]
2532 {
2533 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2534 (checkpoint_overrides.lock(), forked_authorities.lock())
2535 {
2536 let correct_digest = all_unique_values
2538 .iter()
2539 .find(|(_, (authorities, _))| {
2540 authorities
2542 .iter()
2543 .any(|auth| !forked_authorities_set.contains(auth))
2544 })
2545 .map(|(digest, _)| digest.to_string())
2546 .unwrap_or_else(|| {
2547 all_unique_values
2549 .iter()
2550 .max_by_key(|(_, (_, stake))| *stake)
2551 .map(|(digest, _)| digest.to_string())
2552 .unwrap_or_else(|| self.digest.to_string())
2553 });
2554
2555 overrides.insert(self.summary.sequence_number, correct_digest.clone());
2556
2557 tracing::error!(
2558 fatal = true,
2559 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2560 self.summary.sequence_number,
2561 correct_digest
2562 );
2563 }
2564 }
2565 });
2566
2567 debug_fatal!(
2568 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2569 self.summary.sequence_number,
2570 self.signatures_by_digest.uncommitted_stake(),
2571 digests_by_stake_messages
2572 );
2573 self.metrics.split_brain_checkpoint_forks.inc();
2574
2575 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2576 let local_summary = self.summary.clone();
2577 let state = self.state.clone();
2578 let tables = self.store.clone();
2579
2580 tokio::spawn(async move {
2581 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2582 });
2583 }
2584 }
2585}
2586
2587async fn diagnose_split_brain(
2593 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2594 local_summary: CheckpointSummary,
2595 state: Arc<AuthorityState>,
2596 tables: Arc<CheckpointStore>,
2597) {
2598 debug!(
2599 checkpoint_seq = local_summary.sequence_number,
2600 "Running split brain diagnostics..."
2601 );
2602 let time = SystemTime::now();
2603 let digest_to_validator = all_unique_values
2605 .iter()
2606 .filter_map(|(digest, (validators, _))| {
2607 if *digest != local_summary.digest() {
2608 let random_validator = validators.choose(&mut get_rng()).unwrap();
2609 Some((*digest, *random_validator))
2610 } else {
2611 None
2612 }
2613 })
2614 .collect::<HashMap<_, _>>();
2615 if digest_to_validator.is_empty() {
2616 panic!(
2617 "Given split brain condition, there should be at \
2618 least one validator that disagrees with local signature"
2619 );
2620 }
2621
2622 let epoch_store = state.load_epoch_store_one_call_per_task();
2623 let committee = epoch_store
2624 .epoch_start_state()
2625 .get_sui_committee_with_network_metadata();
2626 let network_config = default_mysten_network_config();
2627 let network_clients =
2628 make_network_authority_clients_with_network_config(&committee, &network_config);
2629
2630 let response_futures = digest_to_validator
2632 .values()
2633 .cloned()
2634 .map(|validator| {
2635 let client = network_clients
2636 .get(&validator)
2637 .expect("Failed to get network client");
2638 let request = CheckpointRequestV2 {
2639 sequence_number: Some(local_summary.sequence_number),
2640 request_content: true,
2641 certified: false,
2642 };
2643 client.handle_checkpoint_v2(request)
2644 })
2645 .collect::<Vec<_>>();
2646
2647 let digest_name_pair = digest_to_validator.iter();
2648 let response_data = futures::future::join_all(response_futures)
2649 .await
2650 .into_iter()
2651 .zip_debug_eq(digest_name_pair)
2652 .filter_map(|(response, (digest, name))| match response {
2653 Ok(response) => match response {
2654 CheckpointResponseV2 {
2655 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2656 contents: Some(contents),
2657 } => Some((*name, *digest, summary, contents)),
2658 CheckpointResponseV2 {
2659 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2660 contents: _,
2661 } => {
2662 panic!("Expected pending checkpoint, but got certified checkpoint");
2663 }
2664 CheckpointResponseV2 {
2665 checkpoint: None,
2666 contents: _,
2667 } => {
2668 error!(
2669 "Summary for checkpoint {:?} not found on validator {:?}",
2670 local_summary.sequence_number, name
2671 );
2672 None
2673 }
2674 CheckpointResponseV2 {
2675 checkpoint: _,
2676 contents: None,
2677 } => {
2678 error!(
2679 "Contents for checkpoint {:?} not found on validator {:?}",
2680 local_summary.sequence_number, name
2681 );
2682 None
2683 }
2684 },
2685 Err(e) => {
2686 error!(
2687 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2688 e
2689 );
2690 None
2691 }
2692 })
2693 .collect::<Vec<_>>();
2694
2695 let local_checkpoint_contents = tables
2696 .get_checkpoint_contents(&local_summary.content_digest)
2697 .unwrap_or_else(|_| {
2698 panic!(
2699 "Could not find checkpoint contents for digest {:?}",
2700 local_summary.digest()
2701 )
2702 })
2703 .unwrap_or_else(|| {
2704 panic!(
2705 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2706 local_summary.sequence_number,
2707 local_summary.digest()
2708 )
2709 });
2710 let local_contents_text = format!("{local_checkpoint_contents:?}");
2711
2712 let local_summary_text = format!("{local_summary:?}");
2713 let local_validator = state.name.concise();
2714 let diff_patches = response_data
2715 .iter()
2716 .map(|(name, other_digest, other_summary, contents)| {
2717 let other_contents_text = format!("{contents:?}");
2718 let other_summary_text = format!("{other_summary:?}");
2719 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2720 .enumerate_transactions(&local_summary)
2721 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2722 .unzip();
2723 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2724 .enumerate_transactions(other_summary)
2725 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2726 .unzip();
2727 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2728 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2729 let local_transactions_text = format!("{local_transactions:#?}");
2730 let other_transactions_text = format!("{other_transactions:#?}");
2731 let transactions_patch =
2732 create_patch(&local_transactions_text, &other_transactions_text);
2733 let local_effects_text = format!("{local_effects:#?}");
2734 let other_effects_text = format!("{other_effects:#?}");
2735 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2736 let seq_number = local_summary.sequence_number;
2737 let local_digest = local_summary.digest();
2738 let other_validator = name.concise();
2739 format!(
2740 "Checkpoint: {seq_number:?}\n\
2741 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2742 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2743 Summary Diff: \n{summary_patch}\n\n\
2744 Contents Diff: \n{contents_patch}\n\n\
2745 Transactions Diff: \n{transactions_patch}\n\n\
2746 Effects Diff: \n{effects_patch}",
2747 )
2748 })
2749 .collect::<Vec<_>>()
2750 .join("\n\n\n");
2751
2752 let header = format!(
2753 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2754 Datetime: {:?}",
2755 time
2756 );
2757 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2758 let path = tempfile::tempdir()
2759 .expect("Failed to create tempdir")
2760 .keep()
2761 .join(Path::new("checkpoint_fork_dump.txt"));
2762 let mut file = File::create(path).unwrap();
2763 write!(file, "{}", fork_logs_text).unwrap();
2764 debug!("{}", fork_logs_text);
2765}
2766
2767pub trait CheckpointServiceNotify {
2768 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult;
2769
2770 fn notify_checkpoint(&self) -> SuiResult;
2771}
2772
2773#[allow(clippy::large_enum_variant)]
2774enum CheckpointServiceState {
2775 Unstarted(
2776 (
2777 CheckpointBuilder,
2778 CheckpointAggregator,
2779 CheckpointStateHasher,
2780 ),
2781 ),
2782 Started,
2783}
2784
2785impl CheckpointServiceState {
2786 fn take_unstarted(
2787 &mut self,
2788 ) -> (
2789 CheckpointBuilder,
2790 CheckpointAggregator,
2791 CheckpointStateHasher,
2792 ) {
2793 let mut state = CheckpointServiceState::Started;
2794 std::mem::swap(self, &mut state);
2795
2796 match state {
2797 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2798 (builder, aggregator, hasher)
2799 }
2800 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2801 }
2802 }
2803}
2804
2805pub struct CheckpointService {
2806 tables: Arc<CheckpointStore>,
2807 notify_builder: Arc<Notify>,
2808 signature_sender: mpsc::UnboundedSender<CheckpointSignatureMessage>,
2809 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2811 highest_previously_built_seq: CheckpointSequenceNumber,
2814 metrics: Arc<CheckpointMetrics>,
2815 state: Mutex<CheckpointServiceState>,
2816}
2817
2818impl CheckpointService {
2819 #[allow(clippy::disallowed_methods)]
2825 pub fn build(
2826 state: Arc<AuthorityState>,
2827 checkpoint_store: Arc<CheckpointStore>,
2828 epoch_store: Arc<AuthorityPerEpochStore>,
2829 effects_store: Arc<dyn TransactionCacheRead>,
2830 global_state_hasher: Weak<GlobalStateHasher>,
2831 checkpoint_output: Box<dyn CheckpointOutput>,
2832 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2833 metrics: Arc<CheckpointMetrics>,
2834 ) -> Arc<Self> {
2835 info!("Starting checkpoint service");
2836 Self::initialize_accumulator_account_metrics(&state, &epoch_store, &metrics);
2837 let notify_builder = Arc::new(Notify::new());
2838 let notify_aggregator = Arc::new(Notify::new());
2839
2840 let highest_previously_built_seq = checkpoint_store
2842 .get_latest_locally_computed_checkpoint()
2843 .expect("failed to get latest locally computed checkpoint")
2844 .map(|s| s.sequence_number)
2845 .unwrap_or(0);
2846
2847 let highest_currently_built_seq =
2848 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2849 .expect("epoch should not have ended")
2850 .map(|(seq, _)| seq)
2851 .unwrap_or(0);
2852
2853 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2854
2855 let (signature_sender, signature_receiver) = mpsc::unbounded_channel();
2856
2857 let aggregator = CheckpointAggregator::new(
2858 checkpoint_store.clone(),
2859 epoch_store.clone(),
2860 notify_aggregator.clone(),
2861 signature_receiver,
2862 certified_checkpoint_output,
2863 state.clone(),
2864 metrics.clone(),
2865 );
2866
2867 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
2868
2869 let ckpt_state_hasher = CheckpointStateHasher::new(
2870 epoch_store.clone(),
2871 global_state_hasher.clone(),
2872 receive_from_builder,
2873 );
2874
2875 let builder = CheckpointBuilder::new(
2876 state.clone(),
2877 checkpoint_store.clone(),
2878 epoch_store.clone(),
2879 notify_builder.clone(),
2880 effects_store,
2881 global_state_hasher,
2882 send_to_hasher,
2883 checkpoint_output,
2884 notify_aggregator.clone(),
2885 highest_currently_built_seq_tx.clone(),
2886 metrics.clone(),
2887 );
2888
2889 Arc::new(Self {
2890 tables: checkpoint_store,
2891 notify_builder,
2892 signature_sender,
2893 highest_currently_built_seq_tx,
2894 highest_previously_built_seq,
2895 metrics,
2896 state: Mutex::new(CheckpointServiceState::Unstarted((
2897 builder,
2898 aggregator,
2899 ckpt_state_hasher,
2900 ))),
2901 })
2902 }
2903
2904 fn initialize_accumulator_account_metrics(
2905 state: &AuthorityState,
2906 epoch_store: &AuthorityPerEpochStore,
2907 metrics: &CheckpointMetrics,
2908 ) {
2909 if !epoch_store.protocol_config().enable_accumulators() {
2910 return;
2911 }
2912
2913 let object_store = state.get_object_store();
2914 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
2915 Ok(Some(count)) => metrics.initialize_accumulator_accounts_live(count),
2916 Ok(None) => {}
2917 Err(e) => fatal!("failed to initialize accumulator account metrics: {e}"),
2918 }
2919 }
2920
2921 pub async fn spawn(
2929 &self,
2930 epoch_store: Arc<AuthorityPerEpochStore>,
2931 consensus_replay_waiter: Option<ReplayWaiter>,
2932 ) {
2933 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
2934
2935 let last_persisted_builder_seq = epoch_store
2945 .last_persisted_checkpoint_builder_summary()
2946 .expect("epoch should not have ended")
2947 .map(|s| s.summary.sequence_number);
2948
2949 let last_executed_seq = self
2950 .tables
2951 .get_highest_executed_checkpoint()
2952 .expect("Failed to get highest executed checkpoint")
2953 .map(|checkpoint| *checkpoint.sequence_number());
2954
2955 if let Some(last_committed_seq) = last_persisted_builder_seq.max(last_executed_seq) {
2956 if let Err(e) = builder
2957 .epoch_store
2958 .clear_state_hashes_after_checkpoint(last_committed_seq)
2959 {
2960 error!(
2961 "Failed to clear state hashes after checkpoint {}: {:?}",
2962 last_committed_seq, e
2963 );
2964 } else {
2965 info!(
2966 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
2967 last_committed_seq
2968 );
2969 }
2970 }
2971
2972 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
2973
2974 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
2975 let aggregator_task = spawn_monitored_task!(aggregator.run());
2976
2977 spawn_monitored_task!(async move {
2978 epoch_store
2979 .within_alive_epoch(async move {
2980 builder.run(consensus_replay_waiter).await;
2981 builder_finished_tx.send(()).ok();
2982 })
2983 .await
2984 .ok();
2985
2986 state_hasher_task
2988 .await
2989 .expect("state hasher should exit normally");
2990
2991 aggregator_task.abort();
2994 aggregator_task.await.ok();
2995 });
2996
2997 if tokio::time::timeout(Duration::from_secs(120), async move {
3003 tokio::select! {
3004 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3005 _ = self.wait_for_rebuilt_checkpoints() => (),
3006 }
3007 })
3008 .await
3009 .is_err()
3010 {
3011 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3012 }
3013 }
3014}
3015
3016impl CheckpointService {
3017 pub async fn wait_for_rebuilt_checkpoints(&self) {
3023 let highest_previously_built_seq = self.highest_previously_built_seq;
3024 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3025 let mut highest_currently_built_seq = *rx.borrow_and_update();
3026 info!(
3027 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3028 );
3029 loop {
3030 if highest_currently_built_seq >= highest_previously_built_seq {
3031 info!("Checkpoint rebuild complete");
3032 break;
3033 }
3034 rx.changed().await.unwrap();
3035 highest_currently_built_seq = *rx.borrow_and_update();
3036 }
3037 }
3038
3039 #[cfg(test)]
3040 fn write_and_notify_checkpoint_for_testing(
3041 &self,
3042 epoch_store: &AuthorityPerEpochStore,
3043 checkpoint: PendingCheckpoint,
3044 ) -> SuiResult {
3045 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3046
3047 let mut output = ConsensusCommitOutput::new(0);
3048 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3049 output.set_default_commit_stats_for_testing();
3050 epoch_store.push_consensus_output_for_tests(output);
3051 self.notify_checkpoint()?;
3052 Ok(())
3053 }
3054}
3055
3056impl CheckpointServiceNotify for CheckpointService {
3057 fn notify_checkpoint_signature(&self, info: &CheckpointSignatureMessage) -> SuiResult {
3058 let sequence = info.summary.sequence_number;
3059 let signer = info.summary.auth_sig().authority.concise();
3060
3061 if let Some(highest_verified_checkpoint) = self
3062 .tables
3063 .get_highest_verified_checkpoint()?
3064 .map(|x| *x.sequence_number())
3065 && sequence <= highest_verified_checkpoint
3066 {
3067 trace!(
3068 checkpoint_seq = sequence,
3069 "Ignore checkpoint signature from {} - already certified", signer,
3070 );
3071 self.metrics
3072 .last_ignored_checkpoint_signature_received
3073 .set(sequence as i64);
3074 return Ok(());
3075 }
3076 trace!(
3077 checkpoint_seq = sequence,
3078 "Received checkpoint signature, digest {} from {}",
3079 info.summary.digest(),
3080 signer,
3081 );
3082 self.metrics
3083 .last_received_checkpoint_signatures
3084 .with_label_values(&[&signer.to_string()])
3085 .set(sequence as i64);
3086 self.signature_sender.send(info.clone()).ok();
3087 Ok(())
3088 }
3089
3090 fn notify_checkpoint(&self) -> SuiResult {
3091 self.notify_builder.notify_one();
3092 Ok(())
3093 }
3094}
3095
3096pub struct CheckpointServiceNoop {}
3098impl CheckpointServiceNotify for CheckpointServiceNoop {
3099 fn notify_checkpoint_signature(&self, _: &CheckpointSignatureMessage) -> SuiResult {
3100 Ok(())
3101 }
3102
3103 fn notify_checkpoint(&self) -> SuiResult {
3104 Ok(())
3105 }
3106}
3107
3108impl PendingCheckpoint {
3109 pub fn height(&self) -> CheckpointHeight {
3110 self.details.checkpoint_height
3111 }
3112
3113 pub(crate) fn num_roots(&self) -> usize {
3114 self.roots.iter().map(|r| r.tx_roots.len()).sum()
3115 }
3116}
3117
3118pin_project! {
3119 pub struct PollCounter<Fut> {
3120 #[pin]
3121 future: Fut,
3122 count: usize,
3123 }
3124}
3125
3126impl<Fut> PollCounter<Fut> {
3127 pub fn new(future: Fut) -> Self {
3128 Self { future, count: 0 }
3129 }
3130
3131 pub fn count(&self) -> usize {
3132 self.count
3133 }
3134}
3135
3136impl<Fut: Future> Future for PollCounter<Fut> {
3137 type Output = (usize, Fut::Output);
3138
3139 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3140 let this = self.project();
3141 *this.count += 1;
3142 match this.future.poll(cx) {
3143 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3144 Poll::Pending => Poll::Pending,
3145 }
3146 }
3147}
3148
3149fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3150 PollCounter::new(future)
3151}
3152
3153#[cfg(test)]
3154mod tests {
3155 use super::*;
3156 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3157 use futures::FutureExt as _;
3158 use futures::future::BoxFuture;
3159 use std::collections::HashMap;
3160 use std::ops::Deref;
3161 use sui_macros::sim_test;
3162 use sui_protocol_config::{Chain, ProtocolConfig};
3163 use sui_types::accumulator_event::AccumulatorEvent;
3164 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3165 use sui_types::crypto::Signature;
3166 use sui_types::effects::{TransactionEffects, TransactionEvents};
3167 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3168 use sui_types::transaction::VerifiedTransaction;
3169 use tokio::sync::mpsc;
3170
3171 #[tokio::test]
3172 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3173 let store = CheckpointStore::new_for_tests();
3174 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3175 for seq in 70u64..=80u64 {
3176 let contents =
3177 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3178 [sui_types::base_types::ExecutionDigests::new(
3179 sui_types::digests::TransactionDigest::random(),
3180 sui_types::digests::TransactionEffectsDigest::ZERO,
3181 )],
3182 );
3183 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3184 &protocol,
3185 0,
3186 seq,
3187 0,
3188 &contents,
3189 None,
3190 sui_types::gas::GasCostSummary::default(),
3191 None,
3192 0,
3193 Vec::new(),
3194 Vec::new(),
3195 );
3196 store
3197 .tables
3198 .locally_computed_checkpoints
3199 .insert(&seq, &summary)
3200 .unwrap();
3201 }
3202
3203 store
3204 .clear_locally_computed_checkpoints_from(76)
3205 .expect("clear should succeed");
3206
3207 assert!(
3209 store
3210 .tables
3211 .locally_computed_checkpoints
3212 .get(&75)
3213 .unwrap()
3214 .is_some()
3215 );
3216 assert!(
3217 store
3218 .tables
3219 .locally_computed_checkpoints
3220 .get(&76)
3221 .unwrap()
3222 .is_none()
3223 );
3224
3225 for seq in 70u64..76u64 {
3226 assert!(
3227 store
3228 .tables
3229 .locally_computed_checkpoints
3230 .get(&seq)
3231 .unwrap()
3232 .is_some()
3233 );
3234 }
3235 for seq in 76u64..=80u64 {
3236 assert!(
3237 store
3238 .tables
3239 .locally_computed_checkpoints
3240 .get(&seq)
3241 .unwrap()
3242 .is_none()
3243 );
3244 }
3245 }
3246
3247 #[tokio::test]
3248 async fn test_fork_detection_storage() {
3249 let store = CheckpointStore::new_for_tests();
3250 let seq_num = 42;
3252 let digest = CheckpointDigest::random();
3253
3254 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3255
3256 store
3257 .record_checkpoint_fork_detected(seq_num, digest)
3258 .unwrap();
3259
3260 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3261 assert!(retrieved.is_some());
3262 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3263 assert_eq!(retrieved_seq, seq_num);
3264 assert_eq!(retrieved_digest, digest);
3265
3266 store.clear_checkpoint_fork_detected().unwrap();
3267 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3268
3269 let tx_digest = TransactionDigest::random();
3271 let expected_effects = TransactionEffectsDigest::random();
3272 let actual_effects = TransactionEffectsDigest::random();
3273
3274 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3275
3276 store
3277 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3278 .unwrap();
3279
3280 let retrieved = store.get_transaction_fork_detected().unwrap();
3281 assert!(retrieved.is_some());
3282 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3283 assert_eq!(retrieved_tx, tx_digest);
3284 assert_eq!(retrieved_expected, expected_effects);
3285 assert_eq!(retrieved_actual, actual_effects);
3286
3287 store.clear_transaction_fork_detected().unwrap();
3288 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3289 }
3290
3291 #[sim_test]
3292 pub async fn checkpoint_builder_test() {
3293 telemetry_subscribers::init_for_testing();
3294
3295 let mut protocol_config =
3296 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3297 protocol_config.disable_accumulators_for_testing();
3298 let state = TestAuthorityBuilder::new()
3299 .with_protocol_config(protocol_config)
3300 .build()
3301 .await;
3302
3303 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3304 0,
3305 0,
3306 vec![],
3307 SequenceNumber::new(),
3308 );
3309
3310 for i in 0..20 {
3311 state
3312 .database_for_testing()
3313 .perpetual_tables
3314 .transactions
3315 .insert(&d(i), dummy_tx.serializable_ref())
3316 .unwrap();
3317 }
3318
3319 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3320 commit_cert_for_test(
3321 &mut store,
3322 state.clone(),
3323 d(1),
3324 vec![d(2), d(3)],
3325 GasCostSummary::new(11, 12, 11, 1),
3326 );
3327 commit_cert_for_test(
3328 &mut store,
3329 state.clone(),
3330 d(2),
3331 vec![d(3), d(4)],
3332 GasCostSummary::new(21, 22, 21, 1),
3333 );
3334 commit_cert_for_test(
3335 &mut store,
3336 state.clone(),
3337 d(3),
3338 vec![],
3339 GasCostSummary::new(31, 32, 31, 1),
3340 );
3341 commit_cert_for_test(
3342 &mut store,
3343 state.clone(),
3344 d(4),
3345 vec![],
3346 GasCostSummary::new(41, 42, 41, 1),
3347 );
3348 for i in [5, 6, 7, 10, 11, 12, 13] {
3349 commit_cert_for_test(
3350 &mut store,
3351 state.clone(),
3352 d(i),
3353 vec![],
3354 GasCostSummary::new(41, 42, 41, 1),
3355 );
3356 }
3357 for i in [15, 16, 17] {
3358 commit_cert_for_test(
3359 &mut store,
3360 state.clone(),
3361 d(i),
3362 vec![],
3363 GasCostSummary::new(51, 52, 51, 1),
3364 );
3365 }
3366 let all_digests: Vec<_> = store.keys().copied().collect();
3367 for digest in all_digests {
3368 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3369 state
3370 .epoch_store_for_testing()
3371 .test_insert_user_signature(digest, vec![(signature, None)]);
3372 }
3373
3374 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3375 let (certified_output, mut certified_result) =
3376 mpsc::channel::<CertifiedCheckpointSummary>(10);
3377 let store = Arc::new(store);
3378
3379 let ckpt_dir = tempfile::tempdir().unwrap();
3380 let checkpoint_store =
3381 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3382 let epoch_store = state.epoch_store_for_testing();
3383
3384 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3385 state.get_global_state_hash_store().clone(),
3386 ));
3387
3388 let checkpoint_service = CheckpointService::build(
3389 state.clone(),
3390 checkpoint_store,
3391 epoch_store.clone(),
3392 store,
3393 Arc::downgrade(&global_state_hasher),
3394 Box::new(output),
3395 Box::new(certified_output),
3396 CheckpointMetrics::new_for_tests(),
3397 );
3398 checkpoint_service.spawn(epoch_store.clone(), None).await;
3399
3400 checkpoint_service
3401 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3402 .unwrap();
3403 checkpoint_service
3404 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3405 .unwrap();
3406 checkpoint_service
3407 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3408 .unwrap();
3409 checkpoint_service
3410 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3411 .unwrap();
3412 checkpoint_service
3413 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3414 .unwrap();
3415 checkpoint_service
3416 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3417 .unwrap();
3418
3419 let (c1c, c1s) = result.recv().await.unwrap();
3420 let (c2c, c2s) = result.recv().await.unwrap();
3421
3422 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3423 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3424 assert_eq!(c1t, vec![d(4)]);
3425 assert_eq!(c1s.previous_digest, None);
3426 assert_eq!(c1s.sequence_number, 0);
3427 assert_eq!(
3428 c1s.epoch_rolling_gas_cost_summary,
3429 GasCostSummary::new(41, 42, 41, 1)
3430 );
3431
3432 assert_eq!(c2t, vec![d(3), d(1)]);
3434 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3435 assert_eq!(c2s.sequence_number, 1);
3436 assert_eq!(
3437 c2s.epoch_rolling_gas_cost_summary,
3438 GasCostSummary::new(83, 86, 83, 3)
3439 );
3440
3441 let (c3c, c3s) = result.recv().await.unwrap();
3444 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3445 assert_eq!(c3s.sequence_number, 2);
3446 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3447 assert_eq!(c3t, vec![d(10), d(11), d(12), d(13)]);
3448
3449 let (c4c, c4s) = result.recv().await.unwrap();
3450 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3451 assert_eq!(c4s.sequence_number, 3);
3452 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3453 assert_eq!(c4t, vec![d(15), d(16), d(17)]);
3454
3455 let (c5c, c5s) = result.recv().await.unwrap();
3456 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3457 assert_eq!(c5s.sequence_number, 4);
3458 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3459 assert_eq!(c5t, vec![d(5)]);
3460
3461 let (c6c, c6s) = result.recv().await.unwrap();
3462 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3463 assert_eq!(c6s.sequence_number, 5);
3464 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3465 assert_eq!(c6t, vec![d(6)]);
3466
3467 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3468 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3469
3470 checkpoint_service
3471 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c2ss })
3472 .unwrap();
3473 checkpoint_service
3474 .notify_checkpoint_signature(&CheckpointSignatureMessage { summary: c1ss })
3475 .unwrap();
3476
3477 let c1sc = certified_result.recv().await.unwrap();
3478 let c2sc = certified_result.recv().await.unwrap();
3479 assert_eq!(c1sc.sequence_number, 0);
3480 assert_eq!(c2sc.sequence_number, 1);
3481 }
3482
3483 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3484 fn notify_read_executed_effects_may_fail(
3485 &self,
3486 _: &str,
3487 digests: &[TransactionDigest],
3488 ) -> BoxFuture<'_, SuiResult<Vec<TransactionEffects>>> {
3489 std::future::ready(Ok(digests
3490 .iter()
3491 .map(|d| self.get(d).expect("effects not found").clone())
3492 .collect()))
3493 .boxed()
3494 }
3495
3496 fn notify_read_executed_effects_digests(
3497 &self,
3498 _: &str,
3499 digests: &[TransactionDigest],
3500 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3501 std::future::ready(
3502 digests
3503 .iter()
3504 .map(|d| {
3505 self.get(d)
3506 .map(|fx| fx.digest())
3507 .expect("effects not found")
3508 })
3509 .collect(),
3510 )
3511 .boxed()
3512 }
3513
3514 fn multi_get_executed_effects(
3515 &self,
3516 digests: &[TransactionDigest],
3517 ) -> Vec<Option<TransactionEffects>> {
3518 digests.iter().map(|d| self.get(d).cloned()).collect()
3519 }
3520
3521 fn multi_get_transaction_blocks(
3527 &self,
3528 _: &[TransactionDigest],
3529 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3530 unimplemented!()
3531 }
3532
3533 fn multi_get_executed_effects_digests(
3534 &self,
3535 _: &[TransactionDigest],
3536 ) -> Vec<Option<TransactionEffectsDigest>> {
3537 unimplemented!()
3538 }
3539
3540 fn multi_get_effects(
3541 &self,
3542 _: &[TransactionEffectsDigest],
3543 ) -> Vec<Option<TransactionEffects>> {
3544 unimplemented!()
3545 }
3546
3547 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3548 unimplemented!()
3549 }
3550
3551 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3552 unimplemented!()
3553 }
3554
3555 fn get_unchanged_loaded_runtime_objects(
3556 &self,
3557 _digest: &TransactionDigest,
3558 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3559 unimplemented!()
3560 }
3561
3562 fn transaction_executed_in_last_epoch(&self, _: &TransactionDigest, _: EpochId) -> bool {
3563 unimplemented!()
3564 }
3565 }
3566
3567 #[async_trait::async_trait]
3568 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3569 async fn checkpoint_created(
3570 &self,
3571 summary: &CheckpointSummary,
3572 contents: &CheckpointContents,
3573 _epoch_store: &Arc<AuthorityPerEpochStore>,
3574 _checkpoint_store: &Arc<CheckpointStore>,
3575 ) -> SuiResult {
3576 self.try_send((contents.clone(), summary.clone())).unwrap();
3577 Ok(())
3578 }
3579 }
3580
3581 #[async_trait::async_trait]
3582 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3583 async fn certified_checkpoint_created(
3584 &self,
3585 summary: &CertifiedCheckpointSummary,
3586 ) -> SuiResult {
3587 self.try_send(summary.clone()).unwrap();
3588 Ok(())
3589 }
3590 }
3591
3592 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3593 PendingCheckpoint {
3594 roots: vec![CheckpointRoots {
3595 tx_roots: t
3596 .into_iter()
3597 .map(|t| TransactionKey::Digest(d(t)))
3598 .collect(),
3599 settlement_root: None,
3600 height: i,
3601 }],
3602 details: PendingCheckpointInfo {
3603 timestamp_ms,
3604 last_of_epoch: false,
3605 checkpoint_height: i,
3606 consensus_commit_ref: CommitRef::default(),
3607 rejected_transactions_digest: Digest::default(),
3608 checkpoint_seq: i,
3609 },
3610 }
3611 }
3612
3613 fn d(i: u8) -> TransactionDigest {
3614 let mut bytes: [u8; 32] = Default::default();
3615 bytes[0] = i;
3616 TransactionDigest::new(bytes)
3617 }
3618
3619 fn e(
3620 transaction_digest: TransactionDigest,
3621 dependencies: Vec<TransactionDigest>,
3622 gas_used: GasCostSummary,
3623 ) -> TransactionEffects {
3624 let mut effects = TransactionEffects::default();
3625 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3626 *effects.dependencies_mut_for_testing() = dependencies;
3627 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3628 effects
3629 }
3630
3631 fn commit_cert_for_test(
3632 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3633 state: Arc<AuthorityState>,
3634 digest: TransactionDigest,
3635 dependencies: Vec<TransactionDigest>,
3636 gas_used: GasCostSummary,
3637 ) {
3638 let epoch_store = state.epoch_store_for_testing();
3639 let effects = e(digest, dependencies, gas_used);
3640 store.insert(digest, effects.clone());
3641 epoch_store.insert_executed_in_epoch(&digest);
3642 }
3643}