1mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::AccumulatorSettlementTxBuilder;
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::balance_withdraw_scheduler::BalanceSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use consensus_core::CommitRef;
26use diffy::create_patch;
27use itertools::Itertools;
28use mysten_common::random::get_rng;
29use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
30use mysten_common::{assert_reachable, debug_fatal, fatal};
31use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
32use nonempty::NonEmpty;
33use parking_lot::Mutex;
34use pin_project_lite::pin_project;
35use serde::{Deserialize, Serialize};
36use sui_macros::fail_point_arg;
37use sui_network::default_mysten_network_config;
38use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
39use sui_types::base_types::{ConciseableName, SequenceNumber};
40use sui_types::executable_transaction::VerifiedExecutableTransaction;
41use sui_types::execution::ExecutionTimeObservationKey;
42use sui_types::messages_checkpoint::{
43 CheckpointArtifacts, CheckpointCommitment, VersionedFullCheckpointContents,
44};
45use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
46use tokio::sync::{mpsc, watch};
47use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
48
49use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
50use crate::authority::authority_store_pruner::PrunerWatermarks;
51use crate::consensus_handler::SequencedConsensusTransactionKey;
52use rand::seq::SliceRandom;
53use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
54use std::fs::File;
55use std::future::Future;
56use std::io::Write;
57use std::path::Path;
58use std::pin::Pin;
59use std::sync::Arc;
60use std::sync::Weak;
61use std::task::{Context, Poll};
62use std::time::{Duration, SystemTime};
63use sui_protocol_config::ProtocolVersion;
64use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
65use sui_types::committee::StakeUnit;
66use sui_types::crypto::AuthorityStrongQuorumSignInfo;
67use sui_types::digests::{
68 CheckpointContentsDigest, CheckpointDigest, Digest, TransactionEffectsDigest,
69};
70use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
71use sui_types::error::{SuiErrorKind, SuiResult};
72use sui_types::gas::GasCostSummary;
73use sui_types::message_envelope::Message;
74use sui_types::messages_checkpoint::{
75 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
76 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
77 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
78 VerifiedCheckpointContents,
79};
80use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
81use sui_types::messages_consensus::ConsensusTransactionKey;
82use sui_types::signature::GenericSignature;
83use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
84use sui_types::transaction::{
85 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
86};
87use tokio::{sync::Notify, time::timeout};
88use tracing::{debug, error, info, instrument, trace, warn};
89use typed_store::DBMapUtils;
90use typed_store::Map;
91use typed_store::{
92 TypedStoreError,
93 rocks::{DBMap, MetricConf},
94};
95
96const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
97
98pub type CheckpointHeight = u64;
99
100pub struct EpochStats {
101 pub checkpoint_count: u64,
102 pub transaction_count: u64,
103 pub total_gas_reward: u64,
104}
105
106#[derive(Clone, Debug)]
107pub struct PendingCheckpointInfo {
108 pub timestamp_ms: CheckpointTimestamp,
109 pub last_of_epoch: bool,
110 pub checkpoint_height: CheckpointHeight,
113 pub consensus_commit_ref: CommitRef,
115 pub rejected_transactions_digest: Digest,
116}
117
118#[derive(Clone, Debug)]
119pub struct PendingCheckpoint {
120 pub roots: Vec<TransactionKey>,
121 pub details: PendingCheckpointInfo,
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize)]
125pub struct BuilderCheckpointSummary {
126 pub summary: CheckpointSummary,
127 pub checkpoint_height: Option<CheckpointHeight>,
129 pub position_in_commit: usize,
130}
131
132#[derive(DBMapUtils)]
133#[cfg_attr(tidehunter, tidehunter)]
134pub struct CheckpointStoreTables {
135 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
137
138 pub(crate) checkpoint_sequence_by_contents_digest:
140 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
141
142 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
146 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
149 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
150 full_checkpoint_content_v2: DBMap<CheckpointSequenceNumber, VersionedFullCheckpointContents>,
151
152 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
154 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
156
157 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
161
162 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
164
165 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
168
169 pub(crate) transaction_fork_detected: DBMap<
171 u8,
172 (
173 TransactionDigest,
174 TransactionEffectsDigest,
175 TransactionEffectsDigest,
176 ),
177 >,
178}
179
180fn full_checkpoint_content_table_default_config() -> DBOptions {
181 DBOptions {
182 options: default_db_options().options,
183 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
187 }
188}
189
190impl CheckpointStoreTables {
191 #[cfg(not(tidehunter))]
192 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
193 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
194 }
195
196 #[cfg(tidehunter)]
197 pub fn new(
198 path: &Path,
199 metric_name: &'static str,
200 pruner_watermarks: Arc<PrunerWatermarks>,
201 ) -> Self {
202 tracing::warn!("Checkpoint DB using tidehunter");
203 use crate::authority::authority_store_pruner::apply_relocation_filter;
204 use typed_store::tidehunter_util::{
205 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
206 default_mutex_count, default_value_cache_size,
207 };
208 let mutexes = default_mutex_count() * 4;
209 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
210 let override_dirty_keys_config = KeySpaceConfig::new()
211 .with_max_dirty_keys(64_000)
212 .with_value_cache_size(default_value_cache_size());
213 let config_u64 = ThConfig::new_with_config(
214 8,
215 mutexes,
216 u64_sequence_key,
217 override_dirty_keys_config.clone(),
218 );
219 let digest_config = ThConfig::new_with_rm_prefix(
220 32,
221 mutexes,
222 KeyType::uniform(default_cells_per_mutex()),
223 KeySpaceConfig::default(),
224 vec![0, 0, 0, 0, 0, 0, 0, 32],
225 );
226 let watermarks_config = KeySpaceConfig::new()
227 .with_value_cache_size(10)
228 .disable_unload();
229 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
230 let configs = vec![
231 (
232 "checkpoint_content",
233 digest_config.clone().with_config(
234 lru_config
235 .clone()
236 .with_relocation_filter(|_, _| Decision::Remove),
237 ),
238 ),
239 (
240 "checkpoint_sequence_by_contents_digest",
241 digest_config.clone().with_config(apply_relocation_filter(
242 KeySpaceConfig::default(),
243 pruner_watermarks.checkpoint_id.clone(),
244 |sequence_number: CheckpointSequenceNumber| sequence_number,
245 false,
246 )),
247 ),
248 (
249 "full_checkpoint_content",
250 config_u64.clone().with_config(apply_relocation_filter(
251 override_dirty_keys_config.clone(),
252 pruner_watermarks.checkpoint_id.clone(),
253 |sequence_number: CheckpointSequenceNumber| sequence_number,
254 true,
255 )),
256 ),
257 ("certified_checkpoints", config_u64.clone()),
258 (
259 "checkpoint_by_digest",
260 digest_config.clone().with_config(apply_relocation_filter(
261 lru_config,
262 pruner_watermarks.epoch_id.clone(),
263 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
264 false,
265 )),
266 ),
267 (
268 "locally_computed_checkpoints",
269 config_u64.clone().with_config(apply_relocation_filter(
270 override_dirty_keys_config.clone(),
271 pruner_watermarks.checkpoint_id.clone(),
272 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
273 true,
274 )),
275 ),
276 ("epoch_last_checkpoint_map", config_u64.clone()),
277 (
278 "watermarks",
279 ThConfig::new_with_config(4, 1, KeyType::uniform(1), watermarks_config.clone()),
280 ),
281 (
282 "transaction_fork_detected",
283 ThConfig::new_with_config(
284 1,
285 1,
286 KeyType::uniform(1),
287 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
288 ),
289 ),
290 ];
291 Self::open_tables_read_write(
292 path.to_path_buf(),
293 MetricConf::new(metric_name),
294 configs
295 .into_iter()
296 .map(|(cf, config)| (cf.to_string(), config))
297 .collect(),
298 )
299 }
300
301 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
302 Self::get_read_only_handle(
303 path.to_path_buf(),
304 None,
305 None,
306 MetricConf::new("checkpoint_readonly"),
307 )
308 }
309}
310
311pub struct CheckpointStore {
312 pub(crate) tables: CheckpointStoreTables,
313 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
314 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
315}
316
317impl CheckpointStore {
318 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
319 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
320 Arc::new(Self {
321 tables,
322 synced_checkpoint_notify_read: NotifyRead::new(),
323 executed_checkpoint_notify_read: NotifyRead::new(),
324 })
325 }
326
327 pub fn new_for_tests() -> Arc<Self> {
328 let ckpt_dir = mysten_common::tempdir().unwrap();
329 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
330 }
331
332 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
333 let tables = CheckpointStoreTables::new(
334 path,
335 "db_checkpoint",
336 Arc::new(PrunerWatermarks::default()),
337 );
338 Arc::new(Self {
339 tables,
340 synced_checkpoint_notify_read: NotifyRead::new(),
341 executed_checkpoint_notify_read: NotifyRead::new(),
342 })
343 }
344
345 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
346 CheckpointStoreTables::open_readonly(path)
347 }
348
349 #[instrument(level = "info", skip_all)]
350 pub fn insert_genesis_checkpoint(
351 &self,
352 checkpoint: VerifiedCheckpoint,
353 contents: CheckpointContents,
354 epoch_store: &AuthorityPerEpochStore,
355 ) {
356 assert_eq!(
357 checkpoint.epoch(),
358 0,
359 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
360 );
361 assert_eq!(
362 *checkpoint.sequence_number(),
363 0,
364 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
365 );
366
367 match self.get_checkpoint_by_sequence_number(0).unwrap() {
369 Some(existing_checkpoint) => {
370 assert_eq!(existing_checkpoint.digest(), checkpoint.digest())
371 }
372 None => {
373 if epoch_store.epoch() == checkpoint.epoch {
374 epoch_store
375 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
376 .unwrap();
377 } else {
378 debug!(
379 validator_epoch =% epoch_store.epoch(),
380 genesis_epoch =% checkpoint.epoch(),
381 "Not inserting checkpoint builder data for genesis checkpoint",
382 );
383 }
384 self.insert_checkpoint_contents(contents).unwrap();
385 self.insert_verified_checkpoint(&checkpoint).unwrap();
386 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
387 }
388 }
389 }
390
391 pub fn get_checkpoint_by_digest(
392 &self,
393 digest: &CheckpointDigest,
394 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
395 self.tables
396 .checkpoint_by_digest
397 .get(digest)
398 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
399 }
400
401 pub fn get_checkpoint_by_sequence_number(
402 &self,
403 sequence_number: CheckpointSequenceNumber,
404 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
405 self.tables
406 .certified_checkpoints
407 .get(&sequence_number)
408 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
409 }
410
411 pub fn get_locally_computed_checkpoint(
412 &self,
413 sequence_number: CheckpointSequenceNumber,
414 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
415 self.tables
416 .locally_computed_checkpoints
417 .get(&sequence_number)
418 }
419
420 pub fn multi_get_locally_computed_checkpoints(
421 &self,
422 sequence_numbers: &[CheckpointSequenceNumber],
423 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
424 let checkpoints = self
425 .tables
426 .locally_computed_checkpoints
427 .multi_get(sequence_numbers)?;
428
429 Ok(checkpoints)
430 }
431
432 pub fn get_sequence_number_by_contents_digest(
433 &self,
434 digest: &CheckpointContentsDigest,
435 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
436 self.tables
437 .checkpoint_sequence_by_contents_digest
438 .get(digest)
439 }
440
441 pub fn delete_contents_digest_sequence_number_mapping(
442 &self,
443 digest: &CheckpointContentsDigest,
444 ) -> Result<(), TypedStoreError> {
445 self.tables
446 .checkpoint_sequence_by_contents_digest
447 .remove(digest)
448 }
449
450 pub fn get_latest_certified_checkpoint(
451 &self,
452 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
453 Ok(self
454 .tables
455 .certified_checkpoints
456 .reversed_safe_iter_with_bounds(None, None)?
457 .next()
458 .transpose()?
459 .map(|(_, v)| v.into()))
460 }
461
462 pub fn get_latest_locally_computed_checkpoint(
463 &self,
464 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
465 Ok(self
466 .tables
467 .locally_computed_checkpoints
468 .reversed_safe_iter_with_bounds(None, None)?
469 .next()
470 .transpose()?
471 .map(|(_, v)| v))
472 }
473
474 pub fn multi_get_checkpoint_by_sequence_number(
475 &self,
476 sequence_numbers: &[CheckpointSequenceNumber],
477 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
478 let checkpoints = self
479 .tables
480 .certified_checkpoints
481 .multi_get(sequence_numbers)?
482 .into_iter()
483 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
484 .collect();
485
486 Ok(checkpoints)
487 }
488
489 pub fn multi_get_checkpoint_content(
490 &self,
491 contents_digest: &[CheckpointContentsDigest],
492 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
493 self.tables.checkpoint_content.multi_get(contents_digest)
494 }
495
496 pub fn get_highest_verified_checkpoint(
497 &self,
498 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
499 let highest_verified = if let Some(highest_verified) = self
500 .tables
501 .watermarks
502 .get(&CheckpointWatermark::HighestVerified)?
503 {
504 highest_verified
505 } else {
506 return Ok(None);
507 };
508 self.get_checkpoint_by_digest(&highest_verified.1)
509 }
510
511 pub fn get_highest_synced_checkpoint(
512 &self,
513 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
514 let highest_synced = if let Some(highest_synced) = self
515 .tables
516 .watermarks
517 .get(&CheckpointWatermark::HighestSynced)?
518 {
519 highest_synced
520 } else {
521 return Ok(None);
522 };
523 self.get_checkpoint_by_digest(&highest_synced.1)
524 }
525
526 pub fn get_highest_synced_checkpoint_seq_number(
527 &self,
528 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
529 if let Some(highest_synced) = self
530 .tables
531 .watermarks
532 .get(&CheckpointWatermark::HighestSynced)?
533 {
534 Ok(Some(highest_synced.0))
535 } else {
536 Ok(None)
537 }
538 }
539
540 pub fn get_highest_executed_checkpoint_seq_number(
541 &self,
542 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
543 if let Some(highest_executed) = self
544 .tables
545 .watermarks
546 .get(&CheckpointWatermark::HighestExecuted)?
547 {
548 Ok(Some(highest_executed.0))
549 } else {
550 Ok(None)
551 }
552 }
553
554 pub fn get_highest_executed_checkpoint(
555 &self,
556 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
557 let highest_executed = if let Some(highest_executed) = self
558 .tables
559 .watermarks
560 .get(&CheckpointWatermark::HighestExecuted)?
561 {
562 highest_executed
563 } else {
564 return Ok(None);
565 };
566 self.get_checkpoint_by_digest(&highest_executed.1)
567 }
568
569 pub fn get_highest_pruned_checkpoint_seq_number(
570 &self,
571 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
572 self.tables
573 .watermarks
574 .get(&CheckpointWatermark::HighestPruned)
575 .map(|watermark| watermark.map(|w| w.0))
576 }
577
578 pub fn get_checkpoint_contents(
579 &self,
580 digest: &CheckpointContentsDigest,
581 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
582 self.tables.checkpoint_content.get(digest)
583 }
584
585 pub fn get_full_checkpoint_contents_by_sequence_number(
586 &self,
587 seq: CheckpointSequenceNumber,
588 ) -> Result<Option<VersionedFullCheckpointContents>, TypedStoreError> {
589 self.tables.full_checkpoint_content_v2.get(&seq)
590 }
591
592 fn prune_local_summaries(&self) -> SuiResult {
593 if let Some((last_local_summary, _)) = self
594 .tables
595 .locally_computed_checkpoints
596 .reversed_safe_iter_with_bounds(None, None)?
597 .next()
598 .transpose()?
599 {
600 let mut batch = self.tables.locally_computed_checkpoints.batch();
601 batch.schedule_delete_range(
602 &self.tables.locally_computed_checkpoints,
603 &0,
604 &last_local_summary,
605 )?;
606 batch.write()?;
607 info!("Pruned local summaries up to {:?}", last_local_summary);
608 }
609 Ok(())
610 }
611
612 pub fn clear_locally_computed_checkpoints_from(
613 &self,
614 from_seq: CheckpointSequenceNumber,
615 ) -> SuiResult {
616 let keys: Vec<_> = self
617 .tables
618 .locally_computed_checkpoints
619 .safe_iter_with_bounds(Some(from_seq), None)
620 .map(|r| r.map(|(k, _)| k))
621 .collect::<Result<_, _>>()?;
622 if let Some(&last_local_summary) = keys.last() {
623 let mut batch = self.tables.locally_computed_checkpoints.batch();
624 batch
625 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
626 .expect("Failed to delete locally computed checkpoints");
627 batch
628 .write()
629 .expect("Failed to delete locally computed checkpoints");
630 warn!(
631 from_seq,
632 last_local_summary,
633 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
634 from_seq,
635 last_local_summary
636 );
637 }
638 Ok(())
639 }
640
641 fn check_for_checkpoint_fork(
642 &self,
643 local_checkpoint: &CheckpointSummary,
644 verified_checkpoint: &VerifiedCheckpoint,
645 ) {
646 if local_checkpoint != verified_checkpoint.data() {
647 let verified_contents = self
648 .get_checkpoint_contents(&verified_checkpoint.content_digest)
649 .map(|opt_contents| {
650 opt_contents
651 .map(|contents| format!("{:?}", contents))
652 .unwrap_or_else(|| {
653 format!(
654 "Verified checkpoint contents not found, digest: {:?}",
655 verified_checkpoint.content_digest,
656 )
657 })
658 })
659 .map_err(|e| {
660 format!(
661 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
662 verified_checkpoint.content_digest, e
663 )
664 })
665 .unwrap_or_else(|err_msg| err_msg);
666
667 let local_contents = self
668 .get_checkpoint_contents(&local_checkpoint.content_digest)
669 .map(|opt_contents| {
670 opt_contents
671 .map(|contents| format!("{:?}", contents))
672 .unwrap_or_else(|| {
673 format!(
674 "Local checkpoint contents not found, digest: {:?}",
675 local_checkpoint.content_digest
676 )
677 })
678 })
679 .map_err(|e| {
680 format!(
681 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
682 local_checkpoint.content_digest, e
683 )
684 })
685 .unwrap_or_else(|err_msg| err_msg);
686
687 error!(
689 verified_checkpoint = ?verified_checkpoint.data(),
690 ?verified_contents,
691 ?local_checkpoint,
692 ?local_contents,
693 "Local checkpoint fork detected!",
694 );
695
696 if let Err(e) = self.record_checkpoint_fork_detected(
698 *local_checkpoint.sequence_number(),
699 local_checkpoint.digest(),
700 ) {
701 error!("Failed to record checkpoint fork in database: {:?}", e);
702 }
703
704 fail_point_arg!(
705 "kill_checkpoint_fork_node",
706 |checkpoint_overrides: std::sync::Arc<
707 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
708 >| {
709 #[cfg(msim)]
710 {
711 if let Ok(mut overrides) = checkpoint_overrides.lock() {
712 overrides.insert(
713 local_checkpoint.sequence_number,
714 verified_checkpoint.digest().to_string(),
715 );
716 }
717 tracing::error!(
718 fatal = true,
719 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
720 local_checkpoint.sequence_number(),
721 verified_checkpoint.digest()
722 );
723 sui_simulator::task::shutdown_current_node();
724 }
725 }
726 );
727
728 fatal!(
729 "Local checkpoint fork detected for sequence number: {}",
730 local_checkpoint.sequence_number()
731 );
732 }
733 }
734
735 pub fn insert_certified_checkpoint(
741 &self,
742 checkpoint: &VerifiedCheckpoint,
743 ) -> Result<(), TypedStoreError> {
744 debug!(
745 checkpoint_seq = checkpoint.sequence_number(),
746 "Inserting certified checkpoint",
747 );
748 let mut batch = self.tables.certified_checkpoints.batch();
749 batch
750 .insert_batch(
751 &self.tables.certified_checkpoints,
752 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
753 )?
754 .insert_batch(
755 &self.tables.checkpoint_by_digest,
756 [(checkpoint.digest(), checkpoint.serializable_ref())],
757 )?;
758 if checkpoint.next_epoch_committee().is_some() {
759 batch.insert_batch(
760 &self.tables.epoch_last_checkpoint_map,
761 [(&checkpoint.epoch(), checkpoint.sequence_number())],
762 )?;
763 }
764 batch.write()?;
765
766 if let Some(local_checkpoint) = self
767 .tables
768 .locally_computed_checkpoints
769 .get(checkpoint.sequence_number())?
770 {
771 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
772 }
773
774 Ok(())
775 }
776
777 #[instrument(level = "debug", skip_all)]
780 pub fn insert_verified_checkpoint(
781 &self,
782 checkpoint: &VerifiedCheckpoint,
783 ) -> Result<(), TypedStoreError> {
784 self.insert_certified_checkpoint(checkpoint)?;
785 self.update_highest_verified_checkpoint(checkpoint)
786 }
787
788 pub fn update_highest_verified_checkpoint(
789 &self,
790 checkpoint: &VerifiedCheckpoint,
791 ) -> Result<(), TypedStoreError> {
792 if Some(*checkpoint.sequence_number())
793 > self
794 .get_highest_verified_checkpoint()?
795 .map(|x| *x.sequence_number())
796 {
797 debug!(
798 checkpoint_seq = checkpoint.sequence_number(),
799 "Updating highest verified checkpoint",
800 );
801 self.tables.watermarks.insert(
802 &CheckpointWatermark::HighestVerified,
803 &(*checkpoint.sequence_number(), *checkpoint.digest()),
804 )?;
805 }
806
807 Ok(())
808 }
809
810 pub fn update_highest_synced_checkpoint(
811 &self,
812 checkpoint: &VerifiedCheckpoint,
813 ) -> Result<(), TypedStoreError> {
814 let seq = *checkpoint.sequence_number();
815 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
816 self.tables.watermarks.insert(
817 &CheckpointWatermark::HighestSynced,
818 &(seq, *checkpoint.digest()),
819 )?;
820 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
821 Ok(())
822 }
823
824 async fn notify_read_checkpoint_watermark<F>(
825 &self,
826 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
827 seq: CheckpointSequenceNumber,
828 get_watermark: F,
829 ) -> VerifiedCheckpoint
830 where
831 F: Fn() -> Option<CheckpointSequenceNumber>,
832 {
833 notify_read
834 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
835 let seq = seqs[0];
836 let Some(highest) = get_watermark() else {
837 return vec![None];
838 };
839 if highest < seq {
840 return vec![None];
841 }
842 let checkpoint = self
843 .get_checkpoint_by_sequence_number(seq)
844 .expect("db error")
845 .expect("checkpoint not found");
846 vec![Some(checkpoint)]
847 })
848 .await
849 .into_iter()
850 .next()
851 .unwrap()
852 }
853
854 pub async fn notify_read_synced_checkpoint(
855 &self,
856 seq: CheckpointSequenceNumber,
857 ) -> VerifiedCheckpoint {
858 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
859 self.get_highest_synced_checkpoint_seq_number()
860 .expect("db error")
861 })
862 .await
863 }
864
865 pub async fn notify_read_executed_checkpoint(
866 &self,
867 seq: CheckpointSequenceNumber,
868 ) -> VerifiedCheckpoint {
869 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
870 self.get_highest_executed_checkpoint_seq_number()
871 .expect("db error")
872 })
873 .await
874 }
875
876 pub fn update_highest_executed_checkpoint(
877 &self,
878 checkpoint: &VerifiedCheckpoint,
879 ) -> Result<(), TypedStoreError> {
880 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
881 if seq_number >= *checkpoint.sequence_number() {
882 return Ok(());
883 }
884 assert_eq!(
885 seq_number + 1,
886 *checkpoint.sequence_number(),
887 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
888 checkpoint.sequence_number(),
889 seq_number
890 );
891 }
892 let seq = *checkpoint.sequence_number();
893 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
894 self.tables.watermarks.insert(
895 &CheckpointWatermark::HighestExecuted,
896 &(seq, *checkpoint.digest()),
897 )?;
898 self.executed_checkpoint_notify_read
899 .notify(&seq, checkpoint);
900 Ok(())
901 }
902
903 pub fn update_highest_pruned_checkpoint(
904 &self,
905 checkpoint: &VerifiedCheckpoint,
906 ) -> Result<(), TypedStoreError> {
907 self.tables.watermarks.insert(
908 &CheckpointWatermark::HighestPruned,
909 &(*checkpoint.sequence_number(), *checkpoint.digest()),
910 )
911 }
912
913 pub fn set_highest_executed_checkpoint_subtle(
918 &self,
919 checkpoint: &VerifiedCheckpoint,
920 ) -> Result<(), TypedStoreError> {
921 self.tables.watermarks.insert(
922 &CheckpointWatermark::HighestExecuted,
923 &(*checkpoint.sequence_number(), *checkpoint.digest()),
924 )
925 }
926
927 pub fn insert_checkpoint_contents(
928 &self,
929 contents: CheckpointContents,
930 ) -> Result<(), TypedStoreError> {
931 debug!(
932 checkpoint_seq = ?contents.digest(),
933 "Inserting checkpoint contents",
934 );
935 self.tables
936 .checkpoint_content
937 .insert(contents.digest(), &contents)
938 }
939
940 pub fn insert_verified_checkpoint_contents(
941 &self,
942 checkpoint: &VerifiedCheckpoint,
943 full_contents: VerifiedCheckpointContents,
944 ) -> Result<(), TypedStoreError> {
945 let mut batch = self.tables.full_checkpoint_content_v2.batch();
946 batch.insert_batch(
947 &self.tables.checkpoint_sequence_by_contents_digest,
948 [(&checkpoint.content_digest, checkpoint.sequence_number())],
949 )?;
950 let full_contents = full_contents.into_inner();
951 batch.insert_batch(
952 &self.tables.full_checkpoint_content_v2,
953 [(checkpoint.sequence_number(), &full_contents)],
954 )?;
955
956 let contents = full_contents.into_checkpoint_contents();
957 assert_eq!(&checkpoint.content_digest, contents.digest());
958
959 batch.insert_batch(
960 &self.tables.checkpoint_content,
961 [(contents.digest(), &contents)],
962 )?;
963
964 batch.write()
965 }
966
967 pub fn delete_full_checkpoint_contents(
968 &self,
969 seq: CheckpointSequenceNumber,
970 ) -> Result<(), TypedStoreError> {
971 self.tables.full_checkpoint_content.remove(&seq)?;
972 self.tables.full_checkpoint_content_v2.remove(&seq)
973 }
974
975 pub fn get_epoch_last_checkpoint(
976 &self,
977 epoch_id: EpochId,
978 ) -> SuiResult<Option<VerifiedCheckpoint>> {
979 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
980 let checkpoint = match seq {
981 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
982 None => None,
983 };
984 Ok(checkpoint)
985 }
986
987 pub fn get_epoch_last_checkpoint_seq_number(
988 &self,
989 epoch_id: EpochId,
990 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
991 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
992 Ok(seq)
993 }
994
995 pub fn insert_epoch_last_checkpoint(
996 &self,
997 epoch_id: EpochId,
998 checkpoint: &VerifiedCheckpoint,
999 ) -> SuiResult {
1000 self.tables
1001 .epoch_last_checkpoint_map
1002 .insert(&epoch_id, checkpoint.sequence_number())?;
1003 Ok(())
1004 }
1005
1006 pub fn get_epoch_state_commitments(
1007 &self,
1008 epoch: EpochId,
1009 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1010 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1011 checkpoint
1012 .end_of_epoch_data
1013 .as_ref()
1014 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1015 .epoch_commitments
1016 .clone()
1017 });
1018 Ok(commitments)
1019 }
1020
1021 pub fn get_epoch_stats(
1023 &self,
1024 epoch: EpochId,
1025 last_checkpoint: &CheckpointSummary,
1026 ) -> Option<EpochStats> {
1027 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1028 (0, 0)
1029 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1030 (
1031 checkpoint.sequence_number + 1,
1032 checkpoint.network_total_transactions,
1033 )
1034 } else {
1035 return None;
1036 };
1037 Some(EpochStats {
1038 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1039 transaction_count: last_checkpoint.network_total_transactions
1040 - prev_epoch_network_transactions,
1041 total_gas_reward: last_checkpoint
1042 .epoch_rolling_gas_cost_summary
1043 .computation_cost,
1044 })
1045 }
1046
1047 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1048 self.tables
1050 .checkpoint_content
1051 .checkpoint_db(path)
1052 .map_err(Into::into)
1053 }
1054
1055 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1056 let mut wb = self.tables.watermarks.batch();
1057 wb.delete_batch(
1058 &self.tables.watermarks,
1059 std::iter::once(CheckpointWatermark::HighestExecuted),
1060 )?;
1061 wb.write()?;
1062 Ok(())
1063 }
1064
1065 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1066 self.delete_highest_executed_checkpoint_test_only()?;
1067 Ok(())
1068 }
1069
1070 pub fn record_checkpoint_fork_detected(
1071 &self,
1072 checkpoint_seq: CheckpointSequenceNumber,
1073 checkpoint_digest: CheckpointDigest,
1074 ) -> Result<(), TypedStoreError> {
1075 info!(
1076 checkpoint_seq = checkpoint_seq,
1077 checkpoint_digest = ?checkpoint_digest,
1078 "Recording checkpoint fork detection in database"
1079 );
1080 self.tables.watermarks.insert(
1081 &CheckpointWatermark::CheckpointForkDetected,
1082 &(checkpoint_seq, checkpoint_digest),
1083 )
1084 }
1085
1086 pub fn get_checkpoint_fork_detected(
1087 &self,
1088 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1089 self.tables
1090 .watermarks
1091 .get(&CheckpointWatermark::CheckpointForkDetected)
1092 }
1093
1094 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1095 self.tables
1096 .watermarks
1097 .remove(&CheckpointWatermark::CheckpointForkDetected)
1098 }
1099
1100 pub fn record_transaction_fork_detected(
1101 &self,
1102 tx_digest: TransactionDigest,
1103 expected_effects_digest: TransactionEffectsDigest,
1104 actual_effects_digest: TransactionEffectsDigest,
1105 ) -> Result<(), TypedStoreError> {
1106 info!(
1107 tx_digest = ?tx_digest,
1108 expected_effects_digest = ?expected_effects_digest,
1109 actual_effects_digest = ?actual_effects_digest,
1110 "Recording transaction fork detection in database"
1111 );
1112 self.tables.transaction_fork_detected.insert(
1113 &TRANSACTION_FORK_DETECTED_KEY,
1114 &(tx_digest, expected_effects_digest, actual_effects_digest),
1115 )
1116 }
1117
1118 pub fn get_transaction_fork_detected(
1119 &self,
1120 ) -> Result<
1121 Option<(
1122 TransactionDigest,
1123 TransactionEffectsDigest,
1124 TransactionEffectsDigest,
1125 )>,
1126 TypedStoreError,
1127 > {
1128 self.tables
1129 .transaction_fork_detected
1130 .get(&TRANSACTION_FORK_DETECTED_KEY)
1131 }
1132
1133 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1134 self.tables
1135 .transaction_fork_detected
1136 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1137 }
1138}
1139
1140#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1141pub enum CheckpointWatermark {
1142 HighestVerified,
1143 HighestSynced,
1144 HighestExecuted,
1145 HighestPruned,
1146 CheckpointForkDetected,
1147}
1148
1149struct CheckpointStateHasher {
1150 epoch_store: Arc<AuthorityPerEpochStore>,
1151 hasher: Weak<GlobalStateHasher>,
1152 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1153}
1154
1155impl CheckpointStateHasher {
1156 fn new(
1157 epoch_store: Arc<AuthorityPerEpochStore>,
1158 hasher: Weak<GlobalStateHasher>,
1159 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1160 ) -> Self {
1161 Self {
1162 epoch_store,
1163 hasher,
1164 receive_from_builder,
1165 }
1166 }
1167
1168 async fn run(self) {
1169 let Self {
1170 epoch_store,
1171 hasher,
1172 mut receive_from_builder,
1173 } = self;
1174 while let Some((seq, effects)) = receive_from_builder.recv().await {
1175 let Some(hasher) = hasher.upgrade() else {
1176 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1177 break;
1178 };
1179 hasher
1180 .accumulate_checkpoint(&effects, seq, &epoch_store)
1181 .expect("epoch ended while accumulating checkpoint");
1182 }
1183 }
1184}
1185
1186#[derive(Debug)]
1187pub(crate) enum CheckpointBuilderError {
1188 ChangeEpochTxAlreadyExecuted,
1189 SystemPackagesMissing,
1190 Retry(anyhow::Error),
1191}
1192
1193impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1194 for CheckpointBuilderError
1195{
1196 fn from(e: SuiError) -> Self {
1197 Self::Retry(e.into())
1198 }
1199}
1200
1201pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1202
1203pub struct CheckpointBuilder {
1204 state: Arc<AuthorityState>,
1205 store: Arc<CheckpointStore>,
1206 epoch_store: Arc<AuthorityPerEpochStore>,
1207 notify: Arc<Notify>,
1208 notify_aggregator: Arc<Notify>,
1209 last_built: watch::Sender<CheckpointSequenceNumber>,
1210 effects_store: Arc<dyn TransactionCacheRead>,
1211 global_state_hasher: Weak<GlobalStateHasher>,
1212 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1213 output: Box<dyn CheckpointOutput>,
1214 metrics: Arc<CheckpointMetrics>,
1215 max_transactions_per_checkpoint: usize,
1216 max_checkpoint_size_bytes: usize,
1217}
1218
1219pub struct CheckpointAggregator {
1220 store: Arc<CheckpointStore>,
1221 epoch_store: Arc<AuthorityPerEpochStore>,
1222 notify: Arc<Notify>,
1223 current: Option<CheckpointSignatureAggregator>,
1224 output: Box<dyn CertifiedCheckpointOutput>,
1225 state: Arc<AuthorityState>,
1226 metrics: Arc<CheckpointMetrics>,
1227}
1228
1229pub struct CheckpointSignatureAggregator {
1231 next_index: u64,
1232 summary: CheckpointSummary,
1233 digest: CheckpointDigest,
1234 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1236 store: Arc<CheckpointStore>,
1237 state: Arc<AuthorityState>,
1238 metrics: Arc<CheckpointMetrics>,
1239}
1240
1241impl CheckpointBuilder {
1242 fn new(
1243 state: Arc<AuthorityState>,
1244 store: Arc<CheckpointStore>,
1245 epoch_store: Arc<AuthorityPerEpochStore>,
1246 notify: Arc<Notify>,
1247 effects_store: Arc<dyn TransactionCacheRead>,
1248 global_state_hasher: Weak<GlobalStateHasher>,
1250 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1252 output: Box<dyn CheckpointOutput>,
1253 notify_aggregator: Arc<Notify>,
1254 last_built: watch::Sender<CheckpointSequenceNumber>,
1255 metrics: Arc<CheckpointMetrics>,
1256 max_transactions_per_checkpoint: usize,
1257 max_checkpoint_size_bytes: usize,
1258 ) -> Self {
1259 Self {
1260 state,
1261 store,
1262 epoch_store,
1263 notify,
1264 effects_store,
1265 global_state_hasher,
1266 send_to_hasher,
1267 output,
1268 notify_aggregator,
1269 last_built,
1270 metrics,
1271 max_transactions_per_checkpoint,
1272 max_checkpoint_size_bytes,
1273 }
1274 }
1275
1276 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1284 if let Some(replay_waiter) = consensus_replay_waiter {
1285 info!("Waiting for consensus commits to replay ...");
1286 replay_waiter.wait_for_replay().await;
1287 info!("Consensus commits finished replaying");
1288 }
1289 info!("Starting CheckpointBuilder");
1290 loop {
1291 match self.maybe_build_checkpoints().await {
1292 Ok(()) => {}
1293 err @ Err(
1294 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1295 | CheckpointBuilderError::SystemPackagesMissing,
1296 ) => {
1297 info!("CheckpointBuilder stopping: {:?}", err);
1298 return;
1299 }
1300 Err(CheckpointBuilderError::Retry(inner)) => {
1301 let msg = format!("{:?}", inner);
1302 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1303 tokio::time::sleep(Duration::from_secs(1)).await;
1304 self.metrics.checkpoint_errors.inc();
1305 continue;
1306 }
1307 }
1308
1309 self.notify.notified().await;
1310 }
1311 }
1312
1313 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1314 let _scope = monitored_scope("BuildCheckpoints");
1315
1316 let summary = self
1318 .epoch_store
1319 .last_built_checkpoint_builder_summary()
1320 .expect("epoch should not have ended");
1321 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1322 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1323
1324 let min_checkpoint_interval_ms = self
1325 .epoch_store
1326 .protocol_config()
1327 .min_checkpoint_interval_ms_as_option()
1328 .unwrap_or_default();
1329 let mut grouped_pending_checkpoints = Vec::new();
1330 let mut checkpoints_iter = self
1331 .epoch_store
1332 .get_pending_checkpoints(last_height)
1333 .expect("unexpected epoch store error")
1334 .into_iter()
1335 .peekable();
1336 while let Some((height, pending)) = checkpoints_iter.next() {
1337 let current_timestamp = pending.details().timestamp_ms;
1340 let can_build = match last_timestamp {
1341 Some(last_timestamp) => {
1342 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1343 }
1344 None => true,
1345 } || checkpoints_iter
1348 .peek()
1349 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1350 || pending.details().last_of_epoch;
1352 grouped_pending_checkpoints.push(pending);
1353 if !can_build {
1354 debug!(
1355 checkpoint_commit_height = height,
1356 ?last_timestamp,
1357 ?current_timestamp,
1358 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1359 );
1360 continue;
1361 }
1362
1363 last_height = Some(height);
1365 last_timestamp = Some(current_timestamp);
1366 debug!(
1367 checkpoint_commit_height_from = grouped_pending_checkpoints
1368 .first()
1369 .unwrap()
1370 .details()
1371 .checkpoint_height,
1372 checkpoint_commit_height_to = last_height,
1373 "Making checkpoint with commit height range"
1374 );
1375
1376 let seq = self
1377 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1378 .await?;
1379
1380 self.last_built.send_if_modified(|cur| {
1381 if seq > *cur {
1383 *cur = seq;
1384 true
1385 } else {
1386 false
1387 }
1388 });
1389
1390 tokio::task::yield_now().await;
1393 }
1394 debug!(
1395 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1396 grouped_pending_checkpoints.len(),
1397 );
1398
1399 Ok(())
1400 }
1401
1402 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1403 async fn make_checkpoint(
1404 &mut self,
1405 pendings: Vec<PendingCheckpoint>,
1406 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1407 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1408
1409 let pending_ckpt_str = pendings
1410 .iter()
1411 .map(|p| {
1412 format!(
1413 "height={}, commit={}",
1414 p.details().checkpoint_height,
1415 p.details().consensus_commit_ref
1416 )
1417 })
1418 .join("; ");
1419
1420 let last_details = pendings.last().unwrap().details().clone();
1421
1422 let highest_executed_sequence = self
1425 .store
1426 .get_highest_executed_checkpoint_seq_number()
1427 .expect("db error")
1428 .unwrap_or(0);
1429
1430 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1431 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1432
1433 let new_checkpoints = self
1434 .create_checkpoints(
1435 sorted_tx_effects_included_in_checkpoint,
1436 &last_details,
1437 &all_roots,
1438 )
1439 .await?;
1440 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1441 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1442 debug_fatal!(
1443 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1444 );
1445 }
1446
1447 let new_ckpt_str = new_checkpoints
1448 .iter()
1449 .map(|(ckpt, _)| format!("seq={}, digest={}", ckpt.sequence_number(), ckpt.digest()))
1450 .join("; ");
1451
1452 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1453 .await?;
1454 info!(
1455 "Made new checkpoint {} from pending checkpoint {}",
1456 new_ckpt_str, pending_ckpt_str
1457 );
1458
1459 Ok(highest_sequence)
1460 }
1461
1462 async fn construct_and_execute_settlement_transactions(
1463 &self,
1464 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1465 checkpoint_height: CheckpointHeight,
1466 checkpoint_seq: CheckpointSequenceNumber,
1467 tx_index_offset: u64,
1468 ) -> (TransactionKey, Vec<TransactionEffects>) {
1469 let _scope =
1470 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1471
1472 let tx_key =
1473 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1474
1475 let epoch = self.epoch_store.epoch();
1476 let accumulator_root_obj_initial_shared_version = self
1477 .epoch_store
1478 .epoch_start_config()
1479 .accumulator_root_obj_initial_shared_version()
1480 .expect("accumulator root object must exist");
1481
1482 let builder = AccumulatorSettlementTxBuilder::new(
1483 Some(self.effects_store.as_ref()),
1484 sorted_tx_effects_included_in_checkpoint,
1485 checkpoint_seq,
1486 tx_index_offset,
1487 );
1488
1489 let accumulator_changes = builder.collect_accumulator_changes();
1490 let num_updates = builder.num_updates();
1491 let (settlement_txns, barrier_tx) = builder.build_tx(
1492 self.epoch_store.protocol_config(),
1493 epoch,
1494 accumulator_root_obj_initial_shared_version,
1495 checkpoint_height,
1496 checkpoint_seq,
1497 );
1498
1499 let settlement_txns: Vec<_> = settlement_txns
1500 .into_iter()
1501 .chain(std::iter::once(barrier_tx))
1502 .map(|tx| {
1503 VerifiedExecutableTransaction::new_system(
1504 VerifiedTransaction::new_system_transaction(tx),
1505 self.epoch_store.epoch(),
1506 )
1507 })
1508 .collect();
1509
1510 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1511
1512 debug!(
1513 ?settlement_digests,
1514 ?tx_key,
1515 "created settlement transactions with {num_updates} updates"
1516 );
1517
1518 self.epoch_store
1519 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1520
1521 let settlement_effects = loop {
1522 match tokio::time::timeout(Duration::from_secs(5), async {
1523 self.effects_store
1524 .notify_read_executed_effects(
1525 "CheckpointBuilder::notify_read_settlement_effects",
1526 &settlement_digests,
1527 )
1528 .await
1529 })
1530 .await
1531 {
1532 Ok(effects) => break effects,
1533 Err(_) => {
1534 debug_fatal!(
1535 "Timeout waiting for settlement transactions to be executed {:?}, retrying...",
1536 tx_key
1537 );
1538 }
1539 }
1540 };
1541
1542 let mut next_accumulator_version = None;
1543 for fx in settlement_effects.iter() {
1544 assert!(
1545 fx.status().is_ok(),
1546 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1547 fx.transaction_digest(),
1548 fx
1549 );
1550 if let Some(version) = fx
1551 .mutated()
1552 .iter()
1553 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1554 {
1555 assert!(
1556 next_accumulator_version.is_none(),
1557 "Only one settlement transaction should mutate the accumulator root object"
1558 );
1559 next_accumulator_version = Some(version);
1560 }
1561 }
1562 let settlements = BalanceSettlement {
1563 next_accumulator_version: next_accumulator_version
1564 .expect("Accumulator root object should be mutated in the settlement transactions"),
1565 balance_changes: accumulator_changes,
1566 };
1567
1568 self.state
1569 .execution_scheduler()
1570 .settle_balances(settlements);
1571
1572 (tx_key, settlement_effects)
1573 }
1574
1575 #[instrument(level = "debug", skip_all)]
1580 async fn resolve_checkpoint_transactions(
1581 &self,
1582 pending_checkpoints: Vec<PendingCheckpoint>,
1583 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1584 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1585
1586 let mut effects_in_current_checkpoint = BTreeSet::new();
1591
1592 let mut tx_effects = Vec::new();
1593 let mut tx_roots = HashSet::new();
1594
1595 for pending_checkpoint in pending_checkpoints.into_iter() {
1596 let mut pending = pending_checkpoint;
1597 debug!(
1598 checkpoint_commit_height = pending.details.checkpoint_height,
1599 "Resolving checkpoint transactions for pending checkpoint.",
1600 );
1601
1602 trace!(
1603 "roots for pending checkpoint {:?}: {:?}",
1604 pending.details.checkpoint_height, pending.roots,
1605 );
1606
1607 let settlement_root = if self.epoch_store.accumulators_enabled() {
1608 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1609 pending.roots.pop()
1610 else {
1611 fatal!("No settlement root found");
1612 };
1613 Some(settlement_root)
1614 } else {
1615 None
1616 };
1617
1618 let roots = &pending.roots;
1619
1620 self.metrics
1621 .checkpoint_roots_count
1622 .inc_by(roots.len() as u64);
1623
1624 let root_digests = self
1625 .epoch_store
1626 .notify_read_tx_key_to_digest(roots)
1627 .in_monitored_scope("CheckpointNotifyDigests")
1628 .await?;
1629 let root_effects = self
1630 .effects_store
1631 .notify_read_executed_effects(
1632 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1633 &root_digests,
1634 )
1635 .in_monitored_scope("CheckpointNotifyRead")
1636 .await;
1637
1638 let consensus_commit_prologue = if self
1639 .epoch_store
1640 .protocol_config()
1641 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1642 {
1643 let consensus_commit_prologue =
1647 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1648
1649 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1652 let unsorted_ccp = self.complete_checkpoint_effects(
1653 vec![ccp_effects.clone()],
1654 &mut effects_in_current_checkpoint,
1655 )?;
1656
1657 if unsorted_ccp.len() != 1 {
1660 fatal!(
1661 "Expected 1 consensus commit prologue, got {:?}",
1662 unsorted_ccp
1663 .iter()
1664 .map(|e| e.transaction_digest())
1665 .collect::<Vec<_>>()
1666 );
1667 }
1668 assert_eq!(unsorted_ccp.len(), 1);
1669 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1670 }
1671 consensus_commit_prologue
1672 } else {
1673 None
1674 };
1675
1676 let unsorted =
1677 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1678
1679 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1680 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1681 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1682 if cfg!(debug_assertions) {
1683 for tx in unsorted.iter() {
1685 assert!(tx.transaction_digest() != &ccp_digest);
1686 }
1687 }
1688 sorted.push(ccp_effects);
1689 }
1690 sorted.extend(CausalOrder::causal_sort(unsorted));
1691
1692 if let Some(settlement_root) = settlement_root {
1693 let last_checkpoint =
1696 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1697 let next_checkpoint_seq = last_checkpoint
1698 .as_ref()
1699 .map(|(seq, _)| *seq)
1700 .unwrap_or_default()
1701 + 1;
1702 let tx_index_offset = tx_effects.len() as u64;
1703
1704 let (tx_key, settlement_effects) = self
1705 .construct_and_execute_settlement_transactions(
1706 &sorted,
1707 pending.details.checkpoint_height,
1708 next_checkpoint_seq,
1709 tx_index_offset,
1710 )
1711 .await;
1712 debug!(?tx_key, "executed settlement transactions");
1713
1714 assert_eq!(settlement_root, tx_key);
1715
1716 sorted.extend(settlement_effects);
1723 }
1724
1725 #[cfg(msim)]
1726 {
1727 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1729 }
1730
1731 tx_effects.extend(sorted);
1732 tx_roots.extend(root_digests);
1733 }
1734
1735 Ok((tx_effects, tx_roots))
1736 }
1737
1738 fn extract_consensus_commit_prologue(
1743 &self,
1744 root_digests: &[TransactionDigest],
1745 root_effects: &[TransactionEffects],
1746 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1747 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1748 if root_digests.is_empty() {
1749 return Ok(None);
1750 }
1751
1752 let first_tx = self
1757 .state
1758 .get_transaction_cache_reader()
1759 .get_transaction_block(&root_digests[0])
1760 .expect("Transaction block must exist");
1761
1762 Ok(first_tx
1763 .transaction_data()
1764 .is_consensus_commit_prologue()
1765 .then(|| {
1766 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1767 (*first_tx.digest(), root_effects[0].clone())
1768 }))
1769 }
1770
1771 #[instrument(level = "debug", skip_all)]
1772 async fn write_checkpoints(
1773 &mut self,
1774 height: CheckpointHeight,
1775 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1776 ) -> SuiResult {
1777 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1778 let mut batch = self.store.tables.checkpoint_content.batch();
1779 let mut all_tx_digests =
1780 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1781
1782 for (summary, contents) in &new_checkpoints {
1783 debug!(
1784 checkpoint_commit_height = height,
1785 checkpoint_seq = summary.sequence_number,
1786 contents_digest = ?contents.digest(),
1787 "writing checkpoint",
1788 );
1789
1790 if let Some(previously_computed_summary) = self
1791 .store
1792 .tables
1793 .locally_computed_checkpoints
1794 .get(&summary.sequence_number)?
1795 && previously_computed_summary.digest() != summary.digest()
1796 {
1797 fatal!(
1798 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1799 summary.sequence_number,
1800 previously_computed_summary.digest(),
1801 summary.digest()
1802 );
1803 }
1804
1805 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1806
1807 self.metrics
1808 .transactions_included_in_checkpoint
1809 .inc_by(contents.size() as u64);
1810 let sequence_number = summary.sequence_number;
1811 self.metrics
1812 .last_constructed_checkpoint
1813 .set(sequence_number as i64);
1814
1815 batch.insert_batch(
1816 &self.store.tables.checkpoint_content,
1817 [(contents.digest(), contents)],
1818 )?;
1819
1820 batch.insert_batch(
1821 &self.store.tables.locally_computed_checkpoints,
1822 [(sequence_number, summary)],
1823 )?;
1824 }
1825
1826 batch.write()?;
1827
1828 for (summary, contents) in &new_checkpoints {
1830 self.output
1831 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1832 .await?;
1833 }
1834
1835 for (local_checkpoint, _) in &new_checkpoints {
1836 if let Some(certified_checkpoint) = self
1837 .store
1838 .tables
1839 .certified_checkpoints
1840 .get(local_checkpoint.sequence_number())?
1841 {
1842 self.store
1843 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1844 }
1845 }
1846
1847 self.notify_aggregator.notify_one();
1848 self.epoch_store
1849 .process_constructed_checkpoint(height, new_checkpoints);
1850 Ok(())
1851 }
1852
1853 #[allow(clippy::type_complexity)]
1854 fn split_checkpoint_chunks(
1855 &self,
1856 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1857 signatures: Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
1858 ) -> CheckpointBuilderResult<
1859 Vec<
1860 Vec<(
1861 TransactionEffects,
1862 Vec<(GenericSignature, Option<SequenceNumber>)>,
1863 )>,
1864 >,
1865 > {
1866 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1867 let mut chunks = Vec::new();
1868 let mut chunk = Vec::new();
1869 let mut chunk_size: usize = 0;
1870 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1871 .into_iter()
1872 .zip(signatures.into_iter())
1873 {
1874 let signatures_size = if self.epoch_store.protocol_config().address_aliases() {
1879 bcs::serialized_size(&signatures)?
1880 } else {
1881 let signatures: Vec<&GenericSignature> =
1882 signatures.iter().map(|(s, _)| s).collect();
1883 bcs::serialized_size(&signatures)?
1884 };
1885 let size = transaction_size + bcs::serialized_size(&effects)? + signatures_size;
1886 if chunk.len() == self.max_transactions_per_checkpoint
1887 || (chunk_size + size) > self.max_checkpoint_size_bytes
1888 {
1889 if chunk.is_empty() {
1890 warn!(
1892 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1893 self.max_checkpoint_size_bytes
1894 );
1895 } else {
1896 chunks.push(chunk);
1897 chunk = Vec::new();
1898 chunk_size = 0;
1899 }
1900 }
1901
1902 chunk.push((effects, signatures));
1903 chunk_size += size;
1904 }
1905
1906 if !chunk.is_empty() || chunks.is_empty() {
1907 chunks.push(chunk);
1912 }
1917 Ok(chunks)
1918 }
1919
1920 fn load_last_built_checkpoint_summary(
1921 epoch_store: &AuthorityPerEpochStore,
1922 store: &CheckpointStore,
1923 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1924 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1925 if last_checkpoint.is_none() {
1926 let epoch = epoch_store.epoch();
1927 if epoch > 0 {
1928 let previous_epoch = epoch - 1;
1929 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1930 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1931 if let Some((ref seq, _)) = last_checkpoint {
1932 debug!(
1933 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1934 );
1935 } else {
1936 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1938 }
1939 }
1940 }
1941 Ok(last_checkpoint)
1942 }
1943
1944 #[instrument(level = "debug", skip_all)]
1945 async fn create_checkpoints(
1946 &self,
1947 all_effects: Vec<TransactionEffects>,
1948 details: &PendingCheckpointInfo,
1949 all_roots: &HashSet<TransactionDigest>,
1950 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1951 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1952
1953 let total = all_effects.len();
1954 let mut last_checkpoint =
1955 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1956 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1957 info!(
1958 checkpoint_commit_height = details.checkpoint_height,
1959 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1960 checkpoint_timestamp = details.timestamp_ms,
1961 "Creating checkpoint(s) for {} transactions",
1962 all_effects.len(),
1963 );
1964
1965 let all_digests: Vec<_> = all_effects
1966 .iter()
1967 .map(|effect| *effect.transaction_digest())
1968 .collect();
1969 let transactions_and_sizes = self
1970 .state
1971 .get_transaction_cache_reader()
1972 .get_transactions_and_serialized_sizes(&all_digests)?;
1973 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1974 let mut transactions = Vec::with_capacity(all_effects.len());
1975 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1976 let mut randomness_rounds = BTreeMap::new();
1977 {
1978 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1979 debug!(
1980 ?last_checkpoint_seq,
1981 "Waiting for {:?} certificates to appear in consensus",
1982 all_effects.len()
1983 );
1984
1985 for (effects, transaction_and_size) in all_effects
1986 .into_iter()
1987 .zip(transactions_and_sizes.into_iter())
1988 {
1989 let (transaction, size) = transaction_and_size
1990 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1991 match transaction.inner().transaction_data().kind() {
1992 TransactionKind::ConsensusCommitPrologue(_)
1993 | TransactionKind::ConsensusCommitPrologueV2(_)
1994 | TransactionKind::ConsensusCommitPrologueV3(_)
1995 | TransactionKind::ConsensusCommitPrologueV4(_)
1996 | TransactionKind::AuthenticatorStateUpdate(_) => {
1997 }
2000 TransactionKind::ProgrammableSystemTransaction(_) => {
2001 }
2003 TransactionKind::ChangeEpoch(_)
2004 | TransactionKind::Genesis(_)
2005 | TransactionKind::EndOfEpochTransaction(_) => {
2006 fatal!(
2007 "unexpected transaction in checkpoint effects: {:?}",
2008 transaction
2009 );
2010 }
2011 TransactionKind::RandomnessStateUpdate(rsu) => {
2012 randomness_rounds
2013 .insert(*effects.transaction_digest(), rsu.randomness_round);
2014 }
2015 TransactionKind::ProgrammableTransaction(_) => {
2016 let digest = *effects.transaction_digest();
2020 if !all_roots.contains(&digest) {
2021 transaction_keys.push(SequencedConsensusTransactionKey::External(
2022 ConsensusTransactionKey::Certificate(digest),
2023 ));
2024 }
2025 }
2026 }
2027 transactions.push(transaction);
2028 all_effects_and_transaction_sizes.push((effects, size));
2029 }
2030
2031 self.epoch_store
2032 .consensus_messages_processed_notify(transaction_keys)
2033 .await?;
2034 }
2035
2036 let signatures = self
2037 .epoch_store
2038 .user_signatures_for_checkpoint(&transactions, &all_digests);
2039 debug!(
2040 ?last_checkpoint_seq,
2041 "Received {} checkpoint user signatures from consensus",
2042 signatures.len()
2043 );
2044
2045 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
2046 Some(
2047 transactions
2048 .iter()
2049 .flat_map(|tx| {
2050 if let TransactionKind::ProgrammableTransaction(ptb) =
2051 tx.transaction_data().kind()
2052 {
2053 itertools::Either::Left(
2054 ptb.commands
2055 .iter()
2056 .map(ExecutionTimeObservationKey::from_command),
2057 )
2058 } else {
2059 itertools::Either::Right(std::iter::empty())
2060 }
2061 })
2062 .collect(),
2063 )
2064 } else {
2065 None
2066 };
2067
2068 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2069 let chunks_count = chunks.len();
2070
2071 let mut checkpoints = Vec::with_capacity(chunks_count);
2072 debug!(
2073 ?last_checkpoint_seq,
2074 "Creating {} checkpoints with {} transactions", chunks_count, total,
2075 );
2076
2077 let epoch = self.epoch_store.epoch();
2078 for (index, transactions) in chunks.into_iter().enumerate() {
2079 let first_checkpoint_of_epoch = index == 0
2080 && last_checkpoint
2081 .as_ref()
2082 .map(|(_, c)| c.epoch != epoch)
2083 .unwrap_or(true);
2084 if first_checkpoint_of_epoch {
2085 self.epoch_store
2086 .record_epoch_first_checkpoint_creation_time_metric();
2087 }
2088 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2089
2090 let sequence_number = last_checkpoint
2091 .as_ref()
2092 .map(|(_, c)| c.sequence_number + 1)
2093 .unwrap_or_default();
2094 let mut timestamp_ms = details.timestamp_ms;
2095 if let Some((_, last_checkpoint)) = &last_checkpoint
2096 && last_checkpoint.timestamp_ms > timestamp_ms
2097 {
2098 debug!(
2100 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2101 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2102 );
2103 if self
2104 .epoch_store
2105 .protocol_config()
2106 .enforce_checkpoint_timestamp_monotonicity()
2107 {
2108 timestamp_ms = last_checkpoint.timestamp_ms;
2109 }
2110 }
2111
2112 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2113 let epoch_rolling_gas_cost_summary =
2114 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2115
2116 let end_of_epoch_data = if last_checkpoint_of_epoch {
2117 let system_state_obj = self
2118 .augment_epoch_last_checkpoint(
2119 &epoch_rolling_gas_cost_summary,
2120 timestamp_ms,
2121 &mut effects,
2122 &mut signatures,
2123 sequence_number,
2124 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2125 last_checkpoint_seq.unwrap_or_default(),
2126 )
2127 .await?;
2128
2129 let committee = system_state_obj
2130 .get_current_epoch_committee()
2131 .committee()
2132 .clone();
2133
2134 let root_state_digest = {
2137 let state_acc = self
2138 .global_state_hasher
2139 .upgrade()
2140 .expect("No checkpoints should be getting built after local configuration");
2141 let acc = state_acc.accumulate_checkpoint(
2142 &effects,
2143 sequence_number,
2144 &self.epoch_store,
2145 )?;
2146
2147 state_acc
2148 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2149 .await?;
2150
2151 state_acc.accumulate_running_root(
2152 &self.epoch_store,
2153 sequence_number,
2154 Some(acc),
2155 )?;
2156 state_acc
2157 .digest_epoch(self.epoch_store.clone(), sequence_number)
2158 .await?
2159 };
2160 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2161 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2162
2163 let epoch_commitments = if self
2164 .epoch_store
2165 .protocol_config()
2166 .check_commit_root_state_digest_supported()
2167 {
2168 vec![root_state_digest.into()]
2169 } else {
2170 vec![]
2171 };
2172
2173 Some(EndOfEpochData {
2174 next_epoch_committee: committee.voting_rights,
2175 next_epoch_protocol_version: ProtocolVersion::new(
2176 system_state_obj.protocol_version(),
2177 ),
2178 epoch_commitments,
2179 })
2180 } else {
2181 self.send_to_hasher
2182 .send((sequence_number, effects.clone()))
2183 .await?;
2184
2185 None
2186 };
2187 let contents = if self.epoch_store.protocol_config().address_aliases() {
2188 CheckpointContents::new_v2(&effects, signatures)
2189 } else {
2190 CheckpointContents::new_with_digests_and_signatures(
2191 effects.iter().map(TransactionEffects::execution_digests),
2192 signatures
2193 .into_iter()
2194 .map(|sigs| sigs.into_iter().map(|(s, _)| s).collect())
2195 .collect(),
2196 )
2197 };
2198
2199 let num_txns = contents.size() as u64;
2200
2201 let network_total_transactions = last_checkpoint
2202 .as_ref()
2203 .map(|(_, c)| c.network_total_transactions + num_txns)
2204 .unwrap_or(num_txns);
2205
2206 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2207
2208 let matching_randomness_rounds: Vec<_> = effects
2209 .iter()
2210 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2211 .copied()
2212 .collect();
2213
2214 let checkpoint_commitments = if self
2215 .epoch_store
2216 .protocol_config()
2217 .include_checkpoint_artifacts_digest_in_summary()
2218 {
2219 let artifacts = CheckpointArtifacts::from(&effects[..]);
2220 let artifacts_digest = artifacts.digest()?;
2221 vec![artifacts_digest.into()]
2222 } else {
2223 Default::default()
2224 };
2225
2226 let summary = CheckpointSummary::new(
2227 self.epoch_store.protocol_config(),
2228 epoch,
2229 sequence_number,
2230 network_total_transactions,
2231 &contents,
2232 previous_digest,
2233 epoch_rolling_gas_cost_summary,
2234 end_of_epoch_data,
2235 timestamp_ms,
2236 matching_randomness_rounds,
2237 checkpoint_commitments,
2238 );
2239 summary.report_checkpoint_age(
2240 &self.metrics.last_created_checkpoint_age,
2241 &self.metrics.last_created_checkpoint_age_ms,
2242 );
2243 if last_checkpoint_of_epoch {
2244 info!(
2245 checkpoint_seq = sequence_number,
2246 "creating last checkpoint of epoch {}", epoch
2247 );
2248 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2249 self.epoch_store
2250 .report_epoch_metrics_at_last_checkpoint(stats);
2251 }
2252 }
2253 last_checkpoint = Some((sequence_number, summary.clone()));
2254 checkpoints.push((summary, contents));
2255 }
2256
2257 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2258 }
2259
2260 fn get_epoch_total_gas_cost(
2261 &self,
2262 last_checkpoint: Option<&CheckpointSummary>,
2263 cur_checkpoint_effects: &[TransactionEffects],
2264 ) -> GasCostSummary {
2265 let (previous_epoch, previous_gas_costs) = last_checkpoint
2266 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2267 .unwrap_or_default();
2268 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2269 if previous_epoch == self.epoch_store.epoch() {
2270 GasCostSummary::new(
2272 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2273 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2274 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2275 previous_gas_costs.non_refundable_storage_fee
2276 + current_gas_costs.non_refundable_storage_fee,
2277 )
2278 } else {
2279 current_gas_costs
2280 }
2281 }
2282
2283 #[instrument(level = "error", skip_all)]
2284 async fn augment_epoch_last_checkpoint(
2285 &self,
2286 epoch_total_gas_cost: &GasCostSummary,
2287 epoch_start_timestamp_ms: CheckpointTimestamp,
2288 checkpoint_effects: &mut Vec<TransactionEffects>,
2289 signatures: &mut Vec<Vec<(GenericSignature, Option<SequenceNumber>)>>,
2290 checkpoint: CheckpointSequenceNumber,
2291 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2292 last_checkpoint: CheckpointSequenceNumber,
2295 ) -> CheckpointBuilderResult<SuiSystemState> {
2296 let (system_state, effects) = self
2297 .state
2298 .create_and_execute_advance_epoch_tx(
2299 &self.epoch_store,
2300 epoch_total_gas_cost,
2301 checkpoint,
2302 epoch_start_timestamp_ms,
2303 end_of_epoch_observation_keys,
2304 last_checkpoint,
2305 )
2306 .await?;
2307 checkpoint_effects.push(effects);
2308 signatures.push(vec![]);
2309 Ok(system_state)
2310 }
2311
2312 #[instrument(level = "debug", skip_all)]
2319 fn complete_checkpoint_effects(
2320 &self,
2321 mut roots: Vec<TransactionEffects>,
2322 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2323 ) -> SuiResult<Vec<TransactionEffects>> {
2324 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2325 let mut results = vec![];
2326 let mut seen = HashSet::new();
2327 loop {
2328 let mut pending = HashSet::new();
2329
2330 let transactions_included = self
2331 .epoch_store
2332 .builder_included_transactions_in_checkpoint(
2333 roots.iter().map(|e| e.transaction_digest()),
2334 )?;
2335
2336 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2337 let digest = effect.transaction_digest();
2338 seen.insert(*digest);
2340
2341 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2343 continue;
2344 }
2345
2346 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2348 continue;
2349 }
2350
2351 let existing_effects = self
2352 .epoch_store
2353 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2354
2355 for (dependency, effects_signature_exists) in
2356 effect.dependencies().iter().zip(existing_effects.iter())
2357 {
2358 if !effects_signature_exists {
2363 continue;
2364 }
2365 if seen.insert(*dependency) {
2366 pending.insert(*dependency);
2367 }
2368 }
2369 results.push(effect);
2370 }
2371 if pending.is_empty() {
2372 break;
2373 }
2374 let pending = pending.into_iter().collect::<Vec<_>>();
2375 let effects = self.effects_store.multi_get_executed_effects(&pending);
2376 let effects = effects
2377 .into_iter()
2378 .zip(pending)
2379 .map(|(opt, digest)| match opt {
2380 Some(x) => x,
2381 None => panic!(
2382 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2383 digest
2384 ),
2385 })
2386 .collect::<Vec<_>>();
2387 roots = effects;
2388 }
2389
2390 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2391 Ok(results)
2392 }
2393
2394 #[cfg(msim)]
2397 fn expensive_consensus_commit_prologue_invariants_check(
2398 &self,
2399 root_digests: &[TransactionDigest],
2400 sorted: &[TransactionEffects],
2401 ) {
2402 if !self
2403 .epoch_store
2404 .protocol_config()
2405 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2406 {
2407 return;
2408 }
2409
2410 let root_txs = self
2412 .state
2413 .get_transaction_cache_reader()
2414 .multi_get_transaction_blocks(root_digests);
2415 let ccps = root_txs
2416 .iter()
2417 .filter_map(|tx| {
2418 if let Some(tx) = tx {
2419 if tx.transaction_data().is_consensus_commit_prologue() {
2420 Some(tx)
2421 } else {
2422 None
2423 }
2424 } else {
2425 None
2426 }
2427 })
2428 .collect::<Vec<_>>();
2429
2430 assert!(ccps.len() <= 1);
2432
2433 let txs = self
2435 .state
2436 .get_transaction_cache_reader()
2437 .multi_get_transaction_blocks(
2438 &sorted
2439 .iter()
2440 .map(|tx| tx.transaction_digest().clone())
2441 .collect::<Vec<_>>(),
2442 );
2443
2444 if ccps.len() == 0 {
2445 for tx in txs.iter() {
2448 if let Some(tx) = tx {
2449 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2450 }
2451 }
2452 } else {
2453 assert!(
2455 txs[0]
2456 .as_ref()
2457 .unwrap()
2458 .transaction_data()
2459 .is_consensus_commit_prologue()
2460 );
2461
2462 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2463
2464 for tx in txs.iter().skip(1) {
2465 if let Some(tx) = tx {
2466 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2467 }
2468 }
2469 }
2470 }
2471}
2472
2473impl CheckpointAggregator {
2474 fn new(
2475 tables: Arc<CheckpointStore>,
2476 epoch_store: Arc<AuthorityPerEpochStore>,
2477 notify: Arc<Notify>,
2478 output: Box<dyn CertifiedCheckpointOutput>,
2479 state: Arc<AuthorityState>,
2480 metrics: Arc<CheckpointMetrics>,
2481 ) -> Self {
2482 let current = None;
2483 Self {
2484 store: tables,
2485 epoch_store,
2486 notify,
2487 current,
2488 output,
2489 state,
2490 metrics,
2491 }
2492 }
2493
2494 async fn run(mut self) {
2495 info!("Starting CheckpointAggregator");
2496 loop {
2497 if let Err(e) = self.run_and_notify().await {
2498 error!(
2499 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2500 e
2501 );
2502 self.metrics.checkpoint_errors.inc();
2503 tokio::time::sleep(Duration::from_secs(1)).await;
2504 continue;
2505 }
2506
2507 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2508 }
2509 }
2510
2511 async fn run_and_notify(&mut self) -> SuiResult {
2512 let summaries = self.run_inner()?;
2513 for summary in summaries {
2514 self.output.certified_checkpoint_created(&summary).await?;
2515 }
2516 Ok(())
2517 }
2518
2519 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2520 let _scope = monitored_scope("CheckpointAggregator");
2521 let mut result = vec![];
2522 'outer: loop {
2523 let next_to_certify = self.next_checkpoint_to_certify()?;
2524 let current = if let Some(current) = &mut self.current {
2525 if current.summary.sequence_number < next_to_certify {
2531 assert_reachable!("skip checkpoint certification");
2532 self.current = None;
2533 continue;
2534 }
2535 current
2536 } else {
2537 let Some(summary) = self
2538 .epoch_store
2539 .get_built_checkpoint_summary(next_to_certify)?
2540 else {
2541 return Ok(result);
2542 };
2543 self.current = Some(CheckpointSignatureAggregator {
2544 next_index: 0,
2545 digest: summary.digest(),
2546 summary,
2547 signatures_by_digest: MultiStakeAggregator::new(
2548 self.epoch_store.committee().clone(),
2549 ),
2550 store: self.store.clone(),
2551 state: self.state.clone(),
2552 metrics: self.metrics.clone(),
2553 });
2554 self.current.as_mut().unwrap()
2555 };
2556
2557 let epoch_tables = self
2558 .epoch_store
2559 .tables()
2560 .expect("should not run past end of epoch");
2561 let iter = epoch_tables
2562 .pending_checkpoint_signatures
2563 .safe_iter_with_bounds(
2564 Some((current.summary.sequence_number, current.next_index)),
2565 None,
2566 );
2567 for item in iter {
2568 let ((seq, index), data) = item?;
2569 if seq != current.summary.sequence_number {
2570 trace!(
2571 checkpoint_seq =? current.summary.sequence_number,
2572 "Not enough checkpoint signatures",
2573 );
2574 return Ok(result);
2576 }
2577 trace!(
2578 checkpoint_seq = current.summary.sequence_number,
2579 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2580 current.summary.digest(),
2581 data.summary.auth_sig().authority.concise()
2582 );
2583 self.metrics
2584 .checkpoint_participation
2585 .with_label_values(&[&format!(
2586 "{:?}",
2587 data.summary.auth_sig().authority.concise()
2588 )])
2589 .inc();
2590 if let Ok(auth_signature) = current.try_aggregate(data) {
2591 debug!(
2592 checkpoint_seq = current.summary.sequence_number,
2593 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2594 current.summary.digest(),
2595 );
2596 let summary = VerifiedCheckpoint::new_unchecked(
2597 CertifiedCheckpointSummary::new_from_data_and_sig(
2598 current.summary.clone(),
2599 auth_signature,
2600 ),
2601 );
2602
2603 self.store.insert_certified_checkpoint(&summary)?;
2604 self.metrics
2605 .last_certified_checkpoint
2606 .set(current.summary.sequence_number as i64);
2607 current.summary.report_checkpoint_age(
2608 &self.metrics.last_certified_checkpoint_age,
2609 &self.metrics.last_certified_checkpoint_age_ms,
2610 );
2611 result.push(summary.into_inner());
2612 self.current = None;
2613 continue 'outer;
2614 } else {
2615 current.next_index = index + 1;
2616 }
2617 }
2618 break;
2619 }
2620 Ok(result)
2621 }
2622
2623 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2624 Ok(self
2625 .store
2626 .tables
2627 .certified_checkpoints
2628 .reversed_safe_iter_with_bounds(None, None)?
2629 .next()
2630 .transpose()?
2631 .map(|(seq, _)| seq + 1)
2632 .unwrap_or_default())
2633 }
2634}
2635
2636impl CheckpointSignatureAggregator {
2637 #[allow(clippy::result_unit_err)]
2638 pub fn try_aggregate(
2639 &mut self,
2640 data: CheckpointSignatureMessage,
2641 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2642 let their_digest = *data.summary.digest();
2643 let (_, signature) = data.summary.into_data_and_sig();
2644 let author = signature.authority;
2645 let envelope =
2646 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2647 match self.signatures_by_digest.insert(their_digest, envelope) {
2648 InsertResult::Failed { error }
2650 if matches!(
2651 error.as_inner(),
2652 SuiErrorKind::StakeAggregatorRepeatedSigner {
2653 conflicting_sig: false,
2654 ..
2655 },
2656 ) =>
2657 {
2658 Err(())
2659 }
2660 InsertResult::Failed { error } => {
2661 warn!(
2662 checkpoint_seq = self.summary.sequence_number,
2663 "Failed to aggregate new signature from validator {:?}: {:?}",
2664 author.concise(),
2665 error
2666 );
2667 self.check_for_split_brain();
2668 Err(())
2669 }
2670 InsertResult::QuorumReached(cert) => {
2671 if their_digest != self.digest {
2674 self.metrics.remote_checkpoint_forks.inc();
2675 warn!(
2676 checkpoint_seq = self.summary.sequence_number,
2677 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2678 author.concise(),
2679 their_digest,
2680 self.digest
2681 );
2682 return Err(());
2683 }
2684 Ok(cert)
2685 }
2686 InsertResult::NotEnoughVotes {
2687 bad_votes: _,
2688 bad_authorities: _,
2689 } => {
2690 self.check_for_split_brain();
2691 Err(())
2692 }
2693 }
2694 }
2695
2696 fn check_for_split_brain(&self) {
2700 debug!(
2701 checkpoint_seq = self.summary.sequence_number,
2702 "Checking for split brain condition"
2703 );
2704 if self.signatures_by_digest.quorum_unreachable() {
2705 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2711 let digests_by_stake_messages = all_unique_values
2712 .iter()
2713 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2714 .map(|(digest, (_authorities, total_stake))| {
2715 format!("{:?} (total stake: {})", digest, total_stake)
2716 })
2717 .collect::<Vec<String>>();
2718 fail_point_arg!("kill_split_brain_node", |(
2719 checkpoint_overrides,
2720 forked_authorities,
2721 ): (
2722 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2723 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2724 )| {
2725 #[cfg(msim)]
2726 {
2727 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2728 (checkpoint_overrides.lock(), forked_authorities.lock())
2729 {
2730 let correct_digest = all_unique_values
2732 .iter()
2733 .find(|(_, (authorities, _))| {
2734 authorities
2736 .iter()
2737 .any(|auth| !forked_authorities_set.contains(auth))
2738 })
2739 .map(|(digest, _)| digest.to_string())
2740 .unwrap_or_else(|| {
2741 all_unique_values
2743 .iter()
2744 .max_by_key(|(_, (_, stake))| *stake)
2745 .map(|(digest, _)| digest.to_string())
2746 .unwrap_or_else(|| self.digest.to_string())
2747 });
2748
2749 overrides.insert(self.summary.sequence_number, correct_digest.clone());
2750
2751 tracing::error!(
2752 fatal = true,
2753 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2754 self.summary.sequence_number,
2755 correct_digest
2756 );
2757 }
2758 }
2759 });
2760
2761 debug_fatal!(
2762 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2763 self.summary.sequence_number,
2764 self.signatures_by_digest.uncommitted_stake(),
2765 digests_by_stake_messages
2766 );
2767 self.metrics.split_brain_checkpoint_forks.inc();
2768
2769 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2770 let local_summary = self.summary.clone();
2771 let state = self.state.clone();
2772 let tables = self.store.clone();
2773
2774 tokio::spawn(async move {
2775 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2776 });
2777 }
2778 }
2779}
2780
2781async fn diagnose_split_brain(
2787 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2788 local_summary: CheckpointSummary,
2789 state: Arc<AuthorityState>,
2790 tables: Arc<CheckpointStore>,
2791) {
2792 debug!(
2793 checkpoint_seq = local_summary.sequence_number,
2794 "Running split brain diagnostics..."
2795 );
2796 let time = SystemTime::now();
2797 let digest_to_validator = all_unique_values
2799 .iter()
2800 .filter_map(|(digest, (validators, _))| {
2801 if *digest != local_summary.digest() {
2802 let random_validator = validators.choose(&mut get_rng()).unwrap();
2803 Some((*digest, *random_validator))
2804 } else {
2805 None
2806 }
2807 })
2808 .collect::<HashMap<_, _>>();
2809 if digest_to_validator.is_empty() {
2810 panic!(
2811 "Given split brain condition, there should be at \
2812 least one validator that disagrees with local signature"
2813 );
2814 }
2815
2816 let epoch_store = state.load_epoch_store_one_call_per_task();
2817 let committee = epoch_store
2818 .epoch_start_state()
2819 .get_sui_committee_with_network_metadata();
2820 let network_config = default_mysten_network_config();
2821 let network_clients =
2822 make_network_authority_clients_with_network_config(&committee, &network_config);
2823
2824 let response_futures = digest_to_validator
2826 .values()
2827 .cloned()
2828 .map(|validator| {
2829 let client = network_clients
2830 .get(&validator)
2831 .expect("Failed to get network client");
2832 let request = CheckpointRequestV2 {
2833 sequence_number: Some(local_summary.sequence_number),
2834 request_content: true,
2835 certified: false,
2836 };
2837 client.handle_checkpoint_v2(request)
2838 })
2839 .collect::<Vec<_>>();
2840
2841 let digest_name_pair = digest_to_validator.iter();
2842 let response_data = futures::future::join_all(response_futures)
2843 .await
2844 .into_iter()
2845 .zip(digest_name_pair)
2846 .filter_map(|(response, (digest, name))| match response {
2847 Ok(response) => match response {
2848 CheckpointResponseV2 {
2849 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2850 contents: Some(contents),
2851 } => Some((*name, *digest, summary, contents)),
2852 CheckpointResponseV2 {
2853 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2854 contents: _,
2855 } => {
2856 panic!("Expected pending checkpoint, but got certified checkpoint");
2857 }
2858 CheckpointResponseV2 {
2859 checkpoint: None,
2860 contents: _,
2861 } => {
2862 error!(
2863 "Summary for checkpoint {:?} not found on validator {:?}",
2864 local_summary.sequence_number, name
2865 );
2866 None
2867 }
2868 CheckpointResponseV2 {
2869 checkpoint: _,
2870 contents: None,
2871 } => {
2872 error!(
2873 "Contents for checkpoint {:?} not found on validator {:?}",
2874 local_summary.sequence_number, name
2875 );
2876 None
2877 }
2878 },
2879 Err(e) => {
2880 error!(
2881 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2882 e
2883 );
2884 None
2885 }
2886 })
2887 .collect::<Vec<_>>();
2888
2889 let local_checkpoint_contents = tables
2890 .get_checkpoint_contents(&local_summary.content_digest)
2891 .unwrap_or_else(|_| {
2892 panic!(
2893 "Could not find checkpoint contents for digest {:?}",
2894 local_summary.digest()
2895 )
2896 })
2897 .unwrap_or_else(|| {
2898 panic!(
2899 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2900 local_summary.sequence_number,
2901 local_summary.digest()
2902 )
2903 });
2904 let local_contents_text = format!("{local_checkpoint_contents:?}");
2905
2906 let local_summary_text = format!("{local_summary:?}");
2907 let local_validator = state.name.concise();
2908 let diff_patches = response_data
2909 .iter()
2910 .map(|(name, other_digest, other_summary, contents)| {
2911 let other_contents_text = format!("{contents:?}");
2912 let other_summary_text = format!("{other_summary:?}");
2913 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2914 .enumerate_transactions(&local_summary)
2915 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2916 .unzip();
2917 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2918 .enumerate_transactions(other_summary)
2919 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2920 .unzip();
2921 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2922 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2923 let local_transactions_text = format!("{local_transactions:#?}");
2924 let other_transactions_text = format!("{other_transactions:#?}");
2925 let transactions_patch =
2926 create_patch(&local_transactions_text, &other_transactions_text);
2927 let local_effects_text = format!("{local_effects:#?}");
2928 let other_effects_text = format!("{other_effects:#?}");
2929 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2930 let seq_number = local_summary.sequence_number;
2931 let local_digest = local_summary.digest();
2932 let other_validator = name.concise();
2933 format!(
2934 "Checkpoint: {seq_number:?}\n\
2935 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2936 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2937 Summary Diff: \n{summary_patch}\n\n\
2938 Contents Diff: \n{contents_patch}\n\n\
2939 Transactions Diff: \n{transactions_patch}\n\n\
2940 Effects Diff: \n{effects_patch}",
2941 )
2942 })
2943 .collect::<Vec<_>>()
2944 .join("\n\n\n");
2945
2946 let header = format!(
2947 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2948 Datetime: {:?}",
2949 time
2950 );
2951 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2952 let path = tempfile::tempdir()
2953 .expect("Failed to create tempdir")
2954 .keep()
2955 .join(Path::new("checkpoint_fork_dump.txt"));
2956 let mut file = File::create(path).unwrap();
2957 write!(file, "{}", fork_logs_text).unwrap();
2958 debug!("{}", fork_logs_text);
2959}
2960
2961pub trait CheckpointServiceNotify {
2962 fn notify_checkpoint_signature(
2963 &self,
2964 epoch_store: &AuthorityPerEpochStore,
2965 info: &CheckpointSignatureMessage,
2966 ) -> SuiResult;
2967
2968 fn notify_checkpoint(&self) -> SuiResult;
2969}
2970
2971#[allow(clippy::large_enum_variant)]
2972enum CheckpointServiceState {
2973 Unstarted(
2974 (
2975 CheckpointBuilder,
2976 CheckpointAggregator,
2977 CheckpointStateHasher,
2978 ),
2979 ),
2980 Started,
2981}
2982
2983impl CheckpointServiceState {
2984 fn take_unstarted(
2985 &mut self,
2986 ) -> (
2987 CheckpointBuilder,
2988 CheckpointAggregator,
2989 CheckpointStateHasher,
2990 ) {
2991 let mut state = CheckpointServiceState::Started;
2992 std::mem::swap(self, &mut state);
2993
2994 match state {
2995 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2996 (builder, aggregator, hasher)
2997 }
2998 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2999 }
3000 }
3001}
3002
3003pub struct CheckpointService {
3004 tables: Arc<CheckpointStore>,
3005 notify_builder: Arc<Notify>,
3006 notify_aggregator: Arc<Notify>,
3007 last_signature_index: Mutex<u64>,
3008 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
3010 highest_previously_built_seq: CheckpointSequenceNumber,
3013 metrics: Arc<CheckpointMetrics>,
3014 state: Mutex<CheckpointServiceState>,
3015}
3016
3017impl CheckpointService {
3018 pub fn build(
3020 state: Arc<AuthorityState>,
3021 checkpoint_store: Arc<CheckpointStore>,
3022 epoch_store: Arc<AuthorityPerEpochStore>,
3023 effects_store: Arc<dyn TransactionCacheRead>,
3024 global_state_hasher: Weak<GlobalStateHasher>,
3025 checkpoint_output: Box<dyn CheckpointOutput>,
3026 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
3027 metrics: Arc<CheckpointMetrics>,
3028 max_transactions_per_checkpoint: usize,
3029 max_checkpoint_size_bytes: usize,
3030 ) -> Arc<Self> {
3031 info!(
3032 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
3033 );
3034 let notify_builder = Arc::new(Notify::new());
3035 let notify_aggregator = Arc::new(Notify::new());
3036
3037 let highest_previously_built_seq = checkpoint_store
3039 .get_latest_locally_computed_checkpoint()
3040 .expect("failed to get latest locally computed checkpoint")
3041 .map(|s| s.sequence_number)
3042 .unwrap_or(0);
3043
3044 let highest_currently_built_seq =
3045 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
3046 .expect("epoch should not have ended")
3047 .map(|(seq, _)| seq)
3048 .unwrap_or(0);
3049
3050 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
3051
3052 let aggregator = CheckpointAggregator::new(
3053 checkpoint_store.clone(),
3054 epoch_store.clone(),
3055 notify_aggregator.clone(),
3056 certified_checkpoint_output,
3057 state.clone(),
3058 metrics.clone(),
3059 );
3060
3061 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3062
3063 let ckpt_state_hasher = CheckpointStateHasher::new(
3064 epoch_store.clone(),
3065 global_state_hasher.clone(),
3066 receive_from_builder,
3067 );
3068
3069 let builder = CheckpointBuilder::new(
3070 state.clone(),
3071 checkpoint_store.clone(),
3072 epoch_store.clone(),
3073 notify_builder.clone(),
3074 effects_store,
3075 global_state_hasher,
3076 send_to_hasher,
3077 checkpoint_output,
3078 notify_aggregator.clone(),
3079 highest_currently_built_seq_tx.clone(),
3080 metrics.clone(),
3081 max_transactions_per_checkpoint,
3082 max_checkpoint_size_bytes,
3083 );
3084
3085 let last_signature_index = epoch_store
3086 .get_last_checkpoint_signature_index()
3087 .expect("should not cross end of epoch");
3088 let last_signature_index = Mutex::new(last_signature_index);
3089
3090 Arc::new(Self {
3091 tables: checkpoint_store,
3092 notify_builder,
3093 notify_aggregator,
3094 last_signature_index,
3095 highest_currently_built_seq_tx,
3096 highest_previously_built_seq,
3097 metrics,
3098 state: Mutex::new(CheckpointServiceState::Unstarted((
3099 builder,
3100 aggregator,
3101 ckpt_state_hasher,
3102 ))),
3103 })
3104 }
3105
3106 pub async fn spawn(
3114 &self,
3115 epoch_store: Arc<AuthorityPerEpochStore>,
3116 consensus_replay_waiter: Option<ReplayWaiter>,
3117 ) {
3118 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3119
3120 if let Some(last_committed_seq) = self
3123 .tables
3124 .get_highest_executed_checkpoint()
3125 .expect("Failed to get highest executed checkpoint")
3126 .map(|checkpoint| *checkpoint.sequence_number())
3127 {
3128 if let Err(e) = builder
3129 .epoch_store
3130 .clear_state_hashes_after_checkpoint(last_committed_seq)
3131 {
3132 error!(
3133 "Failed to clear state hashes after checkpoint {}: {:?}",
3134 last_committed_seq, e
3135 );
3136 } else {
3137 info!(
3138 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3139 last_committed_seq
3140 );
3141 }
3142 }
3143
3144 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3145
3146 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3147 let aggregator_task = spawn_monitored_task!(aggregator.run());
3148
3149 spawn_monitored_task!(async move {
3150 epoch_store
3151 .within_alive_epoch(async move {
3152 builder.run(consensus_replay_waiter).await;
3153 builder_finished_tx.send(()).ok();
3154 })
3155 .await
3156 .ok();
3157
3158 state_hasher_task
3160 .await
3161 .expect("state hasher should exit normally");
3162
3163 aggregator_task.abort();
3166 aggregator_task.await.ok();
3167 });
3168
3169 if tokio::time::timeout(Duration::from_secs(120), async move {
3175 tokio::select! {
3176 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3177 _ = self.wait_for_rebuilt_checkpoints() => (),
3178 }
3179 })
3180 .await
3181 .is_err()
3182 {
3183 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3184 }
3185 }
3186}
3187
3188impl CheckpointService {
3189 pub async fn wait_for_rebuilt_checkpoints(&self) {
3195 let highest_previously_built_seq = self.highest_previously_built_seq;
3196 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3197 let mut highest_currently_built_seq = *rx.borrow_and_update();
3198 info!(
3199 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3200 );
3201 loop {
3202 if highest_currently_built_seq >= highest_previously_built_seq {
3203 info!("Checkpoint rebuild complete");
3204 break;
3205 }
3206 rx.changed().await.unwrap();
3207 highest_currently_built_seq = *rx.borrow_and_update();
3208 }
3209 }
3210
3211 #[cfg(test)]
3212 fn write_and_notify_checkpoint_for_testing(
3213 &self,
3214 epoch_store: &AuthorityPerEpochStore,
3215 checkpoint: PendingCheckpoint,
3216 ) -> SuiResult {
3217 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3218
3219 let mut output = ConsensusCommitOutput::new(0);
3220 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3221 output.set_default_commit_stats_for_testing();
3222 epoch_store.push_consensus_output_for_tests(output);
3223 self.notify_checkpoint()?;
3224 Ok(())
3225 }
3226}
3227
3228impl CheckpointServiceNotify for CheckpointService {
3229 fn notify_checkpoint_signature(
3230 &self,
3231 epoch_store: &AuthorityPerEpochStore,
3232 info: &CheckpointSignatureMessage,
3233 ) -> SuiResult {
3234 let sequence = info.summary.sequence_number;
3235 let signer = info.summary.auth_sig().authority.concise();
3236
3237 if let Some(highest_verified_checkpoint) = self
3238 .tables
3239 .get_highest_verified_checkpoint()?
3240 .map(|x| *x.sequence_number())
3241 && sequence <= highest_verified_checkpoint
3242 {
3243 trace!(
3244 checkpoint_seq = sequence,
3245 "Ignore checkpoint signature from {} - already certified", signer,
3246 );
3247 self.metrics
3248 .last_ignored_checkpoint_signature_received
3249 .set(sequence as i64);
3250 return Ok(());
3251 }
3252 trace!(
3253 checkpoint_seq = sequence,
3254 "Received checkpoint signature, digest {} from {}",
3255 info.summary.digest(),
3256 signer,
3257 );
3258 self.metrics
3259 .last_received_checkpoint_signatures
3260 .with_label_values(&[&signer.to_string()])
3261 .set(sequence as i64);
3262 let mut index = self.last_signature_index.lock();
3265 *index += 1;
3266 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3267 self.notify_aggregator.notify_one();
3268 Ok(())
3269 }
3270
3271 fn notify_checkpoint(&self) -> SuiResult {
3272 self.notify_builder.notify_one();
3273 Ok(())
3274 }
3275}
3276
3277pub struct CheckpointServiceNoop {}
3279impl CheckpointServiceNotify for CheckpointServiceNoop {
3280 fn notify_checkpoint_signature(
3281 &self,
3282 _: &AuthorityPerEpochStore,
3283 _: &CheckpointSignatureMessage,
3284 ) -> SuiResult {
3285 Ok(())
3286 }
3287
3288 fn notify_checkpoint(&self) -> SuiResult {
3289 Ok(())
3290 }
3291}
3292
3293impl PendingCheckpoint {
3294 pub fn height(&self) -> CheckpointHeight {
3295 self.details.checkpoint_height
3296 }
3297
3298 pub fn roots(&self) -> &Vec<TransactionKey> {
3299 &self.roots
3300 }
3301
3302 pub fn details(&self) -> &PendingCheckpointInfo {
3303 &self.details
3304 }
3305}
3306
3307pin_project! {
3308 pub struct PollCounter<Fut> {
3309 #[pin]
3310 future: Fut,
3311 count: usize,
3312 }
3313}
3314
3315impl<Fut> PollCounter<Fut> {
3316 pub fn new(future: Fut) -> Self {
3317 Self { future, count: 0 }
3318 }
3319
3320 pub fn count(&self) -> usize {
3321 self.count
3322 }
3323}
3324
3325impl<Fut: Future> Future for PollCounter<Fut> {
3326 type Output = (usize, Fut::Output);
3327
3328 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3329 let this = self.project();
3330 *this.count += 1;
3331 match this.future.poll(cx) {
3332 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3333 Poll::Pending => Poll::Pending,
3334 }
3335 }
3336}
3337
3338fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3339 PollCounter::new(future)
3340}
3341
3342#[cfg(test)]
3343mod tests {
3344 use super::*;
3345 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3346 use crate::transaction_outputs::TransactionOutputs;
3347 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3348 use futures::FutureExt as _;
3349 use futures::future::BoxFuture;
3350 use std::collections::HashMap;
3351 use std::ops::Deref;
3352 use sui_macros::sim_test;
3353 use sui_protocol_config::{Chain, ProtocolConfig};
3354 use sui_types::accumulator_event::AccumulatorEvent;
3355 use sui_types::authenticator_state::ActiveJwk;
3356 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3357 use sui_types::crypto::Signature;
3358 use sui_types::effects::{TransactionEffects, TransactionEvents};
3359 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3360 use sui_types::transaction::VerifiedTransaction;
3361 use tokio::sync::mpsc;
3362
3363 #[tokio::test]
3364 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3365 let store = CheckpointStore::new_for_tests();
3366 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3367 for seq in 70u64..=80u64 {
3368 let contents =
3369 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3370 [sui_types::base_types::ExecutionDigests::new(
3371 sui_types::digests::TransactionDigest::random(),
3372 sui_types::digests::TransactionEffectsDigest::ZERO,
3373 )],
3374 );
3375 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3376 &protocol,
3377 0,
3378 seq,
3379 0,
3380 &contents,
3381 None,
3382 sui_types::gas::GasCostSummary::default(),
3383 None,
3384 0,
3385 Vec::new(),
3386 Vec::new(),
3387 );
3388 store
3389 .tables
3390 .locally_computed_checkpoints
3391 .insert(&seq, &summary)
3392 .unwrap();
3393 }
3394
3395 store
3396 .clear_locally_computed_checkpoints_from(76)
3397 .expect("clear should succeed");
3398
3399 assert!(
3401 store
3402 .tables
3403 .locally_computed_checkpoints
3404 .get(&75)
3405 .unwrap()
3406 .is_some()
3407 );
3408 assert!(
3409 store
3410 .tables
3411 .locally_computed_checkpoints
3412 .get(&76)
3413 .unwrap()
3414 .is_none()
3415 );
3416
3417 for seq in 70u64..76u64 {
3418 assert!(
3419 store
3420 .tables
3421 .locally_computed_checkpoints
3422 .get(&seq)
3423 .unwrap()
3424 .is_some()
3425 );
3426 }
3427 for seq in 76u64..=80u64 {
3428 assert!(
3429 store
3430 .tables
3431 .locally_computed_checkpoints
3432 .get(&seq)
3433 .unwrap()
3434 .is_none()
3435 );
3436 }
3437 }
3438
3439 #[tokio::test]
3440 async fn test_fork_detection_storage() {
3441 let store = CheckpointStore::new_for_tests();
3442 let seq_num = 42;
3444 let digest = CheckpointDigest::random();
3445
3446 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3447
3448 store
3449 .record_checkpoint_fork_detected(seq_num, digest)
3450 .unwrap();
3451
3452 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3453 assert!(retrieved.is_some());
3454 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3455 assert_eq!(retrieved_seq, seq_num);
3456 assert_eq!(retrieved_digest, digest);
3457
3458 store.clear_checkpoint_fork_detected().unwrap();
3459 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3460
3461 let tx_digest = TransactionDigest::random();
3463 let expected_effects = TransactionEffectsDigest::random();
3464 let actual_effects = TransactionEffectsDigest::random();
3465
3466 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3467
3468 store
3469 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3470 .unwrap();
3471
3472 let retrieved = store.get_transaction_fork_detected().unwrap();
3473 assert!(retrieved.is_some());
3474 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3475 assert_eq!(retrieved_tx, tx_digest);
3476 assert_eq!(retrieved_expected, expected_effects);
3477 assert_eq!(retrieved_actual, actual_effects);
3478
3479 store.clear_transaction_fork_detected().unwrap();
3480 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3481 }
3482
3483 #[sim_test]
3484 pub async fn checkpoint_builder_test() {
3485 telemetry_subscribers::init_for_testing();
3486
3487 let mut protocol_config =
3488 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3489 protocol_config.disable_accumulators_for_testing();
3490 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3491 let state = TestAuthorityBuilder::new()
3492 .with_protocol_config(protocol_config)
3493 .build()
3494 .await;
3495
3496 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3497 0,
3498 0,
3499 vec![],
3500 SequenceNumber::new(),
3501 );
3502
3503 let jwks = {
3504 let mut jwks = Vec::new();
3505 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3506 jwks.push(ActiveJwk {
3507 jwk_id: JwkId::new(
3508 "https://accounts.google.com".to_string(),
3509 "1234567890".to_string(),
3510 ),
3511 jwk: JWK {
3512 kty: "RSA".to_string(),
3513 e: "AQAB".to_string(),
3514 n: "1234567890".to_string(),
3515 alg: "RS256".to_string(),
3516 },
3517 epoch: 0,
3518 });
3519 }
3520 jwks
3521 };
3522
3523 let dummy_tx_with_data =
3524 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3525
3526 for i in 0..15 {
3527 state
3528 .database_for_testing()
3529 .perpetual_tables
3530 .transactions
3531 .insert(&d(i), dummy_tx.serializable_ref())
3532 .unwrap();
3533 }
3534 for i in 15..20 {
3535 state
3536 .database_for_testing()
3537 .perpetual_tables
3538 .transactions
3539 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3540 .unwrap();
3541 }
3542
3543 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3544 commit_cert_for_test(
3545 &mut store,
3546 state.clone(),
3547 d(1),
3548 vec![d(2), d(3)],
3549 GasCostSummary::new(11, 12, 11, 1),
3550 );
3551 commit_cert_for_test(
3552 &mut store,
3553 state.clone(),
3554 d(2),
3555 vec![d(3), d(4)],
3556 GasCostSummary::new(21, 22, 21, 1),
3557 );
3558 commit_cert_for_test(
3559 &mut store,
3560 state.clone(),
3561 d(3),
3562 vec![],
3563 GasCostSummary::new(31, 32, 31, 1),
3564 );
3565 commit_cert_for_test(
3566 &mut store,
3567 state.clone(),
3568 d(4),
3569 vec![],
3570 GasCostSummary::new(41, 42, 41, 1),
3571 );
3572 for i in [5, 6, 7, 10, 11, 12, 13] {
3573 commit_cert_for_test(
3574 &mut store,
3575 state.clone(),
3576 d(i),
3577 vec![],
3578 GasCostSummary::new(41, 42, 41, 1),
3579 );
3580 }
3581 for i in [15, 16, 17] {
3582 commit_cert_for_test(
3583 &mut store,
3584 state.clone(),
3585 d(i),
3586 vec![],
3587 GasCostSummary::new(51, 52, 51, 1),
3588 );
3589 }
3590 let all_digests: Vec<_> = store.keys().copied().collect();
3591 for digest in all_digests {
3592 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3593 state
3594 .epoch_store_for_testing()
3595 .test_insert_user_signature(digest, vec![(signature, None)]);
3596 }
3597
3598 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3599 let (certified_output, mut certified_result) =
3600 mpsc::channel::<CertifiedCheckpointSummary>(10);
3601 let store = Arc::new(store);
3602
3603 let ckpt_dir = tempfile::tempdir().unwrap();
3604 let checkpoint_store =
3605 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3606 let epoch_store = state.epoch_store_for_testing();
3607
3608 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3609 state.get_global_state_hash_store().clone(),
3610 ));
3611
3612 let checkpoint_service = CheckpointService::build(
3613 state.clone(),
3614 checkpoint_store,
3615 epoch_store.clone(),
3616 store,
3617 Arc::downgrade(&global_state_hasher),
3618 Box::new(output),
3619 Box::new(certified_output),
3620 CheckpointMetrics::new_for_tests(),
3621 3,
3622 100_000,
3623 );
3624 checkpoint_service.spawn(epoch_store.clone(), None).await;
3625
3626 checkpoint_service
3627 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3628 .unwrap();
3629 checkpoint_service
3630 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3631 .unwrap();
3632 checkpoint_service
3633 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3634 .unwrap();
3635 checkpoint_service
3636 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3637 .unwrap();
3638 checkpoint_service
3639 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3640 .unwrap();
3641 checkpoint_service
3642 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3643 .unwrap();
3644
3645 let (c1c, c1s) = result.recv().await.unwrap();
3646 let (c2c, c2s) = result.recv().await.unwrap();
3647
3648 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3649 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3650 assert_eq!(c1t, vec![d(4)]);
3651 assert_eq!(c1s.previous_digest, None);
3652 assert_eq!(c1s.sequence_number, 0);
3653 assert_eq!(
3654 c1s.epoch_rolling_gas_cost_summary,
3655 GasCostSummary::new(41, 42, 41, 1)
3656 );
3657
3658 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3659 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3660 assert_eq!(c2s.sequence_number, 1);
3661 assert_eq!(
3662 c2s.epoch_rolling_gas_cost_summary,
3663 GasCostSummary::new(104, 108, 104, 4)
3664 );
3665
3666 let (c3c, c3s) = result.recv().await.unwrap();
3669 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3670 let (c4c, c4s) = result.recv().await.unwrap();
3671 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3672 assert_eq!(c3s.sequence_number, 2);
3673 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3674 assert_eq!(c4s.sequence_number, 3);
3675 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3676 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3677 assert_eq!(c4t, vec![d(13)]);
3678
3679 let (c5c, c5s) = result.recv().await.unwrap();
3682 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3683 let (c6c, c6s) = result.recv().await.unwrap();
3684 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3685 assert_eq!(c5s.sequence_number, 4);
3686 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3687 assert_eq!(c6s.sequence_number, 5);
3688 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3689 assert_eq!(c5t, vec![d(15), d(16)]);
3690 assert_eq!(c6t, vec![d(17)]);
3691
3692 let (c7c, c7s) = result.recv().await.unwrap();
3695 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3696 assert_eq!(c7t, vec![d(5), d(6)]);
3697 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3698 assert_eq!(c7s.sequence_number, 6);
3699
3700 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3701 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3702
3703 checkpoint_service
3704 .notify_checkpoint_signature(
3705 &epoch_store,
3706 &CheckpointSignatureMessage { summary: c2ss },
3707 )
3708 .unwrap();
3709 checkpoint_service
3710 .notify_checkpoint_signature(
3711 &epoch_store,
3712 &CheckpointSignatureMessage { summary: c1ss },
3713 )
3714 .unwrap();
3715
3716 let c1sc = certified_result.recv().await.unwrap();
3717 let c2sc = certified_result.recv().await.unwrap();
3718 assert_eq!(c1sc.sequence_number, 0);
3719 assert_eq!(c2sc.sequence_number, 1);
3720 }
3721
3722 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3723 fn notify_read_executed_effects(
3724 &self,
3725 _: &str,
3726 digests: &[TransactionDigest],
3727 ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3728 std::future::ready(
3729 digests
3730 .iter()
3731 .map(|d| self.get(d).expect("effects not found").clone())
3732 .collect(),
3733 )
3734 .boxed()
3735 }
3736
3737 fn notify_read_executed_effects_digests(
3738 &self,
3739 _: &str,
3740 digests: &[TransactionDigest],
3741 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3742 std::future::ready(
3743 digests
3744 .iter()
3745 .map(|d| {
3746 self.get(d)
3747 .map(|fx| fx.digest())
3748 .expect("effects not found")
3749 })
3750 .collect(),
3751 )
3752 .boxed()
3753 }
3754
3755 fn multi_get_executed_effects(
3756 &self,
3757 digests: &[TransactionDigest],
3758 ) -> Vec<Option<TransactionEffects>> {
3759 digests.iter().map(|d| self.get(d).cloned()).collect()
3760 }
3761
3762 fn multi_get_transaction_blocks(
3768 &self,
3769 _: &[TransactionDigest],
3770 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3771 unimplemented!()
3772 }
3773
3774 fn multi_get_executed_effects_digests(
3775 &self,
3776 _: &[TransactionDigest],
3777 ) -> Vec<Option<TransactionEffectsDigest>> {
3778 unimplemented!()
3779 }
3780
3781 fn multi_get_effects(
3782 &self,
3783 _: &[TransactionEffectsDigest],
3784 ) -> Vec<Option<TransactionEffects>> {
3785 unimplemented!()
3786 }
3787
3788 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3789 unimplemented!()
3790 }
3791
3792 fn get_mysticeti_fastpath_outputs(
3793 &self,
3794 _: &TransactionDigest,
3795 ) -> Option<Arc<TransactionOutputs>> {
3796 unimplemented!()
3797 }
3798
3799 fn notify_read_fastpath_transaction_outputs<'a>(
3800 &'a self,
3801 _: &'a [TransactionDigest],
3802 ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3803 unimplemented!()
3804 }
3805
3806 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3807 unimplemented!()
3808 }
3809
3810 fn get_unchanged_loaded_runtime_objects(
3811 &self,
3812 _digest: &TransactionDigest,
3813 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3814 unimplemented!()
3815 }
3816 }
3817
3818 #[async_trait::async_trait]
3819 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3820 async fn checkpoint_created(
3821 &self,
3822 summary: &CheckpointSummary,
3823 contents: &CheckpointContents,
3824 _epoch_store: &Arc<AuthorityPerEpochStore>,
3825 _checkpoint_store: &Arc<CheckpointStore>,
3826 ) -> SuiResult {
3827 self.try_send((contents.clone(), summary.clone())).unwrap();
3828 Ok(())
3829 }
3830 }
3831
3832 #[async_trait::async_trait]
3833 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3834 async fn certified_checkpoint_created(
3835 &self,
3836 summary: &CertifiedCheckpointSummary,
3837 ) -> SuiResult {
3838 self.try_send(summary.clone()).unwrap();
3839 Ok(())
3840 }
3841 }
3842
3843 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3844 PendingCheckpoint {
3845 roots: t
3846 .into_iter()
3847 .map(|t| TransactionKey::Digest(d(t)))
3848 .collect(),
3849 details: PendingCheckpointInfo {
3850 timestamp_ms,
3851 last_of_epoch: false,
3852 checkpoint_height: i,
3853 consensus_commit_ref: CommitRef::default(),
3854 rejected_transactions_digest: Digest::default(),
3855 },
3856 }
3857 }
3858
3859 fn d(i: u8) -> TransactionDigest {
3860 let mut bytes: [u8; 32] = Default::default();
3861 bytes[0] = i;
3862 TransactionDigest::new(bytes)
3863 }
3864
3865 fn e(
3866 transaction_digest: TransactionDigest,
3867 dependencies: Vec<TransactionDigest>,
3868 gas_used: GasCostSummary,
3869 ) -> TransactionEffects {
3870 let mut effects = TransactionEffects::default();
3871 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3872 *effects.dependencies_mut_for_testing() = dependencies;
3873 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3874 effects
3875 }
3876
3877 fn commit_cert_for_test(
3878 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3879 state: Arc<AuthorityState>,
3880 digest: TransactionDigest,
3881 dependencies: Vec<TransactionDigest>,
3882 gas_used: GasCostSummary,
3883 ) {
3884 let epoch_store = state.epoch_store_for_testing();
3885 let effects = e(digest, dependencies, gas_used);
3886 store.insert(digest, effects.clone());
3887 epoch_store.insert_executed_in_epoch(&digest);
3888 }
3889}