sui_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs1use crate::checkpoints::checkpoint_executor::{CheckpointExecutionData, CheckpointTransactionData};
5use crate::execution_cache::TransactionCacheRead;
6use prost::Message;
7use std::collections::{BTreeSet, HashMap};
8use std::path::Path;
9use sui_rpc::field::FieldMask;
10use sui_rpc::field::FieldMaskUtil;
11use sui_rpc::merge::Merge;
12use sui_rpc::proto::sui::rpc;
13use sui_types::effects::TransactionEffectsAPI;
14use sui_types::error::{SuiErrorKind, SuiResult};
15use sui_types::full_checkpoint_content::{
16 Checkpoint, CheckpointData, ExecutedTransaction, ObjectSet,
17};
18use sui_types::storage::ObjectStore;
19
20pub(crate) fn store_checkpoint_locally(
21 path: impl AsRef<Path>,
22 checkpoint_data: &CheckpointData,
23) -> SuiResult {
24 let path = path.as_ref();
25 let sequence_number = checkpoint_data.checkpoint_summary.sequence_number;
26
27 std::fs::create_dir_all(path).map_err(|err| {
28 SuiErrorKind::FileIOError(format!(
29 "failed to save full checkpoint content locally {:?}",
30 err
31 ))
32 })?;
33
34 let checkpoint: Checkpoint = checkpoint_data.clone().into();
35
36 let mask = FieldMask::from_paths([
37 rpc::v2::Checkpoint::path_builder().sequence_number(),
38 rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
39 rpc::v2::Checkpoint::path_builder().signature().finish(),
40 rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
41 rpc::v2::Checkpoint::path_builder()
42 .transactions()
43 .transaction()
44 .bcs()
45 .value(),
46 rpc::v2::Checkpoint::path_builder()
47 .transactions()
48 .effects()
49 .bcs()
50 .value(),
51 rpc::v2::Checkpoint::path_builder()
52 .transactions()
53 .effects()
54 .unchanged_loaded_runtime_objects()
55 .finish(),
56 rpc::v2::Checkpoint::path_builder()
57 .transactions()
58 .events()
59 .bcs()
60 .value(),
61 rpc::v2::Checkpoint::path_builder()
62 .objects()
63 .objects()
64 .bcs()
65 .value(),
66 ]);
67
68 let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
69 let proto_bytes = proto_checkpoint.encode_to_vec();
70 let compressed = zstd::encode_all(&proto_bytes[..], 3).map_err(|_| {
71 SuiErrorKind::TransactionSerializationError {
72 error: "failed to compress checkpoint content".to_string(),
73 }
74 })?;
75
76 let file_name = format!("{}.binpb.zst", sequence_number);
77 std::fs::write(path.join(file_name), compressed).map_err(|_| {
78 SuiErrorKind::FileIOError("failed to save full checkpoint content locally".to_string())
79 })?;
80
81 Ok(())
82}
83
84pub(crate) fn load_checkpoint(
85 ckpt_data: &CheckpointExecutionData,
86 ckpt_tx_data: &CheckpointTransactionData,
87 object_store: &dyn ObjectStore,
88 transaction_cache_reader: &dyn TransactionCacheRead,
89) -> SuiResult<Checkpoint> {
90 let event_tx_digests = ckpt_tx_data
91 .effects
92 .iter()
93 .flat_map(|fx| fx.events_digest().map(|_| fx.transaction_digest()).copied())
94 .collect::<Vec<_>>();
95
96 let mut events = transaction_cache_reader
97 .multi_get_events(&event_tx_digests)
98 .into_iter()
99 .zip(event_tx_digests)
100 .map(|(maybe_event, tx_digest)| {
101 maybe_event
102 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest }.into())
103 .map(|event| (tx_digest, event))
104 })
105 .collect::<SuiResult<HashMap<_, _>>>()?;
106
107 let mut transactions = Vec::with_capacity(ckpt_tx_data.transactions.len());
108 for (tx, fx) in ckpt_tx_data
109 .transactions
110 .iter()
111 .zip(ckpt_tx_data.effects.iter())
112 {
113 let events = fx.events_digest().map(|_event_digest| {
114 events
115 .remove(fx.transaction_digest())
116 .expect("event was already checked to be present")
117 });
118
119 let transaction = ExecutedTransaction {
120 transaction: tx.transaction_data().clone(),
121 signatures: tx.tx_signatures().to_vec(),
122 effects: fx.clone(),
123 events,
124 unchanged_loaded_runtime_objects: transaction_cache_reader
125 .get_unchanged_loaded_runtime_objects(tx.digest())
126 .unwrap_or_default(),
129 };
130 transactions.push(transaction);
131 }
132
133 let object_set = {
134 let refs = transactions
135 .iter()
136 .flat_map(|tx| {
137 sui_types::storage::get_transaction_object_set(
138 &tx.transaction,
139 &tx.effects,
140 &tx.unchanged_loaded_runtime_objects,
141 )
142 })
143 .collect::<BTreeSet<_>>()
144 .into_iter()
145 .collect::<Vec<_>>();
146
147 let objects = object_store.multi_get_objects_by_key(&refs);
148
149 let mut object_set = ObjectSet::default();
150 for (idx, object) in objects.into_iter().enumerate() {
151 object_set.insert(object.ok_or_else(|| {
152 sui_types::storage::error::Error::custom(format!(
153 "unabled to load object {:?}",
154 refs[idx]
155 ))
156 })?);
157 }
158 object_set
159 };
160 let checkpoint = Checkpoint {
161 summary: ckpt_data.checkpoint.clone().into(),
162 contents: ckpt_data.checkpoint_contents.clone(),
163 transactions,
164 object_set,
165 };
166 Ok(checkpoint)
167}