1use std::{
5 collections::{BTreeMap, VecDeque},
6 ops::Bound::{Excluded, Included},
7 time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16 DBMapUtils, Map as _,
17 metrics::SamplingInterval,
18 rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23 block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25 error::{ConsensusError, ConsensusResult},
26};
27
28#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34 #[rename = "digests"]
36 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44 finalized_commits:
46 DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50 const BLOCKS_CF: &'static str = "blocks";
51 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52 const COMMITS_CF: &'static str = "commits";
53 const COMMIT_VOTES_CF: &'static str = "commit_votes";
54 const COMMIT_INFO_CF: &'static str = "commit_info";
55 const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57 #[cfg(not(tidehunter))]
59 pub fn new(path: &str, use_fifo_compaction: bool) -> Self {
60 let db_options =
63 default_db_options().optimize_db_for_write_throughput(2, use_fifo_compaction);
64 let mut metrics_conf = MetricConf::new("consensus");
65 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66 let cf_options = if use_fifo_compaction {
67 default_db_options().optimize_for_no_deletion()
68 } else {
69 default_db_options().optimize_for_write_throughput()
70 };
71 let blocks_cf_options = if use_fifo_compaction {
75 default_db_options().optimize_for_no_deletion()
76 } else {
77 default_db_options().optimize_for_write_throughput_no_deletion()
78 };
79 let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
80 (
81 Self::BLOCKS_CF.to_string(),
82 blocks_cf_options
83 .set_block_options(512, 128 << 10),
85 ),
86 (
87 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
88 cf_options.clone(),
89 ),
90 (Self::COMMITS_CF.to_string(), cf_options.clone()),
91 (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
92 (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
93 (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
94 ]));
95 Self::open_tables_read_write(
96 path.into(),
97 metrics_conf,
98 Some(db_options.options),
99 Some(column_family_options),
100 )
101 }
102
103 #[cfg(tidehunter)]
104 pub fn new(path: &str, _use_fifo_compaction: bool) -> Self {
105 tracing::warn!("Consensus store using tidehunter");
106 use typed_store::tidehunter_util::{
107 KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
108 };
109 let mutexes = default_mutex_count();
110 let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
111 let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
112 let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
113 let u32_prefix = KeyType::from_prefix_bits(3 * 8);
114 let u64_prefix = KeyType::from_prefix_bits(6 * 8);
115 let configs = vec![
116 (
117 Self::BLOCKS_CF.to_string(),
118 ThConfig::new_with_config_indexing(
119 index_index_digest_key.clone(),
120 mutexes,
121 u32_prefix.clone(),
122 KeySpaceConfig::new(),
123 ),
124 ),
125 (
126 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
127 ThConfig::new_with_config_indexing(
128 index_index_digest_key.clone(),
129 mutexes,
130 u64_prefix.clone(),
131 KeySpaceConfig::new(),
132 ),
133 ),
134 (
135 Self::COMMITS_CF.to_string(),
136 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
137 ),
138 (
139 Self::COMMIT_VOTES_CF.to_string(),
140 ThConfig::new_with_config_indexing(
141 commit_vote_key,
142 mutexes,
143 u32_prefix.clone(),
144 KeySpaceConfig::new(),
145 ),
146 ),
147 (
148 Self::COMMIT_INFO_CF.to_string(),
149 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
150 ),
151 (
152 Self::FINALIZED_COMMITS_CF.to_string(),
153 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
154 ),
155 ];
156 Self::open_tables_read_write(
157 path.into(),
158 MetricConf::new("consensus")
159 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
160 configs.into_iter().collect(),
161 )
162 }
163}
164
165impl Store for RocksDBStore {
166 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
167 fail_point!("consensus-store-before-write");
168
169 let mut batch = self.blocks.batch();
170 for block in write_batch.blocks {
171 let block_ref = block.reference();
172 batch
173 .insert_batch(
174 &self.blocks,
175 [(
176 (block_ref.round, block_ref.author, block_ref.digest),
177 block.serialized(),
178 )],
179 )
180 .map_err(ConsensusError::RocksDBFailure)?;
181 batch
182 .insert_batch(
183 &self.digests_by_authorities,
184 [((block_ref.author, block_ref.round, block_ref.digest), ())],
185 )
186 .map_err(ConsensusError::RocksDBFailure)?;
187 for vote in block.commit_votes() {
188 batch
189 .insert_batch(
190 &self.commit_votes,
191 [((vote.index, vote.digest, block_ref), ())],
192 )
193 .map_err(ConsensusError::RocksDBFailure)?;
194 }
195 }
196
197 for commit in write_batch.commits {
198 batch
199 .insert_batch(
200 &self.commits,
201 [((commit.index(), commit.digest()), commit.serialized())],
202 )
203 .map_err(ConsensusError::RocksDBFailure)?;
204 }
205
206 for (commit_ref, commit_info) in write_batch.commit_info {
207 batch
208 .insert_batch(
209 &self.commit_info,
210 [((commit_ref.index, commit_ref.digest), commit_info)],
211 )
212 .map_err(ConsensusError::RocksDBFailure)?;
213 }
214
215 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
216 batch
217 .insert_batch(
218 &self.finalized_commits,
219 [((commit_ref.index, commit_ref.digest), rejected_transactions)],
220 )
221 .map_err(ConsensusError::RocksDBFailure)?;
222 }
223
224 batch.write()?;
225 fail_point!("consensus-store-after-write");
226 Ok(())
227 }
228
229 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
230 let keys = refs
231 .iter()
232 .map(|r| (r.round, r.author, r.digest))
233 .collect::<Vec<_>>();
234 let serialized = self.blocks.multi_get(keys)?;
235 let mut blocks = vec![];
236 for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
237 if let Some(serialized) = serialized {
238 let signed_block: SignedBlock =
239 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
240 let block = VerifiedBlock::new_verified(signed_block, serialized);
242 assert_eq!(*key, block.reference());
244 blocks.push(Some(block));
245 } else {
246 blocks.push(None);
247 }
248 }
249 Ok(blocks)
250 }
251
252 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
253 let refs = refs
254 .iter()
255 .map(|r| (r.round, r.author, r.digest))
256 .collect::<Vec<_>>();
257 let exist = self.blocks.multi_contains_keys(refs)?;
258 Ok(exist)
259 }
260
261 fn scan_blocks_by_author(
262 &self,
263 author: AuthorityIndex,
264 start_round: Round,
265 ) -> ConsensusResult<Vec<VerifiedBlock>> {
266 self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
267 }
268
269 fn scan_blocks_by_author_in_range(
270 &self,
271 author: AuthorityIndex,
272 start_round: Round,
273 end_round: Round,
274 limit: usize,
275 ) -> ConsensusResult<Vec<VerifiedBlock>> {
276 let mut refs = vec![];
277 for kv in self.digests_by_authorities.safe_range_iter((
278 Included((author, start_round, BlockDigest::MIN)),
279 Excluded((author, end_round, BlockDigest::MIN)),
280 )) {
281 let ((author, round, digest), _) = kv?;
282 refs.push(BlockRef::new(round, author, digest));
283 if refs.len() >= limit {
284 break;
285 }
286 }
287 let results = self.read_blocks(refs.as_slice())?;
288 let mut blocks = Vec::with_capacity(refs.len());
289 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
290 blocks.push(
291 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
292 );
293 }
294 Ok(blocks)
295 }
296
297 fn scan_last_blocks_by_author(
301 &self,
302 author: AuthorityIndex,
303 num_of_rounds: u64,
304 before_round: Option<Round>,
305 ) -> ConsensusResult<Vec<VerifiedBlock>> {
306 let before_round = before_round.unwrap_or(Round::MAX);
307 let mut refs = VecDeque::new();
308 for kv in self
309 .digests_by_authorities
310 .reversed_safe_iter_with_bounds(
311 Some((author, Round::MIN, BlockDigest::MIN)),
312 Some((author, before_round, BlockDigest::MAX)),
313 )?
314 .take(num_of_rounds as usize)
315 {
316 let ((author, round, digest), _) = kv?;
317 refs.push_front(BlockRef::new(round, author, digest));
318 }
319 let refs_slice = refs.make_contiguous();
320 let results = self.read_blocks(refs_slice)?;
321 let mut blocks = vec![];
322 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
323 blocks.push(
324 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
325 );
326 }
327 Ok(blocks)
328 }
329
330 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
331 let Some(result) = self
332 .commits
333 .reversed_safe_iter_with_bounds(None, None)?
334 .next()
335 else {
336 return Ok(None);
337 };
338 let ((_index, digest), serialized) = result?;
339 let commit = TrustedCommit::new_trusted(
340 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
341 serialized,
342 );
343 assert_eq!(commit.digest(), digest);
344 Ok(Some(commit))
345 }
346
347 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
348 let mut commits = vec![];
349 for result in self.commits.safe_range_iter((
350 Included((range.start(), CommitDigest::MIN)),
351 Included((range.end(), CommitDigest::MAX)),
352 )) {
353 let ((_index, digest), serialized) = result?;
354 let commit = TrustedCommit::new_trusted(
355 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
356 serialized,
357 );
358 assert_eq!(commit.digest(), digest);
359 commits.push(commit);
360 }
361 Ok(commits)
362 }
363
364 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
365 let mut votes = Vec::new();
366 for vote in self.commit_votes.safe_range_iter((
367 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
368 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
369 )) {
370 let ((_, _, block_ref), _) = vote?;
371 votes.push(block_ref);
372 }
373 Ok(votes)
374 }
375
376 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
377 let Some(result) = self
378 .commit_info
379 .reversed_safe_iter_with_bounds(None, None)?
380 .next()
381 else {
382 return Ok(None);
383 };
384 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
385 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
386 }
387
388 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
389 let Some(result) = self
390 .finalized_commits
391 .reversed_safe_iter_with_bounds(None, None)?
392 .next()
393 else {
394 return Ok(None);
395 };
396 let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
397 Ok(Some(CommitRef::new(index, digest)))
398 }
399
400 fn read_rejected_transactions(
401 &self,
402 commit_ref: CommitRef,
403 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
404 let result = self
405 .finalized_commits
406 .get(&(commit_ref.index, commit_ref.digest))?;
407 Ok(result)
408 }
409}