sui_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::checkpoints::checkpoint_executor::{CheckpointExecutionData, CheckpointTransactionData};
5use crate::execution_cache::TransactionCacheRead;
6use prost::Message;
7use std::collections::{BTreeSet, HashMap};
8use std::path::Path;
9use sui_rpc::field::FieldMask;
10use sui_rpc::field::FieldMaskUtil;
11use sui_rpc::merge::Merge;
12use sui_rpc::proto::sui::rpc;
13use sui_types::effects::TransactionEffectsAPI;
14use sui_types::error::{SuiErrorKind, SuiResult};
15use sui_types::full_checkpoint_content::{
16    Checkpoint, CheckpointData, ExecutedTransaction, ObjectSet,
17};
18use sui_types::storage::ObjectStore;
19
20pub(crate) fn store_checkpoint_locally(
21    path: impl AsRef<Path>,
22    checkpoint_data: &CheckpointData,
23) -> SuiResult {
24    let path = path.as_ref();
25    let sequence_number = checkpoint_data.checkpoint_summary.sequence_number;
26
27    std::fs::create_dir_all(path).map_err(|err| {
28        SuiErrorKind::FileIOError(format!(
29            "failed to save full checkpoint content locally {:?}",
30            err
31        ))
32    })?;
33
34    let checkpoint: Checkpoint = checkpoint_data.clone().into();
35
36    let mask = FieldMask::from_paths([
37        rpc::v2::Checkpoint::path_builder().sequence_number(),
38        rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
39        rpc::v2::Checkpoint::path_builder().signature().finish(),
40        rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
41        rpc::v2::Checkpoint::path_builder()
42            .transactions()
43            .transaction()
44            .bcs()
45            .value(),
46        rpc::v2::Checkpoint::path_builder()
47            .transactions()
48            .effects()
49            .bcs()
50            .value(),
51        rpc::v2::Checkpoint::path_builder()
52            .transactions()
53            .effects()
54            .unchanged_loaded_runtime_objects()
55            .finish(),
56        rpc::v2::Checkpoint::path_builder()
57            .transactions()
58            .events()
59            .bcs()
60            .value(),
61        rpc::v2::Checkpoint::path_builder()
62            .objects()
63            .objects()
64            .bcs()
65            .value(),
66    ]);
67
68    let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
69    let proto_bytes = proto_checkpoint.encode_to_vec();
70    let compressed = zstd::encode_all(&proto_bytes[..], 3).map_err(|_| {
71        SuiErrorKind::TransactionSerializationError {
72            error: "failed to compress checkpoint content".to_string(),
73        }
74    })?;
75
76    let file_name = format!("{}.binpb.zst", sequence_number);
77    std::fs::write(path.join(file_name), compressed).map_err(|_| {
78        SuiErrorKind::FileIOError("failed to save full checkpoint content locally".to_string())
79    })?;
80
81    Ok(())
82}
83
84pub(crate) fn load_checkpoint(
85    ckpt_data: &CheckpointExecutionData,
86    ckpt_tx_data: &CheckpointTransactionData,
87    object_store: &dyn ObjectStore,
88    transaction_cache_reader: &dyn TransactionCacheRead,
89) -> SuiResult<Checkpoint> {
90    let event_tx_digests = ckpt_tx_data
91        .effects
92        .iter()
93        .flat_map(|fx| fx.events_digest().map(|_| fx.transaction_digest()).copied())
94        .collect::<Vec<_>>();
95
96    let mut events = transaction_cache_reader
97        .multi_get_events(&event_tx_digests)
98        .into_iter()
99        .zip(event_tx_digests)
100        .map(|(maybe_event, tx_digest)| {
101            maybe_event
102                .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest }.into())
103                .map(|event| (tx_digest, event))
104        })
105        .collect::<SuiResult<HashMap<_, _>>>()?;
106
107    let mut transactions = Vec::with_capacity(ckpt_tx_data.transactions.len());
108    for (tx, fx) in ckpt_tx_data
109        .transactions
110        .iter()
111        .zip(ckpt_tx_data.effects.iter())
112    {
113        let events = fx.events_digest().map(|_event_digest| {
114            events
115                .remove(fx.transaction_digest())
116                .expect("event was already checked to be present")
117        });
118
119        let transaction = ExecutedTransaction {
120            transaction: tx.transaction_data().clone(),
121            signatures: tx.tx_signatures().to_vec(),
122            effects: fx.clone(),
123            events,
124            unchanged_loaded_runtime_objects: transaction_cache_reader
125                .get_unchanged_loaded_runtime_objects(tx.digest())
126                // We don't write empty sets to the DB to save space, so if this load went through
127                // the writeback cache to the DB itself it wouldn't find an entry.
128                .unwrap_or_default(),
129        };
130        transactions.push(transaction);
131    }
132
133    let object_set = {
134        let refs = transactions
135            .iter()
136            .flat_map(|tx| {
137                sui_types::storage::get_transaction_object_set(
138                    &tx.transaction,
139                    &tx.effects,
140                    &tx.unchanged_loaded_runtime_objects,
141                )
142            })
143            .collect::<BTreeSet<_>>()
144            .into_iter()
145            .collect::<Vec<_>>();
146
147        let objects = object_store.multi_get_objects_by_key(&refs);
148
149        let mut object_set = ObjectSet::default();
150        for (idx, object) in objects.into_iter().enumerate() {
151            object_set.insert(object.ok_or_else(|| {
152                sui_types::storage::error::Error::custom(format!(
153                    "unabled to load object {:?}",
154                    refs[idx]
155                ))
156            })?);
157        }
158        object_set
159    };
160    let checkpoint = Checkpoint {
161        summary: ckpt_data.checkpoint.clone().into(),
162        contents: ckpt_data.checkpoint_contents.clone(),
163        transactions,
164        object_set,
165    };
166    Ok(checkpoint)
167}