sui_core/checkpoints/
checkpoint_output.rs1use crate::authority::StableSyncAuthoritySigner;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::consensus_adapter::SubmitToConsensus;
7use crate::epoch::reconfiguration::ReconfigurationInitiator;
8use async_trait::async_trait;
9use std::sync::Arc;
10use sui_types::base_types::AuthorityName;
11use sui_types::error::SuiResult;
12use sui_types::message_envelope::Message;
13use sui_types::messages_checkpoint::{
14 CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
15 SignedCheckpointSummary, VerifiedCheckpoint,
16};
17use sui_types::messages_consensus::ConsensusTransaction;
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21
22#[async_trait]
23pub trait CheckpointOutput: Sync + Send + 'static {
24 async fn checkpoint_created(
25 &self,
26 summary: &CheckpointSummary,
27 contents: &CheckpointContents,
28 epoch_store: &Arc<AuthorityPerEpochStore>,
29 checkpoint_store: &Arc<CheckpointStore>,
30 ) -> SuiResult;
31}
32
33#[async_trait]
34pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
35 async fn certified_checkpoint_created(&self, summary: &CertifiedCheckpointSummary)
36 -> SuiResult;
37}
38
39pub struct SubmitCheckpointToConsensus<T> {
40 pub sender: T,
41 pub signer: StableSyncAuthoritySigner,
42 pub authority: AuthorityName,
43 pub next_reconfiguration_timestamp_ms: u64,
44 pub metrics: Arc<CheckpointMetrics>,
45}
46
47pub struct LogCheckpointOutput;
48
49impl LogCheckpointOutput {
50 pub fn boxed() -> Box<dyn CheckpointOutput> {
51 Box::new(Self)
52 }
53
54 pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
55 Box::new(Self)
56 }
57}
58
59#[async_trait]
60impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
61 for SubmitCheckpointToConsensus<T>
62{
63 #[instrument(level = "debug", skip_all)]
64 async fn checkpoint_created(
65 &self,
66 summary: &CheckpointSummary,
67 contents: &CheckpointContents,
68 epoch_store: &Arc<AuthorityPerEpochStore>,
69 checkpoint_store: &Arc<CheckpointStore>,
70 ) -> SuiResult {
71 LogCheckpointOutput
72 .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
73 .await?;
74
75 let checkpoint_timestamp = summary.timestamp_ms;
76 let checkpoint_seq = summary.sequence_number;
77 self.metrics.checkpoint_creation_latency.observe(
78 summary
79 .timestamp()
80 .elapsed()
81 .unwrap_or_default()
82 .as_secs_f64(),
83 );
84 self.metrics.checkpoint_creation_latency_ms.observe(
85 summary
86 .timestamp()
87 .elapsed()
88 .unwrap_or_default()
89 .as_millis() as u64,
90 );
91
92 let highest_verified_checkpoint = checkpoint_store
93 .get_highest_verified_checkpoint()?
94 .map(|x| *x.sequence_number());
95
96 if Some(checkpoint_seq) > highest_verified_checkpoint {
97 debug!(
98 "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99 {}ms left till end of epoch at timestamp {}",
100 self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101 );
102
103 let summary = SignedCheckpointSummary::new(
104 epoch_store.epoch(),
105 summary.clone(),
106 &*self.signer,
107 self.authority,
108 );
109
110 let message = CheckpointSignatureMessage { summary };
111 let transaction = if epoch_store
112 .protocol_config()
113 .consensus_checkpoint_signature_key_includes_digest()
114 {
115 ConsensusTransaction::new_checkpoint_signature_message_v2(message)
116 } else {
117 ConsensusTransaction::new_checkpoint_signature_message(message)
118 };
119 self.sender
120 .submit_to_consensus(&vec![transaction], epoch_store)?;
121 self.metrics
122 .last_sent_checkpoint_signature
123 .set(checkpoint_seq as i64);
124 } else {
125 debug!(
126 "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
127 );
128 self.metrics
129 .last_skipped_checkpoint_signature_submission
130 .set(checkpoint_seq as i64);
131 }
132
133 if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
134 info!(
136 "Closing epoch at sequence {checkpoint_seq} at timestamp {checkpoint_timestamp}. next_reconfiguration_timestamp_ms {}",
137 self.next_reconfiguration_timestamp_ms
138 );
139 self.sender.close_epoch(epoch_store);
140 }
141 Ok(())
142 }
143}
144
145#[async_trait]
146impl CheckpointOutput for LogCheckpointOutput {
147 async fn checkpoint_created(
148 &self,
149 summary: &CheckpointSummary,
150 contents: &CheckpointContents,
151 _epoch_store: &Arc<AuthorityPerEpochStore>,
152 _checkpoint_store: &Arc<CheckpointStore>,
153 ) -> SuiResult {
154 trace!(
155 "Including following transactions in checkpoint {}: {:?}",
156 summary.sequence_number, contents
157 );
158 info!(
159 "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
160 summary.digest(),
161 summary.epoch,
162 summary.sequence_number,
163 summary.previous_digest,
164 contents.size(),
165 summary.content_digest,
166 summary.end_of_epoch_data,
167 );
168
169 Ok(())
170 }
171}
172
173#[async_trait]
174impl CertifiedCheckpointOutput for LogCheckpointOutput {
175 async fn certified_checkpoint_created(
176 &self,
177 summary: &CertifiedCheckpointSummary,
178 ) -> SuiResult {
179 info!(
180 "Certified checkpoint with sequence {} and digest {}",
181 summary.sequence_number,
182 summary.digest()
183 );
184 Ok(())
185 }
186}
187
188pub struct SendCheckpointToStateSync {
189 handle: sui_network::state_sync::Handle,
190}
191
192impl SendCheckpointToStateSync {
193 pub fn new(handle: sui_network::state_sync::Handle) -> Self {
194 Self { handle }
195 }
196}
197
198#[async_trait]
199impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
200 #[instrument(level = "debug", skip_all)]
201 async fn certified_checkpoint_created(
202 &self,
203 summary: &CertifiedCheckpointSummary,
204 ) -> SuiResult {
205 info!(
206 "Certified checkpoint with sequence {} and digest {}",
207 summary.sequence_number,
208 summary.digest()
209 );
210 self.handle
211 .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
212 .await;
213
214 Ok(())
215 }
216}