sui_core/checkpoints/
checkpoint_output.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::StableSyncAuthoritySigner;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::consensus_adapter::SubmitToConsensus;
7use crate::epoch::reconfiguration::ReconfigurationInitiator;
8use async_trait::async_trait;
9use std::sync::Arc;
10use sui_types::base_types::AuthorityName;
11use sui_types::error::SuiResult;
12use sui_types::message_envelope::Message;
13use sui_types::messages_checkpoint::{
14    CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
15    SignedCheckpointSummary, VerifiedCheckpoint,
16};
17use sui_types::messages_consensus::ConsensusTransaction;
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21
22#[async_trait]
23pub trait CheckpointOutput: Sync + Send + 'static {
24    async fn checkpoint_created(
25        &self,
26        summary: &CheckpointSummary,
27        contents: &CheckpointContents,
28        epoch_store: &Arc<AuthorityPerEpochStore>,
29        checkpoint_store: &Arc<CheckpointStore>,
30    ) -> SuiResult;
31}
32
33#[async_trait]
34pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
35    async fn certified_checkpoint_created(&self, summary: &CertifiedCheckpointSummary)
36    -> SuiResult;
37}
38
39pub struct SubmitCheckpointToConsensus<T> {
40    pub sender: T,
41    pub signer: StableSyncAuthoritySigner,
42    pub authority: AuthorityName,
43    pub next_reconfiguration_timestamp_ms: u64,
44    pub metrics: Arc<CheckpointMetrics>,
45}
46
47pub struct LogCheckpointOutput;
48
49impl LogCheckpointOutput {
50    pub fn boxed() -> Box<dyn CheckpointOutput> {
51        Box::new(Self)
52    }
53
54    pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
55        Box::new(Self)
56    }
57}
58
59#[async_trait]
60impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
61    for SubmitCheckpointToConsensus<T>
62{
63    #[instrument(level = "debug", skip_all)]
64    async fn checkpoint_created(
65        &self,
66        summary: &CheckpointSummary,
67        contents: &CheckpointContents,
68        epoch_store: &Arc<AuthorityPerEpochStore>,
69        checkpoint_store: &Arc<CheckpointStore>,
70    ) -> SuiResult {
71        LogCheckpointOutput
72            .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
73            .await?;
74
75        let checkpoint_timestamp = summary.timestamp_ms;
76        let checkpoint_seq = summary.sequence_number;
77        self.metrics.checkpoint_creation_latency.observe(
78            summary
79                .timestamp()
80                .elapsed()
81                .unwrap_or_default()
82                .as_secs_f64(),
83        );
84        self.metrics.checkpoint_creation_latency_ms.observe(
85            summary
86                .timestamp()
87                .elapsed()
88                .unwrap_or_default()
89                .as_millis() as u64,
90        );
91
92        let highest_verified_checkpoint = checkpoint_store
93            .get_highest_verified_checkpoint()?
94            .map(|x| *x.sequence_number());
95
96        if Some(checkpoint_seq) > highest_verified_checkpoint {
97            debug!(
98                "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99                {}ms left till end of epoch at timestamp {}",
100                self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101            );
102
103            let summary = SignedCheckpointSummary::new(
104                epoch_store.epoch(),
105                summary.clone(),
106                &*self.signer,
107                self.authority,
108            );
109
110            let message = CheckpointSignatureMessage { summary };
111            assert!(
112                epoch_store
113                    .protocol_config()
114                    .consensus_checkpoint_signature_key_includes_digest()
115            );
116            let transaction = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
117            self.sender
118                .submit_to_consensus(&[transaction], epoch_store)?;
119            self.metrics
120                .last_sent_checkpoint_signature
121                .set(checkpoint_seq as i64);
122        } else {
123            debug!(
124                "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
125            );
126            self.metrics
127                .last_skipped_checkpoint_signature_submission
128                .set(checkpoint_seq as i64);
129        }
130
131        if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
132            // close_epoch is ok if called multiple times
133            info!(
134                "Closing epoch at sequence {checkpoint_seq} at timestamp {checkpoint_timestamp}. next_reconfiguration_timestamp_ms {}",
135                self.next_reconfiguration_timestamp_ms
136            );
137            self.sender.close_epoch(epoch_store);
138        }
139        Ok(())
140    }
141}
142
143#[async_trait]
144impl CheckpointOutput for LogCheckpointOutput {
145    async fn checkpoint_created(
146        &self,
147        summary: &CheckpointSummary,
148        contents: &CheckpointContents,
149        _epoch_store: &Arc<AuthorityPerEpochStore>,
150        _checkpoint_store: &Arc<CheckpointStore>,
151    ) -> SuiResult {
152        trace!(
153            "Including following transactions in checkpoint {}: {:?}",
154            summary.sequence_number, contents
155        );
156        debug!(
157            "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
158            summary.digest(),
159            summary.epoch,
160            summary.sequence_number,
161            summary.previous_digest,
162            contents.size(),
163            summary.content_digest,
164            summary.end_of_epoch_data,
165        );
166
167        Ok(())
168    }
169}
170
171#[async_trait]
172impl CertifiedCheckpointOutput for LogCheckpointOutput {
173    async fn certified_checkpoint_created(
174        &self,
175        summary: &CertifiedCheckpointSummary,
176    ) -> SuiResult {
177        info!(
178            "Certified checkpoint with sequence {} and digest {}",
179            summary.sequence_number,
180            summary.digest()
181        );
182        Ok(())
183    }
184}
185
186pub struct SendCheckpointToStateSync {
187    handle: sui_network::state_sync::Handle,
188}
189
190impl SendCheckpointToStateSync {
191    pub fn new(handle: sui_network::state_sync::Handle) -> Self {
192        Self { handle }
193    }
194}
195
196#[async_trait]
197impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
198    #[instrument(level = "debug", skip_all)]
199    async fn certified_checkpoint_created(
200        &self,
201        summary: &CertifiedCheckpointSummary,
202    ) -> SuiResult {
203        info!(
204            "Certified checkpoint with sequence {} and digest {}",
205            summary.sequence_number,
206            summary.digest()
207        );
208        self.handle
209            .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
210            .await;
211
212        Ok(())
213    }
214}