consensus_core/storage/
rocksdb_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    ops::Bound::Included,
7    time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use sui_macros::fail_point;
14use typed_store::{
15    DBMapUtils, Map as _,
16    metrics::SamplingInterval,
17    rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
18};
19
20use super::{CommitInfo, Store, WriteBatch};
21use crate::{
22    block::{BlockAPI as _, SignedBlock, VerifiedBlock},
23    commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
24    error::{ConsensusError, ConsensusResult},
25};
26
27/// Persistent storage with RocksDB.
28#[derive(DBMapUtils)]
29#[cfg_attr(tidehunter, tidehunter)]
30pub struct RocksDBStore {
31    /// Stores SignedBlock by refs.
32    blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
33    /// A secondary index that orders refs first by authors.
34    #[rename = "digests"]
35    digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
36    /// Maps commit index to Commit.
37    commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
38    /// Collects votes on commits.
39    /// TODO: batch multiple votes into a single row.
40    commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
41    /// Stores info related to Commit that helps recovery.
42    commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
43    /// Maps finalized commits to the transactions rejected in the commit.
44    finalized_commits:
45        DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
46}
47
48impl RocksDBStore {
49    const BLOCKS_CF: &'static str = "blocks";
50    const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
51    const COMMITS_CF: &'static str = "commits";
52    const COMMIT_VOTES_CF: &'static str = "commit_votes";
53    const COMMIT_INFO_CF: &'static str = "commit_info";
54    const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
55
56    /// Creates a new instance of RocksDB storage.
57    #[cfg(not(tidehunter))]
58    pub fn new(path: &str) -> Self {
59        // Consensus data has high write throughput (all transactions) and is rarely read
60        // (only during recovery and when helping peers catch up).
61        let db_options = default_db_options().optimize_db_for_write_throughput(2);
62        let mut metrics_conf = MetricConf::new("consensus");
63        metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
64        let cf_options = default_db_options().optimize_for_write_throughput();
65        let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
66            (
67                Self::BLOCKS_CF.to_string(),
68                default_db_options()
69                    .optimize_for_write_throughput_no_deletion()
70                    // Using larger block is ok since there is not much point reads on the cf.
71                    .set_block_options(512, 128 << 10),
72            ),
73            (
74                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
75                cf_options.clone(),
76            ),
77            (Self::COMMITS_CF.to_string(), cf_options.clone()),
78            (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
79            (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
80            (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
81        ]));
82        Self::open_tables_read_write(
83            path.into(),
84            metrics_conf,
85            Some(db_options.options),
86            Some(column_family_options),
87        )
88    }
89
90    #[cfg(tidehunter)]
91    pub fn new(path: &str) -> Self {
92        tracing::warn!("Consensus store using tidehunter");
93        use typed_store::tidehunter_util::{
94            KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
95        };
96        let mutexes = default_mutex_count();
97        let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
98        let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
99        let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
100        let u32_prefix = KeyType::from_prefix_bits(3 * 8);
101        let u64_prefix = KeyType::from_prefix_bits(6 * 8);
102        let configs = vec![
103            (
104                Self::BLOCKS_CF.to_string(),
105                ThConfig::new_with_config_indexing(
106                    index_index_digest_key.clone(),
107                    mutexes,
108                    u32_prefix.clone(),
109                    KeySpaceConfig::new(),
110                ),
111            ),
112            (
113                Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
114                ThConfig::new_with_config_indexing(
115                    index_index_digest_key.clone(),
116                    mutexes,
117                    u64_prefix.clone(),
118                    KeySpaceConfig::new(),
119                ),
120            ),
121            (
122                Self::COMMITS_CF.to_string(),
123                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
124            ),
125            (
126                Self::COMMIT_VOTES_CF.to_string(),
127                ThConfig::new_with_config_indexing(
128                    commit_vote_key,
129                    mutexes,
130                    u32_prefix.clone(),
131                    KeySpaceConfig::new(),
132                ),
133            ),
134            (
135                Self::COMMIT_INFO_CF.to_string(),
136                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137            ),
138            (
139                Self::FINALIZED_COMMITS_CF.to_string(),
140                ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
141            ),
142        ];
143        Self::open_tables_read_write(
144            path.into(),
145            MetricConf::new("consensus")
146                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
147            configs.into_iter().collect(),
148        )
149    }
150}
151
152impl Store for RocksDBStore {
153    fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
154        fail_point!("consensus-store-before-write");
155
156        let mut batch = self.blocks.batch();
157        for block in write_batch.blocks {
158            let block_ref = block.reference();
159            batch
160                .insert_batch(
161                    &self.blocks,
162                    [(
163                        (block_ref.round, block_ref.author, block_ref.digest),
164                        block.serialized(),
165                    )],
166                )
167                .map_err(ConsensusError::RocksDBFailure)?;
168            batch
169                .insert_batch(
170                    &self.digests_by_authorities,
171                    [((block_ref.author, block_ref.round, block_ref.digest), ())],
172                )
173                .map_err(ConsensusError::RocksDBFailure)?;
174            for vote in block.commit_votes() {
175                batch
176                    .insert_batch(
177                        &self.commit_votes,
178                        [((vote.index, vote.digest, block_ref), ())],
179                    )
180                    .map_err(ConsensusError::RocksDBFailure)?;
181            }
182        }
183
184        for commit in write_batch.commits {
185            batch
186                .insert_batch(
187                    &self.commits,
188                    [((commit.index(), commit.digest()), commit.serialized())],
189                )
190                .map_err(ConsensusError::RocksDBFailure)?;
191        }
192
193        for (commit_ref, commit_info) in write_batch.commit_info {
194            batch
195                .insert_batch(
196                    &self.commit_info,
197                    [((commit_ref.index, commit_ref.digest), commit_info)],
198                )
199                .map_err(ConsensusError::RocksDBFailure)?;
200        }
201
202        for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
203            batch
204                .insert_batch(
205                    &self.finalized_commits,
206                    [((commit_ref.index, commit_ref.digest), rejected_transactions)],
207                )
208                .map_err(ConsensusError::RocksDBFailure)?;
209        }
210
211        batch.write()?;
212        fail_point!("consensus-store-after-write");
213        Ok(())
214    }
215
216    fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
217        let keys = refs
218            .iter()
219            .map(|r| (r.round, r.author, r.digest))
220            .collect::<Vec<_>>();
221        let serialized = self.blocks.multi_get(keys)?;
222        let mut blocks = vec![];
223        for (key, serialized) in refs.iter().zip(serialized) {
224            if let Some(serialized) = serialized {
225                let signed_block: SignedBlock =
226                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
227                // Only accepted blocks should have been written to storage.
228                let block = VerifiedBlock::new_verified(signed_block, serialized);
229                // Makes sure block data is not corrupted, by comparing digests.
230                assert_eq!(*key, block.reference());
231                blocks.push(Some(block));
232            } else {
233                blocks.push(None);
234            }
235        }
236        Ok(blocks)
237    }
238
239    fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
240        let refs = refs
241            .iter()
242            .map(|r| (r.round, r.author, r.digest))
243            .collect::<Vec<_>>();
244        let exist = self.blocks.multi_contains_keys(refs)?;
245        Ok(exist)
246    }
247
248    fn scan_blocks_by_author(
249        &self,
250        author: AuthorityIndex,
251        start_round: Round,
252    ) -> ConsensusResult<Vec<VerifiedBlock>> {
253        let mut refs = vec![];
254        for kv in self.digests_by_authorities.safe_range_iter((
255            Included((author, start_round, BlockDigest::MIN)),
256            Included((author, Round::MAX, BlockDigest::MAX)),
257        )) {
258            let ((author, round, digest), _) = kv?;
259            refs.push(BlockRef::new(round, author, digest));
260        }
261        let results = self.read_blocks(refs.as_slice())?;
262        let mut blocks = Vec::with_capacity(refs.len());
263        for (r, block) in refs.into_iter().zip(results.into_iter()) {
264            blocks.push(
265                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
266            );
267        }
268        Ok(blocks)
269    }
270
271    // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order.
272    // When a `before_round` is defined then the blocks of round `<=before_round` are returned. If not
273    // then the max value for round will be used as cut off.
274    fn scan_last_blocks_by_author(
275        &self,
276        author: AuthorityIndex,
277        num_of_rounds: u64,
278        before_round: Option<Round>,
279    ) -> ConsensusResult<Vec<VerifiedBlock>> {
280        let before_round = before_round.unwrap_or(Round::MAX);
281        let mut refs = VecDeque::new();
282        for kv in self
283            .digests_by_authorities
284            .reversed_safe_iter_with_bounds(
285                Some((author, Round::MIN, BlockDigest::MIN)),
286                Some((author, before_round, BlockDigest::MAX)),
287            )?
288            .take(num_of_rounds as usize)
289        {
290            let ((author, round, digest), _) = kv?;
291            refs.push_front(BlockRef::new(round, author, digest));
292        }
293        let refs_slice = refs.make_contiguous();
294        let results = self.read_blocks(refs_slice)?;
295        let mut blocks = vec![];
296        for (r, block) in refs.into_iter().zip(results.into_iter()) {
297            blocks.push(
298                block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
299            );
300        }
301        Ok(blocks)
302    }
303
304    fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
305        let Some(result) = self
306            .commits
307            .reversed_safe_iter_with_bounds(None, None)?
308            .next()
309        else {
310            return Ok(None);
311        };
312        let ((_index, digest), serialized) = result?;
313        let commit = TrustedCommit::new_trusted(
314            bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
315            serialized,
316        );
317        assert_eq!(commit.digest(), digest);
318        Ok(Some(commit))
319    }
320
321    fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
322        let mut commits = vec![];
323        for result in self.commits.safe_range_iter((
324            Included((range.start(), CommitDigest::MIN)),
325            Included((range.end(), CommitDigest::MAX)),
326        )) {
327            let ((_index, digest), serialized) = result?;
328            let commit = TrustedCommit::new_trusted(
329                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
330                serialized,
331            );
332            assert_eq!(commit.digest(), digest);
333            commits.push(commit);
334        }
335        Ok(commits)
336    }
337
338    fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
339        let mut votes = Vec::new();
340        for vote in self.commit_votes.safe_range_iter((
341            Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
342            Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
343        )) {
344            let ((_, _, block_ref), _) = vote?;
345            votes.push(block_ref);
346        }
347        Ok(votes)
348    }
349
350    fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
351        let Some(result) = self
352            .commit_info
353            .reversed_safe_iter_with_bounds(None, None)?
354            .next()
355        else {
356            return Ok(None);
357        };
358        let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
359        Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
360    }
361
362    fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
363        let Some(result) = self
364            .finalized_commits
365            .reversed_safe_iter_with_bounds(None, None)?
366            .next()
367        else {
368            return Ok(None);
369        };
370        let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
371        Ok(Some(CommitRef::new(index, digest)))
372    }
373
374    fn read_rejected_transactions(
375        &self,
376        commit_ref: CommitRef,
377    ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
378        let result = self
379            .finalized_commits
380            .get(&(commit_ref.index, commit_ref.digest))?;
381        Ok(result)
382    }
383}