consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    ops::Bound::{Excluded, Included},
7    time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16    DBMapUtils, Map as _,
17    metrics::SamplingInterval,
18    rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23    block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25    error::{ConsensusError, ConsensusResult},
26};
27
28/// Persistent storage with RocksDB.
29#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32    /// Stores SignedBlock by refs.
33    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34    /// A secondary index that orders refs first by authors.
35    #[rename = "digests"]
36    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37    /// Maps commit index to Commit.
38    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39    /// Collects votes on commits.
40    /// TODO: batch multiple votes into a single row.
41    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42    /// Stores info related to Commit that helps recovery.
43    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44    /// Maps finalized commits to the transactions rejected in the commit.
45    finalized_commits:
46        DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50    const BLOCKS_CF: &'static str = "blocks";
51    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52    const COMMITS_CF: &'static str = "commits";
53    const COMMIT_VOTES_CF: &'static str = "commit_votes";
54    const COMMIT_INFO_CF: &'static str = "commit_info";
55    const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57    /// Creates a new instance of RocksDB storage.
58    #[cfg(not(tidehunter))]
59    pub fn new(path: &str, use_fifo_compaction: bool) -> Self {
60        // Consensus data has high write throughput (all transactions) and is rarely read
61        // (only during recovery and when helping peers catch up).
62        let db_options =
63            default_db_options().optimize_db_for_write_throughput(2, use_fifo_compaction);
64        let mut metrics_conf = MetricConf::new("consensus");
65        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66        let cf_options = if use_fifo_compaction {
67            default_db_options().optimize_for_no_deletion()
68        } else {
69            default_db_options().optimize_for_write_throughput()
70        };
71        // BLOCKS_CF only receives inserts (no deletions) and is dropped with the DB at epoch
72        // boundary. Use Universal compaction (or FIFO when enabled) which handles this
73        // pattern better than Level compaction.
74        let blocks_cf_options = if use_fifo_compaction {
75            default_db_options().optimize_for_no_deletion()
76        } else {
77            default_db_options().optimize_for_write_throughput_no_deletion()
78        };
79        let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
80            (
81                Self::BLOCKS_CF.to_string(),
82                blocks_cf_options
83                    // Using larger block is ok since there is not much point reads on the cf.
84                    .set_block_options(512, 128 << 10),
85            ),
86            (
87                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
88                cf_options.clone(),
89            ),
90            (Self::COMMITS_CF.to_string(), cf_options.clone()),
91            (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
92            (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
93            (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
94        ]));
95        Self::open_tables_read_write(
96            path.into(),
97            metrics_conf,
98            Some(db_options.options),
99            Some(column_family_options),
100        )
101    }
102
103    #[cfg(tidehunter)]
104    pub fn new(path: &str, _use_fifo_compaction: bool) -> Self {
105        tracing::warn!("Consensus store using tidehunter");
106        use typed_store::tidehunter_util::{
107            KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
108        };
109        let mutexes = default_mutex_count();
110        let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
111        let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
112        let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
113        let u32_prefix = KeyType::from_prefix_bits(3 * 8);
114        let u64_prefix = KeyType::from_prefix_bits(6 * 8);
115        let configs = vec![
116            (
117                Self::BLOCKS_CF.to_string(),
118                ThConfig::new_with_config_indexing(
119                    index_index_digest_key.clone(),
120                    mutexes,
121                    u32_prefix.clone(),
122                    KeySpaceConfig::new(),
123                ),
124            ),
125            (
126                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
127                ThConfig::new_with_config_indexing(
128                    index_index_digest_key.clone(),
129                    mutexes,
130                    u64_prefix.clone(),
131                    KeySpaceConfig::new(),
132                ),
133            ),
134            (
135                Self::COMMITS_CF.to_string(),
136                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137            ),
138            (
139                Self::COMMIT_VOTES_CF.to_string(),
140                ThConfig::new_with_config_indexing(
141                    commit_vote_key,
142                    mutexes,
143                    u32_prefix.clone(),
144                    KeySpaceConfig::new(),
145                ),
146            ),
147            (
148                Self::COMMIT_INFO_CF.to_string(),
149                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
150            ),
151            (
152                Self::FINALIZED_COMMITS_CF.to_string(),
153                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
154            ),
155        ];
156        Self::open_tables_read_write(
157            path.into(),
158            MetricConf::new("consensus")
159                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0))
160                .with_th_batch_compression(),
161            configs.into_iter().collect(),
162        )
163    }
164}
165
166impl Store for RocksDBStore {
167    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
168        fail_point!("consensus-store-before-write");
169
170        let mut batch = self.blocks.batch();
171        for block in write_batch.blocks {
172            let block_ref = block.reference();
173            batch
174                .insert_batch(
175                    &self.blocks,
176                    [(
177                        (block_ref.round, block_ref.author, block_ref.digest),
178                        block.serialized(),
179                    )],
180                )
181                .map_err(ConsensusError::RocksDBFailure)?;
182            batch
183                .insert_batch(
184                    &self.digests_by_authorities,
185                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
186                )
187                .map_err(ConsensusError::RocksDBFailure)?;
188            for vote in block.commit_votes() {
189                batch
190                    .insert_batch(
191                        &self.commit_votes,
192                        [((vote.index, vote.digest, block_ref), ())],
193                    )
194                    .map_err(ConsensusError::RocksDBFailure)?;
195            }
196        }
197
198        for commit in write_batch.commits {
199            batch
200                .insert_batch(
201                    &self.commits,
202                    [((commit.index(), commit.digest()), commit.serialized())],
203                )
204                .map_err(ConsensusError::RocksDBFailure)?;
205        }
206
207        for (commit_ref, commit_info) in write_batch.commit_info {
208            batch
209                .insert_batch(
210                    &self.commit_info,
211                    [((commit_ref.index, commit_ref.digest), commit_info)],
212                )
213                .map_err(ConsensusError::RocksDBFailure)?;
214        }
215
216        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
217            batch
218                .insert_batch(
219                    &self.finalized_commits,
220                    [((commit_ref.index, commit_ref.digest), rejected_transactions)],
221                )
222                .map_err(ConsensusError::RocksDBFailure)?;
223        }
224
225        batch.write()?;
226        fail_point!("consensus-store-after-write");
227        Ok(())
228    }
229
230    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
231        let keys = refs
232            .iter()
233            .map(|r| (r.round, r.author, r.digest))
234            .collect::<Vec<_>>();
235        let serialized = self.blocks.multi_get(keys)?;
236        let mut blocks = vec![];
237        for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
238            if let Some(serialized) = serialized {
239                let signed_block: SignedBlock =
240                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
241                // Only accepted blocks should have been written to storage.
242                let block = VerifiedBlock::new_verified(signed_block, serialized);
243                // Makes sure block data is not corrupted, by comparing digests.
244                assert_eq!(*key, block.reference());
245                blocks.push(Some(block));
246            } else {
247                blocks.push(None);
248            }
249        }
250        Ok(blocks)
251    }
252
253    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
254        let refs = refs
255            .iter()
256            .map(|r| (r.round, r.author, r.digest))
257            .collect::<Vec<_>>();
258        let exist = self.blocks.multi_contains_keys(refs)?;
259        Ok(exist)
260    }
261
262    fn scan_blocks_by_author(
263        &self,
264        author: AuthorityIndex,
265        start_round: Round,
266    ) -> ConsensusResult<Vec<VerifiedBlock>> {
267        self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
268    }
269
270    fn scan_blocks_by_author_in_range(
271        &self,
272        author: AuthorityIndex,
273        start_round: Round,
274        end_round: Round,
275        limit: usize,
276    ) -> ConsensusResult<Vec<VerifiedBlock>> {
277        let mut refs = vec![];
278        for kv in self.digests_by_authorities.safe_range_iter((
279            Included((author, start_round, BlockDigest::MIN)),
280            Excluded((author, end_round, BlockDigest::MIN)),
281        )) {
282            let ((author, round, digest), _) = kv?;
283            refs.push(BlockRef::new(round, author, digest));
284            if refs.len() >= limit {
285                break;
286            }
287        }
288        let results = self.read_blocks(refs.as_slice())?;
289        let mut blocks = Vec::with_capacity(refs.len());
290        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
291            blocks.push(
292                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
293            );
294        }
295        Ok(blocks)
296    }
297
298    // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order.
299    // When a `before_round` is defined then the blocks of round `<=before_round` are returned. If not
300    // then the max value for round will be used as cut off.
301    fn scan_last_blocks_by_author(
302        &self,
303        author: AuthorityIndex,
304        num_of_rounds: u64,
305        before_round: Option<Round>,
306    ) -> ConsensusResult<Vec<VerifiedBlock>> {
307        let before_round = before_round.unwrap_or(Round::MAX);
308        let mut refs = VecDeque::new();
309        for kv in self
310            .digests_by_authorities
311            .reversed_safe_iter_with_bounds(
312                Some((author, Round::MIN, BlockDigest::MIN)),
313                Some((author, before_round, BlockDigest::MAX)),
314            )?
315            .take(num_of_rounds as usize)
316        {
317            let ((author, round, digest), _) = kv?;
318            refs.push_front(BlockRef::new(round, author, digest));
319        }
320        let refs_slice = refs.make_contiguous();
321        let results = self.read_blocks(refs_slice)?;
322        let mut blocks = vec![];
323        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
324            blocks.push(
325                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
326            );
327        }
328        Ok(blocks)
329    }
330
331    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
332        let Some(result) = self
333            .commits
334            .reversed_safe_iter_with_bounds(None, None)?
335            .next()
336        else {
337            return Ok(None);
338        };
339        let ((_index, digest), serialized) = result?;
340        let commit = TrustedCommit::new_trusted(
341            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
342            serialized,
343        );
344        assert_eq!(commit.digest(), digest);
345        Ok(Some(commit))
346    }
347
348    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
349        let mut commits = vec![];
350        for result in self.commits.safe_range_iter((
351            Included((range.start(), CommitDigest::MIN)),
352            Included((range.end(), CommitDigest::MAX)),
353        )) {
354            let ((_index, digest), serialized) = result?;
355            let commit = TrustedCommit::new_trusted(
356                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
357                serialized,
358            );
359            assert_eq!(commit.digest(), digest);
360            commits.push(commit);
361        }
362        Ok(commits)
363    }
364
365    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
366        let mut votes = Vec::new();
367        for vote in self.commit_votes.safe_range_iter((
368            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
369            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
370        )) {
371            let ((_, _, block_ref), _) = vote?;
372            votes.push(block_ref);
373        }
374        Ok(votes)
375    }
376
377    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
378        let Some(result) = self
379            .commit_info
380            .reversed_safe_iter_with_bounds(None, None)?
381            .next()
382        else {
383            return Ok(None);
384        };
385        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
386        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
387    }
388
389    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
390        let Some(result) = self
391            .finalized_commits
392            .reversed_safe_iter_with_bounds(None, None)?
393            .next()
394        else {
395            return Ok(None);
396        };
397        let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
398        Ok(Some(CommitRef::new(index, digest)))
399    }
400
401    fn read_rejected_transactions(
402        &self,
403        commit_ref: CommitRef,
404    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
405        let result = self
406            .finalized_commits
407            .get(&(commit_ref.index, commit_ref.digest))?;
408        Ok(result)
409    }
410}