1use crate::committee::extract_new_committee_info;
5use crate::config::Config;
6use crate::graphql::query_last_checkpoint_of_epoch;
7use crate::object_store::SuiObjectStore;
8use anyhow::{Result, anyhow};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::io::Read;
12use std::{fs, io::Write};
13use sui_config::genesis::Genesis;
14use sui_data_ingestion_core::end_of_epoch_data;
15use sui_sdk::SuiClientBuilder;
16use sui_types::{
17 crypto::AuthorityQuorumSignInfo, message_envelope::Envelope,
18 messages_checkpoint::CheckpointSummary,
19};
20use tracing::info;
21
22const CHECKPOINT_BUCKET_TIMEOUT_SECS: u64 = 5;
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct CheckpointsList {
26 pub checkpoints: Vec<u64>,
27}
28
29pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointsList> {
30 let checkpoints_path = config.checkpoint_list_path();
31 let reader = fs::File::open(checkpoints_path)?;
32 Ok(serde_yaml::from_reader(reader)?)
33}
34
35pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointsList) -> Result<()> {
36 let checkpoints_path = config.checkpoint_list_path();
37 let mut writer = fs::File::create(checkpoints_path)?;
38 let bytes = serde_yaml::to_vec(checkpoints_list)?;
39 writer
40 .write_all(&bytes)
41 .map_err(|e| anyhow!("Unable to serialize checkpoint list: {}", e))
42}
43
44pub fn read_checkpoint(
45 config: &Config,
46 seq: u64,
47) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
48 read_checkpoint_general(config, seq, None)
49}
50
51fn read_checkpoint_general(
52 config: &Config,
53 seq: u64,
54 path: Option<&str>,
55) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
56 let checkpoint_path = config.checkpoint_path(seq, path);
57 let mut reader = fs::File::open(checkpoint_path)?;
58 let mut buffer = Vec::new();
59 reader.read_to_end(&mut buffer)?;
60 bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
61}
62
63pub fn write_checkpoint(
64 config: &Config,
65 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
66) -> Result<()> {
67 write_checkpoint_general(config, summary, None)
68}
69
70fn write_checkpoint_general(
71 config: &Config,
72 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
73 path: Option<&str>,
74) -> Result<()> {
75 let checkpoint_path = config.checkpoint_path(*summary.sequence_number(), path);
76 let mut writer = fs::File::create(checkpoint_path)?;
77 let bytes =
78 bcs::to_bytes(summary).map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
79 writer.write_all(&bytes)?;
80 Ok(())
81}
82
83async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointsList> {
85 let graphql_list = if config.graphql_url.is_some() {
87 match sync_checkpoint_list_to_latest_using_graphql(config).await {
88 Ok(list) => list,
89 Err(e) => {
90 info!("Failed to get checkpoints from GraphQL: {}", e);
91 CheckpointsList {
92 checkpoints: vec![],
93 }
94 }
95 }
96 } else {
97 CheckpointsList {
98 checkpoints: vec![],
99 }
100 };
101
102 let archive_list = match sync_checkpoint_list_to_latest_using_checkpoint_bucket(
104 config.object_store_url.clone(),
105 )
106 .await
107 {
108 Ok(list) => list,
109 Err(e) => {
110 info!("Failed to get checkpoints from archive: {}", e);
111 CheckpointsList {
112 checkpoints: vec![],
113 }
114 }
115 };
116
117 if graphql_list.checkpoints.is_empty() && archive_list.checkpoints.is_empty() {
119 return Err(anyhow!(
120 "Could not retrieve any checkpoints from configured sources"
121 ));
122 }
123
124 let merged_checkpoints = merge_checkpoint_lists(&graphql_list, &archive_list);
125 Ok(CheckpointsList {
126 checkpoints: merged_checkpoints,
127 })
128}
129
130fn merge_checkpoint_lists(list1: &CheckpointsList, list2: &CheckpointsList) -> Vec<u64> {
132 let unique_checkpoints: HashSet<u64> = list1
134 .checkpoints
135 .iter()
136 .chain(list2.checkpoints.iter())
137 .copied()
138 .collect();
139
140 let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
142 sorted_checkpoints.sort();
143
144 sorted_checkpoints
145}
146
147async fn sync_checkpoint_list_to_latest_using_checkpoint_bucket(
149 archive_url: String,
150) -> anyhow::Result<CheckpointsList> {
151 info!("Syncing checkpoints from Archive store");
152 let checkpoints =
153 end_of_epoch_data(archive_url, vec![], CHECKPOINT_BUCKET_TIMEOUT_SECS).await?;
154 Ok(CheckpointsList { checkpoints })
155}
156
157async fn sync_checkpoint_list_to_latest_using_graphql(
160 config: &Config,
161) -> anyhow::Result<CheckpointsList> {
162 info!("Syncing checkpoints from GraphQL");
163 let mut checkpoints_list = match read_checkpoint_list(config) {
165 Ok(list) => list,
166 Err(e) => {
167 info!(
168 "Could not read existing checkpoint list, starting with empty list: {}",
169 e
170 );
171 CheckpointsList {
172 checkpoints: vec![],
173 }
174 }
175 };
176
177 if checkpoints_list.checkpoints.is_empty() {
180 return Err(anyhow!(
181 "Empty checkpoint list and no initial checkpoint to start from"
182 ));
183 }
184
185 let latest_in_list = checkpoints_list.checkpoints.last().unwrap();
186 let object_store = SuiObjectStore::new(config)?;
188
189 let summary = object_store
191 .download_checkpoint_summary(*latest_in_list)
192 .await?;
193 let mut last_epoch = summary.epoch();
194
195 let client = SuiClientBuilder::default()
197 .build(config.full_node_url.as_str())
198 .await
199 .expect("Cannot connect to full node");
200
201 let latest_seq = client
202 .read_api()
203 .get_latest_checkpoint_sequence_number()
204 .await?;
205 let latest = object_store.download_checkpoint_summary(latest_seq).await?;
206
207 while last_epoch + 1 < latest.epoch() {
209 let target_epoch = last_epoch + 1;
210 let target_last_checkpoint_number =
211 query_last_checkpoint_of_epoch(config, target_epoch).await?;
212
213 checkpoints_list
215 .checkpoints
216 .push(target_last_checkpoint_number);
217
218 last_epoch = target_epoch;
220
221 info!(
222 "Last Epoch: {} Last Checkpoint: {}",
223 target_epoch, target_last_checkpoint_number
224 );
225 }
226
227 Ok(checkpoints_list)
228}
229
230pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
231 let checkpoints_list = sync_checkpoint_list_to_latest(config)
232 .await
233 .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
234
235 write_checkpoint_list(config, &checkpoints_list)?;
237
238 let mut genesis_path = config.checkpoint_summary_dir.clone();
240 genesis_path.push(&config.genesis_filename);
241 let genesis_committee = Genesis::load(&genesis_path)?
242 .committee()
243 .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?;
244
245 let mut prev_committee = genesis_committee;
249 let object_store = SuiObjectStore::new(config)?;
250 for ckp_id in &checkpoints_list.checkpoints {
251 let mut checkpoint_path = config.checkpoint_summary_dir.clone();
253 checkpoint_path.push(format!("{}.yaml", ckp_id));
254
255 let summary = if checkpoint_path.exists() {
257 read_checkpoint(config, *ckp_id)
258 .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
259 } else {
260 let summary = object_store
262 .download_checkpoint_summary(*ckp_id)
263 .await
264 .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
265 summary.clone().try_into_verified(&prev_committee)?;
266 write_checkpoint(config, &summary)?;
268 summary
269 };
270
271 info!(
273 "Epoch: {} Checkpoint ID: {}",
274 summary.epoch(),
275 summary.digest()
276 );
277
278 prev_committee = extract_new_committee_info(&summary)?;
280 }
281
282 Ok(())
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use roaring::RoaringBitmap;
289 use sui_types::{
290 gas::GasCostSummary, messages_checkpoint::CheckpointContents,
291 supported_protocol_versions::ProtocolConfig,
292 };
293 use tempfile::TempDir;
294
295 fn create_test_config() -> (Config, TempDir) {
296 let temp_dir = TempDir::new().unwrap();
297 let config = Config {
298 checkpoint_summary_dir: temp_dir.path().to_path_buf(),
299 ..Default::default()
300 };
301 (config, temp_dir)
302 }
303
304 #[test]
305 fn test_checkpoint_list_read_write() {
306 let (config, _temp_dir) = create_test_config();
307 let test_list = CheckpointsList {
308 checkpoints: vec![1, 2, 3],
309 };
310
311 write_checkpoint_list(&config, &test_list).unwrap();
312 let read_list = read_checkpoint_list(&config).unwrap();
313
314 assert_eq!(test_list.checkpoints, read_list.checkpoints);
315 }
316
317 #[test]
318 fn test_checkpoint_read_write() {
319 let (config, _temp_dir) = create_test_config();
320 let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
321 let summary = CheckpointSummary::new(
322 &ProtocolConfig::get_for_max_version_UNSAFE(),
323 0,
324 0,
325 0,
326 &contents,
327 None,
328 GasCostSummary::default(),
329 None,
330 0,
331 Vec::new(),
332 Vec::new(),
333 );
334 let info = AuthorityQuorumSignInfo::<true> {
335 epoch: 0,
336 signature: Default::default(),
337 signers_map: RoaringBitmap::new(),
338 };
339 let test_summary = Envelope::new_from_data_and_sig(summary, info);
340
341 write_checkpoint(&config, &test_summary).unwrap();
342 let read_summary = read_checkpoint(&config, 0).unwrap();
343
344 assert_eq!(
345 test_summary.sequence_number(),
346 read_summary.sequence_number()
347 );
348 }
349}