sui_core/checkpoints/
checkpoint_output.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::StableSyncAuthoritySigner;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::consensus_adapter::SubmitToConsensus;
7use crate::epoch::reconfiguration::ReconfigurationInitiator;
8use async_trait::async_trait;
9use std::sync::Arc;
10use sui_types::base_types::AuthorityName;
11use sui_types::error::SuiResult;
12use sui_types::message_envelope::Message;
13use sui_types::messages_checkpoint::{
14    CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
15    SignedCheckpointSummary, VerifiedCheckpoint,
16};
17use sui_types::messages_consensus::ConsensusTransaction;
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21
22#[async_trait]
23pub trait CheckpointOutput: Sync + Send + 'static {
24    async fn checkpoint_created(
25        &self,
26        summary: &CheckpointSummary,
27        contents: &CheckpointContents,
28        epoch_store: &Arc<AuthorityPerEpochStore>,
29        checkpoint_store: &Arc<CheckpointStore>,
30    ) -> SuiResult;
31}
32
33#[async_trait]
34pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
35    async fn certified_checkpoint_created(&self, summary: &CertifiedCheckpointSummary)
36    -> SuiResult;
37}
38
39pub struct SubmitCheckpointToConsensus<T> {
40    pub sender: T,
41    pub signer: StableSyncAuthoritySigner,
42    pub authority: AuthorityName,
43    pub next_reconfiguration_timestamp_ms: u64,
44    pub metrics: Arc<CheckpointMetrics>,
45}
46
47pub struct LogCheckpointOutput;
48
49impl LogCheckpointOutput {
50    pub fn boxed() -> Box<dyn CheckpointOutput> {
51        Box::new(Self)
52    }
53
54    pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
55        Box::new(Self)
56    }
57}
58
59#[async_trait]
60impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
61    for SubmitCheckpointToConsensus<T>
62{
63    #[instrument(level = "debug", skip_all)]
64    async fn checkpoint_created(
65        &self,
66        summary: &CheckpointSummary,
67        contents: &CheckpointContents,
68        epoch_store: &Arc<AuthorityPerEpochStore>,
69        checkpoint_store: &Arc<CheckpointStore>,
70    ) -> SuiResult {
71        LogCheckpointOutput
72            .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
73            .await?;
74
75        let checkpoint_timestamp = summary.timestamp_ms;
76        let checkpoint_seq = summary.sequence_number;
77        self.metrics.checkpoint_creation_latency.observe(
78            summary
79                .timestamp()
80                .elapsed()
81                .unwrap_or_default()
82                .as_secs_f64(),
83        );
84        self.metrics.checkpoint_creation_latency_ms.observe(
85            summary
86                .timestamp()
87                .elapsed()
88                .unwrap_or_default()
89                .as_millis() as u64,
90        );
91
92        let highest_verified_checkpoint = checkpoint_store
93            .get_highest_verified_checkpoint()?
94            .map(|x| *x.sequence_number());
95
96        if Some(checkpoint_seq) > highest_verified_checkpoint {
97            debug!(
98                "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99                {}ms left till end of epoch at timestamp {}",
100                self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101            );
102
103            let summary = SignedCheckpointSummary::new(
104                epoch_store.epoch(),
105                summary.clone(),
106                &*self.signer,
107                self.authority,
108            );
109
110            let message = CheckpointSignatureMessage { summary };
111            let transaction = if epoch_store
112                .protocol_config()
113                .consensus_checkpoint_signature_key_includes_digest()
114            {
115                ConsensusTransaction::new_checkpoint_signature_message_v2(message)
116            } else {
117                ConsensusTransaction::new_checkpoint_signature_message(message)
118            };
119            self.sender
120                .submit_to_consensus(&vec![transaction], epoch_store)?;
121            self.metrics
122                .last_sent_checkpoint_signature
123                .set(checkpoint_seq as i64);
124        } else {
125            debug!(
126                "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
127            );
128            self.metrics
129                .last_skipped_checkpoint_signature_submission
130                .set(checkpoint_seq as i64);
131        }
132
133        if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
134            // close_epoch is ok if called multiple times
135            info!(
136                "Closing epoch at sequence {checkpoint_seq} at timestamp {checkpoint_timestamp}. next_reconfiguration_timestamp_ms {}",
137                self.next_reconfiguration_timestamp_ms
138            );
139            self.sender.close_epoch(epoch_store);
140        }
141        Ok(())
142    }
143}
144
145#[async_trait]
146impl CheckpointOutput for LogCheckpointOutput {
147    async fn checkpoint_created(
148        &self,
149        summary: &CheckpointSummary,
150        contents: &CheckpointContents,
151        _epoch_store: &Arc<AuthorityPerEpochStore>,
152        _checkpoint_store: &Arc<CheckpointStore>,
153    ) -> SuiResult {
154        trace!(
155            "Including following transactions in checkpoint {}: {:?}",
156            summary.sequence_number, contents
157        );
158        info!(
159            "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
160            summary.digest(),
161            summary.epoch,
162            summary.sequence_number,
163            summary.previous_digest,
164            contents.size(),
165            summary.content_digest,
166            summary.end_of_epoch_data,
167        );
168
169        Ok(())
170    }
171}
172
173#[async_trait]
174impl CertifiedCheckpointOutput for LogCheckpointOutput {
175    async fn certified_checkpoint_created(
176        &self,
177        summary: &CertifiedCheckpointSummary,
178    ) -> SuiResult {
179        info!(
180            "Certified checkpoint with sequence {} and digest {}",
181            summary.sequence_number,
182            summary.digest()
183        );
184        Ok(())
185    }
186}
187
188pub struct SendCheckpointToStateSync {
189    handle: sui_network::state_sync::Handle,
190}
191
192impl SendCheckpointToStateSync {
193    pub fn new(handle: sui_network::state_sync::Handle) -> Self {
194        Self { handle }
195    }
196}
197
198#[async_trait]
199impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
200    #[instrument(level = "debug", skip_all)]
201    async fn certified_checkpoint_created(
202        &self,
203        summary: &CertifiedCheckpointSummary,
204    ) -> SuiResult {
205        info!(
206            "Certified checkpoint with sequence {} and digest {}",
207            summary.sequence_number,
208            summary.digest()
209        );
210        self.handle
211            .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
212            .await;
213
214        Ok(())
215    }
216}