1mod causal_order;
5pub mod checkpoint_executor;
6mod checkpoint_output;
7mod metrics;
8
9use crate::accumulators::AccumulatorSettlementTxBuilder;
10use crate::authority::AuthorityState;
11use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
12use crate::authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config};
13use crate::checkpoints::causal_order::CausalOrder;
14use crate::checkpoints::checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput};
15pub use crate::checkpoints::checkpoint_output::{
16 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
17};
18pub use crate::checkpoints::metrics::CheckpointMetrics;
19use crate::consensus_manager::ReplayWaiter;
20use crate::execution_cache::TransactionCacheRead;
21
22use crate::execution_scheduler::balance_withdraw_scheduler::BalanceSettlement;
23use crate::global_state_hasher::GlobalStateHasher;
24use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
25use diffy::create_patch;
26use itertools::Itertools;
27use mysten_common::random::get_rng;
28use mysten_common::sync::notify_read::{CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME, NotifyRead};
29use mysten_common::{assert_reachable, debug_fatal, fatal};
30use mysten_metrics::{MonitoredFutureExt, monitored_scope, spawn_monitored_task};
31use nonempty::NonEmpty;
32use parking_lot::Mutex;
33use pin_project_lite::pin_project;
34use serde::{Deserialize, Serialize};
35use sui_macros::fail_point_arg;
36use sui_network::default_mysten_network_config;
37use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
38use sui_types::base_types::ConciseableName;
39use sui_types::executable_transaction::VerifiedExecutableTransaction;
40use sui_types::execution::ExecutionTimeObservationKey;
41use sui_types::messages_checkpoint::{CheckpointArtifacts, CheckpointCommitment};
42use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
43use tokio::sync::{mpsc, watch};
44use typed_store::rocks::{DBOptions, ReadWriteOptions, default_db_options};
45
46use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
47use crate::authority::authority_store_pruner::PrunerWatermarks;
48use crate::consensus_handler::SequencedConsensusTransactionKey;
49use rand::seq::SliceRandom;
50use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
51use std::fs::File;
52use std::future::Future;
53use std::io::Write;
54use std::path::Path;
55use std::pin::Pin;
56use std::sync::Arc;
57use std::sync::Weak;
58use std::task::{Context, Poll};
59use std::time::{Duration, SystemTime};
60use sui_protocol_config::ProtocolVersion;
61use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
62use sui_types::committee::StakeUnit;
63use sui_types::crypto::AuthorityStrongQuorumSignInfo;
64use sui_types::digests::{CheckpointContentsDigest, CheckpointDigest, TransactionEffectsDigest};
65use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
66use sui_types::error::{SuiErrorKind, SuiResult};
67use sui_types::gas::GasCostSummary;
68use sui_types::message_envelope::Message;
69use sui_types::messages_checkpoint::{
70 CertifiedCheckpointSummary, CheckpointContents, CheckpointResponseV2, CheckpointSequenceNumber,
71 CheckpointSignatureMessage, CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp,
72 EndOfEpochData, FullCheckpointContents, TrustedCheckpoint, VerifiedCheckpoint,
73 VerifiedCheckpointContents,
74};
75use sui_types::messages_checkpoint::{CheckpointRequestV2, SignedCheckpointSummary};
76use sui_types::messages_consensus::ConsensusTransactionKey;
77use sui_types::signature::GenericSignature;
78use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
79use sui_types::transaction::{
80 TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction,
81};
82use tokio::{sync::Notify, time::timeout};
83use tracing::{debug, error, info, instrument, trace, warn};
84use typed_store::DBMapUtils;
85use typed_store::Map;
86use typed_store::{
87 TypedStoreError,
88 rocks::{DBMap, MetricConf},
89};
90
91const TRANSACTION_FORK_DETECTED_KEY: u8 = 0;
92
93pub type CheckpointHeight = u64;
94
95pub struct EpochStats {
96 pub checkpoint_count: u64,
97 pub transaction_count: u64,
98 pub total_gas_reward: u64,
99}
100
101#[derive(Clone, Debug)]
102pub struct PendingCheckpointInfo {
103 pub timestamp_ms: CheckpointTimestamp,
104 pub last_of_epoch: bool,
105 pub checkpoint_height: CheckpointHeight,
108}
109
110#[derive(Clone, Debug)]
111pub struct PendingCheckpoint {
112 pub roots: Vec<TransactionKey>,
113 pub details: PendingCheckpointInfo,
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize)]
117pub struct BuilderCheckpointSummary {
118 pub summary: CheckpointSummary,
119 pub checkpoint_height: Option<CheckpointHeight>,
121 pub position_in_commit: usize,
122}
123
124#[derive(DBMapUtils)]
125#[cfg_attr(tidehunter, tidehunter)]
126pub struct CheckpointStoreTables {
127 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
129
130 pub(crate) checkpoint_sequence_by_contents_digest:
132 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
133
134 #[default_options_override_fn = "full_checkpoint_content_table_default_config"]
138 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
139
140 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
142 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
144
145 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
149
150 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
152
153 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
156
157 pub(crate) transaction_fork_detected: DBMap<
159 u8,
160 (
161 TransactionDigest,
162 TransactionEffectsDigest,
163 TransactionEffectsDigest,
164 ),
165 >,
166}
167
168fn full_checkpoint_content_table_default_config() -> DBOptions {
169 DBOptions {
170 options: default_db_options().options,
171 rw_options: ReadWriteOptions::default().set_log_value_hash(true),
175 }
176}
177
178impl CheckpointStoreTables {
179 #[cfg(not(tidehunter))]
180 pub fn new(path: &Path, metric_name: &'static str, _: Arc<PrunerWatermarks>) -> Self {
181 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
182 }
183
184 #[cfg(tidehunter)]
185 pub fn new(
186 path: &Path,
187 metric_name: &'static str,
188 pruner_watermarks: Arc<PrunerWatermarks>,
189 ) -> Self {
190 tracing::warn!("Checkpoint DB using tidehunter");
191 use crate::authority::authority_store_pruner::apply_relocation_filter;
192 use typed_store::tidehunter_util::{
193 Decision, KeySpaceConfig, KeyType, ThConfig, default_cells_per_mutex,
194 default_mutex_count, default_value_cache_size,
195 };
196 let mutexes = default_mutex_count() * 4;
197 let u64_sequence_key = KeyType::from_prefix_bits(6 * 8);
198 let override_dirty_keys_config = KeySpaceConfig::new()
199 .with_max_dirty_keys(64_000)
200 .with_value_cache_size(default_value_cache_size());
201 let config_u64 = ThConfig::new_with_config(
202 8,
203 mutexes,
204 u64_sequence_key,
205 override_dirty_keys_config.clone(),
206 );
207 let digest_config = ThConfig::new_with_rm_prefix(
208 32,
209 mutexes,
210 KeyType::uniform(default_cells_per_mutex()),
211 KeySpaceConfig::default(),
212 vec![0, 0, 0, 0, 0, 0, 0, 32],
213 );
214 let watermarks_config = KeySpaceConfig::new()
215 .with_value_cache_size(10)
216 .disable_unload();
217 let lru_config = KeySpaceConfig::new().with_value_cache_size(default_value_cache_size());
218 let configs = vec![
219 (
220 "checkpoint_content",
221 digest_config.clone().with_config(
222 lru_config
223 .clone()
224 .with_relocation_filter(|_, _| Decision::Remove),
225 ),
226 ),
227 (
228 "checkpoint_sequence_by_contents_digest",
229 digest_config.clone().with_config(apply_relocation_filter(
230 KeySpaceConfig::default(),
231 pruner_watermarks.checkpoint_id.clone(),
232 |sequence_number: CheckpointSequenceNumber| sequence_number,
233 false,
234 )),
235 ),
236 (
237 "full_checkpoint_content",
238 config_u64.clone().with_config(apply_relocation_filter(
239 override_dirty_keys_config.clone(),
240 pruner_watermarks.checkpoint_id.clone(),
241 |sequence_number: CheckpointSequenceNumber| sequence_number,
242 true,
243 )),
244 ),
245 ("certified_checkpoints", config_u64.clone()),
246 (
247 "checkpoint_by_digest",
248 digest_config.clone().with_config(apply_relocation_filter(
249 lru_config,
250 pruner_watermarks.epoch_id.clone(),
251 |checkpoint: TrustedCheckpoint| checkpoint.inner().epoch,
252 false,
253 )),
254 ),
255 (
256 "locally_computed_checkpoints",
257 config_u64.clone().with_config(apply_relocation_filter(
258 override_dirty_keys_config.clone(),
259 pruner_watermarks.checkpoint_id.clone(),
260 |checkpoint_id: CheckpointSequenceNumber| checkpoint_id,
261 true,
262 )),
263 ),
264 ("epoch_last_checkpoint_map", config_u64.clone()),
265 (
266 "watermarks",
267 ThConfig::new_with_config(
268 4,
269 1,
270 KeyType::uniform(1),
271 apply_relocation_filter(
272 watermarks_config.clone(),
273 pruner_watermarks.checkpoint_id.clone(),
274 |(watermark, _): (CheckpointSequenceNumber, CheckpointDigest)| watermark,
275 false,
276 ),
277 ),
278 ),
279 (
280 "transaction_fork_detected",
281 ThConfig::new_with_config(
282 1,
283 1,
284 KeyType::uniform(1),
285 watermarks_config.with_relocation_filter(|_, _| Decision::Remove),
286 ),
287 ),
288 ];
289 Self::open_tables_read_write(
290 path.to_path_buf(),
291 MetricConf::new(metric_name),
292 configs
293 .into_iter()
294 .map(|(cf, config)| (cf.to_string(), config))
295 .collect(),
296 )
297 }
298
299 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
300 Self::get_read_only_handle(
301 path.to_path_buf(),
302 None,
303 None,
304 MetricConf::new("checkpoint_readonly"),
305 )
306 }
307}
308
309pub struct CheckpointStore {
310 pub(crate) tables: CheckpointStoreTables,
311 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
312 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
313}
314
315impl CheckpointStore {
316 pub fn new(path: &Path, pruner_watermarks: Arc<PrunerWatermarks>) -> Arc<Self> {
317 let tables = CheckpointStoreTables::new(path, "checkpoint", pruner_watermarks);
318 Arc::new(Self {
319 tables,
320 synced_checkpoint_notify_read: NotifyRead::new(),
321 executed_checkpoint_notify_read: NotifyRead::new(),
322 })
323 }
324
325 pub fn new_for_tests() -> Arc<Self> {
326 let ckpt_dir = mysten_common::tempdir().unwrap();
327 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()))
328 }
329
330 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
331 let tables = CheckpointStoreTables::new(
332 path,
333 "db_checkpoint",
334 Arc::new(PrunerWatermarks::default()),
335 );
336 Arc::new(Self {
337 tables,
338 synced_checkpoint_notify_read: NotifyRead::new(),
339 executed_checkpoint_notify_read: NotifyRead::new(),
340 })
341 }
342
343 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
344 CheckpointStoreTables::open_readonly(path)
345 }
346
347 #[instrument(level = "info", skip_all)]
348 pub fn insert_genesis_checkpoint(
349 &self,
350 checkpoint: VerifiedCheckpoint,
351 contents: CheckpointContents,
352 epoch_store: &AuthorityPerEpochStore,
353 ) {
354 assert_eq!(
355 checkpoint.epoch(),
356 0,
357 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
358 );
359 assert_eq!(
360 *checkpoint.sequence_number(),
361 0,
362 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
363 );
364
365 if self
367 .get_checkpoint_by_digest(checkpoint.digest())
368 .unwrap()
369 .is_none()
370 {
371 if epoch_store.epoch() == checkpoint.epoch {
372 epoch_store
373 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
374 .unwrap();
375 } else {
376 debug!(
377 validator_epoch =% epoch_store.epoch(),
378 genesis_epoch =% checkpoint.epoch(),
379 "Not inserting checkpoint builder data for genesis checkpoint",
380 );
381 }
382 self.insert_checkpoint_contents(contents).unwrap();
383 self.insert_verified_checkpoint(&checkpoint).unwrap();
384 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
385 }
386 }
387
388 pub fn get_checkpoint_by_digest(
389 &self,
390 digest: &CheckpointDigest,
391 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
392 self.tables
393 .checkpoint_by_digest
394 .get(digest)
395 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
396 }
397
398 pub fn get_checkpoint_by_sequence_number(
399 &self,
400 sequence_number: CheckpointSequenceNumber,
401 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
402 self.tables
403 .certified_checkpoints
404 .get(&sequence_number)
405 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
406 }
407
408 pub fn get_locally_computed_checkpoint(
409 &self,
410 sequence_number: CheckpointSequenceNumber,
411 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
412 self.tables
413 .locally_computed_checkpoints
414 .get(&sequence_number)
415 }
416
417 pub fn multi_get_locally_computed_checkpoints(
418 &self,
419 sequence_numbers: &[CheckpointSequenceNumber],
420 ) -> Result<Vec<Option<CheckpointSummary>>, TypedStoreError> {
421 let checkpoints = self
422 .tables
423 .locally_computed_checkpoints
424 .multi_get(sequence_numbers)?;
425
426 Ok(checkpoints)
427 }
428
429 pub fn get_sequence_number_by_contents_digest(
430 &self,
431 digest: &CheckpointContentsDigest,
432 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
433 self.tables
434 .checkpoint_sequence_by_contents_digest
435 .get(digest)
436 }
437
438 pub fn delete_contents_digest_sequence_number_mapping(
439 &self,
440 digest: &CheckpointContentsDigest,
441 ) -> Result<(), TypedStoreError> {
442 self.tables
443 .checkpoint_sequence_by_contents_digest
444 .remove(digest)
445 }
446
447 pub fn get_latest_certified_checkpoint(
448 &self,
449 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
450 Ok(self
451 .tables
452 .certified_checkpoints
453 .reversed_safe_iter_with_bounds(None, None)?
454 .next()
455 .transpose()?
456 .map(|(_, v)| v.into()))
457 }
458
459 pub fn get_latest_locally_computed_checkpoint(
460 &self,
461 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
462 Ok(self
463 .tables
464 .locally_computed_checkpoints
465 .reversed_safe_iter_with_bounds(None, None)?
466 .next()
467 .transpose()?
468 .map(|(_, v)| v))
469 }
470
471 pub fn multi_get_checkpoint_by_sequence_number(
472 &self,
473 sequence_numbers: &[CheckpointSequenceNumber],
474 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
475 let checkpoints = self
476 .tables
477 .certified_checkpoints
478 .multi_get(sequence_numbers)?
479 .into_iter()
480 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
481 .collect();
482
483 Ok(checkpoints)
484 }
485
486 pub fn multi_get_checkpoint_content(
487 &self,
488 contents_digest: &[CheckpointContentsDigest],
489 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
490 self.tables.checkpoint_content.multi_get(contents_digest)
491 }
492
493 pub fn get_highest_verified_checkpoint(
494 &self,
495 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
496 let highest_verified = if let Some(highest_verified) = self
497 .tables
498 .watermarks
499 .get(&CheckpointWatermark::HighestVerified)?
500 {
501 highest_verified
502 } else {
503 return Ok(None);
504 };
505 self.get_checkpoint_by_digest(&highest_verified.1)
506 }
507
508 pub fn get_highest_synced_checkpoint(
509 &self,
510 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
511 let highest_synced = if let Some(highest_synced) = self
512 .tables
513 .watermarks
514 .get(&CheckpointWatermark::HighestSynced)?
515 {
516 highest_synced
517 } else {
518 return Ok(None);
519 };
520 self.get_checkpoint_by_digest(&highest_synced.1)
521 }
522
523 pub fn get_highest_synced_checkpoint_seq_number(
524 &self,
525 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
526 if let Some(highest_synced) = self
527 .tables
528 .watermarks
529 .get(&CheckpointWatermark::HighestSynced)?
530 {
531 Ok(Some(highest_synced.0))
532 } else {
533 Ok(None)
534 }
535 }
536
537 pub fn get_highest_executed_checkpoint_seq_number(
538 &self,
539 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
540 if let Some(highest_executed) = self
541 .tables
542 .watermarks
543 .get(&CheckpointWatermark::HighestExecuted)?
544 {
545 Ok(Some(highest_executed.0))
546 } else {
547 Ok(None)
548 }
549 }
550
551 pub fn get_highest_executed_checkpoint(
552 &self,
553 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
554 let highest_executed = if let Some(highest_executed) = self
555 .tables
556 .watermarks
557 .get(&CheckpointWatermark::HighestExecuted)?
558 {
559 highest_executed
560 } else {
561 return Ok(None);
562 };
563 self.get_checkpoint_by_digest(&highest_executed.1)
564 }
565
566 pub fn get_highest_pruned_checkpoint_seq_number(
567 &self,
568 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
569 self.tables
570 .watermarks
571 .get(&CheckpointWatermark::HighestPruned)
572 .map(|watermark| watermark.map(|w| w.0))
573 }
574
575 pub fn get_checkpoint_contents(
576 &self,
577 digest: &CheckpointContentsDigest,
578 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
579 self.tables.checkpoint_content.get(digest)
580 }
581
582 pub fn get_full_checkpoint_contents_by_sequence_number(
583 &self,
584 seq: CheckpointSequenceNumber,
585 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
586 self.tables.full_checkpoint_content.get(&seq)
587 }
588
589 fn prune_local_summaries(&self) -> SuiResult {
590 if let Some((last_local_summary, _)) = self
591 .tables
592 .locally_computed_checkpoints
593 .reversed_safe_iter_with_bounds(None, None)?
594 .next()
595 .transpose()?
596 {
597 let mut batch = self.tables.locally_computed_checkpoints.batch();
598 batch.schedule_delete_range(
599 &self.tables.locally_computed_checkpoints,
600 &0,
601 &last_local_summary,
602 )?;
603 batch.write()?;
604 info!("Pruned local summaries up to {:?}", last_local_summary);
605 }
606 Ok(())
607 }
608
609 pub fn clear_locally_computed_checkpoints_from(
610 &self,
611 from_seq: CheckpointSequenceNumber,
612 ) -> SuiResult {
613 let keys: Vec<_> = self
614 .tables
615 .locally_computed_checkpoints
616 .safe_iter_with_bounds(Some(from_seq), None)
617 .map(|r| r.map(|(k, _)| k))
618 .collect::<Result<_, _>>()?;
619 if let Some(&last_local_summary) = keys.last() {
620 let mut batch = self.tables.locally_computed_checkpoints.batch();
621 batch
622 .delete_batch(&self.tables.locally_computed_checkpoints, keys.iter())
623 .expect("Failed to delete locally computed checkpoints");
624 batch
625 .write()
626 .expect("Failed to delete locally computed checkpoints");
627 warn!(
628 from_seq,
629 last_local_summary,
630 "Cleared locally_computed_checkpoints from {} (inclusive) through {} (inclusive)",
631 from_seq,
632 last_local_summary
633 );
634 }
635 Ok(())
636 }
637
638 fn check_for_checkpoint_fork(
639 &self,
640 local_checkpoint: &CheckpointSummary,
641 verified_checkpoint: &VerifiedCheckpoint,
642 ) {
643 if local_checkpoint != verified_checkpoint.data() {
644 let verified_contents = self
645 .get_checkpoint_contents(&verified_checkpoint.content_digest)
646 .map(|opt_contents| {
647 opt_contents
648 .map(|contents| format!("{:?}", contents))
649 .unwrap_or_else(|| {
650 format!(
651 "Verified checkpoint contents not found, digest: {:?}",
652 verified_checkpoint.content_digest,
653 )
654 })
655 })
656 .map_err(|e| {
657 format!(
658 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
659 verified_checkpoint.content_digest, e
660 )
661 })
662 .unwrap_or_else(|err_msg| err_msg);
663
664 let local_contents = self
665 .get_checkpoint_contents(&local_checkpoint.content_digest)
666 .map(|opt_contents| {
667 opt_contents
668 .map(|contents| format!("{:?}", contents))
669 .unwrap_or_else(|| {
670 format!(
671 "Local checkpoint contents not found, digest: {:?}",
672 local_checkpoint.content_digest
673 )
674 })
675 })
676 .map_err(|e| {
677 format!(
678 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
679 local_checkpoint.content_digest, e
680 )
681 })
682 .unwrap_or_else(|err_msg| err_msg);
683
684 error!(
686 verified_checkpoint = ?verified_checkpoint.data(),
687 ?verified_contents,
688 ?local_checkpoint,
689 ?local_contents,
690 "Local checkpoint fork detected!",
691 );
692
693 if let Err(e) = self.record_checkpoint_fork_detected(
695 *local_checkpoint.sequence_number(),
696 local_checkpoint.digest(),
697 ) {
698 error!("Failed to record checkpoint fork in database: {:?}", e);
699 }
700
701 fail_point_arg!(
702 "kill_checkpoint_fork_node",
703 |checkpoint_overrides: std::sync::Arc<
704 std::sync::Mutex<std::collections::BTreeMap<u64, String>>,
705 >| {
706 #[cfg(msim)]
707 {
708 if let Ok(mut overrides) = checkpoint_overrides.lock() {
709 overrides.insert(
710 local_checkpoint.sequence_number,
711 verified_checkpoint.digest().to_string(),
712 );
713 }
714 tracing::error!(
715 fatal = true,
716 "Fork recovery test: killing node due to checkpoint fork for sequence number: {}, using verified digest: {}",
717 local_checkpoint.sequence_number(),
718 verified_checkpoint.digest()
719 );
720 sui_simulator::task::shutdown_current_node();
721 }
722 }
723 );
724
725 fatal!(
726 "Local checkpoint fork detected for sequence number: {}",
727 local_checkpoint.sequence_number()
728 );
729 }
730 }
731
732 pub fn insert_certified_checkpoint(
738 &self,
739 checkpoint: &VerifiedCheckpoint,
740 ) -> Result<(), TypedStoreError> {
741 debug!(
742 checkpoint_seq = checkpoint.sequence_number(),
743 "Inserting certified checkpoint",
744 );
745 let mut batch = self.tables.certified_checkpoints.batch();
746 batch
747 .insert_batch(
748 &self.tables.certified_checkpoints,
749 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
750 )?
751 .insert_batch(
752 &self.tables.checkpoint_by_digest,
753 [(checkpoint.digest(), checkpoint.serializable_ref())],
754 )?;
755 if checkpoint.next_epoch_committee().is_some() {
756 batch.insert_batch(
757 &self.tables.epoch_last_checkpoint_map,
758 [(&checkpoint.epoch(), checkpoint.sequence_number())],
759 )?;
760 }
761 batch.write()?;
762
763 if let Some(local_checkpoint) = self
764 .tables
765 .locally_computed_checkpoints
766 .get(checkpoint.sequence_number())?
767 {
768 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
769 }
770
771 Ok(())
772 }
773
774 #[instrument(level = "debug", skip_all)]
777 pub fn insert_verified_checkpoint(
778 &self,
779 checkpoint: &VerifiedCheckpoint,
780 ) -> Result<(), TypedStoreError> {
781 self.insert_certified_checkpoint(checkpoint)?;
782 self.update_highest_verified_checkpoint(checkpoint)
783 }
784
785 pub fn update_highest_verified_checkpoint(
786 &self,
787 checkpoint: &VerifiedCheckpoint,
788 ) -> Result<(), TypedStoreError> {
789 if Some(*checkpoint.sequence_number())
790 > self
791 .get_highest_verified_checkpoint()?
792 .map(|x| *x.sequence_number())
793 {
794 debug!(
795 checkpoint_seq = checkpoint.sequence_number(),
796 "Updating highest verified checkpoint",
797 );
798 self.tables.watermarks.insert(
799 &CheckpointWatermark::HighestVerified,
800 &(*checkpoint.sequence_number(), *checkpoint.digest()),
801 )?;
802 }
803
804 Ok(())
805 }
806
807 pub fn update_highest_synced_checkpoint(
808 &self,
809 checkpoint: &VerifiedCheckpoint,
810 ) -> Result<(), TypedStoreError> {
811 let seq = *checkpoint.sequence_number();
812 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
813 self.tables.watermarks.insert(
814 &CheckpointWatermark::HighestSynced,
815 &(seq, *checkpoint.digest()),
816 )?;
817 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
818 Ok(())
819 }
820
821 async fn notify_read_checkpoint_watermark<F>(
822 &self,
823 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
824 seq: CheckpointSequenceNumber,
825 get_watermark: F,
826 ) -> VerifiedCheckpoint
827 where
828 F: Fn() -> Option<CheckpointSequenceNumber>,
829 {
830 notify_read
831 .read("notify_read_checkpoint_watermark", &[seq], |seqs| {
832 let seq = seqs[0];
833 let Some(highest) = get_watermark() else {
834 return vec![None];
835 };
836 if highest < seq {
837 return vec![None];
838 }
839 let checkpoint = self
840 .get_checkpoint_by_sequence_number(seq)
841 .expect("db error")
842 .expect("checkpoint not found");
843 vec![Some(checkpoint)]
844 })
845 .await
846 .into_iter()
847 .next()
848 .unwrap()
849 }
850
851 pub async fn notify_read_synced_checkpoint(
852 &self,
853 seq: CheckpointSequenceNumber,
854 ) -> VerifiedCheckpoint {
855 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
856 self.get_highest_synced_checkpoint_seq_number()
857 .expect("db error")
858 })
859 .await
860 }
861
862 pub async fn notify_read_executed_checkpoint(
863 &self,
864 seq: CheckpointSequenceNumber,
865 ) -> VerifiedCheckpoint {
866 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
867 self.get_highest_executed_checkpoint_seq_number()
868 .expect("db error")
869 })
870 .await
871 }
872
873 pub fn update_highest_executed_checkpoint(
874 &self,
875 checkpoint: &VerifiedCheckpoint,
876 ) -> Result<(), TypedStoreError> {
877 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
878 if seq_number >= *checkpoint.sequence_number() {
879 return Ok(());
880 }
881 assert_eq!(
882 seq_number + 1,
883 *checkpoint.sequence_number(),
884 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
885 checkpoint.sequence_number(),
886 seq_number
887 );
888 }
889 let seq = *checkpoint.sequence_number();
890 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
891 self.tables.watermarks.insert(
892 &CheckpointWatermark::HighestExecuted,
893 &(seq, *checkpoint.digest()),
894 )?;
895 self.executed_checkpoint_notify_read
896 .notify(&seq, checkpoint);
897 Ok(())
898 }
899
900 pub fn update_highest_pruned_checkpoint(
901 &self,
902 checkpoint: &VerifiedCheckpoint,
903 ) -> Result<(), TypedStoreError> {
904 self.tables.watermarks.insert(
905 &CheckpointWatermark::HighestPruned,
906 &(*checkpoint.sequence_number(), *checkpoint.digest()),
907 )
908 }
909
910 pub fn set_highest_executed_checkpoint_subtle(
915 &self,
916 checkpoint: &VerifiedCheckpoint,
917 ) -> Result<(), TypedStoreError> {
918 self.tables.watermarks.insert(
919 &CheckpointWatermark::HighestExecuted,
920 &(*checkpoint.sequence_number(), *checkpoint.digest()),
921 )
922 }
923
924 pub fn insert_checkpoint_contents(
925 &self,
926 contents: CheckpointContents,
927 ) -> Result<(), TypedStoreError> {
928 debug!(
929 checkpoint_seq = ?contents.digest(),
930 "Inserting checkpoint contents",
931 );
932 self.tables
933 .checkpoint_content
934 .insert(contents.digest(), &contents)
935 }
936
937 pub fn insert_verified_checkpoint_contents(
938 &self,
939 checkpoint: &VerifiedCheckpoint,
940 full_contents: VerifiedCheckpointContents,
941 ) -> Result<(), TypedStoreError> {
942 let mut batch = self.tables.full_checkpoint_content.batch();
943 batch.insert_batch(
944 &self.tables.checkpoint_sequence_by_contents_digest,
945 [(&checkpoint.content_digest, checkpoint.sequence_number())],
946 )?;
947 let full_contents = full_contents.into_inner();
948 batch.insert_batch(
949 &self.tables.full_checkpoint_content,
950 [(checkpoint.sequence_number(), &full_contents)],
951 )?;
952
953 let contents = full_contents.into_checkpoint_contents();
954 assert_eq!(&checkpoint.content_digest, contents.digest());
955
956 batch.insert_batch(
957 &self.tables.checkpoint_content,
958 [(contents.digest(), &contents)],
959 )?;
960
961 batch.write()
962 }
963
964 pub fn delete_full_checkpoint_contents(
965 &self,
966 seq: CheckpointSequenceNumber,
967 ) -> Result<(), TypedStoreError> {
968 self.tables.full_checkpoint_content.remove(&seq)
969 }
970
971 pub fn get_epoch_last_checkpoint(
972 &self,
973 epoch_id: EpochId,
974 ) -> SuiResult<Option<VerifiedCheckpoint>> {
975 let seq = self.get_epoch_last_checkpoint_seq_number(epoch_id)?;
976 let checkpoint = match seq {
977 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
978 None => None,
979 };
980 Ok(checkpoint)
981 }
982
983 pub fn get_epoch_last_checkpoint_seq_number(
984 &self,
985 epoch_id: EpochId,
986 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
987 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
988 Ok(seq)
989 }
990
991 pub fn insert_epoch_last_checkpoint(
992 &self,
993 epoch_id: EpochId,
994 checkpoint: &VerifiedCheckpoint,
995 ) -> SuiResult {
996 self.tables
997 .epoch_last_checkpoint_map
998 .insert(&epoch_id, checkpoint.sequence_number())?;
999 Ok(())
1000 }
1001
1002 pub fn get_epoch_state_commitments(
1003 &self,
1004 epoch: EpochId,
1005 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1006 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
1007 checkpoint
1008 .end_of_epoch_data
1009 .as_ref()
1010 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
1011 .epoch_commitments
1012 .clone()
1013 });
1014 Ok(commitments)
1015 }
1016
1017 pub fn get_epoch_stats(
1019 &self,
1020 epoch: EpochId,
1021 last_checkpoint: &CheckpointSummary,
1022 ) -> Option<EpochStats> {
1023 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
1024 (0, 0)
1025 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
1026 (
1027 checkpoint.sequence_number + 1,
1028 checkpoint.network_total_transactions,
1029 )
1030 } else {
1031 return None;
1032 };
1033 Some(EpochStats {
1034 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
1035 transaction_count: last_checkpoint.network_total_transactions
1036 - prev_epoch_network_transactions,
1037 total_gas_reward: last_checkpoint
1038 .epoch_rolling_gas_cost_summary
1039 .computation_cost,
1040 })
1041 }
1042
1043 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
1044 self.tables
1046 .checkpoint_content
1047 .checkpoint_db(path)
1048 .map_err(Into::into)
1049 }
1050
1051 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
1052 let mut wb = self.tables.watermarks.batch();
1053 wb.delete_batch(
1054 &self.tables.watermarks,
1055 std::iter::once(CheckpointWatermark::HighestExecuted),
1056 )?;
1057 wb.write()?;
1058 Ok(())
1059 }
1060
1061 pub fn reset_db_for_execution_since_genesis(&self) -> SuiResult {
1062 self.delete_highest_executed_checkpoint_test_only()?;
1063 Ok(())
1064 }
1065
1066 pub fn record_checkpoint_fork_detected(
1067 &self,
1068 checkpoint_seq: CheckpointSequenceNumber,
1069 checkpoint_digest: CheckpointDigest,
1070 ) -> Result<(), TypedStoreError> {
1071 info!(
1072 checkpoint_seq = checkpoint_seq,
1073 checkpoint_digest = ?checkpoint_digest,
1074 "Recording checkpoint fork detection in database"
1075 );
1076 self.tables.watermarks.insert(
1077 &CheckpointWatermark::CheckpointForkDetected,
1078 &(checkpoint_seq, checkpoint_digest),
1079 )
1080 }
1081
1082 pub fn get_checkpoint_fork_detected(
1083 &self,
1084 ) -> Result<Option<(CheckpointSequenceNumber, CheckpointDigest)>, TypedStoreError> {
1085 self.tables
1086 .watermarks
1087 .get(&CheckpointWatermark::CheckpointForkDetected)
1088 }
1089
1090 pub fn clear_checkpoint_fork_detected(&self) -> Result<(), TypedStoreError> {
1091 self.tables
1092 .watermarks
1093 .remove(&CheckpointWatermark::CheckpointForkDetected)
1094 }
1095
1096 pub fn record_transaction_fork_detected(
1097 &self,
1098 tx_digest: TransactionDigest,
1099 expected_effects_digest: TransactionEffectsDigest,
1100 actual_effects_digest: TransactionEffectsDigest,
1101 ) -> Result<(), TypedStoreError> {
1102 info!(
1103 tx_digest = ?tx_digest,
1104 expected_effects_digest = ?expected_effects_digest,
1105 actual_effects_digest = ?actual_effects_digest,
1106 "Recording transaction fork detection in database"
1107 );
1108 self.tables.transaction_fork_detected.insert(
1109 &TRANSACTION_FORK_DETECTED_KEY,
1110 &(tx_digest, expected_effects_digest, actual_effects_digest),
1111 )
1112 }
1113
1114 pub fn get_transaction_fork_detected(
1115 &self,
1116 ) -> Result<
1117 Option<(
1118 TransactionDigest,
1119 TransactionEffectsDigest,
1120 TransactionEffectsDigest,
1121 )>,
1122 TypedStoreError,
1123 > {
1124 self.tables
1125 .transaction_fork_detected
1126 .get(&TRANSACTION_FORK_DETECTED_KEY)
1127 }
1128
1129 pub fn clear_transaction_fork_detected(&self) -> Result<(), TypedStoreError> {
1130 self.tables
1131 .transaction_fork_detected
1132 .remove(&TRANSACTION_FORK_DETECTED_KEY)
1133 }
1134}
1135
1136#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
1137pub enum CheckpointWatermark {
1138 HighestVerified,
1139 HighestSynced,
1140 HighestExecuted,
1141 HighestPruned,
1142 CheckpointForkDetected,
1143}
1144
1145struct CheckpointStateHasher {
1146 epoch_store: Arc<AuthorityPerEpochStore>,
1147 hasher: Weak<GlobalStateHasher>,
1148 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1149}
1150
1151impl CheckpointStateHasher {
1152 fn new(
1153 epoch_store: Arc<AuthorityPerEpochStore>,
1154 hasher: Weak<GlobalStateHasher>,
1155 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1156 ) -> Self {
1157 Self {
1158 epoch_store,
1159 hasher,
1160 receive_from_builder,
1161 }
1162 }
1163
1164 async fn run(self) {
1165 let Self {
1166 epoch_store,
1167 hasher,
1168 mut receive_from_builder,
1169 } = self;
1170 while let Some((seq, effects)) = receive_from_builder.recv().await {
1171 let Some(hasher) = hasher.upgrade() else {
1172 info!("Object state hasher was dropped, stopping checkpoint accumulation");
1173 break;
1174 };
1175 hasher
1176 .accumulate_checkpoint(&effects, seq, &epoch_store)
1177 .expect("epoch ended while accumulating checkpoint");
1178 }
1179 }
1180}
1181
1182#[derive(Debug)]
1183pub(crate) enum CheckpointBuilderError {
1184 ChangeEpochTxAlreadyExecuted,
1185 SystemPackagesMissing,
1186 Retry(anyhow::Error),
1187}
1188
1189impl<SuiError: std::error::Error + Send + Sync + 'static> From<SuiError>
1190 for CheckpointBuilderError
1191{
1192 fn from(e: SuiError) -> Self {
1193 Self::Retry(e.into())
1194 }
1195}
1196
1197pub(crate) type CheckpointBuilderResult<T = ()> = Result<T, CheckpointBuilderError>;
1198
1199pub struct CheckpointBuilder {
1200 state: Arc<AuthorityState>,
1201 store: Arc<CheckpointStore>,
1202 epoch_store: Arc<AuthorityPerEpochStore>,
1203 notify: Arc<Notify>,
1204 notify_aggregator: Arc<Notify>,
1205 last_built: watch::Sender<CheckpointSequenceNumber>,
1206 effects_store: Arc<dyn TransactionCacheRead>,
1207 global_state_hasher: Weak<GlobalStateHasher>,
1208 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1209 output: Box<dyn CheckpointOutput>,
1210 metrics: Arc<CheckpointMetrics>,
1211 max_transactions_per_checkpoint: usize,
1212 max_checkpoint_size_bytes: usize,
1213}
1214
1215pub struct CheckpointAggregator {
1216 store: Arc<CheckpointStore>,
1217 epoch_store: Arc<AuthorityPerEpochStore>,
1218 notify: Arc<Notify>,
1219 current: Option<CheckpointSignatureAggregator>,
1220 output: Box<dyn CertifiedCheckpointOutput>,
1221 state: Arc<AuthorityState>,
1222 metrics: Arc<CheckpointMetrics>,
1223}
1224
1225pub struct CheckpointSignatureAggregator {
1227 next_index: u64,
1228 summary: CheckpointSummary,
1229 digest: CheckpointDigest,
1230 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1232 store: Arc<CheckpointStore>,
1233 state: Arc<AuthorityState>,
1234 metrics: Arc<CheckpointMetrics>,
1235}
1236
1237impl CheckpointBuilder {
1238 fn new(
1239 state: Arc<AuthorityState>,
1240 store: Arc<CheckpointStore>,
1241 epoch_store: Arc<AuthorityPerEpochStore>,
1242 notify: Arc<Notify>,
1243 effects_store: Arc<dyn TransactionCacheRead>,
1244 global_state_hasher: Weak<GlobalStateHasher>,
1246 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
1248 output: Box<dyn CheckpointOutput>,
1249 notify_aggregator: Arc<Notify>,
1250 last_built: watch::Sender<CheckpointSequenceNumber>,
1251 metrics: Arc<CheckpointMetrics>,
1252 max_transactions_per_checkpoint: usize,
1253 max_checkpoint_size_bytes: usize,
1254 ) -> Self {
1255 Self {
1256 state,
1257 store,
1258 epoch_store,
1259 notify,
1260 effects_store,
1261 global_state_hasher,
1262 send_to_hasher,
1263 output,
1264 notify_aggregator,
1265 last_built,
1266 metrics,
1267 max_transactions_per_checkpoint,
1268 max_checkpoint_size_bytes,
1269 }
1270 }
1271
1272 async fn run(mut self, consensus_replay_waiter: Option<ReplayWaiter>) {
1280 if let Some(replay_waiter) = consensus_replay_waiter {
1281 info!("Waiting for consensus commits to replay ...");
1282 replay_waiter.wait_for_replay().await;
1283 info!("Consensus commits finished replaying");
1284 }
1285 info!("Starting CheckpointBuilder");
1286 loop {
1287 match self.maybe_build_checkpoints().await {
1288 Ok(()) => {}
1289 err @ Err(
1290 CheckpointBuilderError::ChangeEpochTxAlreadyExecuted
1291 | CheckpointBuilderError::SystemPackagesMissing,
1292 ) => {
1293 info!("CheckpointBuilder stopping: {:?}", err);
1294 return;
1295 }
1296 Err(CheckpointBuilderError::Retry(inner)) => {
1297 let msg = format!("{:?}", inner);
1298 debug_fatal!("Error while making checkpoint, will retry in 1s: {}", msg);
1299 tokio::time::sleep(Duration::from_secs(1)).await;
1300 self.metrics.checkpoint_errors.inc();
1301 continue;
1302 }
1303 }
1304
1305 self.notify.notified().await;
1306 }
1307 }
1308
1309 async fn maybe_build_checkpoints(&mut self) -> CheckpointBuilderResult {
1310 let _scope = monitored_scope("BuildCheckpoints");
1311
1312 let summary = self
1314 .epoch_store
1315 .last_built_checkpoint_builder_summary()
1316 .expect("epoch should not have ended");
1317 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1318 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1319
1320 let min_checkpoint_interval_ms = self
1321 .epoch_store
1322 .protocol_config()
1323 .min_checkpoint_interval_ms_as_option()
1324 .unwrap_or_default();
1325 let mut grouped_pending_checkpoints = Vec::new();
1326 let mut checkpoints_iter = self
1327 .epoch_store
1328 .get_pending_checkpoints(last_height)
1329 .expect("unexpected epoch store error")
1330 .into_iter()
1331 .peekable();
1332 while let Some((height, pending)) = checkpoints_iter.next() {
1333 let current_timestamp = pending.details().timestamp_ms;
1336 let can_build = match last_timestamp {
1337 Some(last_timestamp) => {
1338 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1339 }
1340 None => true,
1341 } || checkpoints_iter
1344 .peek()
1345 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1346 || pending.details().last_of_epoch;
1348 grouped_pending_checkpoints.push(pending);
1349 if !can_build {
1350 debug!(
1351 checkpoint_commit_height = height,
1352 ?last_timestamp,
1353 ?current_timestamp,
1354 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1355 );
1356 continue;
1357 }
1358
1359 last_height = Some(height);
1361 last_timestamp = Some(current_timestamp);
1362 debug!(
1363 checkpoint_commit_height_from = grouped_pending_checkpoints
1364 .first()
1365 .unwrap()
1366 .details()
1367 .checkpoint_height,
1368 checkpoint_commit_height_to = last_height,
1369 "Making checkpoint with commit height range"
1370 );
1371
1372 let seq = self
1373 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1374 .await?;
1375
1376 self.last_built.send_if_modified(|cur| {
1377 if seq > *cur {
1379 *cur = seq;
1380 true
1381 } else {
1382 false
1383 }
1384 });
1385
1386 tokio::task::yield_now().await;
1389 }
1390 debug!(
1391 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1392 grouped_pending_checkpoints.len(),
1393 );
1394
1395 Ok(())
1396 }
1397
1398 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1399 async fn make_checkpoint(
1400 &mut self,
1401 pendings: Vec<PendingCheckpoint>,
1402 ) -> CheckpointBuilderResult<CheckpointSequenceNumber> {
1403 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1404 let last_details = pendings.last().unwrap().details().clone();
1405
1406 let highest_executed_sequence = self
1409 .store
1410 .get_highest_executed_checkpoint_seq_number()
1411 .expect("db error")
1412 .unwrap_or(0);
1413
1414 let (poll_count, result) = poll_count(self.resolve_checkpoint_transactions(pendings)).await;
1415 let (sorted_tx_effects_included_in_checkpoint, all_roots) = result?;
1416
1417 let new_checkpoints = self
1418 .create_checkpoints(
1419 sorted_tx_effects_included_in_checkpoint,
1420 &last_details,
1421 &all_roots,
1422 )
1423 .await?;
1424 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1425 if highest_sequence <= highest_executed_sequence && poll_count > 1 {
1426 debug_fatal!(
1427 "resolve_checkpoint_transactions should be instantaneous when executed checkpoint is ahead of checkpoint builder"
1428 );
1429 }
1430
1431 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1432 .await?;
1433 Ok(highest_sequence)
1434 }
1435
1436 async fn construct_and_execute_settlement_transactions(
1437 &self,
1438 sorted_tx_effects_included_in_checkpoint: &[TransactionEffects],
1439 checkpoint_height: CheckpointHeight,
1440 tx_index_offset: u64,
1441 ) -> (TransactionKey, Vec<TransactionEffects>) {
1442 let _scope =
1443 monitored_scope("CheckpointBuilder::construct_and_execute_settlement_transactions");
1444
1445 let tx_key =
1446 TransactionKey::AccumulatorSettlement(self.epoch_store.epoch(), checkpoint_height);
1447
1448 let epoch = self.epoch_store.epoch();
1449 let accumulator_root_obj_initial_shared_version = self
1450 .epoch_store
1451 .epoch_start_config()
1452 .accumulator_root_obj_initial_shared_version()
1453 .expect("accumulator root object must exist");
1454
1455 let builder = AccumulatorSettlementTxBuilder::new(
1456 Some(self.effects_store.as_ref()),
1457 sorted_tx_effects_included_in_checkpoint,
1458 tx_index_offset,
1459 );
1460
1461 let accumulator_changes = builder.collect_accumulator_changes();
1462 let num_updates = builder.num_updates();
1463 let (settlement_txns, barrier_tx) = builder.build_tx(
1464 self.epoch_store.protocol_config(),
1465 epoch,
1466 accumulator_root_obj_initial_shared_version,
1467 checkpoint_height,
1468 );
1469
1470 let settlement_txns: Vec<_> = settlement_txns
1471 .into_iter()
1472 .chain(std::iter::once(barrier_tx))
1473 .map(|tx| {
1474 VerifiedExecutableTransaction::new_system(
1475 VerifiedTransaction::new_system_transaction(tx),
1476 self.epoch_store.epoch(),
1477 )
1478 })
1479 .collect();
1480
1481 let settlement_digests: Vec<_> = settlement_txns.iter().map(|tx| *tx.digest()).collect();
1482
1483 debug!(
1484 ?settlement_digests,
1485 ?tx_key,
1486 "created settlement transactions with {num_updates} updates"
1487 );
1488
1489 self.epoch_store
1490 .notify_settlement_transactions_ready(tx_key, settlement_txns);
1491
1492 let settlement_effects = loop {
1493 match tokio::time::timeout(Duration::from_secs(5), async {
1494 self.effects_store
1495 .notify_read_executed_effects(
1496 "CheckpointBuilder::notify_read_settlement_effects",
1497 &settlement_digests,
1498 )
1499 .await
1500 })
1501 .await
1502 {
1503 Ok(effects) => break effects,
1504 Err(_) => {
1505 debug_fatal!(
1506 "Timeout waiting for settlement transactions to be executed {:?}, retrying...",
1507 tx_key
1508 );
1509 }
1510 }
1511 };
1512
1513 let mut next_accumulator_version = None;
1514 for fx in settlement_effects.iter() {
1515 assert!(
1516 fx.status().is_ok(),
1517 "settlement transaction cannot fail (digest: {:?}) {:#?}",
1518 fx.transaction_digest(),
1519 fx
1520 );
1521 if let Some(version) = fx
1522 .mutated()
1523 .iter()
1524 .find_map(|(oref, _)| (oref.0 == SUI_ACCUMULATOR_ROOT_OBJECT_ID).then_some(oref.1))
1525 {
1526 assert!(
1527 next_accumulator_version.is_none(),
1528 "Only one settlement transaction should mutate the accumulator root object"
1529 );
1530 next_accumulator_version = Some(version);
1531 }
1532 }
1533 let settlements = BalanceSettlement {
1534 next_accumulator_version: next_accumulator_version
1535 .expect("Accumulator root object should be mutated in the settlement transactions"),
1536 balance_changes: accumulator_changes,
1537 };
1538
1539 self.state
1540 .execution_scheduler()
1541 .settle_balances(settlements);
1542
1543 (tx_key, settlement_effects)
1544 }
1545
1546 #[instrument(level = "debug", skip_all)]
1551 async fn resolve_checkpoint_transactions(
1552 &self,
1553 pending_checkpoints: Vec<PendingCheckpoint>,
1554 ) -> SuiResult<(Vec<TransactionEffects>, HashSet<TransactionDigest>)> {
1555 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1556
1557 let mut effects_in_current_checkpoint = BTreeSet::new();
1562
1563 let mut tx_effects = Vec::new();
1564 let mut tx_roots = HashSet::new();
1565
1566 for pending_checkpoint in pending_checkpoints.into_iter() {
1567 let mut pending = pending_checkpoint;
1568 debug!(
1569 checkpoint_commit_height = pending.details.checkpoint_height,
1570 "Resolving checkpoint transactions for pending checkpoint.",
1571 );
1572
1573 trace!(
1574 "roots for pending checkpoint {:?}: {:?}",
1575 pending.details.checkpoint_height, pending.roots,
1576 );
1577
1578 let settlement_root = if self.epoch_store.accumulators_enabled() {
1579 let Some(settlement_root @ TransactionKey::AccumulatorSettlement(..)) =
1580 pending.roots.pop()
1581 else {
1582 fatal!("No settlement root found");
1583 };
1584 Some(settlement_root)
1585 } else {
1586 None
1587 };
1588
1589 let roots = &pending.roots;
1590
1591 self.metrics
1592 .checkpoint_roots_count
1593 .inc_by(roots.len() as u64);
1594
1595 let root_digests = self
1596 .epoch_store
1597 .notify_read_tx_key_to_digest(roots)
1598 .in_monitored_scope("CheckpointNotifyDigests")
1599 .await?;
1600 let root_effects = self
1601 .effects_store
1602 .notify_read_executed_effects(
1603 CHECKPOINT_BUILDER_NOTIFY_READ_TASK_NAME,
1604 &root_digests,
1605 )
1606 .in_monitored_scope("CheckpointNotifyRead")
1607 .await;
1608
1609 let consensus_commit_prologue = if self
1610 .epoch_store
1611 .protocol_config()
1612 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
1613 {
1614 let consensus_commit_prologue =
1618 self.extract_consensus_commit_prologue(&root_digests, &root_effects)?;
1619
1620 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1623 let unsorted_ccp = self.complete_checkpoint_effects(
1624 vec![ccp_effects.clone()],
1625 &mut effects_in_current_checkpoint,
1626 )?;
1627
1628 if unsorted_ccp.len() != 1 {
1631 fatal!(
1632 "Expected 1 consensus commit prologue, got {:?}",
1633 unsorted_ccp
1634 .iter()
1635 .map(|e| e.transaction_digest())
1636 .collect::<Vec<_>>()
1637 );
1638 }
1639 assert_eq!(unsorted_ccp.len(), 1);
1640 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1641 }
1642 consensus_commit_prologue
1643 } else {
1644 None
1645 };
1646
1647 let unsorted =
1648 self.complete_checkpoint_effects(root_effects, &mut effects_in_current_checkpoint)?;
1649
1650 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1651 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1652 if let Some((ccp_digest, ccp_effects)) = consensus_commit_prologue {
1653 if cfg!(debug_assertions) {
1654 for tx in unsorted.iter() {
1656 assert!(tx.transaction_digest() != &ccp_digest);
1657 }
1658 }
1659 sorted.push(ccp_effects);
1660 }
1661 sorted.extend(CausalOrder::causal_sort(unsorted));
1662
1663 if let Some(settlement_root) = settlement_root {
1664 let tx_index_offset = tx_effects.len() as u64;
1667 let (tx_key, settlement_effects) = self
1668 .construct_and_execute_settlement_transactions(
1669 &sorted,
1670 pending.details.checkpoint_height,
1671 tx_index_offset,
1672 )
1673 .await;
1674 debug!(?tx_key, "executed settlement transactions");
1675
1676 assert_eq!(settlement_root, tx_key);
1677
1678 sorted.extend(settlement_effects);
1685 }
1686
1687 #[cfg(msim)]
1688 {
1689 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1691 }
1692
1693 tx_effects.extend(sorted);
1694 tx_roots.extend(root_digests);
1695 }
1696
1697 Ok((tx_effects, tx_roots))
1698 }
1699
1700 fn extract_consensus_commit_prologue(
1705 &self,
1706 root_digests: &[TransactionDigest],
1707 root_effects: &[TransactionEffects],
1708 ) -> SuiResult<Option<(TransactionDigest, TransactionEffects)>> {
1709 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1710 if root_digests.is_empty() {
1711 return Ok(None);
1712 }
1713
1714 let first_tx = self
1719 .state
1720 .get_transaction_cache_reader()
1721 .get_transaction_block(&root_digests[0])
1722 .expect("Transaction block must exist");
1723
1724 Ok(first_tx
1725 .transaction_data()
1726 .is_consensus_commit_prologue()
1727 .then(|| {
1728 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1729 (*first_tx.digest(), root_effects[0].clone())
1730 }))
1731 }
1732
1733 #[instrument(level = "debug", skip_all)]
1734 async fn write_checkpoints(
1735 &mut self,
1736 height: CheckpointHeight,
1737 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1738 ) -> SuiResult {
1739 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1740 let mut batch = self.store.tables.checkpoint_content.batch();
1741 let mut all_tx_digests =
1742 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1743
1744 for (summary, contents) in &new_checkpoints {
1745 debug!(
1746 checkpoint_commit_height = height,
1747 checkpoint_seq = summary.sequence_number,
1748 contents_digest = ?contents.digest(),
1749 "writing checkpoint",
1750 );
1751
1752 if let Some(previously_computed_summary) = self
1753 .store
1754 .tables
1755 .locally_computed_checkpoints
1756 .get(&summary.sequence_number)?
1757 && previously_computed_summary.digest() != summary.digest()
1758 {
1759 fatal!(
1760 "Checkpoint {} was previously built with a different result: previously_computed_summary {:?} vs current_summary {:?}",
1761 summary.sequence_number,
1762 previously_computed_summary.digest(),
1763 summary.digest()
1764 );
1765 }
1766
1767 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1768
1769 self.metrics
1770 .transactions_included_in_checkpoint
1771 .inc_by(contents.size() as u64);
1772 let sequence_number = summary.sequence_number;
1773 self.metrics
1774 .last_constructed_checkpoint
1775 .set(sequence_number as i64);
1776
1777 batch.insert_batch(
1778 &self.store.tables.checkpoint_content,
1779 [(contents.digest(), contents)],
1780 )?;
1781
1782 batch.insert_batch(
1783 &self.store.tables.locally_computed_checkpoints,
1784 [(sequence_number, summary)],
1785 )?;
1786 }
1787
1788 batch.write()?;
1789
1790 for (summary, contents) in &new_checkpoints {
1792 self.output
1793 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1794 .await?;
1795 }
1796
1797 for (local_checkpoint, _) in &new_checkpoints {
1798 if let Some(certified_checkpoint) = self
1799 .store
1800 .tables
1801 .certified_checkpoints
1802 .get(local_checkpoint.sequence_number())?
1803 {
1804 self.store
1805 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1806 }
1807 }
1808
1809 self.notify_aggregator.notify_one();
1810 self.epoch_store
1811 .process_constructed_checkpoint(height, new_checkpoints);
1812 Ok(())
1813 }
1814
1815 #[allow(clippy::type_complexity)]
1816 fn split_checkpoint_chunks(
1817 &self,
1818 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1819 signatures: Vec<Vec<GenericSignature>>,
1820 ) -> CheckpointBuilderResult<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1821 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1822 let mut chunks = Vec::new();
1823 let mut chunk = Vec::new();
1824 let mut chunk_size: usize = 0;
1825 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1826 .into_iter()
1827 .zip(signatures.into_iter())
1828 {
1829 let size = transaction_size
1834 + bcs::serialized_size(&effects)?
1835 + bcs::serialized_size(&signatures)?;
1836 if chunk.len() == self.max_transactions_per_checkpoint
1837 || (chunk_size + size) > self.max_checkpoint_size_bytes
1838 {
1839 if chunk.is_empty() {
1840 warn!(
1842 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1843 self.max_checkpoint_size_bytes
1844 );
1845 } else {
1846 chunks.push(chunk);
1847 chunk = Vec::new();
1848 chunk_size = 0;
1849 }
1850 }
1851
1852 chunk.push((effects, signatures));
1853 chunk_size += size;
1854 }
1855
1856 if !chunk.is_empty() || chunks.is_empty() {
1857 chunks.push(chunk);
1862 }
1867 Ok(chunks)
1868 }
1869
1870 fn load_last_built_checkpoint_summary(
1871 epoch_store: &AuthorityPerEpochStore,
1872 store: &CheckpointStore,
1873 ) -> SuiResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1874 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1875 if last_checkpoint.is_none() {
1876 let epoch = epoch_store.epoch();
1877 if epoch > 0 {
1878 let previous_epoch = epoch - 1;
1879 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1880 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1881 if let Some((ref seq, _)) = last_checkpoint {
1882 debug!(
1883 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1884 );
1885 } else {
1886 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1888 }
1889 }
1890 }
1891 Ok(last_checkpoint)
1892 }
1893
1894 #[instrument(level = "debug", skip_all)]
1895 async fn create_checkpoints(
1896 &self,
1897 all_effects: Vec<TransactionEffects>,
1898 details: &PendingCheckpointInfo,
1899 all_roots: &HashSet<TransactionDigest>,
1900 ) -> CheckpointBuilderResult<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1901 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1902
1903 let total = all_effects.len();
1904 let mut last_checkpoint =
1905 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1906 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1907 info!(
1908 checkpoint_commit_height = details.checkpoint_height,
1909 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1910 checkpoint_timestamp = details.timestamp_ms,
1911 "Creating checkpoint(s) for {} transactions",
1912 all_effects.len(),
1913 );
1914
1915 let all_digests: Vec<_> = all_effects
1916 .iter()
1917 .map(|effect| *effect.transaction_digest())
1918 .collect();
1919 let transactions_and_sizes = self
1920 .state
1921 .get_transaction_cache_reader()
1922 .get_transactions_and_serialized_sizes(&all_digests)?;
1923 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1924 let mut transactions = Vec::with_capacity(all_effects.len());
1925 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1926 let mut randomness_rounds = BTreeMap::new();
1927 {
1928 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1929 debug!(
1930 ?last_checkpoint_seq,
1931 "Waiting for {:?} certificates to appear in consensus",
1932 all_effects.len()
1933 );
1934
1935 for (effects, transaction_and_size) in all_effects
1936 .into_iter()
1937 .zip(transactions_and_sizes.into_iter())
1938 {
1939 let (transaction, size) = transaction_and_size
1940 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1941 match transaction.inner().transaction_data().kind() {
1942 TransactionKind::ConsensusCommitPrologue(_)
1943 | TransactionKind::ConsensusCommitPrologueV2(_)
1944 | TransactionKind::ConsensusCommitPrologueV3(_)
1945 | TransactionKind::ConsensusCommitPrologueV4(_)
1946 | TransactionKind::AuthenticatorStateUpdate(_) => {
1947 }
1950 TransactionKind::ProgrammableSystemTransaction(_) => {
1951 }
1953 TransactionKind::ChangeEpoch(_)
1954 | TransactionKind::Genesis(_)
1955 | TransactionKind::EndOfEpochTransaction(_) => {
1956 fatal!(
1957 "unexpected transaction in checkpoint effects: {:?}",
1958 transaction
1959 );
1960 }
1961 TransactionKind::RandomnessStateUpdate(rsu) => {
1962 randomness_rounds
1963 .insert(*effects.transaction_digest(), rsu.randomness_round);
1964 }
1965 TransactionKind::ProgrammableTransaction(_) => {
1966 let digest = *effects.transaction_digest();
1970 if !all_roots.contains(&digest) {
1971 transaction_keys.push(SequencedConsensusTransactionKey::External(
1972 ConsensusTransactionKey::Certificate(digest),
1973 ));
1974 }
1975 }
1976 }
1977 transactions.push(transaction);
1978 all_effects_and_transaction_sizes.push((effects, size));
1979 }
1980
1981 self.epoch_store
1982 .consensus_messages_processed_notify(transaction_keys)
1983 .await?;
1984 }
1985
1986 let signatures = self
1987 .epoch_store
1988 .user_signatures_for_checkpoint(&transactions, &all_digests);
1989 debug!(
1990 ?last_checkpoint_seq,
1991 "Received {} checkpoint user signatures from consensus",
1992 signatures.len()
1993 );
1994
1995 let mut end_of_epoch_observation_keys: Option<Vec<_>> = if details.last_of_epoch {
1996 Some(
1997 transactions
1998 .iter()
1999 .flat_map(|tx| {
2000 if let TransactionKind::ProgrammableTransaction(ptb) =
2001 tx.transaction_data().kind()
2002 {
2003 itertools::Either::Left(
2004 ptb.commands
2005 .iter()
2006 .map(ExecutionTimeObservationKey::from_command),
2007 )
2008 } else {
2009 itertools::Either::Right(std::iter::empty())
2010 }
2011 })
2012 .collect(),
2013 )
2014 } else {
2015 None
2016 };
2017
2018 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
2019 let chunks_count = chunks.len();
2020
2021 let mut checkpoints = Vec::with_capacity(chunks_count);
2022 debug!(
2023 ?last_checkpoint_seq,
2024 "Creating {} checkpoints with {} transactions", chunks_count, total,
2025 );
2026
2027 let epoch = self.epoch_store.epoch();
2028 for (index, transactions) in chunks.into_iter().enumerate() {
2029 let first_checkpoint_of_epoch = index == 0
2030 && last_checkpoint
2031 .as_ref()
2032 .map(|(_, c)| c.epoch != epoch)
2033 .unwrap_or(true);
2034 if first_checkpoint_of_epoch {
2035 self.epoch_store
2036 .record_epoch_first_checkpoint_creation_time_metric();
2037 }
2038 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
2039
2040 let sequence_number = last_checkpoint
2041 .as_ref()
2042 .map(|(_, c)| c.sequence_number + 1)
2043 .unwrap_or_default();
2044 let mut timestamp_ms = details.timestamp_ms;
2045 if let Some((_, last_checkpoint)) = &last_checkpoint
2046 && last_checkpoint.timestamp_ms > timestamp_ms
2047 {
2048 debug!(
2050 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
2051 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
2052 );
2053 if self
2054 .epoch_store
2055 .protocol_config()
2056 .enforce_checkpoint_timestamp_monotonicity()
2057 {
2058 timestamp_ms = last_checkpoint.timestamp_ms;
2059 }
2060 }
2061
2062 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
2063 let epoch_rolling_gas_cost_summary =
2064 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
2065
2066 let end_of_epoch_data = if last_checkpoint_of_epoch {
2067 let system_state_obj = self
2068 .augment_epoch_last_checkpoint(
2069 &epoch_rolling_gas_cost_summary,
2070 timestamp_ms,
2071 &mut effects,
2072 &mut signatures,
2073 sequence_number,
2074 std::mem::take(&mut end_of_epoch_observation_keys).expect("end_of_epoch_observation_keys must be populated for the last checkpoint"),
2075 last_checkpoint_seq.unwrap_or_default(),
2076 )
2077 .await?;
2078
2079 let committee = system_state_obj
2080 .get_current_epoch_committee()
2081 .committee()
2082 .clone();
2083
2084 let root_state_digest = {
2087 let state_acc = self
2088 .global_state_hasher
2089 .upgrade()
2090 .expect("No checkpoints should be getting built after local configuration");
2091 let acc = state_acc.accumulate_checkpoint(
2092 &effects,
2093 sequence_number,
2094 &self.epoch_store,
2095 )?;
2096
2097 state_acc
2098 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
2099 .await?;
2100
2101 state_acc.accumulate_running_root(
2102 &self.epoch_store,
2103 sequence_number,
2104 Some(acc),
2105 )?;
2106 state_acc
2107 .digest_epoch(self.epoch_store.clone(), sequence_number)
2108 .await?
2109 };
2110 self.metrics.highest_accumulated_epoch.set(epoch as i64);
2111 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
2112
2113 let epoch_commitments = if self
2114 .epoch_store
2115 .protocol_config()
2116 .check_commit_root_state_digest_supported()
2117 {
2118 vec![root_state_digest.into()]
2119 } else {
2120 vec![]
2121 };
2122
2123 Some(EndOfEpochData {
2124 next_epoch_committee: committee.voting_rights,
2125 next_epoch_protocol_version: ProtocolVersion::new(
2126 system_state_obj.protocol_version(),
2127 ),
2128 epoch_commitments,
2129 })
2130 } else {
2131 self.send_to_hasher
2132 .send((sequence_number, effects.clone()))
2133 .await?;
2134
2135 None
2136 };
2137 let contents = CheckpointContents::new_with_digests_and_signatures(
2138 effects.iter().map(TransactionEffects::execution_digests),
2139 signatures,
2140 );
2141
2142 let num_txns = contents.size() as u64;
2143
2144 let network_total_transactions = last_checkpoint
2145 .as_ref()
2146 .map(|(_, c)| c.network_total_transactions + num_txns)
2147 .unwrap_or(num_txns);
2148
2149 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
2150
2151 let matching_randomness_rounds: Vec<_> = effects
2152 .iter()
2153 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
2154 .copied()
2155 .collect();
2156
2157 let checkpoint_commitments = if self
2158 .epoch_store
2159 .protocol_config()
2160 .include_checkpoint_artifacts_digest_in_summary()
2161 {
2162 let artifacts = CheckpointArtifacts::from(&effects[..]);
2163 let artifacts_digest = artifacts.digest()?;
2164 vec![artifacts_digest.into()]
2165 } else {
2166 Default::default()
2167 };
2168
2169 let summary = CheckpointSummary::new(
2170 self.epoch_store.protocol_config(),
2171 epoch,
2172 sequence_number,
2173 network_total_transactions,
2174 &contents,
2175 previous_digest,
2176 epoch_rolling_gas_cost_summary,
2177 end_of_epoch_data,
2178 timestamp_ms,
2179 matching_randomness_rounds,
2180 checkpoint_commitments,
2181 );
2182 summary.report_checkpoint_age(
2183 &self.metrics.last_created_checkpoint_age,
2184 &self.metrics.last_created_checkpoint_age_ms,
2185 );
2186 if last_checkpoint_of_epoch {
2187 info!(
2188 checkpoint_seq = sequence_number,
2189 "creating last checkpoint of epoch {}", epoch
2190 );
2191 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
2192 self.epoch_store
2193 .report_epoch_metrics_at_last_checkpoint(stats);
2194 }
2195 }
2196 last_checkpoint = Some((sequence_number, summary.clone()));
2197 checkpoints.push((summary, contents));
2198 }
2199
2200 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
2201 }
2202
2203 fn get_epoch_total_gas_cost(
2204 &self,
2205 last_checkpoint: Option<&CheckpointSummary>,
2206 cur_checkpoint_effects: &[TransactionEffects],
2207 ) -> GasCostSummary {
2208 let (previous_epoch, previous_gas_costs) = last_checkpoint
2209 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
2210 .unwrap_or_default();
2211 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
2212 if previous_epoch == self.epoch_store.epoch() {
2213 GasCostSummary::new(
2215 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
2216 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
2217 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
2218 previous_gas_costs.non_refundable_storage_fee
2219 + current_gas_costs.non_refundable_storage_fee,
2220 )
2221 } else {
2222 current_gas_costs
2223 }
2224 }
2225
2226 #[instrument(level = "error", skip_all)]
2227 async fn augment_epoch_last_checkpoint(
2228 &self,
2229 epoch_total_gas_cost: &GasCostSummary,
2230 epoch_start_timestamp_ms: CheckpointTimestamp,
2231 checkpoint_effects: &mut Vec<TransactionEffects>,
2232 signatures: &mut Vec<Vec<GenericSignature>>,
2233 checkpoint: CheckpointSequenceNumber,
2234 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
2235 last_checkpoint: CheckpointSequenceNumber,
2238 ) -> CheckpointBuilderResult<SuiSystemState> {
2239 let (system_state, effects) = self
2240 .state
2241 .create_and_execute_advance_epoch_tx(
2242 &self.epoch_store,
2243 epoch_total_gas_cost,
2244 checkpoint,
2245 epoch_start_timestamp_ms,
2246 end_of_epoch_observation_keys,
2247 last_checkpoint,
2248 )
2249 .await?;
2250 checkpoint_effects.push(effects);
2251 signatures.push(vec![]);
2252 Ok(system_state)
2253 }
2254
2255 #[instrument(level = "debug", skip_all)]
2262 fn complete_checkpoint_effects(
2263 &self,
2264 mut roots: Vec<TransactionEffects>,
2265 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
2266 ) -> SuiResult<Vec<TransactionEffects>> {
2267 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
2268 let mut results = vec![];
2269 let mut seen = HashSet::new();
2270 loop {
2271 let mut pending = HashSet::new();
2272
2273 let transactions_included = self
2274 .epoch_store
2275 .builder_included_transactions_in_checkpoint(
2276 roots.iter().map(|e| e.transaction_digest()),
2277 )?;
2278
2279 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
2280 let digest = effect.transaction_digest();
2281 seen.insert(*digest);
2283
2284 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
2286 continue;
2287 }
2288
2289 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
2291 continue;
2292 }
2293
2294 let existing_effects = self
2295 .epoch_store
2296 .transactions_executed_in_cur_epoch(effect.dependencies())?;
2297
2298 for (dependency, effects_signature_exists) in
2299 effect.dependencies().iter().zip(existing_effects.iter())
2300 {
2301 if !effects_signature_exists {
2306 continue;
2307 }
2308 if seen.insert(*dependency) {
2309 pending.insert(*dependency);
2310 }
2311 }
2312 results.push(effect);
2313 }
2314 if pending.is_empty() {
2315 break;
2316 }
2317 let pending = pending.into_iter().collect::<Vec<_>>();
2318 let effects = self.effects_store.multi_get_executed_effects(&pending);
2319 let effects = effects
2320 .into_iter()
2321 .zip(pending)
2322 .map(|(opt, digest)| match opt {
2323 Some(x) => x,
2324 None => panic!(
2325 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
2326 digest
2327 ),
2328 })
2329 .collect::<Vec<_>>();
2330 roots = effects;
2331 }
2332
2333 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
2334 Ok(results)
2335 }
2336
2337 #[cfg(msim)]
2340 fn expensive_consensus_commit_prologue_invariants_check(
2341 &self,
2342 root_digests: &[TransactionDigest],
2343 sorted: &[TransactionEffects],
2344 ) {
2345 if !self
2346 .epoch_store
2347 .protocol_config()
2348 .prepend_prologue_tx_in_consensus_commit_in_checkpoints()
2349 {
2350 return;
2351 }
2352
2353 let root_txs = self
2355 .state
2356 .get_transaction_cache_reader()
2357 .multi_get_transaction_blocks(root_digests);
2358 let ccps = root_txs
2359 .iter()
2360 .filter_map(|tx| {
2361 if let Some(tx) = tx {
2362 if tx.transaction_data().is_consensus_commit_prologue() {
2363 Some(tx)
2364 } else {
2365 None
2366 }
2367 } else {
2368 None
2369 }
2370 })
2371 .collect::<Vec<_>>();
2372
2373 assert!(ccps.len() <= 1);
2375
2376 let txs = self
2378 .state
2379 .get_transaction_cache_reader()
2380 .multi_get_transaction_blocks(
2381 &sorted
2382 .iter()
2383 .map(|tx| tx.transaction_digest().clone())
2384 .collect::<Vec<_>>(),
2385 );
2386
2387 if ccps.len() == 0 {
2388 for tx in txs.iter() {
2391 if let Some(tx) = tx {
2392 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2393 }
2394 }
2395 } else {
2396 assert!(
2398 txs[0]
2399 .as_ref()
2400 .unwrap()
2401 .transaction_data()
2402 .is_consensus_commit_prologue()
2403 );
2404
2405 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
2406
2407 for tx in txs.iter().skip(1) {
2408 if let Some(tx) = tx {
2409 assert!(!tx.transaction_data().is_consensus_commit_prologue());
2410 }
2411 }
2412 }
2413 }
2414}
2415
2416impl CheckpointAggregator {
2417 fn new(
2418 tables: Arc<CheckpointStore>,
2419 epoch_store: Arc<AuthorityPerEpochStore>,
2420 notify: Arc<Notify>,
2421 output: Box<dyn CertifiedCheckpointOutput>,
2422 state: Arc<AuthorityState>,
2423 metrics: Arc<CheckpointMetrics>,
2424 ) -> Self {
2425 let current = None;
2426 Self {
2427 store: tables,
2428 epoch_store,
2429 notify,
2430 current,
2431 output,
2432 state,
2433 metrics,
2434 }
2435 }
2436
2437 async fn run(mut self) {
2438 info!("Starting CheckpointAggregator");
2439 loop {
2440 if let Err(e) = self.run_and_notify().await {
2441 error!(
2442 "Error while aggregating checkpoint, will retry in 1s: {:?}",
2443 e
2444 );
2445 self.metrics.checkpoint_errors.inc();
2446 tokio::time::sleep(Duration::from_secs(1)).await;
2447 continue;
2448 }
2449
2450 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
2451 }
2452 }
2453
2454 async fn run_and_notify(&mut self) -> SuiResult {
2455 let summaries = self.run_inner()?;
2456 for summary in summaries {
2457 self.output.certified_checkpoint_created(&summary).await?;
2458 }
2459 Ok(())
2460 }
2461
2462 fn run_inner(&mut self) -> SuiResult<Vec<CertifiedCheckpointSummary>> {
2463 let _scope = monitored_scope("CheckpointAggregator");
2464 let mut result = vec![];
2465 'outer: loop {
2466 let next_to_certify = self.next_checkpoint_to_certify()?;
2467 let current = if let Some(current) = &mut self.current {
2468 if current.summary.sequence_number < next_to_certify {
2474 assert_reachable!("skip checkpoint certification");
2475 self.current = None;
2476 continue;
2477 }
2478 current
2479 } else {
2480 let Some(summary) = self
2481 .epoch_store
2482 .get_built_checkpoint_summary(next_to_certify)?
2483 else {
2484 return Ok(result);
2485 };
2486 self.current = Some(CheckpointSignatureAggregator {
2487 next_index: 0,
2488 digest: summary.digest(),
2489 summary,
2490 signatures_by_digest: MultiStakeAggregator::new(
2491 self.epoch_store.committee().clone(),
2492 ),
2493 store: self.store.clone(),
2494 state: self.state.clone(),
2495 metrics: self.metrics.clone(),
2496 });
2497 self.current.as_mut().unwrap()
2498 };
2499
2500 let epoch_tables = self
2501 .epoch_store
2502 .tables()
2503 .expect("should not run past end of epoch");
2504 let iter = epoch_tables
2505 .pending_checkpoint_signatures
2506 .safe_iter_with_bounds(
2507 Some((current.summary.sequence_number, current.next_index)),
2508 None,
2509 );
2510 for item in iter {
2511 let ((seq, index), data) = item?;
2512 if seq != current.summary.sequence_number {
2513 trace!(
2514 checkpoint_seq =? current.summary.sequence_number,
2515 "Not enough checkpoint signatures",
2516 );
2517 return Ok(result);
2519 }
2520 trace!(
2521 checkpoint_seq = current.summary.sequence_number,
2522 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2523 current.summary.digest(),
2524 data.summary.auth_sig().authority.concise()
2525 );
2526 self.metrics
2527 .checkpoint_participation
2528 .with_label_values(&[&format!(
2529 "{:?}",
2530 data.summary.auth_sig().authority.concise()
2531 )])
2532 .inc();
2533 if let Ok(auth_signature) = current.try_aggregate(data) {
2534 debug!(
2535 checkpoint_seq = current.summary.sequence_number,
2536 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2537 current.summary.digest(),
2538 );
2539 let summary = VerifiedCheckpoint::new_unchecked(
2540 CertifiedCheckpointSummary::new_from_data_and_sig(
2541 current.summary.clone(),
2542 auth_signature,
2543 ),
2544 );
2545
2546 self.store.insert_certified_checkpoint(&summary)?;
2547 self.metrics
2548 .last_certified_checkpoint
2549 .set(current.summary.sequence_number as i64);
2550 current.summary.report_checkpoint_age(
2551 &self.metrics.last_certified_checkpoint_age,
2552 &self.metrics.last_certified_checkpoint_age_ms,
2553 );
2554 result.push(summary.into_inner());
2555 self.current = None;
2556 continue 'outer;
2557 } else {
2558 current.next_index = index + 1;
2559 }
2560 }
2561 break;
2562 }
2563 Ok(result)
2564 }
2565
2566 fn next_checkpoint_to_certify(&self) -> SuiResult<CheckpointSequenceNumber> {
2567 Ok(self
2568 .store
2569 .tables
2570 .certified_checkpoints
2571 .reversed_safe_iter_with_bounds(None, None)?
2572 .next()
2573 .transpose()?
2574 .map(|(seq, _)| seq + 1)
2575 .unwrap_or_default())
2576 }
2577}
2578
2579impl CheckpointSignatureAggregator {
2580 #[allow(clippy::result_unit_err)]
2581 pub fn try_aggregate(
2582 &mut self,
2583 data: CheckpointSignatureMessage,
2584 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2585 let their_digest = *data.summary.digest();
2586 let (_, signature) = data.summary.into_data_and_sig();
2587 let author = signature.authority;
2588 let envelope =
2589 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2590 match self.signatures_by_digest.insert(their_digest, envelope) {
2591 InsertResult::Failed { error }
2593 if matches!(
2594 error.as_inner(),
2595 SuiErrorKind::StakeAggregatorRepeatedSigner {
2596 conflicting_sig: false,
2597 ..
2598 },
2599 ) =>
2600 {
2601 Err(())
2602 }
2603 InsertResult::Failed { error } => {
2604 warn!(
2605 checkpoint_seq = self.summary.sequence_number,
2606 "Failed to aggregate new signature from validator {:?}: {:?}",
2607 author.concise(),
2608 error
2609 );
2610 self.check_for_split_brain();
2611 Err(())
2612 }
2613 InsertResult::QuorumReached(cert) => {
2614 if their_digest != self.digest {
2617 self.metrics.remote_checkpoint_forks.inc();
2618 warn!(
2619 checkpoint_seq = self.summary.sequence_number,
2620 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2621 author.concise(),
2622 their_digest,
2623 self.digest
2624 );
2625 return Err(());
2626 }
2627 Ok(cert)
2628 }
2629 InsertResult::NotEnoughVotes {
2630 bad_votes: _,
2631 bad_authorities: _,
2632 } => {
2633 self.check_for_split_brain();
2634 Err(())
2635 }
2636 }
2637 }
2638
2639 fn check_for_split_brain(&self) {
2643 debug!(
2644 checkpoint_seq = self.summary.sequence_number,
2645 "Checking for split brain condition"
2646 );
2647 if self.signatures_by_digest.quorum_unreachable() {
2648 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2654 let digests_by_stake_messages = all_unique_values
2655 .iter()
2656 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2657 .map(|(digest, (_authorities, total_stake))| {
2658 format!("{:?} (total stake: {})", digest, total_stake)
2659 })
2660 .collect::<Vec<String>>();
2661 fail_point_arg!("kill_split_brain_node", |(
2662 checkpoint_overrides,
2663 forked_authorities,
2664 ): (
2665 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<u64, String>>>,
2666 std::sync::Arc<std::sync::Mutex<std::collections::HashSet<AuthorityName>>>,
2667 )| {
2668 #[cfg(msim)]
2669 {
2670 if let (Ok(mut overrides), Ok(forked_authorities_set)) =
2671 (checkpoint_overrides.lock(), forked_authorities.lock())
2672 {
2673 let correct_digest = all_unique_values
2675 .iter()
2676 .find(|(_, (authorities, _))| {
2677 authorities
2679 .iter()
2680 .any(|auth| !forked_authorities_set.contains(auth))
2681 })
2682 .map(|(digest, _)| digest.to_string())
2683 .unwrap_or_else(|| {
2684 all_unique_values
2686 .iter()
2687 .max_by_key(|(_, (_, stake))| *stake)
2688 .map(|(digest, _)| digest.to_string())
2689 .unwrap_or_else(|| self.digest.to_string())
2690 });
2691
2692 overrides.insert(self.summary.sequence_number, correct_digest.clone());
2693
2694 tracing::error!(
2695 fatal = true,
2696 "Fork recovery test: detected split-brain for sequence number: {}, using digest: {}",
2697 self.summary.sequence_number,
2698 correct_digest
2699 );
2700 }
2701 }
2702 });
2703
2704 debug_fatal!(
2705 "Split brain detected in checkpoint signature aggregation for checkpoint {:?}. Remaining stake: {:?}, Digests by stake: {:?}",
2706 self.summary.sequence_number,
2707 self.signatures_by_digest.uncommitted_stake(),
2708 digests_by_stake_messages
2709 );
2710 self.metrics.split_brain_checkpoint_forks.inc();
2711
2712 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2713 let local_summary = self.summary.clone();
2714 let state = self.state.clone();
2715 let tables = self.store.clone();
2716
2717 tokio::spawn(async move {
2718 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2719 });
2720 }
2721 }
2722}
2723
2724async fn diagnose_split_brain(
2730 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2731 local_summary: CheckpointSummary,
2732 state: Arc<AuthorityState>,
2733 tables: Arc<CheckpointStore>,
2734) {
2735 debug!(
2736 checkpoint_seq = local_summary.sequence_number,
2737 "Running split brain diagnostics..."
2738 );
2739 let time = SystemTime::now();
2740 let digest_to_validator = all_unique_values
2742 .iter()
2743 .filter_map(|(digest, (validators, _))| {
2744 if *digest != local_summary.digest() {
2745 let random_validator = validators.choose(&mut get_rng()).unwrap();
2746 Some((*digest, *random_validator))
2747 } else {
2748 None
2749 }
2750 })
2751 .collect::<HashMap<_, _>>();
2752 if digest_to_validator.is_empty() {
2753 panic!(
2754 "Given split brain condition, there should be at \
2755 least one validator that disagrees with local signature"
2756 );
2757 }
2758
2759 let epoch_store = state.load_epoch_store_one_call_per_task();
2760 let committee = epoch_store
2761 .epoch_start_state()
2762 .get_sui_committee_with_network_metadata();
2763 let network_config = default_mysten_network_config();
2764 let network_clients =
2765 make_network_authority_clients_with_network_config(&committee, &network_config);
2766
2767 let response_futures = digest_to_validator
2769 .values()
2770 .cloned()
2771 .map(|validator| {
2772 let client = network_clients
2773 .get(&validator)
2774 .expect("Failed to get network client");
2775 let request = CheckpointRequestV2 {
2776 sequence_number: Some(local_summary.sequence_number),
2777 request_content: true,
2778 certified: false,
2779 };
2780 client.handle_checkpoint_v2(request)
2781 })
2782 .collect::<Vec<_>>();
2783
2784 let digest_name_pair = digest_to_validator.iter();
2785 let response_data = futures::future::join_all(response_futures)
2786 .await
2787 .into_iter()
2788 .zip(digest_name_pair)
2789 .filter_map(|(response, (digest, name))| match response {
2790 Ok(response) => match response {
2791 CheckpointResponseV2 {
2792 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2793 contents: Some(contents),
2794 } => Some((*name, *digest, summary, contents)),
2795 CheckpointResponseV2 {
2796 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2797 contents: _,
2798 } => {
2799 panic!("Expected pending checkpoint, but got certified checkpoint");
2800 }
2801 CheckpointResponseV2 {
2802 checkpoint: None,
2803 contents: _,
2804 } => {
2805 error!(
2806 "Summary for checkpoint {:?} not found on validator {:?}",
2807 local_summary.sequence_number, name
2808 );
2809 None
2810 }
2811 CheckpointResponseV2 {
2812 checkpoint: _,
2813 contents: None,
2814 } => {
2815 error!(
2816 "Contents for checkpoint {:?} not found on validator {:?}",
2817 local_summary.sequence_number, name
2818 );
2819 None
2820 }
2821 },
2822 Err(e) => {
2823 error!(
2824 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2825 e
2826 );
2827 None
2828 }
2829 })
2830 .collect::<Vec<_>>();
2831
2832 let local_checkpoint_contents = tables
2833 .get_checkpoint_contents(&local_summary.content_digest)
2834 .unwrap_or_else(|_| {
2835 panic!(
2836 "Could not find checkpoint contents for digest {:?}",
2837 local_summary.digest()
2838 )
2839 })
2840 .unwrap_or_else(|| {
2841 panic!(
2842 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2843 local_summary.sequence_number,
2844 local_summary.digest()
2845 )
2846 });
2847 let local_contents_text = format!("{local_checkpoint_contents:?}");
2848
2849 let local_summary_text = format!("{local_summary:?}");
2850 let local_validator = state.name.concise();
2851 let diff_patches = response_data
2852 .iter()
2853 .map(|(name, other_digest, other_summary, contents)| {
2854 let other_contents_text = format!("{contents:?}");
2855 let other_summary_text = format!("{other_summary:?}");
2856 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2857 .enumerate_transactions(&local_summary)
2858 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2859 .unzip();
2860 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2861 .enumerate_transactions(other_summary)
2862 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2863 .unzip();
2864 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2865 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2866 let local_transactions_text = format!("{local_transactions:#?}");
2867 let other_transactions_text = format!("{other_transactions:#?}");
2868 let transactions_patch =
2869 create_patch(&local_transactions_text, &other_transactions_text);
2870 let local_effects_text = format!("{local_effects:#?}");
2871 let other_effects_text = format!("{other_effects:#?}");
2872 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2873 let seq_number = local_summary.sequence_number;
2874 let local_digest = local_summary.digest();
2875 let other_validator = name.concise();
2876 format!(
2877 "Checkpoint: {seq_number:?}\n\
2878 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2879 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2880 Summary Diff: \n{summary_patch}\n\n\
2881 Contents Diff: \n{contents_patch}\n\n\
2882 Transactions Diff: \n{transactions_patch}\n\n\
2883 Effects Diff: \n{effects_patch}",
2884 )
2885 })
2886 .collect::<Vec<_>>()
2887 .join("\n\n\n");
2888
2889 let header = format!(
2890 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2891 Datetime: {:?}",
2892 time
2893 );
2894 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2895 let path = tempfile::tempdir()
2896 .expect("Failed to create tempdir")
2897 .keep()
2898 .join(Path::new("checkpoint_fork_dump.txt"));
2899 let mut file = File::create(path).unwrap();
2900 write!(file, "{}", fork_logs_text).unwrap();
2901 debug!("{}", fork_logs_text);
2902}
2903
2904pub trait CheckpointServiceNotify {
2905 fn notify_checkpoint_signature(
2906 &self,
2907 epoch_store: &AuthorityPerEpochStore,
2908 info: &CheckpointSignatureMessage,
2909 ) -> SuiResult;
2910
2911 fn notify_checkpoint(&self) -> SuiResult;
2912}
2913
2914#[allow(clippy::large_enum_variant)]
2915enum CheckpointServiceState {
2916 Unstarted(
2917 (
2918 CheckpointBuilder,
2919 CheckpointAggregator,
2920 CheckpointStateHasher,
2921 ),
2922 ),
2923 Started,
2924}
2925
2926impl CheckpointServiceState {
2927 fn take_unstarted(
2928 &mut self,
2929 ) -> (
2930 CheckpointBuilder,
2931 CheckpointAggregator,
2932 CheckpointStateHasher,
2933 ) {
2934 let mut state = CheckpointServiceState::Started;
2935 std::mem::swap(self, &mut state);
2936
2937 match state {
2938 CheckpointServiceState::Unstarted((builder, aggregator, hasher)) => {
2939 (builder, aggregator, hasher)
2940 }
2941 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2942 }
2943 }
2944}
2945
2946pub struct CheckpointService {
2947 tables: Arc<CheckpointStore>,
2948 notify_builder: Arc<Notify>,
2949 notify_aggregator: Arc<Notify>,
2950 last_signature_index: Mutex<u64>,
2951 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2953 highest_previously_built_seq: CheckpointSequenceNumber,
2956 metrics: Arc<CheckpointMetrics>,
2957 state: Mutex<CheckpointServiceState>,
2958}
2959
2960impl CheckpointService {
2961 pub fn build(
2963 state: Arc<AuthorityState>,
2964 checkpoint_store: Arc<CheckpointStore>,
2965 epoch_store: Arc<AuthorityPerEpochStore>,
2966 effects_store: Arc<dyn TransactionCacheRead>,
2967 global_state_hasher: Weak<GlobalStateHasher>,
2968 checkpoint_output: Box<dyn CheckpointOutput>,
2969 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2970 metrics: Arc<CheckpointMetrics>,
2971 max_transactions_per_checkpoint: usize,
2972 max_checkpoint_size_bytes: usize,
2973 ) -> Arc<Self> {
2974 info!(
2975 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2976 );
2977 let notify_builder = Arc::new(Notify::new());
2978 let notify_aggregator = Arc::new(Notify::new());
2979
2980 let highest_previously_built_seq = checkpoint_store
2982 .get_latest_locally_computed_checkpoint()
2983 .expect("failed to get latest locally computed checkpoint")
2984 .map(|s| s.sequence_number)
2985 .unwrap_or(0);
2986
2987 let highest_currently_built_seq =
2988 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2989 .expect("epoch should not have ended")
2990 .map(|(seq, _)| seq)
2991 .unwrap_or(0);
2992
2993 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2994
2995 let aggregator = CheckpointAggregator::new(
2996 checkpoint_store.clone(),
2997 epoch_store.clone(),
2998 notify_aggregator.clone(),
2999 certified_checkpoint_output,
3000 state.clone(),
3001 metrics.clone(),
3002 );
3003
3004 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
3005
3006 let ckpt_state_hasher = CheckpointStateHasher::new(
3007 epoch_store.clone(),
3008 global_state_hasher.clone(),
3009 receive_from_builder,
3010 );
3011
3012 let builder = CheckpointBuilder::new(
3013 state.clone(),
3014 checkpoint_store.clone(),
3015 epoch_store.clone(),
3016 notify_builder.clone(),
3017 effects_store,
3018 global_state_hasher,
3019 send_to_hasher,
3020 checkpoint_output,
3021 notify_aggregator.clone(),
3022 highest_currently_built_seq_tx.clone(),
3023 metrics.clone(),
3024 max_transactions_per_checkpoint,
3025 max_checkpoint_size_bytes,
3026 );
3027
3028 let last_signature_index = epoch_store
3029 .get_last_checkpoint_signature_index()
3030 .expect("should not cross end of epoch");
3031 let last_signature_index = Mutex::new(last_signature_index);
3032
3033 Arc::new(Self {
3034 tables: checkpoint_store,
3035 notify_builder,
3036 notify_aggregator,
3037 last_signature_index,
3038 highest_currently_built_seq_tx,
3039 highest_previously_built_seq,
3040 metrics,
3041 state: Mutex::new(CheckpointServiceState::Unstarted((
3042 builder,
3043 aggregator,
3044 ckpt_state_hasher,
3045 ))),
3046 })
3047 }
3048
3049 pub async fn spawn(
3057 &self,
3058 epoch_store: Arc<AuthorityPerEpochStore>,
3059 consensus_replay_waiter: Option<ReplayWaiter>,
3060 ) {
3061 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
3062
3063 if let Some(last_committed_seq) = self
3066 .tables
3067 .get_highest_executed_checkpoint()
3068 .expect("Failed to get highest executed checkpoint")
3069 .map(|checkpoint| *checkpoint.sequence_number())
3070 {
3071 if let Err(e) = builder
3072 .epoch_store
3073 .clear_state_hashes_after_checkpoint(last_committed_seq)
3074 {
3075 error!(
3076 "Failed to clear state hashes after checkpoint {}: {:?}",
3077 last_committed_seq, e
3078 );
3079 } else {
3080 info!(
3081 "Cleared state hashes after checkpoint {} to ensure consistent ECMH computation",
3082 last_committed_seq
3083 );
3084 }
3085 }
3086
3087 let (builder_finished_tx, builder_finished_rx) = tokio::sync::oneshot::channel();
3088
3089 let state_hasher_task = spawn_monitored_task!(state_hasher.run());
3090 let aggregator_task = spawn_monitored_task!(aggregator.run());
3091
3092 spawn_monitored_task!(async move {
3093 epoch_store
3094 .within_alive_epoch(async move {
3095 builder.run(consensus_replay_waiter).await;
3096 builder_finished_tx.send(()).ok();
3097 })
3098 .await
3099 .ok();
3100
3101 state_hasher_task
3103 .await
3104 .expect("state hasher should exit normally");
3105
3106 aggregator_task.abort();
3109 aggregator_task.await.ok();
3110 });
3111
3112 if tokio::time::timeout(Duration::from_secs(120), async move {
3118 tokio::select! {
3119 _ = builder_finished_rx => { debug!("CheckpointBuilder finished"); }
3120 _ = self.wait_for_rebuilt_checkpoints() => (),
3121 }
3122 })
3123 .await
3124 .is_err()
3125 {
3126 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
3127 }
3128 }
3129}
3130
3131impl CheckpointService {
3132 pub async fn wait_for_rebuilt_checkpoints(&self) {
3138 let highest_previously_built_seq = self.highest_previously_built_seq;
3139 let mut rx = self.highest_currently_built_seq_tx.subscribe();
3140 let mut highest_currently_built_seq = *rx.borrow_and_update();
3141 info!(
3142 "Waiting for checkpoints to be rebuilt, previously built seq: {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
3143 );
3144 loop {
3145 if highest_currently_built_seq >= highest_previously_built_seq {
3146 info!("Checkpoint rebuild complete");
3147 break;
3148 }
3149 rx.changed().await.unwrap();
3150 highest_currently_built_seq = *rx.borrow_and_update();
3151 }
3152 }
3153
3154 #[cfg(test)]
3155 fn write_and_notify_checkpoint_for_testing(
3156 &self,
3157 epoch_store: &AuthorityPerEpochStore,
3158 checkpoint: PendingCheckpoint,
3159 ) -> SuiResult {
3160 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
3161
3162 let mut output = ConsensusCommitOutput::new(0);
3163 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
3164 output.set_default_commit_stats_for_testing();
3165 epoch_store.push_consensus_output_for_tests(output);
3166 self.notify_checkpoint()?;
3167 Ok(())
3168 }
3169}
3170
3171impl CheckpointServiceNotify for CheckpointService {
3172 fn notify_checkpoint_signature(
3173 &self,
3174 epoch_store: &AuthorityPerEpochStore,
3175 info: &CheckpointSignatureMessage,
3176 ) -> SuiResult {
3177 let sequence = info.summary.sequence_number;
3178 let signer = info.summary.auth_sig().authority.concise();
3179
3180 if let Some(highest_verified_checkpoint) = self
3181 .tables
3182 .get_highest_verified_checkpoint()?
3183 .map(|x| *x.sequence_number())
3184 && sequence <= highest_verified_checkpoint
3185 {
3186 trace!(
3187 checkpoint_seq = sequence,
3188 "Ignore checkpoint signature from {} - already certified", signer,
3189 );
3190 self.metrics
3191 .last_ignored_checkpoint_signature_received
3192 .set(sequence as i64);
3193 return Ok(());
3194 }
3195 trace!(
3196 checkpoint_seq = sequence,
3197 "Received checkpoint signature, digest {} from {}",
3198 info.summary.digest(),
3199 signer,
3200 );
3201 self.metrics
3202 .last_received_checkpoint_signatures
3203 .with_label_values(&[&signer.to_string()])
3204 .set(sequence as i64);
3205 let mut index = self.last_signature_index.lock();
3208 *index += 1;
3209 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
3210 self.notify_aggregator.notify_one();
3211 Ok(())
3212 }
3213
3214 fn notify_checkpoint(&self) -> SuiResult {
3215 self.notify_builder.notify_one();
3216 Ok(())
3217 }
3218}
3219
3220pub struct CheckpointServiceNoop {}
3222impl CheckpointServiceNotify for CheckpointServiceNoop {
3223 fn notify_checkpoint_signature(
3224 &self,
3225 _: &AuthorityPerEpochStore,
3226 _: &CheckpointSignatureMessage,
3227 ) -> SuiResult {
3228 Ok(())
3229 }
3230
3231 fn notify_checkpoint(&self) -> SuiResult {
3232 Ok(())
3233 }
3234}
3235
3236impl PendingCheckpoint {
3237 pub fn height(&self) -> CheckpointHeight {
3238 self.details.checkpoint_height
3239 }
3240
3241 pub fn roots(&self) -> &Vec<TransactionKey> {
3242 &self.roots
3243 }
3244
3245 pub fn details(&self) -> &PendingCheckpointInfo {
3246 &self.details
3247 }
3248}
3249
3250pin_project! {
3251 pub struct PollCounter<Fut> {
3252 #[pin]
3253 future: Fut,
3254 count: usize,
3255 }
3256}
3257
3258impl<Fut> PollCounter<Fut> {
3259 pub fn new(future: Fut) -> Self {
3260 Self { future, count: 0 }
3261 }
3262
3263 pub fn count(&self) -> usize {
3264 self.count
3265 }
3266}
3267
3268impl<Fut: Future> Future for PollCounter<Fut> {
3269 type Output = (usize, Fut::Output);
3270
3271 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3272 let this = self.project();
3273 *this.count += 1;
3274 match this.future.poll(cx) {
3275 Poll::Ready(output) => Poll::Ready((*this.count, output)),
3276 Poll::Pending => Poll::Pending,
3277 }
3278 }
3279}
3280
3281fn poll_count<Fut>(future: Fut) -> PollCounter<Fut> {
3282 PollCounter::new(future)
3283}
3284
3285#[cfg(test)]
3286mod tests {
3287 use super::*;
3288 use crate::authority::test_authority_builder::TestAuthorityBuilder;
3289 use crate::transaction_outputs::TransactionOutputs;
3290 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
3291 use futures::FutureExt as _;
3292 use futures::future::BoxFuture;
3293 use std::collections::HashMap;
3294 use std::ops::Deref;
3295 use sui_macros::sim_test;
3296 use sui_protocol_config::{Chain, ProtocolConfig};
3297 use sui_types::accumulator_event::AccumulatorEvent;
3298 use sui_types::authenticator_state::ActiveJwk;
3299 use sui_types::base_types::{SequenceNumber, TransactionEffectsDigest};
3300 use sui_types::crypto::Signature;
3301 use sui_types::effects::{TransactionEffects, TransactionEvents};
3302 use sui_types::messages_checkpoint::SignedCheckpointSummary;
3303 use sui_types::transaction::VerifiedTransaction;
3304 use tokio::sync::mpsc;
3305
3306 #[tokio::test]
3307 async fn test_clear_locally_computed_checkpoints_from_deletes_inclusive_range() {
3308 let store = CheckpointStore::new_for_tests();
3309 let protocol = sui_protocol_config::ProtocolConfig::get_for_max_version_UNSAFE();
3310 for seq in 70u64..=80u64 {
3311 let contents =
3312 sui_types::messages_checkpoint::CheckpointContents::new_with_digests_only_for_tests(
3313 [sui_types::base_types::ExecutionDigests::new(
3314 sui_types::digests::TransactionDigest::random(),
3315 sui_types::digests::TransactionEffectsDigest::ZERO,
3316 )],
3317 );
3318 let summary = sui_types::messages_checkpoint::CheckpointSummary::new(
3319 &protocol,
3320 0,
3321 seq,
3322 0,
3323 &contents,
3324 None,
3325 sui_types::gas::GasCostSummary::default(),
3326 None,
3327 0,
3328 Vec::new(),
3329 Vec::new(),
3330 );
3331 store
3332 .tables
3333 .locally_computed_checkpoints
3334 .insert(&seq, &summary)
3335 .unwrap();
3336 }
3337
3338 store
3339 .clear_locally_computed_checkpoints_from(76)
3340 .expect("clear should succeed");
3341
3342 assert!(
3344 store
3345 .tables
3346 .locally_computed_checkpoints
3347 .get(&75)
3348 .unwrap()
3349 .is_some()
3350 );
3351 assert!(
3352 store
3353 .tables
3354 .locally_computed_checkpoints
3355 .get(&76)
3356 .unwrap()
3357 .is_none()
3358 );
3359
3360 for seq in 70u64..76u64 {
3361 assert!(
3362 store
3363 .tables
3364 .locally_computed_checkpoints
3365 .get(&seq)
3366 .unwrap()
3367 .is_some()
3368 );
3369 }
3370 for seq in 76u64..=80u64 {
3371 assert!(
3372 store
3373 .tables
3374 .locally_computed_checkpoints
3375 .get(&seq)
3376 .unwrap()
3377 .is_none()
3378 );
3379 }
3380 }
3381
3382 #[tokio::test]
3383 async fn test_fork_detection_storage() {
3384 let store = CheckpointStore::new_for_tests();
3385 let seq_num = 42;
3387 let digest = CheckpointDigest::random();
3388
3389 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3390
3391 store
3392 .record_checkpoint_fork_detected(seq_num, digest)
3393 .unwrap();
3394
3395 let retrieved = store.get_checkpoint_fork_detected().unwrap();
3396 assert!(retrieved.is_some());
3397 let (retrieved_seq, retrieved_digest) = retrieved.unwrap();
3398 assert_eq!(retrieved_seq, seq_num);
3399 assert_eq!(retrieved_digest, digest);
3400
3401 store.clear_checkpoint_fork_detected().unwrap();
3402 assert!(store.get_checkpoint_fork_detected().unwrap().is_none());
3403
3404 let tx_digest = TransactionDigest::random();
3406 let expected_effects = TransactionEffectsDigest::random();
3407 let actual_effects = TransactionEffectsDigest::random();
3408
3409 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3410
3411 store
3412 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
3413 .unwrap();
3414
3415 let retrieved = store.get_transaction_fork_detected().unwrap();
3416 assert!(retrieved.is_some());
3417 let (retrieved_tx, retrieved_expected, retrieved_actual) = retrieved.unwrap();
3418 assert_eq!(retrieved_tx, tx_digest);
3419 assert_eq!(retrieved_expected, expected_effects);
3420 assert_eq!(retrieved_actual, actual_effects);
3421
3422 store.clear_transaction_fork_detected().unwrap();
3423 assert!(store.get_transaction_fork_detected().unwrap().is_none());
3424 }
3425
3426 #[sim_test]
3427 pub async fn checkpoint_builder_test() {
3428 telemetry_subscribers::init_for_testing();
3429
3430 let mut protocol_config =
3431 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
3432 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
3433 let state = TestAuthorityBuilder::new()
3434 .with_protocol_config(protocol_config)
3435 .build()
3436 .await;
3437
3438 let dummy_tx = VerifiedTransaction::new_authenticator_state_update(
3439 0,
3440 0,
3441 vec![],
3442 SequenceNumber::new(),
3443 );
3444
3445 let jwks = {
3446 let mut jwks = Vec::new();
3447 while bcs::to_bytes(&jwks).unwrap().len() < 40_000 {
3448 jwks.push(ActiveJwk {
3449 jwk_id: JwkId::new(
3450 "https://accounts.google.com".to_string(),
3451 "1234567890".to_string(),
3452 ),
3453 jwk: JWK {
3454 kty: "RSA".to_string(),
3455 e: "AQAB".to_string(),
3456 n: "1234567890".to_string(),
3457 alg: "RS256".to_string(),
3458 },
3459 epoch: 0,
3460 });
3461 }
3462 jwks
3463 };
3464
3465 let dummy_tx_with_data =
3466 VerifiedTransaction::new_authenticator_state_update(0, 1, jwks, SequenceNumber::new());
3467
3468 for i in 0..15 {
3469 state
3470 .database_for_testing()
3471 .perpetual_tables
3472 .transactions
3473 .insert(&d(i), dummy_tx.serializable_ref())
3474 .unwrap();
3475 }
3476 for i in 15..20 {
3477 state
3478 .database_for_testing()
3479 .perpetual_tables
3480 .transactions
3481 .insert(&d(i), dummy_tx_with_data.serializable_ref())
3482 .unwrap();
3483 }
3484
3485 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
3486 commit_cert_for_test(
3487 &mut store,
3488 state.clone(),
3489 d(1),
3490 vec![d(2), d(3)],
3491 GasCostSummary::new(11, 12, 11, 1),
3492 );
3493 commit_cert_for_test(
3494 &mut store,
3495 state.clone(),
3496 d(2),
3497 vec![d(3), d(4)],
3498 GasCostSummary::new(21, 22, 21, 1),
3499 );
3500 commit_cert_for_test(
3501 &mut store,
3502 state.clone(),
3503 d(3),
3504 vec![],
3505 GasCostSummary::new(31, 32, 31, 1),
3506 );
3507 commit_cert_for_test(
3508 &mut store,
3509 state.clone(),
3510 d(4),
3511 vec![],
3512 GasCostSummary::new(41, 42, 41, 1),
3513 );
3514 for i in [5, 6, 7, 10, 11, 12, 13] {
3515 commit_cert_for_test(
3516 &mut store,
3517 state.clone(),
3518 d(i),
3519 vec![],
3520 GasCostSummary::new(41, 42, 41, 1),
3521 );
3522 }
3523 for i in [15, 16, 17] {
3524 commit_cert_for_test(
3525 &mut store,
3526 state.clone(),
3527 d(i),
3528 vec![],
3529 GasCostSummary::new(51, 52, 51, 1),
3530 );
3531 }
3532 let all_digests: Vec<_> = store.keys().copied().collect();
3533 for digest in all_digests {
3534 let signature = Signature::Ed25519SuiSignature(Default::default()).into();
3535 state
3536 .epoch_store_for_testing()
3537 .test_insert_user_signature(digest, vec![signature]);
3538 }
3539
3540 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
3541 let (certified_output, mut certified_result) =
3542 mpsc::channel::<CertifiedCheckpointSummary>(10);
3543 let store = Arc::new(store);
3544
3545 let ckpt_dir = tempfile::tempdir().unwrap();
3546 let checkpoint_store =
3547 CheckpointStore::new(ckpt_dir.path(), Arc::new(PrunerWatermarks::default()));
3548 let epoch_store = state.epoch_store_for_testing();
3549
3550 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
3551 state.get_global_state_hash_store().clone(),
3552 ));
3553
3554 let checkpoint_service = CheckpointService::build(
3555 state.clone(),
3556 checkpoint_store,
3557 epoch_store.clone(),
3558 store,
3559 Arc::downgrade(&global_state_hasher),
3560 Box::new(output),
3561 Box::new(certified_output),
3562 CheckpointMetrics::new_for_tests(),
3563 3,
3564 100_000,
3565 );
3566 checkpoint_service.spawn(epoch_store.clone(), None).await;
3567
3568 checkpoint_service
3569 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
3570 .unwrap();
3571 checkpoint_service
3572 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
3573 .unwrap();
3574 checkpoint_service
3575 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
3576 .unwrap();
3577 checkpoint_service
3578 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
3579 .unwrap();
3580 checkpoint_service
3581 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
3582 .unwrap();
3583 checkpoint_service
3584 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
3585 .unwrap();
3586
3587 let (c1c, c1s) = result.recv().await.unwrap();
3588 let (c2c, c2s) = result.recv().await.unwrap();
3589
3590 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3591 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3592 assert_eq!(c1t, vec![d(4)]);
3593 assert_eq!(c1s.previous_digest, None);
3594 assert_eq!(c1s.sequence_number, 0);
3595 assert_eq!(
3596 c1s.epoch_rolling_gas_cost_summary,
3597 GasCostSummary::new(41, 42, 41, 1)
3598 );
3599
3600 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
3601 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
3602 assert_eq!(c2s.sequence_number, 1);
3603 assert_eq!(
3604 c2s.epoch_rolling_gas_cost_summary,
3605 GasCostSummary::new(104, 108, 104, 4)
3606 );
3607
3608 let (c3c, c3s) = result.recv().await.unwrap();
3611 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3612 let (c4c, c4s) = result.recv().await.unwrap();
3613 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3614 assert_eq!(c3s.sequence_number, 2);
3615 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
3616 assert_eq!(c4s.sequence_number, 3);
3617 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
3618 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
3619 assert_eq!(c4t, vec![d(13)]);
3620
3621 let (c5c, c5s) = result.recv().await.unwrap();
3624 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3625 let (c6c, c6s) = result.recv().await.unwrap();
3626 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3627 assert_eq!(c5s.sequence_number, 4);
3628 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
3629 assert_eq!(c6s.sequence_number, 5);
3630 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
3631 assert_eq!(c5t, vec![d(15), d(16)]);
3632 assert_eq!(c6t, vec![d(17)]);
3633
3634 let (c7c, c7s) = result.recv().await.unwrap();
3637 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
3638 assert_eq!(c7t, vec![d(5), d(6)]);
3639 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
3640 assert_eq!(c7s.sequence_number, 6);
3641
3642 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
3643 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
3644
3645 checkpoint_service
3646 .notify_checkpoint_signature(
3647 &epoch_store,
3648 &CheckpointSignatureMessage { summary: c2ss },
3649 )
3650 .unwrap();
3651 checkpoint_service
3652 .notify_checkpoint_signature(
3653 &epoch_store,
3654 &CheckpointSignatureMessage { summary: c1ss },
3655 )
3656 .unwrap();
3657
3658 let c1sc = certified_result.recv().await.unwrap();
3659 let c2sc = certified_result.recv().await.unwrap();
3660 assert_eq!(c1sc.sequence_number, 0);
3661 assert_eq!(c2sc.sequence_number, 1);
3662 }
3663
3664 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
3665 fn notify_read_executed_effects(
3666 &self,
3667 _: &str,
3668 digests: &[TransactionDigest],
3669 ) -> BoxFuture<'_, Vec<TransactionEffects>> {
3670 std::future::ready(
3671 digests
3672 .iter()
3673 .map(|d| self.get(d).expect("effects not found").clone())
3674 .collect(),
3675 )
3676 .boxed()
3677 }
3678
3679 fn notify_read_executed_effects_digests(
3680 &self,
3681 _: &str,
3682 digests: &[TransactionDigest],
3683 ) -> BoxFuture<'_, Vec<TransactionEffectsDigest>> {
3684 std::future::ready(
3685 digests
3686 .iter()
3687 .map(|d| {
3688 self.get(d)
3689 .map(|fx| fx.digest())
3690 .expect("effects not found")
3691 })
3692 .collect(),
3693 )
3694 .boxed()
3695 }
3696
3697 fn multi_get_executed_effects(
3698 &self,
3699 digests: &[TransactionDigest],
3700 ) -> Vec<Option<TransactionEffects>> {
3701 digests.iter().map(|d| self.get(d).cloned()).collect()
3702 }
3703
3704 fn multi_get_transaction_blocks(
3710 &self,
3711 _: &[TransactionDigest],
3712 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
3713 unimplemented!()
3714 }
3715
3716 fn multi_get_executed_effects_digests(
3717 &self,
3718 _: &[TransactionDigest],
3719 ) -> Vec<Option<TransactionEffectsDigest>> {
3720 unimplemented!()
3721 }
3722
3723 fn multi_get_effects(
3724 &self,
3725 _: &[TransactionEffectsDigest],
3726 ) -> Vec<Option<TransactionEffects>> {
3727 unimplemented!()
3728 }
3729
3730 fn multi_get_events(&self, _: &[TransactionDigest]) -> Vec<Option<TransactionEvents>> {
3731 unimplemented!()
3732 }
3733
3734 fn get_mysticeti_fastpath_outputs(
3735 &self,
3736 _: &TransactionDigest,
3737 ) -> Option<Arc<TransactionOutputs>> {
3738 unimplemented!()
3739 }
3740
3741 fn notify_read_fastpath_transaction_outputs<'a>(
3742 &'a self,
3743 _: &'a [TransactionDigest],
3744 ) -> BoxFuture<'a, Vec<Arc<crate::transaction_outputs::TransactionOutputs>>> {
3745 unimplemented!()
3746 }
3747
3748 fn take_accumulator_events(&self, _: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
3749 unimplemented!()
3750 }
3751
3752 fn get_unchanged_loaded_runtime_objects(
3753 &self,
3754 _digest: &TransactionDigest,
3755 ) -> Option<Vec<sui_types::storage::ObjectKey>> {
3756 unimplemented!()
3757 }
3758 }
3759
3760 #[async_trait::async_trait]
3761 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3762 async fn checkpoint_created(
3763 &self,
3764 summary: &CheckpointSummary,
3765 contents: &CheckpointContents,
3766 _epoch_store: &Arc<AuthorityPerEpochStore>,
3767 _checkpoint_store: &Arc<CheckpointStore>,
3768 ) -> SuiResult {
3769 self.try_send((contents.clone(), summary.clone())).unwrap();
3770 Ok(())
3771 }
3772 }
3773
3774 #[async_trait::async_trait]
3775 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3776 async fn certified_checkpoint_created(
3777 &self,
3778 summary: &CertifiedCheckpointSummary,
3779 ) -> SuiResult {
3780 self.try_send(summary.clone()).unwrap();
3781 Ok(())
3782 }
3783 }
3784
3785 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3786 PendingCheckpoint {
3787 roots: t
3788 .into_iter()
3789 .map(|t| TransactionKey::Digest(d(t)))
3790 .collect(),
3791 details: PendingCheckpointInfo {
3792 timestamp_ms,
3793 last_of_epoch: false,
3794 checkpoint_height: i,
3795 },
3796 }
3797 }
3798
3799 fn d(i: u8) -> TransactionDigest {
3800 let mut bytes: [u8; 32] = Default::default();
3801 bytes[0] = i;
3802 TransactionDigest::new(bytes)
3803 }
3804
3805 fn e(
3806 transaction_digest: TransactionDigest,
3807 dependencies: Vec<TransactionDigest>,
3808 gas_used: GasCostSummary,
3809 ) -> TransactionEffects {
3810 let mut effects = TransactionEffects::default();
3811 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3812 *effects.dependencies_mut_for_testing() = dependencies;
3813 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3814 effects
3815 }
3816
3817 fn commit_cert_for_test(
3818 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3819 state: Arc<AuthorityState>,
3820 digest: TransactionDigest,
3821 dependencies: Vec<TransactionDigest>,
3822 gas_used: GasCostSummary,
3823 ) {
3824 let epoch_store = state.epoch_store_for_testing();
3825 let effects = e(digest, dependencies, gas_used);
3826 store.insert(digest, effects.clone());
3827 epoch_store.insert_executed_in_epoch(&digest);
3828 }
3829}