sui_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use mysten_common::ZipDebugEqIteratorExt;
5
6use crate::checkpoints::checkpoint_executor::{CheckpointExecutionData, CheckpointTransactionData};
7use crate::execution_cache::TransactionCacheRead;
8use prost::Message;
9use std::collections::{BTreeSet, HashMap};
10use std::path::Path;
11use sui_rpc::field::FieldMask;
12use sui_rpc::field::FieldMaskUtil;
13use sui_rpc::merge::Merge;
14use sui_rpc::proto::sui::rpc;
15use sui_types::effects::TransactionEffectsAPI;
16use sui_types::error::{SuiErrorKind, SuiResult};
17use sui_types::full_checkpoint_content::{Checkpoint, ExecutedTransaction, ObjectSet};
18use sui_types::storage::ObjectStore;
19
20pub(crate) fn store_checkpoint_locally(
21    path: impl AsRef<Path>,
22    checkpoint: &Checkpoint,
23) -> SuiResult {
24    let path = path.as_ref();
25    let sequence_number = checkpoint.summary.sequence_number;
26
27    std::fs::create_dir_all(path).map_err(|err| {
28        SuiErrorKind::FileIOError(format!(
29            "failed to save full checkpoint content locally {:?}",
30            err
31        ))
32    })?;
33
34    let mask = FieldMask::from_paths([
35        rpc::v2::Checkpoint::path_builder().sequence_number(),
36        rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
37        rpc::v2::Checkpoint::path_builder().signature().finish(),
38        rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
39        rpc::v2::Checkpoint::path_builder()
40            .transactions()
41            .transaction()
42            .bcs()
43            .value(),
44        rpc::v2::Checkpoint::path_builder()
45            .transactions()
46            .effects()
47            .bcs()
48            .value(),
49        rpc::v2::Checkpoint::path_builder()
50            .transactions()
51            .effects()
52            .unchanged_loaded_runtime_objects()
53            .finish(),
54        rpc::v2::Checkpoint::path_builder()
55            .transactions()
56            .events()
57            .bcs()
58            .value(),
59        rpc::v2::Checkpoint::path_builder()
60            .objects()
61            .objects()
62            .bcs()
63            .value(),
64    ]);
65
66    let proto_checkpoint = rpc::v2::Checkpoint::merge_from(checkpoint, &mask.into());
67    let proto_bytes = proto_checkpoint.encode_to_vec();
68    let compressed = zstd::encode_all(&proto_bytes[..], 3).map_err(|_| {
69        SuiErrorKind::TransactionSerializationError {
70            error: "failed to compress checkpoint content".to_string(),
71        }
72    })?;
73
74    let file_name = format!("{}.binpb.zst", sequence_number);
75    std::fs::write(path.join(file_name), compressed).map_err(|_| {
76        SuiErrorKind::FileIOError("failed to save full checkpoint content locally".to_string())
77    })?;
78
79    Ok(())
80}
81
82pub(crate) fn load_checkpoint(
83    ckpt_data: &CheckpointExecutionData,
84    ckpt_tx_data: &CheckpointTransactionData,
85    object_store: &dyn ObjectStore,
86    transaction_cache_reader: &dyn TransactionCacheRead,
87) -> SuiResult<Checkpoint> {
88    let event_tx_digests = ckpt_tx_data
89        .effects
90        .iter()
91        .flat_map(|fx| fx.events_digest().map(|_| fx.transaction_digest()).copied())
92        .collect::<Vec<_>>();
93
94    let mut events = transaction_cache_reader
95        .multi_get_events(&event_tx_digests)
96        .into_iter()
97        .zip_debug_eq(event_tx_digests)
98        .map(|(maybe_event, tx_digest)| {
99            maybe_event
100                .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest }.into())
101                .map(|event| (tx_digest, event))
102        })
103        .collect::<SuiResult<HashMap<_, _>>>()?;
104
105    let mut transactions = Vec::with_capacity(ckpt_tx_data.transactions.len());
106    for (tx, fx) in ckpt_tx_data
107        .transactions
108        .iter()
109        .zip_debug_eq(ckpt_tx_data.effects.iter())
110    {
111        let events = fx.events_digest().map(|_event_digest| {
112            events
113                .remove(fx.transaction_digest())
114                .expect("event was already checked to be present")
115        });
116
117        let transaction = ExecutedTransaction {
118            transaction: tx.transaction_data().clone(),
119            signatures: tx.tx_signatures().to_vec(),
120            effects: fx.clone(),
121            events,
122            unchanged_loaded_runtime_objects: transaction_cache_reader
123                .get_unchanged_loaded_runtime_objects(tx.digest())
124                // We don't write empty sets to the DB to save space, so if this load went through
125                // the writeback cache to the DB itself it wouldn't find an entry.
126                .unwrap_or_default(),
127        };
128        transactions.push(transaction);
129    }
130
131    let object_set = {
132        let refs = transactions
133            .iter()
134            .flat_map(|tx| {
135                sui_types::storage::get_transaction_object_set(
136                    &tx.transaction,
137                    &tx.effects,
138                    &tx.unchanged_loaded_runtime_objects,
139                )
140            })
141            .collect::<BTreeSet<_>>()
142            .into_iter()
143            .collect::<Vec<_>>();
144
145        let objects = object_store.multi_get_objects_by_key(&refs);
146
147        let mut object_set = ObjectSet::default();
148        for (idx, object) in objects.into_iter().enumerate() {
149            object_set.insert(object.ok_or_else(|| {
150                sui_types::storage::error::Error::custom(format!(
151                    "unabled to load object {:?}",
152                    refs[idx]
153                ))
154            })?);
155        }
156        object_set
157    };
158    let checkpoint = Checkpoint {
159        summary: ckpt_data.checkpoint.clone().into(),
160        contents: ckpt_data.checkpoint_contents.clone(),
161        transactions,
162        object_set,
163    };
164    Ok(checkpoint)
165}