consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    ops::Bound::{Excluded, Included},
7    time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16    DBMapUtils, Map as _,
17    metrics::SamplingInterval,
18    rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23    block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25    error::{ConsensusError, ConsensusResult},
26};
27
28/// Persistent storage with RocksDB.
29#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32    /// Stores SignedBlock by refs.
33    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34    /// A secondary index that orders refs first by authors.
35    #[rename = "digests"]
36    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37    /// Maps commit index to Commit.
38    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39    /// Collects votes on commits.
40    /// TODO: batch multiple votes into a single row.
41    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42    /// Stores info related to Commit that helps recovery.
43    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44    /// Maps finalized commits to the transactions rejected in the commit.
45    finalized_commits:
46        DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50    const BLOCKS_CF: &'static str = "blocks";
51    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52    const COMMITS_CF: &'static str = "commits";
53    const COMMIT_VOTES_CF: &'static str = "commit_votes";
54    const COMMIT_INFO_CF: &'static str = "commit_info";
55    const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57    /// Creates a new instance of RocksDB storage.
58    #[cfg(not(tidehunter))]
59    pub fn new(path: &str, use_fifo_compaction: bool) -> Self {
60        // Consensus data has high write throughput (all transactions) and is rarely read
61        // (only during recovery and when helping peers catch up).
62        let db_options =
63            default_db_options().optimize_db_for_write_throughput(2, use_fifo_compaction);
64        let mut metrics_conf = MetricConf::new("consensus");
65        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66        let cf_options = if use_fifo_compaction {
67            default_db_options().optimize_for_no_deletion()
68        } else {
69            default_db_options().optimize_for_write_throughput()
70        };
71        // BLOCKS_CF only receives inserts (no deletions) and is dropped with the DB at epoch
72        // boundary. Use Universal compaction (or FIFO when enabled) which handles this
73        // pattern better than Level compaction.
74        let blocks_cf_options = if use_fifo_compaction {
75            default_db_options().optimize_for_no_deletion()
76        } else {
77            default_db_options().optimize_for_write_throughput_no_deletion()
78        };
79        let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
80            (
81                Self::BLOCKS_CF.to_string(),
82                blocks_cf_options
83                    // Using larger block is ok since there is not much point reads on the cf.
84                    .set_block_options(512, 128 << 10),
85            ),
86            (
87                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
88                cf_options.clone(),
89            ),
90            (Self::COMMITS_CF.to_string(), cf_options.clone()),
91            (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
92            (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
93            (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
94        ]));
95        Self::open_tables_read_write(
96            path.into(),
97            metrics_conf,
98            Some(db_options.options),
99            Some(column_family_options),
100        )
101    }
102
103    #[cfg(tidehunter)]
104    pub fn new(path: &str, _use_fifo_compaction: bool) -> Self {
105        tracing::warn!("Consensus store using tidehunter");
106        use typed_store::tidehunter_util::{
107            KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
108        };
109        let mutexes = default_mutex_count();
110        let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
111        let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
112        let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
113        let u32_prefix = KeyType::from_prefix_bits(3 * 8);
114        let u64_prefix = KeyType::from_prefix_bits(6 * 8);
115        let configs = vec![
116            (
117                Self::BLOCKS_CF.to_string(),
118                ThConfig::new_with_config_indexing(
119                    index_index_digest_key.clone(),
120                    mutexes,
121                    u32_prefix.clone(),
122                    KeySpaceConfig::new(),
123                ),
124            ),
125            (
126                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
127                ThConfig::new_with_config_indexing(
128                    index_index_digest_key.clone(),
129                    mutexes,
130                    u64_prefix.clone(),
131                    KeySpaceConfig::new(),
132                ),
133            ),
134            (
135                Self::COMMITS_CF.to_string(),
136                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137            ),
138            (
139                Self::COMMIT_VOTES_CF.to_string(),
140                ThConfig::new_with_config_indexing(
141                    commit_vote_key,
142                    mutexes,
143                    u32_prefix.clone(),
144                    KeySpaceConfig::new(),
145                ),
146            ),
147            (
148                Self::COMMIT_INFO_CF.to_string(),
149                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
150            ),
151            (
152                Self::FINALIZED_COMMITS_CF.to_string(),
153                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
154            ),
155        ];
156        Self::open_tables_read_write(
157            path.into(),
158            MetricConf::new("consensus")
159                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
160            configs.into_iter().collect(),
161        )
162    }
163}
164
165impl Store for RocksDBStore {
166    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
167        fail_point!("consensus-store-before-write");
168
169        let mut batch = self.blocks.batch();
170        for block in write_batch.blocks {
171            let block_ref = block.reference();
172            batch
173                .insert_batch(
174                    &self.blocks,
175                    [(
176                        (block_ref.round, block_ref.author, block_ref.digest),
177                        block.serialized(),
178                    )],
179                )
180                .map_err(ConsensusError::RocksDBFailure)?;
181            batch
182                .insert_batch(
183                    &self.digests_by_authorities,
184                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
185                )
186                .map_err(ConsensusError::RocksDBFailure)?;
187            for vote in block.commit_votes() {
188                batch
189                    .insert_batch(
190                        &self.commit_votes,
191                        [((vote.index, vote.digest, block_ref), ())],
192                    )
193                    .map_err(ConsensusError::RocksDBFailure)?;
194            }
195        }
196
197        for commit in write_batch.commits {
198            batch
199                .insert_batch(
200                    &self.commits,
201                    [((commit.index(), commit.digest()), commit.serialized())],
202                )
203                .map_err(ConsensusError::RocksDBFailure)?;
204        }
205
206        for (commit_ref, commit_info) in write_batch.commit_info {
207            batch
208                .insert_batch(
209                    &self.commit_info,
210                    [((commit_ref.index, commit_ref.digest), commit_info)],
211                )
212                .map_err(ConsensusError::RocksDBFailure)?;
213        }
214
215        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
216            batch
217                .insert_batch(
218                    &self.finalized_commits,
219                    [((commit_ref.index, commit_ref.digest), rejected_transactions)],
220                )
221                .map_err(ConsensusError::RocksDBFailure)?;
222        }
223
224        batch.write()?;
225        fail_point!("consensus-store-after-write");
226        Ok(())
227    }
228
229    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
230        let keys = refs
231            .iter()
232            .map(|r| (r.round, r.author, r.digest))
233            .collect::<Vec<_>>();
234        let serialized = self.blocks.multi_get(keys)?;
235        let mut blocks = vec![];
236        for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
237            if let Some(serialized) = serialized {
238                let signed_block: SignedBlock =
239                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
240                // Only accepted blocks should have been written to storage.
241                let block = VerifiedBlock::new_verified(signed_block, serialized);
242                // Makes sure block data is not corrupted, by comparing digests.
243                assert_eq!(*key, block.reference());
244                blocks.push(Some(block));
245            } else {
246                blocks.push(None);
247            }
248        }
249        Ok(blocks)
250    }
251
252    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
253        let refs = refs
254            .iter()
255            .map(|r| (r.round, r.author, r.digest))
256            .collect::<Vec<_>>();
257        let exist = self.blocks.multi_contains_keys(refs)?;
258        Ok(exist)
259    }
260
261    fn scan_blocks_by_author(
262        &self,
263        author: AuthorityIndex,
264        start_round: Round,
265    ) -> ConsensusResult<Vec<VerifiedBlock>> {
266        self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
267    }
268
269    fn scan_blocks_by_author_in_range(
270        &self,
271        author: AuthorityIndex,
272        start_round: Round,
273        end_round: Round,
274        limit: usize,
275    ) -> ConsensusResult<Vec<VerifiedBlock>> {
276        let mut refs = vec![];
277        for kv in self.digests_by_authorities.safe_range_iter((
278            Included((author, start_round, BlockDigest::MIN)),
279            Excluded((author, end_round, BlockDigest::MIN)),
280        )) {
281            let ((author, round, digest), _) = kv?;
282            refs.push(BlockRef::new(round, author, digest));
283            if refs.len() >= limit {
284                break;
285            }
286        }
287        let results = self.read_blocks(refs.as_slice())?;
288        let mut blocks = Vec::with_capacity(refs.len());
289        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
290            blocks.push(
291                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
292            );
293        }
294        Ok(blocks)
295    }
296
297    // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order.
298    // When a `before_round` is defined then the blocks of round `<=before_round` are returned. If not
299    // then the max value for round will be used as cut off.
300    fn scan_last_blocks_by_author(
301        &self,
302        author: AuthorityIndex,
303        num_of_rounds: u64,
304        before_round: Option<Round>,
305    ) -> ConsensusResult<Vec<VerifiedBlock>> {
306        let before_round = before_round.unwrap_or(Round::MAX);
307        let mut refs = VecDeque::new();
308        for kv in self
309            .digests_by_authorities
310            .reversed_safe_iter_with_bounds(
311                Some((author, Round::MIN, BlockDigest::MIN)),
312                Some((author, before_round, BlockDigest::MAX)),
313            )?
314            .take(num_of_rounds as usize)
315        {
316            let ((author, round, digest), _) = kv?;
317            refs.push_front(BlockRef::new(round, author, digest));
318        }
319        let refs_slice = refs.make_contiguous();
320        let results = self.read_blocks(refs_slice)?;
321        let mut blocks = vec![];
322        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
323            blocks.push(
324                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
325            );
326        }
327        Ok(blocks)
328    }
329
330    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
331        let Some(result) = self
332            .commits
333            .reversed_safe_iter_with_bounds(None, None)?
334            .next()
335        else {
336            return Ok(None);
337        };
338        let ((_index, digest), serialized) = result?;
339        let commit = TrustedCommit::new_trusted(
340            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
341            serialized,
342        );
343        assert_eq!(commit.digest(), digest);
344        Ok(Some(commit))
345    }
346
347    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
348        let mut commits = vec![];
349        for result in self.commits.safe_range_iter((
350            Included((range.start(), CommitDigest::MIN)),
351            Included((range.end(), CommitDigest::MAX)),
352        )) {
353            let ((_index, digest), serialized) = result?;
354            let commit = TrustedCommit::new_trusted(
355                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
356                serialized,
357            );
358            assert_eq!(commit.digest(), digest);
359            commits.push(commit);
360        }
361        Ok(commits)
362    }
363
364    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
365        let mut votes = Vec::new();
366        for vote in self.commit_votes.safe_range_iter((
367            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
368            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
369        )) {
370            let ((_, _, block_ref), _) = vote?;
371            votes.push(block_ref);
372        }
373        Ok(votes)
374    }
375
376    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
377        let Some(result) = self
378            .commit_info
379            .reversed_safe_iter_with_bounds(None, None)?
380            .next()
381        else {
382            return Ok(None);
383        };
384        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
385        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
386    }
387
388    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
389        let Some(result) = self
390            .finalized_commits
391            .reversed_safe_iter_with_bounds(None, None)?
392            .next()
393        else {
394            return Ok(None);
395        };
396        let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
397        Ok(Some(CommitRef::new(index, digest)))
398    }
399
400    fn read_rejected_transactions(
401        &self,
402        commit_ref: CommitRef,
403    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
404        let result = self
405            .finalized_commits
406            .get(&(commit_ref.index, commit_ref.digest))?;
407        Ok(result)
408    }
409}