sui_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs1use mysten_common::ZipDebugEqIteratorExt;
5
6use crate::checkpoints::checkpoint_executor::{CheckpointExecutionData, CheckpointTransactionData};
7use crate::execution_cache::TransactionCacheRead;
8use prost::Message;
9use std::collections::{BTreeSet, HashMap};
10use std::path::Path;
11use sui_rpc::field::FieldMask;
12use sui_rpc::field::FieldMaskUtil;
13use sui_rpc::merge::Merge;
14use sui_rpc::proto::sui::rpc;
15use sui_types::effects::TransactionEffectsAPI;
16use sui_types::error::{SuiErrorKind, SuiResult};
17use sui_types::full_checkpoint_content::{Checkpoint, ExecutedTransaction, ObjectSet};
18use sui_types::storage::ObjectStore;
19
20pub(crate) fn store_checkpoint_locally(
21 path: impl AsRef<Path>,
22 checkpoint: &Checkpoint,
23) -> SuiResult {
24 let path = path.as_ref();
25 let sequence_number = checkpoint.summary.sequence_number;
26
27 std::fs::create_dir_all(path).map_err(|err| {
28 SuiErrorKind::FileIOError(format!(
29 "failed to save full checkpoint content locally {:?}",
30 err
31 ))
32 })?;
33
34 let mask = FieldMask::from_paths([
35 rpc::v2::Checkpoint::path_builder().sequence_number(),
36 rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
37 rpc::v2::Checkpoint::path_builder().signature().finish(),
38 rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
39 rpc::v2::Checkpoint::path_builder()
40 .transactions()
41 .transaction()
42 .bcs()
43 .value(),
44 rpc::v2::Checkpoint::path_builder()
45 .transactions()
46 .effects()
47 .bcs()
48 .value(),
49 rpc::v2::Checkpoint::path_builder()
50 .transactions()
51 .effects()
52 .unchanged_loaded_runtime_objects()
53 .finish(),
54 rpc::v2::Checkpoint::path_builder()
55 .transactions()
56 .events()
57 .bcs()
58 .value(),
59 rpc::v2::Checkpoint::path_builder()
60 .objects()
61 .objects()
62 .bcs()
63 .value(),
64 ]);
65
66 let proto_checkpoint = rpc::v2::Checkpoint::merge_from(checkpoint, &mask.into());
67 let proto_bytes = proto_checkpoint.encode_to_vec();
68 let compressed = zstd::encode_all(&proto_bytes[..], 3).map_err(|_| {
69 SuiErrorKind::TransactionSerializationError {
70 error: "failed to compress checkpoint content".to_string(),
71 }
72 })?;
73
74 let file_name = format!("{}.binpb.zst", sequence_number);
75 std::fs::write(path.join(file_name), compressed).map_err(|_| {
76 SuiErrorKind::FileIOError("failed to save full checkpoint content locally".to_string())
77 })?;
78
79 Ok(())
80}
81
82pub(crate) fn load_checkpoint(
83 ckpt_data: &CheckpointExecutionData,
84 ckpt_tx_data: &CheckpointTransactionData,
85 object_store: &dyn ObjectStore,
86 transaction_cache_reader: &dyn TransactionCacheRead,
87) -> SuiResult<Checkpoint> {
88 let event_tx_digests = ckpt_tx_data
89 .effects
90 .iter()
91 .flat_map(|fx| fx.events_digest().map(|_| fx.transaction_digest()).copied())
92 .collect::<Vec<_>>();
93
94 let mut events = transaction_cache_reader
95 .multi_get_events(&event_tx_digests)
96 .into_iter()
97 .zip_debug_eq(event_tx_digests)
98 .map(|(maybe_event, tx_digest)| {
99 maybe_event
100 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest }.into())
101 .map(|event| (tx_digest, event))
102 })
103 .collect::<SuiResult<HashMap<_, _>>>()?;
104
105 let mut transactions = Vec::with_capacity(ckpt_tx_data.transactions.len());
106 for (tx, fx) in ckpt_tx_data
107 .transactions
108 .iter()
109 .zip_debug_eq(ckpt_tx_data.effects.iter())
110 {
111 let events = fx.events_digest().map(|_event_digest| {
112 events
113 .remove(fx.transaction_digest())
114 .expect("event was already checked to be present")
115 });
116
117 let transaction = ExecutedTransaction {
118 transaction: tx.transaction_data().clone(),
119 signatures: tx.tx_signatures().to_vec(),
120 effects: fx.clone(),
121 events,
122 unchanged_loaded_runtime_objects: transaction_cache_reader
123 .get_unchanged_loaded_runtime_objects(tx.digest())
124 .unwrap_or_default(),
127 };
128 transactions.push(transaction);
129 }
130
131 let object_set = {
132 let refs = transactions
133 .iter()
134 .flat_map(|tx| {
135 sui_types::storage::get_transaction_object_set(
136 &tx.transaction,
137 &tx.effects,
138 &tx.unchanged_loaded_runtime_objects,
139 )
140 })
141 .collect::<BTreeSet<_>>()
142 .into_iter()
143 .collect::<Vec<_>>();
144
145 let objects = object_store.multi_get_objects_by_key(&refs);
146
147 let mut object_set = ObjectSet::default();
148 for (idx, object) in objects.into_iter().enumerate() {
149 object_set.insert(object.ok_or_else(|| {
150 sui_types::storage::error::Error::custom(format!(
151 "unabled to load object {:?}",
152 refs[idx]
153 ))
154 })?);
155 }
156 object_set
157 };
158 let checkpoint = Checkpoint {
159 summary: ckpt_data.checkpoint.clone().into(),
160 contents: ckpt_data.checkpoint_contents.clone(),
161 transactions,
162 object_set,
163 };
164 Ok(checkpoint)
165}