consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    ops::Bound::Included,
7    time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use sui_macros::fail_point;
14use typed_store::{
15    DBMapUtils, Map as _,
16    metrics::SamplingInterval,
17    rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
18};
19
20use super::{CommitInfo, Store, WriteBatch};
21use crate::{
22    block::{BlockAPI as _, SignedBlock, VerifiedBlock},
23    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
24    error::{ConsensusError, ConsensusResult},
25};
26
27/// Persistent storage with RocksDB.
28#[derive(DBMapUtils)]
29#[cfg_attr(tidehunter, tidehunter)]
30pub struct RocksDBStore {
31    /// Stores SignedBlock by refs.
32    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
33    /// A secondary index that orders refs first by authors.
34    #[rename = "digests"]
35    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
36    /// Maps commit index to Commit.
37    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
38    /// Collects votes on commits.
39    /// TODO: batch multiple votes into a single row.
40    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
41    /// Stores info related to Commit that helps recovery.
42    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
43    /// Maps finalized commits to the transactions rejected in the commit.
44    finalized_commits:
45        DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
46}
47
48impl RocksDBStore {
49    const BLOCKS_CF: &'static str = "blocks";
50    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
51    const COMMITS_CF: &'static str = "commits";
52    const COMMIT_VOTES_CF: &'static str = "commit_votes";
53    const COMMIT_INFO_CF: &'static str = "commit_info";
54    const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
55
56    /// Creates a new instance of RocksDB storage.
57    #[cfg(not(tidehunter))]
58    pub fn new(path: &str) -> Self {
59        // Consensus data has high write throughput (all transactions) and is rarely read
60        // (only during recovery and when helping peers catch up).
61        let db_options = default_db_options().optimize_db_for_write_throughput(2);
62        let mut metrics_conf = MetricConf::new("consensus");
63        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
64        let cf_options = default_db_options().optimize_for_write_throughput();
65        let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
66            (
67                Self::BLOCKS_CF.to_string(),
68                default_db_options()
69                    .optimize_for_write_throughput_no_deletion()
70                    // Using larger block is ok since there is not much point reads on the cf.
71                    .set_block_options(512, 128 << 10),
72            ),
73            (
74                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
75                cf_options.clone(),
76            ),
77            (Self::COMMITS_CF.to_string(), cf_options.clone()),
78            (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
79            (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
80            (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
81        ]));
82        Self::open_tables_read_write(
83            path.into(),
84            metrics_conf,
85            Some(db_options.options),
86            Some(column_family_options),
87        )
88    }
89
90    #[cfg(tidehunter)]
91    pub fn new(path: &str) -> Self {
92        tracing::warn!("Consensus store using tidehunter");
93        use typed_store::tidehunter_util::{
94            KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
95        };
96        let mutexes = default_mutex_count();
97        let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
98        let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
99        let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
100        let u32_prefix = KeyType::from_prefix_bits(3 * 8);
101        let u64_prefix = KeyType::from_prefix_bits(6 * 8);
102        let override_dirty_keys_config = KeySpaceConfig::new().with_max_dirty_keys(4_000);
103        let configs = vec![
104            (
105                Self::BLOCKS_CF.to_string(),
106                ThConfig::new_with_config_indexing(
107                    index_index_digest_key.clone(),
108                    mutexes,
109                    u32_prefix.clone(),
110                    override_dirty_keys_config.clone(),
111                ),
112            ),
113            (
114                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
115                ThConfig::new_with_config_indexing(
116                    index_index_digest_key.clone(),
117                    mutexes,
118                    u64_prefix.clone(),
119                    override_dirty_keys_config.clone(),
120                ),
121            ),
122            (
123                Self::COMMITS_CF.to_string(),
124                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
125            ),
126            (
127                Self::COMMIT_VOTES_CF.to_string(),
128                ThConfig::new_with_config_indexing(
129                    commit_vote_key,
130                    mutexes,
131                    u32_prefix.clone(),
132                    override_dirty_keys_config.clone(),
133                ),
134            ),
135            (
136                Self::COMMIT_INFO_CF.to_string(),
137                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
138            ),
139            (
140                Self::FINALIZED_COMMITS_CF.to_string(),
141                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
142            ),
143        ];
144        Self::open_tables_read_write(
145            path.into(),
146            MetricConf::new("consensus")
147                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
148            configs.into_iter().collect(),
149        )
150    }
151}
152
153impl Store for RocksDBStore {
154    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
155        fail_point!("consensus-store-before-write");
156
157        let mut batch = self.blocks.batch();
158        for block in write_batch.blocks {
159            let block_ref = block.reference();
160            batch
161                .insert_batch(
162                    &self.blocks,
163                    [(
164                        (block_ref.round, block_ref.author, block_ref.digest),
165                        block.serialized(),
166                    )],
167                )
168                .map_err(ConsensusError::RocksDBFailure)?;
169            batch
170                .insert_batch(
171                    &self.digests_by_authorities,
172                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
173                )
174                .map_err(ConsensusError::RocksDBFailure)?;
175            for vote in block.commit_votes() {
176                batch
177                    .insert_batch(
178                        &self.commit_votes,
179                        [((vote.index, vote.digest, block_ref), ())],
180                    )
181                    .map_err(ConsensusError::RocksDBFailure)?;
182            }
183        }
184
185        for commit in write_batch.commits {
186            batch
187                .insert_batch(
188                    &self.commits,
189                    [((commit.index(), commit.digest()), commit.serialized())],
190                )
191                .map_err(ConsensusError::RocksDBFailure)?;
192        }
193
194        for (commit_ref, commit_info) in write_batch.commit_info {
195            batch
196                .insert_batch(
197                    &self.commit_info,
198                    [((commit_ref.index, commit_ref.digest), commit_info)],
199                )
200                .map_err(ConsensusError::RocksDBFailure)?;
201        }
202
203        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
204            batch
205                .insert_batch(
206                    &self.finalized_commits,
207                    [((commit_ref.index, commit_ref.digest), rejected_transactions)],
208                )
209                .map_err(ConsensusError::RocksDBFailure)?;
210        }
211
212        batch.write()?;
213        fail_point!("consensus-store-after-write");
214        Ok(())
215    }
216
217    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
218        let keys = refs
219            .iter()
220            .map(|r| (r.round, r.author, r.digest))
221            .collect::<Vec<_>>();
222        let serialized = self.blocks.multi_get(keys)?;
223        let mut blocks = vec![];
224        for (key, serialized) in refs.iter().zip(serialized) {
225            if let Some(serialized) = serialized {
226                let signed_block: SignedBlock =
227                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
228                // Only accepted blocks should have been written to storage.
229                let block = VerifiedBlock::new_verified(signed_block, serialized);
230                // Makes sure block data is not corrupted, by comparing digests.
231                assert_eq!(*key, block.reference());
232                blocks.push(Some(block));
233            } else {
234                blocks.push(None);
235            }
236        }
237        Ok(blocks)
238    }
239
240    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
241        let refs = refs
242            .iter()
243            .map(|r| (r.round, r.author, r.digest))
244            .collect::<Vec<_>>();
245        let exist = self.blocks.multi_contains_keys(refs)?;
246        Ok(exist)
247    }
248
249    fn scan_blocks_by_author(
250        &self,
251        author: AuthorityIndex,
252        start_round: Round,
253    ) -> ConsensusResult<Vec<VerifiedBlock>> {
254        let mut refs = vec![];
255        for kv in self.digests_by_authorities.safe_range_iter((
256            Included((author, start_round, BlockDigest::MIN)),
257            Included((author, Round::MAX, BlockDigest::MAX)),
258        )) {
259            let ((author, round, digest), _) = kv?;
260            refs.push(BlockRef::new(round, author, digest));
261        }
262        let results = self.read_blocks(refs.as_slice())?;
263        let mut blocks = Vec::with_capacity(refs.len());
264        for (r, block) in refs.into_iter().zip(results.into_iter()) {
265            blocks.push(
266                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
267            );
268        }
269        Ok(blocks)
270    }
271
272    // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order.
273    // When a `before_round` is defined then the blocks of round `<=before_round` are returned. If not
274    // then the max value for round will be used as cut off.
275    fn scan_last_blocks_by_author(
276        &self,
277        author: AuthorityIndex,
278        num_of_rounds: u64,
279        before_round: Option<Round>,
280    ) -> ConsensusResult<Vec<VerifiedBlock>> {
281        let before_round = before_round.unwrap_or(Round::MAX);
282        let mut refs = VecDeque::new();
283        for kv in self
284            .digests_by_authorities
285            .reversed_safe_iter_with_bounds(
286                Some((author, Round::MIN, BlockDigest::MIN)),
287                Some((author, before_round, BlockDigest::MAX)),
288            )?
289            .take(num_of_rounds as usize)
290        {
291            let ((author, round, digest), _) = kv?;
292            refs.push_front(BlockRef::new(round, author, digest));
293        }
294        let results = self.read_blocks(refs.as_slices().0)?;
295        let mut blocks = vec![];
296        for (r, block) in refs.into_iter().zip(results.into_iter()) {
297            blocks.push(
298                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
299            );
300        }
301        Ok(blocks)
302    }
303
304    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
305        let Some(result) = self
306            .commits
307            .reversed_safe_iter_with_bounds(None, None)?
308            .next()
309        else {
310            return Ok(None);
311        };
312        let ((_index, digest), serialized) = result?;
313        let commit = TrustedCommit::new_trusted(
314            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
315            serialized,
316        );
317        assert_eq!(commit.digest(), digest);
318        Ok(Some(commit))
319    }
320
321    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
322        let mut commits = vec![];
323        for result in self.commits.safe_range_iter((
324            Included((range.start(), CommitDigest::MIN)),
325            Included((range.end(), CommitDigest::MAX)),
326        )) {
327            let ((_index, digest), serialized) = result?;
328            let commit = TrustedCommit::new_trusted(
329                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
330                serialized,
331            );
332            assert_eq!(commit.digest(), digest);
333            commits.push(commit);
334        }
335        Ok(commits)
336    }
337
338    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
339        let mut votes = Vec::new();
340        for vote in self.commit_votes.safe_range_iter((
341            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
342            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
343        )) {
344            let ((_, _, block_ref), _) = vote?;
345            votes.push(block_ref);
346        }
347        Ok(votes)
348    }
349
350    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
351        let Some(result) = self
352            .commit_info
353            .reversed_safe_iter_with_bounds(None, None)?
354            .next()
355        else {
356            return Ok(None);
357        };
358        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
359        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
360    }
361
362    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
363        let Some(result) = self
364            .finalized_commits
365            .reversed_safe_iter_with_bounds(None, None)?
366            .next()
367        else {
368            return Ok(None);
369        };
370        let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
371        Ok(Some(CommitRef::new(index, digest)))
372    }
373
374    fn read_rejected_transactions(
375        &self,
376        commit_ref: CommitRef,
377    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
378        let result = self
379            .finalized_commits
380            .get(&(commit_ref.index, commit_ref.digest))?;
381        Ok(result)
382    }
383}