1use crate::committee::extract_new_committee_info;
5use crate::config::Config;
6use crate::graphql::query_last_checkpoint_of_epoch;
7use crate::object_store::SuiObjectStore;
8use anyhow::{Result, anyhow};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::io::Read;
12use std::{fs, io::Write};
13use sui_config::genesis::Genesis;
14use sui_rpc_api::Client;
15use sui_storage::object_store::util::end_of_epoch_data;
16use sui_types::{
17 crypto::AuthorityQuorumSignInfo, message_envelope::Envelope,
18 messages_checkpoint::CheckpointSummary,
19};
20use tracing::info;
21
22#[derive(Debug, Clone, Deserialize, Serialize)]
23pub struct CheckpointsList {
24 pub checkpoints: Vec<u64>,
25}
26
27pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointsList> {
28 let checkpoints_path = config.checkpoint_list_path();
29 let reader = fs::File::open(checkpoints_path)?;
30 Ok(serde_yaml::from_reader(reader)?)
31}
32
33pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointsList) -> Result<()> {
34 let checkpoints_path = config.checkpoint_list_path();
35 let mut writer = fs::File::create(checkpoints_path)?;
36 let bytes = serde_yaml::to_vec(checkpoints_list)?;
37 writer
38 .write_all(&bytes)
39 .map_err(|e| anyhow!("Unable to serialize checkpoint list: {}", e))
40}
41
42pub fn read_checkpoint(
43 config: &Config,
44 seq: u64,
45) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
46 read_checkpoint_general(config, seq, None)
47}
48
49fn read_checkpoint_general(
50 config: &Config,
51 seq: u64,
52 path: Option<&str>,
53) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
54 let checkpoint_path = config.checkpoint_path(seq, path);
55 let mut reader = fs::File::open(checkpoint_path)?;
56 let mut buffer = Vec::new();
57 reader.read_to_end(&mut buffer)?;
58 bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
59}
60
61pub fn write_checkpoint(
62 config: &Config,
63 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
64) -> Result<()> {
65 write_checkpoint_general(config, summary, None)
66}
67
68fn write_checkpoint_general(
69 config: &Config,
70 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
71 path: Option<&str>,
72) -> Result<()> {
73 let checkpoint_path = config.checkpoint_path(*summary.sequence_number(), path);
74 let mut writer = fs::File::create(checkpoint_path)?;
75 let bytes =
76 bcs::to_bytes(summary).map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
77 writer.write_all(&bytes)?;
78 Ok(())
79}
80
81async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointsList> {
83 let graphql_list = if config.graphql_url.is_some() {
85 match sync_checkpoint_list_to_latest_using_graphql(config).await {
86 Ok(list) => list,
87 Err(e) => {
88 info!("Failed to get checkpoints from GraphQL: {}", e);
89 CheckpointsList {
90 checkpoints: vec![],
91 }
92 }
93 }
94 } else {
95 CheckpointsList {
96 checkpoints: vec![],
97 }
98 };
99
100 let archive_list = match sync_checkpoint_list_to_latest_using_checkpoint_bucket(
102 config.object_store_url.clone(),
103 )
104 .await
105 {
106 Ok(list) => list,
107 Err(e) => {
108 info!("Failed to get checkpoints from archive: {}", e);
109 CheckpointsList {
110 checkpoints: vec![],
111 }
112 }
113 };
114
115 if graphql_list.checkpoints.is_empty() && archive_list.checkpoints.is_empty() {
117 return Err(anyhow!(
118 "Could not retrieve any checkpoints from configured sources"
119 ));
120 }
121
122 let merged_checkpoints = merge_checkpoint_lists(&graphql_list, &archive_list);
123 Ok(CheckpointsList {
124 checkpoints: merged_checkpoints,
125 })
126}
127
128fn merge_checkpoint_lists(list1: &CheckpointsList, list2: &CheckpointsList) -> Vec<u64> {
130 let unique_checkpoints: HashSet<u64> = list1
132 .checkpoints
133 .iter()
134 .chain(list2.checkpoints.iter())
135 .copied()
136 .collect();
137
138 let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
140 sorted_checkpoints.sort();
141
142 sorted_checkpoints
143}
144
145async fn sync_checkpoint_list_to_latest_using_checkpoint_bucket(
147 archive_url: String,
148) -> anyhow::Result<CheckpointsList> {
149 info!("Syncing checkpoints from Archive store");
150 let checkpoints = end_of_epoch_data(&archive_url, vec![]).await?;
151 Ok(CheckpointsList { checkpoints })
152}
153
154async fn sync_checkpoint_list_to_latest_using_graphql(
157 config: &Config,
158) -> anyhow::Result<CheckpointsList> {
159 info!("Syncing checkpoints from GraphQL");
160 let mut checkpoints_list = match read_checkpoint_list(config) {
162 Ok(list) => list,
163 Err(e) => {
164 info!(
165 "Could not read existing checkpoint list, starting with empty list: {}",
166 e
167 );
168 CheckpointsList {
169 checkpoints: vec![],
170 }
171 }
172 };
173
174 if checkpoints_list.checkpoints.is_empty() {
177 return Err(anyhow!(
178 "Empty checkpoint list and no initial checkpoint to start from"
179 ));
180 }
181
182 let latest_in_list = checkpoints_list.checkpoints.last().unwrap();
183 let object_store = SuiObjectStore::new(config)?;
185
186 let summary = object_store
188 .download_checkpoint_summary(*latest_in_list)
189 .await?;
190 let mut last_epoch = summary.epoch();
191
192 let mut client =
194 Client::new(config.full_node_url.as_str()).expect("Cannot connect to full node");
195
196 let latest_seq = client.get_latest_checkpoint().await?.sequence_number;
197 let latest = object_store.download_checkpoint_summary(latest_seq).await?;
198
199 while last_epoch + 1 < latest.epoch() {
201 let target_epoch = last_epoch + 1;
202 let target_last_checkpoint_number =
203 query_last_checkpoint_of_epoch(config, target_epoch).await?;
204
205 checkpoints_list
207 .checkpoints
208 .push(target_last_checkpoint_number);
209
210 last_epoch = target_epoch;
212
213 info!(
214 "Last Epoch: {} Last Checkpoint: {}",
215 target_epoch, target_last_checkpoint_number
216 );
217 }
218
219 Ok(checkpoints_list)
220}
221
222pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
223 let checkpoints_list = sync_checkpoint_list_to_latest(config)
224 .await
225 .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
226
227 write_checkpoint_list(config, &checkpoints_list)?;
229
230 let mut genesis_path = config.checkpoint_summary_dir.clone();
232 genesis_path.push(&config.genesis_filename);
233 let genesis_committee = Genesis::load(&genesis_path)?.committee();
234
235 let mut prev_committee = genesis_committee;
239 let object_store = SuiObjectStore::new(config)?;
240 for ckp_id in &checkpoints_list.checkpoints {
241 let mut checkpoint_path = config.checkpoint_summary_dir.clone();
243 checkpoint_path.push(format!("{}.yaml", ckp_id));
244
245 let summary = if checkpoint_path.exists() {
247 read_checkpoint(config, *ckp_id)
248 .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
249 } else {
250 let summary = object_store
252 .download_checkpoint_summary(*ckp_id)
253 .await
254 .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
255 summary.clone().try_into_verified(&prev_committee)?;
256 write_checkpoint(config, &summary)?;
258 summary
259 };
260
261 info!(
263 "Epoch: {} Checkpoint ID: {}",
264 summary.epoch(),
265 summary.digest()
266 );
267
268 prev_committee = extract_new_committee_info(&summary)?;
270 }
271
272 Ok(())
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use roaring::RoaringBitmap;
279 use sui_types::{
280 gas::GasCostSummary, messages_checkpoint::CheckpointContents,
281 supported_protocol_versions::ProtocolConfig,
282 };
283 use tempfile::TempDir;
284
285 fn create_test_config() -> (Config, TempDir) {
286 let temp_dir = TempDir::new().unwrap();
287 let config = Config {
288 checkpoint_summary_dir: temp_dir.path().to_path_buf(),
289 ..Default::default()
290 };
291 (config, temp_dir)
292 }
293
294 #[test]
295 fn test_checkpoint_list_read_write() {
296 let (config, _temp_dir) = create_test_config();
297 let test_list = CheckpointsList {
298 checkpoints: vec![1, 2, 3],
299 };
300
301 write_checkpoint_list(&config, &test_list).unwrap();
302 let read_list = read_checkpoint_list(&config).unwrap();
303
304 assert_eq!(test_list.checkpoints, read_list.checkpoints);
305 }
306
307 #[test]
308 fn test_checkpoint_read_write() {
309 let (config, _temp_dir) = create_test_config();
310 let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
311 let summary = CheckpointSummary::new(
312 &ProtocolConfig::get_for_max_version_UNSAFE(),
313 0,
314 0,
315 0,
316 &contents,
317 None,
318 GasCostSummary::default(),
319 None,
320 0,
321 Vec::new(),
322 Vec::new(),
323 );
324 let info = AuthorityQuorumSignInfo::<true> {
325 epoch: 0,
326 signature: Default::default(),
327 signers_map: RoaringBitmap::new(),
328 };
329 let test_summary = Envelope::new_from_data_and_sig(summary, info);
330
331 write_checkpoint(&config, &test_summary).unwrap();
332 let read_summary = read_checkpoint(&config, 0).unwrap();
333
334 assert_eq!(
335 test_summary.sequence_number(),
336 read_summary.sequence_number()
337 );
338 }
339}