1#![allow(dead_code)]
4
5use crate::blob::BlobIter;
6use anyhow::{Result, anyhow};
7use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
8use bytes::{Buf, Bytes};
9use fastcrypto::hash::{HashFunction, Sha3_256};
10use futures::StreamExt;
11use itertools::Itertools;
12use num_enum::{IntoPrimitive, TryFromPrimitive};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use std::fs::File;
16use std::io::{BufReader, Read, Write};
17use std::ops::Range;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::{fs, io};
22use sui_types::committee::Committee;
23use sui_types::messages_checkpoint::{
24 CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
25};
26use sui_types::storage::WriteStore;
27use tracing::debug;
28
29pub mod blob;
30pub mod http_key_value_store;
31pub mod key_value_store;
32pub mod key_value_store_metrics;
33pub mod mutex_table;
34pub mod object_store;
35pub mod package_object_cache;
36pub mod sharded_lru;
37pub mod write_path_pending_tx_log;
38
39pub const SHA3_BYTES: usize = 32;
40
41#[derive(
42 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
43)]
44#[repr(u8)]
45pub enum StorageFormat {
46 Blob = 0,
47}
48
49#[derive(
50 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
51)]
52#[repr(u8)]
53pub enum FileCompression {
54 None = 0,
55 Zstd,
56}
57
58impl FileCompression {
59 pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
60 let mut encoder = zstd::Encoder::new(writer, 1)?;
62 io::copy(reader, &mut encoder)?;
63 encoder.finish()?;
64 Ok(())
65 }
66 pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
67 match self {
68 FileCompression::Zstd => {
69 let mut input = File::open(source)?;
70 let tmp_file_name = source.with_extension("tmp");
71 let mut output = File::create(&tmp_file_name)?;
72 Self::zstd_compress(&mut input, &mut output)?;
73 fs::rename(tmp_file_name, source)?;
74 }
75 FileCompression::None => {}
76 }
77 Ok(())
78 }
79 pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
80 let file = File::open(source)?;
81 let res: Box<dyn Read> = match self {
82 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
83 FileCompression::None => Box::new(BufReader::new(file)),
84 };
85 Ok(res)
86 }
87 pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
88 let res: Box<dyn Read> = match self {
89 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
90 FileCompression::None => Box::new(BufReader::new(bytes.reader())),
91 };
92 Ok(res)
93 }
94}
95
96pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
97 let mut hasher = Sha3_256::default();
98 io::copy(&mut bytes.reader(), &mut hasher)?;
99 Ok(hasher.finalize().digest)
100}
101
102pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
103 let mut hasher = Sha3_256::default();
104 io::copy(file, &mut hasher)?;
105 Ok(hasher.finalize().digest)
106}
107
108pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
109 let mut file = fs::File::open(source)?;
110 compute_sha3_checksum_for_file(&mut file)
111}
112
113pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
114 let magic = reader.read_u32::<BigEndian>()?;
115 writer.write_u32::<BigEndian>(magic)?;
116 let storage_format = reader.read_u8()?;
117 writer.write_u8(storage_format)?;
118 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
119 writer.write_u8(file_compression.into())?;
120 match file_compression {
121 FileCompression::Zstd => {
122 FileCompression::zstd_compress(reader, writer)?;
123 }
124 FileCompression::None => {}
125 }
126 Ok(())
127}
128
129pub fn read<R: Read + 'static>(
130 expected_magic: u32,
131 mut reader: R,
132) -> Result<(Box<dyn Read>, StorageFormat)> {
133 let magic = reader.read_u32::<BigEndian>()?;
134 if magic != expected_magic {
135 Err(anyhow!(
136 "Unexpected magic string in file: {:?}, expected: {:?}",
137 magic,
138 expected_magic
139 ))
140 } else {
141 let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
142 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
143 let reader: Box<dyn Read> = match file_compression {
144 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
145 FileCompression::None => Box::new(BufReader::new(reader)),
146 };
147 Ok((reader, storage_format))
148 }
149}
150
151pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
152 expected_magic: u32,
153 reader: R,
154) -> Result<impl Iterator<Item = T>> {
155 let (reader, storage_format) = read(expected_magic, reader)?;
156 match storage_format {
157 StorageFormat::Blob => Ok(BlobIter::new(reader)),
158 }
159}
160
161pub fn verify_checkpoint_with_committee(
162 committee: Arc<Committee>,
163 current: &VerifiedCheckpoint,
164 checkpoint: CertifiedCheckpointSummary,
165) -> Result<VerifiedCheckpoint, Box<CertifiedCheckpointSummary>> {
166 assert_eq!(
167 *checkpoint.sequence_number(),
168 current.sequence_number().checked_add(1).unwrap()
169 );
170
171 if Some(*current.digest()) != checkpoint.previous_digest {
172 debug!(
173 current_checkpoint_seq = current.sequence_number(),
174 current_digest =% current.digest(),
175 checkpoint_seq = checkpoint.sequence_number(),
176 checkpoint_digest =% checkpoint.digest(),
177 checkpoint_previous_digest =? checkpoint.previous_digest,
178 "checkpoint not on same chain"
179 );
180 return Err(Box::new(checkpoint));
181 }
182
183 let current_epoch = current.epoch();
184 if checkpoint.epoch() != current_epoch
185 && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
186 {
187 debug!(
188 checkpoint_seq = checkpoint.sequence_number(),
189 checkpoint_epoch = checkpoint.epoch(),
190 current_checkpoint_seq = current.sequence_number(),
191 current_epoch = current_epoch,
192 "cannot verify checkpoint with too high of an epoch",
193 );
194 return Err(Box::new(checkpoint));
195 }
196
197 if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
198 && current.next_epoch_committee().is_none()
199 {
200 debug!(
201 checkpoint_seq = checkpoint.sequence_number(),
202 checkpoint_epoch = checkpoint.epoch(),
203 current_checkpoint_seq = current.sequence_number(),
204 current_epoch = current_epoch,
205 "next checkpoint claims to be from the next epoch but the latest verified \
206 checkpoint does not indicate that it is the last checkpoint of an epoch"
207 );
208 return Err(Box::new(checkpoint));
209 }
210
211 checkpoint
212 .verify_authority_signatures(&committee)
213 .map_err(|e| {
214 debug!("error verifying checkpoint: {e}");
215 checkpoint.clone()
216 })?;
217 Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
218}
219
220pub fn verify_checkpoint<S>(
221 current: &VerifiedCheckpoint,
222 store: S,
223 checkpoint: CertifiedCheckpointSummary,
224) -> Result<VerifiedCheckpoint, Box<CertifiedCheckpointSummary>>
225where
226 S: WriteStore,
227{
228 let committee = store.get_committee(checkpoint.epoch()).unwrap_or_else(|| {
229 panic!(
230 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
231 checkpoint.epoch(),
232 checkpoint.sequence_number()
233 )
234 });
235
236 verify_checkpoint_with_committee(committee, current, checkpoint)
237}
238
239pub async fn verify_checkpoint_range<S>(
240 checkpoint_range: Range<CheckpointSequenceNumber>,
241 store: S,
242 checkpoint_counter: Arc<AtomicU64>,
243 max_concurrency: usize,
244) where
245 S: WriteStore + Clone,
246{
247 let range_clone = checkpoint_range.clone();
248 futures::stream::iter(range_clone.into_iter().tuple_windows())
249 .map(|(a, b)| {
250 let current = store
251 .get_checkpoint_by_sequence_number(a)
252 .unwrap_or_else(|| {
253 panic!(
254 "Checkpoint {} should exist in store after summary sync but does not",
255 a
256 );
257 });
258 let next = store
259 .get_checkpoint_by_sequence_number(b)
260 .unwrap_or_else(|| {
261 panic!(
262 "Checkpoint {} should exist in store after summary sync but does not",
263 a
264 );
265 });
266 let committee = store.get_committee(next.epoch()).unwrap_or_else(|| {
267 panic!(
268 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
269 next.epoch(),
270 next.sequence_number()
271 )
272 });
273 tokio::spawn(async move {
274 verify_checkpoint_with_committee(committee, ¤t, next.clone().into())
275 .expect("Checkpoint verification failed");
276 })
277 })
278 .buffer_unordered(max_concurrency)
279 .for_each(|result| {
280 result.expect("Checkpoint verification task failed");
281 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
282 futures::future::ready(())
283 })
284 .await;
285 let last = checkpoint_range
286 .last()
287 .expect("Received empty checkpoint range");
288 let final_checkpoint = store
289 .get_checkpoint_by_sequence_number(last)
290 .expect("Expected end of checkpoint range to exist in store");
291 store
292 .update_highest_verified_checkpoint(&final_checkpoint)
293 .expect("Failed to update highest verified checkpoint");
294}