consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    ops::Bound::{Excluded, Included},
7    time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16    DBMapUtils, Map as _,
17    metrics::SamplingInterval,
18    rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23    block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25    error::{ConsensusError, ConsensusResult},
26};
27
28/// Persistent storage with RocksDB.
29#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32    /// Stores SignedBlock by refs.
33    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34    /// A secondary index that orders refs first by authors.
35    #[rename = "digests"]
36    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37    /// Maps commit index to Commit.
38    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39    /// Collects votes on commits.
40    /// TODO: batch multiple votes into a single row.
41    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42    /// Stores info related to Commit that helps recovery.
43    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44    /// Maps finalized commits to the transactions rejected in the commit.
45    finalized_commits:
46        DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50    const BLOCKS_CF: &'static str = "blocks";
51    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52    const COMMITS_CF: &'static str = "commits";
53    const COMMIT_VOTES_CF: &'static str = "commit_votes";
54    const COMMIT_INFO_CF: &'static str = "commit_info";
55    const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57    /// Creates a new instance of RocksDB storage.
58    #[cfg(not(tidehunter))]
59    pub fn new(path: &str) -> Self {
60        // Consensus data has high write throughput (all transactions) and is rarely read
61        // (only during recovery and when helping peers catch up).
62        let db_options =
63            default_db_options().optimize_db_for_write_throughput(2, /* unlimited */ true);
64        let mut metrics_conf = MetricConf::new("consensus");
65        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66        let cf_options = default_db_options().optimize_for_no_deletion();
67        let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
68            (
69                Self::BLOCKS_CF.to_string(),
70                cf_options
71                    .clone()
72                    // Using larger block is ok since there is not much point reads on the cf.
73                    .set_block_options(512, 128 << 10),
74            ),
75            (
76                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
77                cf_options.clone(),
78            ),
79            (Self::COMMITS_CF.to_string(), cf_options.clone()),
80            (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
81            (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
82            (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
83        ]));
84        Self::open_tables_read_write(
85            path.into(),
86            metrics_conf,
87            Some(db_options.options),
88            Some(column_family_options),
89        )
90    }
91
92    #[cfg(tidehunter)]
93    pub fn new(path: &str) -> Self {
94        tracing::warn!("Consensus store using tidehunter");
95        use typed_store::tidehunter_util::{
96            KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
97        };
98        let mutexes = default_mutex_count();
99        let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
100        let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
101        let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
102        let u32_prefix = KeyType::from_prefix_bits(3 * 8);
103        let u64_prefix = KeyType::from_prefix_bits(6 * 8);
104        let configs = vec![
105            (
106                Self::BLOCKS_CF.to_string(),
107                ThConfig::new_with_config_indexing(
108                    index_index_digest_key.clone(),
109                    mutexes,
110                    u32_prefix.clone(),
111                    KeySpaceConfig::new(),
112                ),
113            ),
114            (
115                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
116                ThConfig::new_with_config_indexing(
117                    index_index_digest_key.clone(),
118                    mutexes,
119                    u64_prefix.clone(),
120                    KeySpaceConfig::new(),
121                ),
122            ),
123            (
124                Self::COMMITS_CF.to_string(),
125                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
126            ),
127            (
128                Self::COMMIT_VOTES_CF.to_string(),
129                ThConfig::new_with_config_indexing(
130                    commit_vote_key,
131                    mutexes,
132                    u32_prefix.clone(),
133                    KeySpaceConfig::new(),
134                ),
135            ),
136            (
137                Self::COMMIT_INFO_CF.to_string(),
138                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
139            ),
140            (
141                Self::FINALIZED_COMMITS_CF.to_string(),
142                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
143            ),
144        ];
145        Self::open_tables_read_write(
146            path.into(),
147            MetricConf::new("consensus")
148                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0))
149                .with_th_batch_compression(),
150            configs.into_iter().collect(),
151        )
152    }
153}
154
155impl Store for RocksDBStore {
156    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
157        fail_point!("consensus-store-before-write");
158
159        let mut batch = self.blocks.batch();
160        for block in write_batch.blocks {
161            let block_ref = block.reference();
162            batch
163                .insert_batch(
164                    &self.blocks,
165                    [(
166                        (block_ref.round, block_ref.author, block_ref.digest),
167                        block.serialized(),
168                    )],
169                )
170                .map_err(ConsensusError::RocksDBFailure)?;
171            batch
172                .insert_batch(
173                    &self.digests_by_authorities,
174                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
175                )
176                .map_err(ConsensusError::RocksDBFailure)?;
177            for vote in block.commit_votes() {
178                batch
179                    .insert_batch(
180                        &self.commit_votes,
181                        [((vote.index, vote.digest, block_ref), ())],
182                    )
183                    .map_err(ConsensusError::RocksDBFailure)?;
184            }
185        }
186
187        for commit in write_batch.commits {
188            batch
189                .insert_batch(
190                    &self.commits,
191                    [((commit.index(), commit.digest()), commit.serialized())],
192                )
193                .map_err(ConsensusError::RocksDBFailure)?;
194        }
195
196        for (commit_ref, commit_info) in write_batch.commit_info {
197            batch
198                .insert_batch(
199                    &self.commit_info,
200                    [((commit_ref.index, commit_ref.digest), commit_info)],
201                )
202                .map_err(ConsensusError::RocksDBFailure)?;
203        }
204
205        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
206            batch
207                .insert_batch(
208                    &self.finalized_commits,
209                    [((commit_ref.index, commit_ref.digest), rejected_transactions)],
210                )
211                .map_err(ConsensusError::RocksDBFailure)?;
212        }
213
214        batch.write()?;
215        fail_point!("consensus-store-after-write");
216        Ok(())
217    }
218
219    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
220        let keys = refs
221            .iter()
222            .map(|r| (r.round, r.author, r.digest))
223            .collect::<Vec<_>>();
224        let serialized = self.blocks.multi_get(keys)?;
225        let mut blocks = vec![];
226        for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
227            if let Some(serialized) = serialized {
228                let signed_block: SignedBlock =
229                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
230                // Only accepted blocks should have been written to storage.
231                let block = VerifiedBlock::new_verified(signed_block, serialized);
232                // Makes sure block data is not corrupted, by comparing digests.
233                assert_eq!(*key, block.reference());
234                blocks.push(Some(block));
235            } else {
236                blocks.push(None);
237            }
238        }
239        Ok(blocks)
240    }
241
242    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
243        let refs = refs
244            .iter()
245            .map(|r| (r.round, r.author, r.digest))
246            .collect::<Vec<_>>();
247        let exist = self.blocks.multi_contains_keys(refs)?;
248        Ok(exist)
249    }
250
251    fn scan_blocks_by_author(
252        &self,
253        author: AuthorityIndex,
254        start_round: Round,
255    ) -> ConsensusResult<Vec<VerifiedBlock>> {
256        self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
257    }
258
259    fn scan_blocks_by_author_in_range(
260        &self,
261        author: AuthorityIndex,
262        start_round: Round,
263        end_round: Round,
264        limit: usize,
265    ) -> ConsensusResult<Vec<VerifiedBlock>> {
266        let mut refs = vec![];
267        for kv in self.digests_by_authorities.safe_range_iter((
268            Included((author, start_round, BlockDigest::MIN)),
269            Excluded((author, end_round, BlockDigest::MIN)),
270        )) {
271            let ((author, round, digest), _) = kv?;
272            refs.push(BlockRef::new(round, author, digest));
273            if refs.len() >= limit {
274                break;
275            }
276        }
277        let results = self.read_blocks(refs.as_slice())?;
278        let mut blocks = Vec::with_capacity(refs.len());
279        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
280            blocks.push(
281                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
282            );
283        }
284        Ok(blocks)
285    }
286
287    // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order.
288    // When a `before_round` is defined then the blocks of round `<=before_round` are returned. If not
289    // then the max value for round will be used as cut off.
290    fn scan_last_blocks_by_author(
291        &self,
292        author: AuthorityIndex,
293        num_of_rounds: u64,
294        before_round: Option<Round>,
295    ) -> ConsensusResult<Vec<VerifiedBlock>> {
296        let before_round = before_round.unwrap_or(Round::MAX);
297        let mut refs = VecDeque::new();
298        for kv in self
299            .digests_by_authorities
300            .reversed_safe_iter_with_bounds(
301                Some((author, Round::MIN, BlockDigest::MIN)),
302                Some((author, before_round, BlockDigest::MAX)),
303            )?
304            .take(num_of_rounds as usize)
305        {
306            let ((author, round, digest), _) = kv?;
307            refs.push_front(BlockRef::new(round, author, digest));
308        }
309        let refs_slice = refs.make_contiguous();
310        let results = self.read_blocks(refs_slice)?;
311        let mut blocks = vec![];
312        for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
313            blocks.push(
314                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
315            );
316        }
317        Ok(blocks)
318    }
319
320    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
321        let Some(result) = self
322            .commits
323            .reversed_safe_iter_with_bounds(None, None)?
324            .next()
325        else {
326            return Ok(None);
327        };
328        let ((_index, digest), serialized) = result?;
329        let commit = TrustedCommit::new_trusted(
330            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
331            serialized,
332        );
333        assert_eq!(commit.digest(), digest);
334        Ok(Some(commit))
335    }
336
337    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
338        let mut commits = vec![];
339        for result in self.commits.safe_range_iter((
340            Included((range.start(), CommitDigest::MIN)),
341            Included((range.end(), CommitDigest::MAX)),
342        )) {
343            let ((_index, digest), serialized) = result?;
344            let commit = TrustedCommit::new_trusted(
345                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
346                serialized,
347            );
348            assert_eq!(commit.digest(), digest);
349            commits.push(commit);
350        }
351        Ok(commits)
352    }
353
354    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
355        let mut votes = Vec::new();
356        for vote in self.commit_votes.safe_range_iter((
357            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
358            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
359        )) {
360            let ((_, _, block_ref), _) = vote?;
361            votes.push(block_ref);
362        }
363        Ok(votes)
364    }
365
366    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
367        let Some(result) = self
368            .commit_info
369            .reversed_safe_iter_with_bounds(None, None)?
370            .next()
371        else {
372            return Ok(None);
373        };
374        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
375        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
376    }
377
378    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
379        let Some(result) = self
380            .finalized_commits
381            .reversed_safe_iter_with_bounds(None, None)?
382            .next()
383        else {
384            return Ok(None);
385        };
386        let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
387        Ok(Some(CommitRef::new(index, digest)))
388    }
389
390    fn read_rejected_transactions(
391        &self,
392        commit_ref: CommitRef,
393    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
394        let result = self
395            .finalized_commits
396            .get(&(commit_ref.index, commit_ref.digest))?;
397        Ok(result)
398    }
399}