sui_storage/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3#![allow(dead_code)]
4
5use crate::blob::BlobIter;
6use anyhow::{Result, anyhow};
7use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
8use bytes::{Buf, Bytes};
9use fastcrypto::hash::{HashFunction, Sha3_256};
10use futures::StreamExt;
11use itertools::Itertools;
12use num_enum::{IntoPrimitive, TryFromPrimitive};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use std::fs::File;
16use std::io::{BufReader, Read, Write};
17use std::ops::Range;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::{fs, io};
22use sui_types::committee::Committee;
23use sui_types::messages_checkpoint::{
24    CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
25};
26use sui_types::storage::WriteStore;
27use tracing::debug;
28
29pub mod blob;
30pub mod http_key_value_store;
31pub mod key_value_store;
32pub mod key_value_store_metrics;
33pub mod mutex_table;
34pub mod object_store;
35pub mod package_object_cache;
36pub mod sharded_lru;
37pub mod write_path_pending_tx_log;
38
39pub const SHA3_BYTES: usize = 32;
40
41#[derive(
42    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
43)]
44#[repr(u8)]
45pub enum StorageFormat {
46    Blob = 0,
47}
48
49#[derive(
50    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
51)]
52#[repr(u8)]
53pub enum FileCompression {
54    None = 0,
55    Zstd,
56}
57
58impl FileCompression {
59    pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
60        // TODO: Add zstd compression level as function argument
61        let mut encoder = zstd::Encoder::new(writer, 1)?;
62        io::copy(reader, &mut encoder)?;
63        encoder.finish()?;
64        Ok(())
65    }
66    pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
67        match self {
68            FileCompression::Zstd => {
69                let mut input = File::open(source)?;
70                let tmp_file_name = source.with_extension("tmp");
71                let mut output = File::create(&tmp_file_name)?;
72                Self::zstd_compress(&mut input, &mut output)?;
73                fs::rename(tmp_file_name, source)?;
74            }
75            FileCompression::None => {}
76        }
77        Ok(())
78    }
79    pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
80        let file = File::open(source)?;
81        let res: Box<dyn Read> = match self {
82            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
83            FileCompression::None => Box::new(BufReader::new(file)),
84        };
85        Ok(res)
86    }
87    pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
88        let res: Box<dyn Read> = match self {
89            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
90            FileCompression::None => Box::new(BufReader::new(bytes.reader())),
91        };
92        Ok(res)
93    }
94}
95
96pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
97    let mut hasher = Sha3_256::default();
98    io::copy(&mut bytes.reader(), &mut hasher)?;
99    Ok(hasher.finalize().digest)
100}
101
102pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
103    let mut hasher = Sha3_256::default();
104    io::copy(file, &mut hasher)?;
105    Ok(hasher.finalize().digest)
106}
107
108pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
109    let mut file = fs::File::open(source)?;
110    compute_sha3_checksum_for_file(&mut file)
111}
112
113pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
114    let magic = reader.read_u32::<BigEndian>()?;
115    writer.write_u32::<BigEndian>(magic)?;
116    let storage_format = reader.read_u8()?;
117    writer.write_u8(storage_format)?;
118    let file_compression = FileCompression::try_from(reader.read_u8()?)?;
119    writer.write_u8(file_compression.into())?;
120    match file_compression {
121        FileCompression::Zstd => {
122            FileCompression::zstd_compress(reader, writer)?;
123        }
124        FileCompression::None => {}
125    }
126    Ok(())
127}
128
129pub fn read<R: Read + 'static>(
130    expected_magic: u32,
131    mut reader: R,
132) -> Result<(Box<dyn Read>, StorageFormat)> {
133    let magic = reader.read_u32::<BigEndian>()?;
134    if magic != expected_magic {
135        Err(anyhow!(
136            "Unexpected magic string in file: {:?}, expected: {:?}",
137            magic,
138            expected_magic
139        ))
140    } else {
141        let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
142        let file_compression = FileCompression::try_from(reader.read_u8()?)?;
143        let reader: Box<dyn Read> = match file_compression {
144            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
145            FileCompression::None => Box::new(BufReader::new(reader)),
146        };
147        Ok((reader, storage_format))
148    }
149}
150
151pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
152    expected_magic: u32,
153    reader: R,
154) -> Result<impl Iterator<Item = T>> {
155    let (reader, storage_format) = read(expected_magic, reader)?;
156    match storage_format {
157        StorageFormat::Blob => Ok(BlobIter::new(reader)),
158    }
159}
160
161pub fn verify_checkpoint_with_committee(
162    committee: Arc<Committee>,
163    current: &VerifiedCheckpoint,
164    checkpoint: CertifiedCheckpointSummary,
165) -> Result<VerifiedCheckpoint, Box<CertifiedCheckpointSummary>> {
166    assert_eq!(
167        *checkpoint.sequence_number(),
168        current.sequence_number().checked_add(1).unwrap()
169    );
170
171    if Some(*current.digest()) != checkpoint.previous_digest {
172        debug!(
173            current_checkpoint_seq = current.sequence_number(),
174            current_digest =% current.digest(),
175            checkpoint_seq = checkpoint.sequence_number(),
176            checkpoint_digest =% checkpoint.digest(),
177            checkpoint_previous_digest =? checkpoint.previous_digest,
178            "checkpoint not on same chain"
179        );
180        return Err(Box::new(checkpoint));
181    }
182
183    let current_epoch = current.epoch();
184    if checkpoint.epoch() != current_epoch
185        && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
186    {
187        debug!(
188            checkpoint_seq = checkpoint.sequence_number(),
189            checkpoint_epoch = checkpoint.epoch(),
190            current_checkpoint_seq = current.sequence_number(),
191            current_epoch = current_epoch,
192            "cannot verify checkpoint with too high of an epoch",
193        );
194        return Err(Box::new(checkpoint));
195    }
196
197    if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
198        && current.next_epoch_committee().is_none()
199    {
200        debug!(
201            checkpoint_seq = checkpoint.sequence_number(),
202            checkpoint_epoch = checkpoint.epoch(),
203            current_checkpoint_seq = current.sequence_number(),
204            current_epoch = current_epoch,
205            "next checkpoint claims to be from the next epoch but the latest verified \
206            checkpoint does not indicate that it is the last checkpoint of an epoch"
207        );
208        return Err(Box::new(checkpoint));
209    }
210
211    checkpoint
212        .verify_authority_signatures(&committee)
213        .map_err(|e| {
214            debug!("error verifying checkpoint: {e}");
215            checkpoint.clone()
216        })?;
217    Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
218}
219
220pub fn verify_checkpoint<S>(
221    current: &VerifiedCheckpoint,
222    store: S,
223    checkpoint: CertifiedCheckpointSummary,
224) -> Result<VerifiedCheckpoint, Box<CertifiedCheckpointSummary>>
225where
226    S: WriteStore,
227{
228    let committee = store.get_committee(checkpoint.epoch()).unwrap_or_else(|| {
229        panic!(
230            "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
231            checkpoint.epoch(),
232            checkpoint.sequence_number()
233        )
234    });
235
236    verify_checkpoint_with_committee(committee, current, checkpoint)
237}
238
239pub async fn verify_checkpoint_range<S>(
240    checkpoint_range: Range<CheckpointSequenceNumber>,
241    store: S,
242    checkpoint_counter: Arc<AtomicU64>,
243    max_concurrency: usize,
244) where
245    S: WriteStore + Clone,
246{
247    let range_clone = checkpoint_range.clone();
248    futures::stream::iter(range_clone.into_iter().tuple_windows())
249        .map(|(a, b)| {
250            let current = store
251                .get_checkpoint_by_sequence_number(a)
252                .unwrap_or_else(|| {
253                    panic!(
254                        "Checkpoint {} should exist in store after summary sync but does not",
255                        a
256                    );
257                });
258            let next = store
259                .get_checkpoint_by_sequence_number(b)
260                .unwrap_or_else(|| {
261                    panic!(
262                        "Checkpoint {} should exist in store after summary sync but does not",
263                        a
264                    );
265                });
266            let committee = store.get_committee(next.epoch()).unwrap_or_else(|| {
267                panic!(
268                    "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
269                    next.epoch(),
270                    next.sequence_number()
271                )
272            });
273            tokio::spawn(async move {
274                verify_checkpoint_with_committee(committee, &current, next.clone().into())
275                    .expect("Checkpoint verification failed");
276            })
277        })
278        .buffer_unordered(max_concurrency)
279        .for_each(|result| {
280            result.expect("Checkpoint verification task failed");
281            checkpoint_counter.fetch_add(1, Ordering::Relaxed);
282            futures::future::ready(())
283        })
284        .await;
285    let last = checkpoint_range
286        .last()
287        .expect("Received empty checkpoint range");
288    let final_checkpoint = store
289        .get_checkpoint_by_sequence_number(last)
290        .expect("Expected end of checkpoint range to exist in store");
291    store
292        .update_highest_verified_checkpoint(&final_checkpoint)
293        .expect("Failed to update highest verified checkpoint");
294}