sui_light_client/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::committee::extract_new_committee_info;
5use crate::config::Config;
6use crate::graphql::query_last_checkpoint_of_epoch;
7use crate::object_store::SuiObjectStore;
8use anyhow::{Result, anyhow};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::io::Read;
12use std::{fs, io::Write};
13use sui_config::genesis::Genesis;
14use sui_data_ingestion_core::end_of_epoch_data;
15use sui_rpc_api::Client;
16use sui_types::{
17    crypto::AuthorityQuorumSignInfo, message_envelope::Envelope,
18    messages_checkpoint::CheckpointSummary,
19};
20use tracing::info;
21
22const CHECKPOINT_BUCKET_TIMEOUT_SECS: u64 = 5;
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct CheckpointsList {
26    pub checkpoints: Vec<u64>,
27}
28
29pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointsList> {
30    let checkpoints_path = config.checkpoint_list_path();
31    let reader = fs::File::open(checkpoints_path)?;
32    Ok(serde_yaml::from_reader(reader)?)
33}
34
35pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointsList) -> Result<()> {
36    let checkpoints_path = config.checkpoint_list_path();
37    let mut writer = fs::File::create(checkpoints_path)?;
38    let bytes = serde_yaml::to_vec(checkpoints_list)?;
39    writer
40        .write_all(&bytes)
41        .map_err(|e| anyhow!("Unable to serialize checkpoint list: {}", e))
42}
43
44pub fn read_checkpoint(
45    config: &Config,
46    seq: u64,
47) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
48    read_checkpoint_general(config, seq, None)
49}
50
51fn read_checkpoint_general(
52    config: &Config,
53    seq: u64,
54    path: Option<&str>,
55) -> Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
56    let checkpoint_path = config.checkpoint_path(seq, path);
57    let mut reader = fs::File::open(checkpoint_path)?;
58    let mut buffer = Vec::new();
59    reader.read_to_end(&mut buffer)?;
60    bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
61}
62
63pub fn write_checkpoint(
64    config: &Config,
65    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
66) -> Result<()> {
67    write_checkpoint_general(config, summary, None)
68}
69
70fn write_checkpoint_general(
71    config: &Config,
72    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
73    path: Option<&str>,
74) -> Result<()> {
75    let checkpoint_path = config.checkpoint_path(*summary.sequence_number(), path);
76    let mut writer = fs::File::create(checkpoint_path)?;
77    let bytes =
78        bcs::to_bytes(summary).map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
79    writer.write_all(&bytes)?;
80    Ok(())
81}
82
83/// Downloads the list of end of epoch checkpoints from the archive store or the GraphQL endpoint
84async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointsList> {
85    // Try getting checkpoints from GraphQL if URL is configured
86    let graphql_list = if config.graphql_url.is_some() {
87        match sync_checkpoint_list_to_latest_using_graphql(config).await {
88            Ok(list) => list,
89            Err(e) => {
90                info!("Failed to get checkpoints from GraphQL: {}", e);
91                CheckpointsList {
92                    checkpoints: vec![],
93                }
94            }
95        }
96    } else {
97        CheckpointsList {
98            checkpoints: vec![],
99        }
100    };
101
102    // Try getting checkpoints from archive store if configured
103    let archive_list = match sync_checkpoint_list_to_latest_using_checkpoint_bucket(
104        config.object_store_url.clone(),
105    )
106    .await
107    {
108        Ok(list) => list,
109        Err(e) => {
110            info!("Failed to get checkpoints from archive: {}", e);
111            CheckpointsList {
112                checkpoints: vec![],
113            }
114        }
115    };
116
117    // Verify we have at least some checkpoints
118    if graphql_list.checkpoints.is_empty() && archive_list.checkpoints.is_empty() {
119        return Err(anyhow!(
120            "Could not retrieve any checkpoints from configured sources"
121        ));
122    }
123
124    let merged_checkpoints = merge_checkpoint_lists(&graphql_list, &archive_list);
125    Ok(CheckpointsList {
126        checkpoints: merged_checkpoints,
127    })
128}
129
130/// Merges two checkpoint lists, removing duplicates and ensuring the result is sorted
131fn merge_checkpoint_lists(list1: &CheckpointsList, list2: &CheckpointsList) -> Vec<u64> {
132    // Combine both lists into a HashSet to remove duplicates
133    let unique_checkpoints: HashSet<u64> = list1
134        .checkpoints
135        .iter()
136        .chain(list2.checkpoints.iter())
137        .copied()
138        .collect();
139
140    // Convert to sorted vector
141    let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
142    sorted_checkpoints.sort();
143
144    sorted_checkpoints
145}
146
147/// Downloads the list of end of epoch checkpoints from the archive store
148async fn sync_checkpoint_list_to_latest_using_checkpoint_bucket(
149    archive_url: String,
150) -> anyhow::Result<CheckpointsList> {
151    info!("Syncing checkpoints from Archive store");
152    let checkpoints =
153        end_of_epoch_data(archive_url, vec![], CHECKPOINT_BUCKET_TIMEOUT_SECS).await?;
154    Ok(CheckpointsList { checkpoints })
155}
156
157/// Run binary search to for each end of epoch checkpoint that is missing
158/// between the latest on the list and the latest checkpoint.
159async fn sync_checkpoint_list_to_latest_using_graphql(
160    config: &Config,
161) -> anyhow::Result<CheckpointsList> {
162    info!("Syncing checkpoints from GraphQL");
163    // Get the local checkpoint list, or create an empty one if it doesn't exist
164    let mut checkpoints_list = match read_checkpoint_list(config) {
165        Ok(list) => list,
166        Err(e) => {
167            info!(
168                "Could not read existing checkpoint list, starting with empty list: {}",
169                e
170            );
171            CheckpointsList {
172                checkpoints: vec![],
173            }
174        }
175    };
176
177    // If list is empty, we can't proceed with the normal algorithm
178    // as we need a starting checkpoint
179    if checkpoints_list.checkpoints.is_empty() {
180        return Err(anyhow!(
181            "Empty checkpoint list and no initial checkpoint to start from"
182        ));
183    }
184
185    let latest_in_list = checkpoints_list.checkpoints.last().unwrap();
186    // Create object store
187    let object_store = SuiObjectStore::new(config)?;
188
189    // Download the latest in list checkpoint
190    let summary = object_store
191        .download_checkpoint_summary(*latest_in_list)
192        .await?;
193    let mut last_epoch = summary.epoch();
194
195    // Download the very latest checkpoint
196    let mut client =
197        Client::new(config.full_node_url.as_str()).expect("Cannot connect to full node");
198
199    let latest_seq = client.get_latest_checkpoint().await?.sequence_number;
200    let latest = object_store.download_checkpoint_summary(latest_seq).await?;
201
202    // Sequentially record all the missing end of epoch checkpoints numbers
203    while last_epoch + 1 < latest.epoch() {
204        let target_epoch = last_epoch + 1;
205        let target_last_checkpoint_number =
206            query_last_checkpoint_of_epoch(config, target_epoch).await?;
207
208        // Add to the list
209        checkpoints_list
210            .checkpoints
211            .push(target_last_checkpoint_number);
212
213        // Update
214        last_epoch = target_epoch;
215
216        info!(
217            "Last Epoch: {} Last Checkpoint: {}",
218            target_epoch, target_last_checkpoint_number
219        );
220    }
221
222    Ok(checkpoints_list)
223}
224
225pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
226    let checkpoints_list = sync_checkpoint_list_to_latest(config)
227        .await
228        .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
229
230    // Write the fetched checkpoint list to disk
231    write_checkpoint_list(config, &checkpoints_list)?;
232
233    // Load the genesis committee
234    let mut genesis_path = config.checkpoint_summary_dir.clone();
235    genesis_path.push(&config.genesis_filename);
236    let genesis_committee = Genesis::load(&genesis_path)?.committee();
237
238    // Check the signatures of all checkpoints
239    // And download any missing ones
240
241    let mut prev_committee = genesis_committee;
242    let object_store = SuiObjectStore::new(config)?;
243    for ckp_id in &checkpoints_list.checkpoints {
244        // check if there is a file with this name ckp_id.yaml in the checkpoint_summary_dir
245        let mut checkpoint_path = config.checkpoint_summary_dir.clone();
246        checkpoint_path.push(format!("{}.yaml", ckp_id));
247
248        // If file exists read the file otherwise download it from the server
249        let summary = if checkpoint_path.exists() {
250            read_checkpoint(config, *ckp_id)
251                .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
252        } else {
253            // Download the checkpoint from the server
254            let summary = object_store
255                .download_checkpoint_summary(*ckp_id)
256                .await
257                .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
258            summary.clone().try_into_verified(&prev_committee)?;
259            // Write the checkpoint summary to a file
260            write_checkpoint(config, &summary)?;
261            summary
262        };
263
264        // Print the id of the checkpoint and the epoch number
265        info!(
266            "Epoch: {} Checkpoint ID: {}",
267            summary.epoch(),
268            summary.digest()
269        );
270
271        // Extract the new committee information
272        prev_committee = extract_new_committee_info(&summary)?;
273    }
274
275    Ok(())
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use roaring::RoaringBitmap;
282    use sui_types::{
283        gas::GasCostSummary, messages_checkpoint::CheckpointContents,
284        supported_protocol_versions::ProtocolConfig,
285    };
286    use tempfile::TempDir;
287
288    fn create_test_config() -> (Config, TempDir) {
289        let temp_dir = TempDir::new().unwrap();
290        let config = Config {
291            checkpoint_summary_dir: temp_dir.path().to_path_buf(),
292            ..Default::default()
293        };
294        (config, temp_dir)
295    }
296
297    #[test]
298    fn test_checkpoint_list_read_write() {
299        let (config, _temp_dir) = create_test_config();
300        let test_list = CheckpointsList {
301            checkpoints: vec![1, 2, 3],
302        };
303
304        write_checkpoint_list(&config, &test_list).unwrap();
305        let read_list = read_checkpoint_list(&config).unwrap();
306
307        assert_eq!(test_list.checkpoints, read_list.checkpoints);
308    }
309
310    #[test]
311    fn test_checkpoint_read_write() {
312        let (config, _temp_dir) = create_test_config();
313        let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
314        let summary = CheckpointSummary::new(
315            &ProtocolConfig::get_for_max_version_UNSAFE(),
316            0,
317            0,
318            0,
319            &contents,
320            None,
321            GasCostSummary::default(),
322            None,
323            0,
324            Vec::new(),
325            Vec::new(),
326        );
327        let info = AuthorityQuorumSignInfo::<true> {
328            epoch: 0,
329            signature: Default::default(),
330            signers_map: RoaringBitmap::new(),
331        };
332        let test_summary = Envelope::new_from_data_and_sig(summary, info);
333
334        write_checkpoint(&config, &test_summary).unwrap();
335        let read_summary = read_checkpoint(&config, 0).unwrap();
336
337        assert_eq!(
338            test_summary.sequence_number(),
339            read_summary.sequence_number()
340        );
341    }
342}