sui_core/checkpoints/
checkpoint_output.rs1use crate::authority::StableSyncAuthoritySigner;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::consensus_adapter::SubmitToConsensus;
7use crate::epoch::reconfiguration::ReconfigurationInitiator;
8use async_trait::async_trait;
9use std::sync::Arc;
10use sui_types::base_types::AuthorityName;
11use sui_types::error::SuiResult;
12use sui_types::message_envelope::Message;
13use sui_types::messages_checkpoint::{
14 CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
15 SignedCheckpointSummary, VerifiedCheckpoint,
16};
17use sui_types::messages_consensus::ConsensusTransaction;
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21
22#[async_trait]
23pub trait CheckpointOutput: Sync + Send + 'static {
24 async fn checkpoint_created(
25 &self,
26 summary: &CheckpointSummary,
27 contents: &CheckpointContents,
28 epoch_store: &Arc<AuthorityPerEpochStore>,
29 checkpoint_store: &Arc<CheckpointStore>,
30 ) -> SuiResult;
31}
32
33#[async_trait]
34pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
35 async fn certified_checkpoint_created(&self, summary: &CertifiedCheckpointSummary)
36 -> SuiResult;
37}
38
39pub struct SubmitCheckpointToConsensus<T> {
40 pub sender: T,
41 pub signer: StableSyncAuthoritySigner,
42 pub authority: AuthorityName,
43 pub next_reconfiguration_timestamp_ms: u64,
44 pub metrics: Arc<CheckpointMetrics>,
45}
46
47pub struct LogCheckpointOutput;
48
49impl LogCheckpointOutput {
50 pub fn boxed() -> Box<dyn CheckpointOutput> {
51 Box::new(Self)
52 }
53
54 pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
55 Box::new(Self)
56 }
57}
58
59#[async_trait]
60impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
61 for SubmitCheckpointToConsensus<T>
62{
63 #[instrument(level = "debug", skip_all)]
64 async fn checkpoint_created(
65 &self,
66 summary: &CheckpointSummary,
67 contents: &CheckpointContents,
68 epoch_store: &Arc<AuthorityPerEpochStore>,
69 checkpoint_store: &Arc<CheckpointStore>,
70 ) -> SuiResult {
71 LogCheckpointOutput
72 .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
73 .await?;
74
75 let checkpoint_timestamp = summary.timestamp_ms;
76 let checkpoint_seq = summary.sequence_number;
77 self.metrics.checkpoint_creation_latency.observe(
78 summary
79 .timestamp()
80 .elapsed()
81 .unwrap_or_default()
82 .as_secs_f64(),
83 );
84 self.metrics.checkpoint_creation_latency_ms.observe(
85 summary
86 .timestamp()
87 .elapsed()
88 .unwrap_or_default()
89 .as_millis() as u64,
90 );
91
92 let highest_verified_checkpoint = checkpoint_store
93 .get_highest_verified_checkpoint()?
94 .map(|x| *x.sequence_number());
95
96 if Some(checkpoint_seq) > highest_verified_checkpoint {
97 debug!(
98 "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99 {}ms left till end of epoch at timestamp {}",
100 self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101 );
102
103 let summary = SignedCheckpointSummary::new(
104 epoch_store.epoch(),
105 summary.clone(),
106 &*self.signer,
107 self.authority,
108 );
109
110 let message = CheckpointSignatureMessage { summary };
111 assert!(
112 epoch_store
113 .protocol_config()
114 .consensus_checkpoint_signature_key_includes_digest()
115 );
116 let transaction = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
117 self.sender
118 .submit_to_consensus(&[transaction], epoch_store)?;
119 self.metrics
120 .last_sent_checkpoint_signature
121 .set(checkpoint_seq as i64);
122 } else {
123 debug!(
124 "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
125 );
126 self.metrics
127 .last_skipped_checkpoint_signature_submission
128 .set(checkpoint_seq as i64);
129 }
130
131 if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
132 info!(
134 "Closing epoch at sequence {checkpoint_seq} at timestamp {checkpoint_timestamp}. next_reconfiguration_timestamp_ms {}",
135 self.next_reconfiguration_timestamp_ms
136 );
137 self.sender.close_epoch(epoch_store);
138 }
139 Ok(())
140 }
141}
142
143#[async_trait]
144impl CheckpointOutput for LogCheckpointOutput {
145 async fn checkpoint_created(
146 &self,
147 summary: &CheckpointSummary,
148 contents: &CheckpointContents,
149 _epoch_store: &Arc<AuthorityPerEpochStore>,
150 _checkpoint_store: &Arc<CheckpointStore>,
151 ) -> SuiResult {
152 trace!(
153 "Including following transactions in checkpoint {}: {:?}",
154 summary.sequence_number, contents
155 );
156 debug!(
157 "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
158 summary.digest(),
159 summary.epoch,
160 summary.sequence_number,
161 summary.previous_digest,
162 contents.size(),
163 summary.content_digest,
164 summary.end_of_epoch_data,
165 );
166
167 Ok(())
168 }
169}
170
171#[async_trait]
172impl CertifiedCheckpointOutput for LogCheckpointOutput {
173 async fn certified_checkpoint_created(
174 &self,
175 summary: &CertifiedCheckpointSummary,
176 ) -> SuiResult {
177 info!(
178 "Certified checkpoint with sequence {} and digest {}",
179 summary.sequence_number,
180 summary.digest()
181 );
182 Ok(())
183 }
184}
185
186pub struct SendCheckpointToStateSync {
187 handle: sui_network::state_sync::Handle,
188}
189
190impl SendCheckpointToStateSync {
191 pub fn new(handle: sui_network::state_sync::Handle) -> Self {
192 Self { handle }
193 }
194}
195
196#[async_trait]
197impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
198 #[instrument(level = "debug", skip_all)]
199 async fn certified_checkpoint_created(
200 &self,
201 summary: &CertifiedCheckpointSummary,
202 ) -> SuiResult {
203 info!(
204 "Certified checkpoint with sequence {} and digest {}",
205 summary.sequence_number,
206 summary.digest()
207 );
208 self.handle
209 .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
210 .await;
211
212 Ok(())
213 }
214}