sui_light_client/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::committee::extract_new_committee_info;
5use crate::config::Config;
6use crate::graphql::query_last_checkpoint_of_epoch;
7use crate::object_store::SuiObjectStore;
8use anyhow::{Result, anyhow};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::io::Read;
12use std::{fs, io::Write};
13use sui_config::genesis::Genesis;
14use sui_data_ingestion_core::end_of_epoch_data;
15use sui_sdk::SuiClientBuilder;
16use sui_types::{
17    crypto::AuthorityQuorumSignInfo, message_envelope::Envelope,
18    messages_checkpoint::CheckpointSummary,
19};
20use tracing::info;
21
22const CHECKPOINT_BUCKET_TIMEOUT_SECS: u64 = 5;
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct CheckpointsList {
26    pub checkpoints: Vec<u64>,
27}
28
29pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointsList> {
30    let checkpoints_path = config.checkpoint_list_path();
31    let reader = fs::File::open(checkpoints_path)?;
32    Ok(serde_yaml::from_reader(reader)?)
33}
34
35pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointsList) -> Result<()> {
36    let checkpoints_path = config.checkpoint_list_path();
37    let mut writer = fs::File::create(checkpoints_path)?;
38    let bytes = serde_yaml::to_vec(checkpoints_list)?;
39    writer
40        .write_all(&bytes)
41        .map_err(|e| anyhow!("Unable to serialize checkpoint list: {}", e))
42}
43
44pub fn read_checkpoint(
45    config: &Config,
46    seq: u64,
47) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
48    read_checkpoint_general(config, seq, None)
49}
50
51fn read_checkpoint_general(
52    config: &Config,
53    seq: u64,
54    path: Option<&str>,
55) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
56    let checkpoint_path = config.checkpoint_path(seq, path);
57    let mut reader = fs::File::open(checkpoint_path)?;
58    let mut buffer = Vec::new();
59    reader.read_to_end(&mut buffer)?;
60    bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
61}
62
63pub fn write_checkpoint(
64    config: &Config,
65    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
66) -> Result<()> {
67    write_checkpoint_general(config, summary, None)
68}
69
70fn write_checkpoint_general(
71    config: &Config,
72    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
73    path: Option<&str>,
74) -> Result<()> {
75    let checkpoint_path = config.checkpoint_path(*summary.sequence_number(), path);
76    let mut writer = fs::File::create(checkpoint_path)?;
77    let bytes =
78        bcs::to_bytes(summary).map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
79    writer.write_all(&bytes)?;
80    Ok(())
81}
82
83/// Downloads the list of end of epoch checkpoints from the archive store or the GraphQL endpoint
84async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointsList> {
85    // Try getting checkpoints from GraphQL if URL is configured
86    let graphql_list = if config.graphql_url.is_some() {
87        match sync_checkpoint_list_to_latest_using_graphql(config).await {
88            Ok(list) => list,
89            Err(e) => {
90                info!("Failed to get checkpoints from GraphQL: {}", e);
91                CheckpointsList {
92                    checkpoints: vec![],
93                }
94            }
95        }
96    } else {
97        CheckpointsList {
98            checkpoints: vec![],
99        }
100    };
101
102    // Try getting checkpoints from archive store if configured
103    let archive_list = match sync_checkpoint_list_to_latest_using_checkpoint_bucket(
104        config.object_store_url.clone(),
105    )
106    .await
107    {
108        Ok(list) => list,
109        Err(e) => {
110            info!("Failed to get checkpoints from archive: {}", e);
111            CheckpointsList {
112                checkpoints: vec![],
113            }
114        }
115    };
116
117    // Verify we have at least some checkpoints
118    if graphql_list.checkpoints.is_empty() && archive_list.checkpoints.is_empty() {
119        return Err(anyhow!(
120            "Could not retrieve any checkpoints from configured sources"
121        ));
122    }
123
124    let merged_checkpoints = merge_checkpoint_lists(&graphql_list, &archive_list);
125    Ok(CheckpointsList {
126        checkpoints: merged_checkpoints,
127    })
128}
129
130/// Merges two checkpoint lists, removing duplicates and ensuring the result is sorted
131fn merge_checkpoint_lists(list1: &CheckpointsList, list2: &CheckpointsList) -> Vec<u64> {
132    // Combine both lists into a HashSet to remove duplicates
133    let unique_checkpoints: HashSet<u64> = list1
134        .checkpoints
135        .iter()
136        .chain(list2.checkpoints.iter())
137        .copied()
138        .collect();
139
140    // Convert to sorted vector
141    let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
142    sorted_checkpoints.sort();
143
144    sorted_checkpoints
145}
146
147/// Downloads the list of end of epoch checkpoints from the archive store
148async fn sync_checkpoint_list_to_latest_using_checkpoint_bucket(
149    archive_url: String,
150) -> anyhow::Result<CheckpointsList> {
151    info!("Syncing checkpoints from Archive store");
152    let checkpoints =
153        end_of_epoch_data(archive_url, vec![], CHECKPOINT_BUCKET_TIMEOUT_SECS).await?;
154    Ok(CheckpointsList { checkpoints })
155}
156
157/// Run binary search to for each end of epoch checkpoint that is missing
158/// between the latest on the list and the latest checkpoint.
159async fn sync_checkpoint_list_to_latest_using_graphql(
160    config: &Config,
161) -> anyhow::Result<CheckpointsList> {
162    info!("Syncing checkpoints from GraphQL");
163    // Get the local checkpoint list, or create an empty one if it doesn't exist
164    let mut checkpoints_list = match read_checkpoint_list(config) {
165        Ok(list) => list,
166        Err(e) => {
167            info!(
168                "Could not read existing checkpoint list, starting with empty list: {}",
169                e
170            );
171            CheckpointsList {
172                checkpoints: vec![],
173            }
174        }
175    };
176
177    // If list is empty, we can't proceed with the normal algorithm
178    // as we need a starting checkpoint
179    if checkpoints_list.checkpoints.is_empty() {
180        return Err(anyhow!(
181            "Empty checkpoint list and no initial checkpoint to start from"
182        ));
183    }
184
185    let latest_in_list = checkpoints_list.checkpoints.last().unwrap();
186    // Create object store
187    let object_store = SuiObjectStore::new(config)?;
188
189    // Download the latest in list checkpoint
190    let summary = object_store
191        .download_checkpoint_summary(*latest_in_list)
192        .await?;
193    let mut last_epoch = summary.epoch();
194
195    // Download the very latest checkpoint
196    let client = SuiClientBuilder::default()
197        .build(config.full_node_url.as_str())
198        .await
199        .expect("Cannot connect to full node");
200
201    let latest_seq = client
202        .read_api()
203        .get_latest_checkpoint_sequence_number()
204        .await?;
205    let latest = object_store.download_checkpoint_summary(latest_seq).await?;
206
207    // Sequentially record all the missing end of epoch checkpoints numbers
208    while last_epoch + 1 < latest.epoch() {
209        let target_epoch = last_epoch + 1;
210        let target_last_checkpoint_number =
211            query_last_checkpoint_of_epoch(config, target_epoch).await?;
212
213        // Add to the list
214        checkpoints_list
215            .checkpoints
216            .push(target_last_checkpoint_number);
217
218        // Update
219        last_epoch = target_epoch;
220
221        info!(
222            "Last Epoch: {} Last Checkpoint: {}",
223            target_epoch, target_last_checkpoint_number
224        );
225    }
226
227    Ok(checkpoints_list)
228}
229
230pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
231    let checkpoints_list = sync_checkpoint_list_to_latest(config)
232        .await
233        .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
234
235    // Write the fetched checkpoint list to disk
236    write_checkpoint_list(config, &checkpoints_list)?;
237
238    // Load the genesis committee
239    let mut genesis_path = config.checkpoint_summary_dir.clone();
240    genesis_path.push(&config.genesis_filename);
241    let genesis_committee = Genesis::load(&genesis_path)?
242        .committee()
243        .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?;
244
245    // Check the signatures of all checkpoints
246    // And download any missing ones
247
248    let mut prev_committee = genesis_committee;
249    let object_store = SuiObjectStore::new(config)?;
250    for ckp_id in &checkpoints_list.checkpoints {
251        // check if there is a file with this name ckp_id.yaml in the checkpoint_summary_dir
252        let mut checkpoint_path = config.checkpoint_summary_dir.clone();
253        checkpoint_path.push(format!("{}.yaml", ckp_id));
254
255        // If file exists read the file otherwise download it from the server
256        let summary = if checkpoint_path.exists() {
257            read_checkpoint(config, *ckp_id)
258                .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
259        } else {
260            // Download the checkpoint from the server
261            let summary = object_store
262                .download_checkpoint_summary(*ckp_id)
263                .await
264                .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
265            summary.clone().try_into_verified(&prev_committee)?;
266            // Write the checkpoint summary to a file
267            write_checkpoint(config, &summary)?;
268            summary
269        };
270
271        // Print the id of the checkpoint and the epoch number
272        info!(
273            "Epoch: {} Checkpoint ID: {}",
274            summary.epoch(),
275            summary.digest()
276        );
277
278        // Extract the new committee information
279        prev_committee = extract_new_committee_info(&summary)?;
280    }
281
282    Ok(())
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use roaring::RoaringBitmap;
289    use sui_types::{
290        gas::GasCostSummary, messages_checkpoint::CheckpointContents,
291        supported_protocol_versions::ProtocolConfig,
292    };
293    use tempfile::TempDir;
294
295    fn create_test_config() -> (Config, TempDir) {
296        let temp_dir = TempDir::new().unwrap();
297        let config = Config {
298            checkpoint_summary_dir: temp_dir.path().to_path_buf(),
299            ..Default::default()
300        };
301        (config, temp_dir)
302    }
303
304    #[test]
305    fn test_checkpoint_list_read_write() {
306        let (config, _temp_dir) = create_test_config();
307        let test_list = CheckpointsList {
308            checkpoints: vec![1, 2, 3],
309        };
310
311        write_checkpoint_list(&config, &test_list).unwrap();
312        let read_list = read_checkpoint_list(&config).unwrap();
313
314        assert_eq!(test_list.checkpoints, read_list.checkpoints);
315    }
316
317    #[test]
318    fn test_checkpoint_read_write() {
319        let (config, _temp_dir) = create_test_config();
320        let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
321        let summary = CheckpointSummary::new(
322            &ProtocolConfig::get_for_max_version_UNSAFE(),
323            0,
324            0,
325            0,
326            &contents,
327            None,
328            GasCostSummary::default(),
329            None,
330            0,
331            Vec::new(),
332            Vec::new(),
333        );
334        let info = AuthorityQuorumSignInfo::<true> {
335            epoch: 0,
336            signature: Default::default(),
337            signers_map: RoaringBitmap::new(),
338        };
339        let test_summary = Envelope::new_from_data_and_sig(summary, info);
340
341        write_checkpoint(&config, &test_summary).unwrap();
342        let read_summary = read_checkpoint(&config, 0).unwrap();
343
344        assert_eq!(
345            test_summary.sequence_number(),
346            read_summary.sequence_number()
347        );
348    }
349}