sui_light_client/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::committee::extract_new_committee_info;
5use crate::config::Config;
6use crate::graphql::query_last_checkpoint_of_epoch;
7use crate::object_store::SuiObjectStore;
8use anyhow::{Result, anyhow};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::io::Read;
12use std::{fs, io::Write};
13use sui_config::genesis::Genesis;
14use sui_rpc_api::Client;
15use sui_storage::object_store::util::end_of_epoch_data;
16use sui_types::{
17    crypto::AuthorityQuorumSignInfo, message_envelope::Envelope,
18    messages_checkpoint::CheckpointSummary,
19};
20use tracing::info;
21
22#[derive(Debug, Clone, Deserialize, Serialize)]
23pub struct CheckpointsList {
24    pub checkpoints: Vec<u64>,
25}
26
27pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointsList> {
28    let checkpoints_path = config.checkpoint_list_path();
29    let reader = fs::File::open(checkpoints_path)?;
30    Ok(serde_yaml::from_reader(reader)?)
31}
32
33pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointsList) -> Result<()> {
34    let checkpoints_path = config.checkpoint_list_path();
35    let mut writer = fs::File::create(checkpoints_path)?;
36    let bytes = serde_yaml::to_vec(checkpoints_list)?;
37    writer
38        .write_all(&bytes)
39        .map_err(|e| anyhow!("Unable to serialize checkpoint list: {}", e))
40}
41
42pub fn read_checkpoint(
43    config: &Config,
44    seq: u64,
45) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
46    read_checkpoint_general(config, seq, None)
47}
48
49fn read_checkpoint_general(
50    config: &Config,
51    seq: u64,
52    path: Option<&str>,
53) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
54    let checkpoint_path = config.checkpoint_path(seq, path);
55    let mut reader = fs::File::open(checkpoint_path)?;
56    let mut buffer = Vec::new();
57    reader.read_to_end(&mut buffer)?;
58    bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
59}
60
61pub fn write_checkpoint(
62    config: &Config,
63    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
64) -> Result<()> {
65    write_checkpoint_general(config, summary, None)
66}
67
68fn write_checkpoint_general(
69    config: &Config,
70    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
71    path: Option<&str>,
72) -> Result<()> {
73    let checkpoint_path = config.checkpoint_path(*summary.sequence_number(), path);
74    let mut writer = fs::File::create(checkpoint_path)?;
75    let bytes =
76        bcs::to_bytes(summary).map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
77    writer.write_all(&bytes)?;
78    Ok(())
79}
80
81/// Downloads the list of end of epoch checkpoints from the archive store or the GraphQL endpoint
82async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointsList> {
83    // Try getting checkpoints from GraphQL if URL is configured
84    let graphql_list = if config.graphql_url.is_some() {
85        match sync_checkpoint_list_to_latest_using_graphql(config).await {
86            Ok(list) => list,
87            Err(e) => {
88                info!("Failed to get checkpoints from GraphQL: {}", e);
89                CheckpointsList {
90                    checkpoints: vec![],
91                }
92            }
93        }
94    } else {
95        CheckpointsList {
96            checkpoints: vec![],
97        }
98    };
99
100    // Try getting checkpoints from archive store if configured
101    let archive_list = match sync_checkpoint_list_to_latest_using_checkpoint_bucket(
102        config.object_store_url.clone(),
103    )
104    .await
105    {
106        Ok(list) => list,
107        Err(e) => {
108            info!("Failed to get checkpoints from archive: {}", e);
109            CheckpointsList {
110                checkpoints: vec![],
111            }
112        }
113    };
114
115    // Verify we have at least some checkpoints
116    if graphql_list.checkpoints.is_empty() && archive_list.checkpoints.is_empty() {
117        return Err(anyhow!(
118            "Could not retrieve any checkpoints from configured sources"
119        ));
120    }
121
122    let merged_checkpoints = merge_checkpoint_lists(&graphql_list, &archive_list);
123    Ok(CheckpointsList {
124        checkpoints: merged_checkpoints,
125    })
126}
127
128/// Merges two checkpoint lists, removing duplicates and ensuring the result is sorted
129fn merge_checkpoint_lists(list1: &CheckpointsList, list2: &CheckpointsList) -> Vec<u64> {
130    // Combine both lists into a HashSet to remove duplicates
131    let unique_checkpoints: HashSet<u64> = list1
132        .checkpoints
133        .iter()
134        .chain(list2.checkpoints.iter())
135        .copied()
136        .collect();
137
138    // Convert to sorted vector
139    let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
140    sorted_checkpoints.sort();
141
142    sorted_checkpoints
143}
144
145/// Downloads the list of end of epoch checkpoints from the archive store
146async fn sync_checkpoint_list_to_latest_using_checkpoint_bucket(
147    archive_url: String,
148) -> anyhow::Result<CheckpointsList> {
149    info!("Syncing checkpoints from Archive store");
150    let checkpoints = end_of_epoch_data(&archive_url, vec![]).await?;
151    Ok(CheckpointsList { checkpoints })
152}
153
154/// Run binary search to for each end of epoch checkpoint that is missing
155/// between the latest on the list and the latest checkpoint.
156async fn sync_checkpoint_list_to_latest_using_graphql(
157    config: &Config,
158) -> anyhow::Result<CheckpointsList> {
159    info!("Syncing checkpoints from GraphQL");
160    // Get the local checkpoint list, or create an empty one if it doesn't exist
161    let mut checkpoints_list = match read_checkpoint_list(config) {
162        Ok(list) => list,
163        Err(e) => {
164            info!(
165                "Could not read existing checkpoint list, starting with empty list: {}",
166                e
167            );
168            CheckpointsList {
169                checkpoints: vec![],
170            }
171        }
172    };
173
174    // If list is empty, we can't proceed with the normal algorithm
175    // as we need a starting checkpoint
176    if checkpoints_list.checkpoints.is_empty() {
177        return Err(anyhow!(
178            "Empty checkpoint list and no initial checkpoint to start from"
179        ));
180    }
181
182    let latest_in_list = checkpoints_list.checkpoints.last().unwrap();
183    // Create object store
184    let object_store = SuiObjectStore::new(config)?;
185
186    // Download the latest in list checkpoint
187    let summary = object_store
188        .download_checkpoint_summary(*latest_in_list)
189        .await?;
190    let mut last_epoch = summary.epoch();
191
192    // Download the very latest checkpoint
193    let mut client =
194        Client::new(config.full_node_url.as_str()).expect("Cannot connect to full node");
195
196    let latest_seq = client.get_latest_checkpoint().await?.sequence_number;
197    let latest = object_store.download_checkpoint_summary(latest_seq).await?;
198
199    // Sequentially record all the missing end of epoch checkpoints numbers
200    while last_epoch + 1 < latest.epoch() {
201        let target_epoch = last_epoch + 1;
202        let target_last_checkpoint_number =
203            query_last_checkpoint_of_epoch(config, target_epoch).await?;
204
205        // Add to the list
206        checkpoints_list
207            .checkpoints
208            .push(target_last_checkpoint_number);
209
210        // Update
211        last_epoch = target_epoch;
212
213        info!(
214            "Last Epoch: {} Last Checkpoint: {}",
215            target_epoch, target_last_checkpoint_number
216        );
217    }
218
219    Ok(checkpoints_list)
220}
221
222pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
223    let checkpoints_list = sync_checkpoint_list_to_latest(config)
224        .await
225        .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
226
227    // Write the fetched checkpoint list to disk
228    write_checkpoint_list(config, &checkpoints_list)?;
229
230    // Load the genesis committee
231    let mut genesis_path = config.checkpoint_summary_dir.clone();
232    genesis_path.push(&config.genesis_filename);
233    let genesis_committee = Genesis::load(&genesis_path)?.committee();
234
235    // Check the signatures of all checkpoints
236    // And download any missing ones
237
238    let mut prev_committee = genesis_committee;
239    let object_store = SuiObjectStore::new(config)?;
240    for ckp_id in &checkpoints_list.checkpoints {
241        // check if there is a file with this name ckp_id.yaml in the checkpoint_summary_dir
242        let mut checkpoint_path = config.checkpoint_summary_dir.clone();
243        checkpoint_path.push(format!("{}.yaml", ckp_id));
244
245        // If file exists read the file otherwise download it from the server
246        let summary = if checkpoint_path.exists() {
247            read_checkpoint(config, *ckp_id)
248                .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
249        } else {
250            // Download the checkpoint from the server
251            let summary = object_store
252                .download_checkpoint_summary(*ckp_id)
253                .await
254                .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
255            summary.clone().try_into_verified(&prev_committee)?;
256            // Write the checkpoint summary to a file
257            write_checkpoint(config, &summary)?;
258            summary
259        };
260
261        // Print the id of the checkpoint and the epoch number
262        info!(
263            "Epoch: {} Checkpoint ID: {}",
264            summary.epoch(),
265            summary.digest()
266        );
267
268        // Extract the new committee information
269        prev_committee = extract_new_committee_info(&summary)?;
270    }
271
272    Ok(())
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use roaring::RoaringBitmap;
279    use sui_types::{
280        gas::GasCostSummary, messages_checkpoint::CheckpointContents,
281        supported_protocol_versions::ProtocolConfig,
282    };
283    use tempfile::TempDir;
284
285    fn create_test_config() -> (Config, TempDir) {
286        let temp_dir = TempDir::new().unwrap();
287        let config = Config {
288            checkpoint_summary_dir: temp_dir.path().to_path_buf(),
289            ..Default::default()
290        };
291        (config, temp_dir)
292    }
293
294    #[test]
295    fn test_checkpoint_list_read_write() {
296        let (config, _temp_dir) = create_test_config();
297        let test_list = CheckpointsList {
298            checkpoints: vec![1, 2, 3],
299        };
300
301        write_checkpoint_list(&config, &test_list).unwrap();
302        let read_list = read_checkpoint_list(&config).unwrap();
303
304        assert_eq!(test_list.checkpoints, read_list.checkpoints);
305    }
306
307    #[test]
308    fn test_checkpoint_read_write() {
309        let (config, _temp_dir) = create_test_config();
310        let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
311        let summary = CheckpointSummary::new(
312            &ProtocolConfig::get_for_max_version_UNSAFE(),
313            0,
314            0,
315            0,
316            &contents,
317            None,
318            GasCostSummary::default(),
319            None,
320            0,
321            Vec::new(),
322            Vec::new(),
323        );
324        let info = AuthorityQuorumSignInfo::<true> {
325            epoch: 0,
326            signature: Default::default(),
327            signers_map: RoaringBitmap::new(),
328        };
329        let test_summary = Envelope::new_from_data_and_sig(summary, info);
330
331        write_checkpoint(&config, &test_summary).unwrap();
332        let read_summary = read_checkpoint(&config, 0).unwrap();
333
334        assert_eq!(
335            test_summary.sequence_number(),
336            read_summary.sequence_number()
337        );
338    }
339}