1use std::{
5 collections::{BTreeMap, VecDeque},
6 ops::Bound::Included,
7 time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use sui_macros::fail_point;
14use typed_store::{
15 DBMapUtils, Map as _,
16 metrics::SamplingInterval,
17 rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
18};
19
20use super::{CommitInfo, Store, WriteBatch};
21use crate::{
22 block::{BlockAPI as _, SignedBlock, VerifiedBlock},
23 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
24 error::{ConsensusError, ConsensusResult},
25};
26
27#[derive(DBMapUtils)]
29#[cfg_attr(tidehunter, tidehunter)]
30pub struct RocksDBStore {
31 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
33 #[rename = "digests"]
35 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
36 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
38 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
41 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
43 finalized_commits:
45 DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
46}
47
48impl RocksDBStore {
49 const BLOCKS_CF: &'static str = "blocks";
50 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
51 const COMMITS_CF: &'static str = "commits";
52 const COMMIT_VOTES_CF: &'static str = "commit_votes";
53 const COMMIT_INFO_CF: &'static str = "commit_info";
54 const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
55
56 #[cfg(not(tidehunter))]
58 pub fn new(path: &str) -> Self {
59 let db_options = default_db_options().optimize_db_for_write_throughput(2);
62 let mut metrics_conf = MetricConf::new("consensus");
63 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
64 let cf_options = default_db_options().optimize_for_write_throughput();
65 let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
66 (
67 Self::BLOCKS_CF.to_string(),
68 default_db_options()
69 .optimize_for_write_throughput_no_deletion()
70 .set_block_options(512, 128 << 10),
72 ),
73 (
74 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
75 cf_options.clone(),
76 ),
77 (Self::COMMITS_CF.to_string(), cf_options.clone()),
78 (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
79 (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
80 (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
81 ]));
82 Self::open_tables_read_write(
83 path.into(),
84 metrics_conf,
85 Some(db_options.options),
86 Some(column_family_options),
87 )
88 }
89
90 #[cfg(tidehunter)]
91 pub fn new(path: &str) -> Self {
92 tracing::warn!("Consensus store using tidehunter");
93 use typed_store::tidehunter_util::{
94 KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
95 };
96 let mutexes = default_mutex_count();
97 let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
98 let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
99 let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
100 let u32_prefix = KeyType::from_prefix_bits(3 * 8);
101 let u64_prefix = KeyType::from_prefix_bits(6 * 8);
102 let override_dirty_keys_config = KeySpaceConfig::new().with_max_dirty_keys(4_000);
103 let configs = vec![
104 (
105 Self::BLOCKS_CF.to_string(),
106 ThConfig::new_with_config_indexing(
107 index_index_digest_key.clone(),
108 mutexes,
109 u32_prefix.clone(),
110 override_dirty_keys_config.clone(),
111 ),
112 ),
113 (
114 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
115 ThConfig::new_with_config_indexing(
116 index_index_digest_key.clone(),
117 mutexes,
118 u64_prefix.clone(),
119 override_dirty_keys_config.clone(),
120 ),
121 ),
122 (
123 Self::COMMITS_CF.to_string(),
124 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
125 ),
126 (
127 Self::COMMIT_VOTES_CF.to_string(),
128 ThConfig::new_with_config_indexing(
129 commit_vote_key,
130 mutexes,
131 u32_prefix.clone(),
132 override_dirty_keys_config.clone(),
133 ),
134 ),
135 (
136 Self::COMMIT_INFO_CF.to_string(),
137 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
138 ),
139 (
140 Self::FINALIZED_COMMITS_CF.to_string(),
141 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
142 ),
143 ];
144 Self::open_tables_read_write(
145 path.into(),
146 MetricConf::new("consensus")
147 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
148 configs.into_iter().collect(),
149 )
150 }
151}
152
153impl Store for RocksDBStore {
154 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
155 fail_point!("consensus-store-before-write");
156
157 let mut batch = self.blocks.batch();
158 for block in write_batch.blocks {
159 let block_ref = block.reference();
160 batch
161 .insert_batch(
162 &self.blocks,
163 [(
164 (block_ref.round, block_ref.author, block_ref.digest),
165 block.serialized(),
166 )],
167 )
168 .map_err(ConsensusError::RocksDBFailure)?;
169 batch
170 .insert_batch(
171 &self.digests_by_authorities,
172 [((block_ref.author, block_ref.round, block_ref.digest), ())],
173 )
174 .map_err(ConsensusError::RocksDBFailure)?;
175 for vote in block.commit_votes() {
176 batch
177 .insert_batch(
178 &self.commit_votes,
179 [((vote.index, vote.digest, block_ref), ())],
180 )
181 .map_err(ConsensusError::RocksDBFailure)?;
182 }
183 }
184
185 for commit in write_batch.commits {
186 batch
187 .insert_batch(
188 &self.commits,
189 [((commit.index(), commit.digest()), commit.serialized())],
190 )
191 .map_err(ConsensusError::RocksDBFailure)?;
192 }
193
194 for (commit_ref, commit_info) in write_batch.commit_info {
195 batch
196 .insert_batch(
197 &self.commit_info,
198 [((commit_ref.index, commit_ref.digest), commit_info)],
199 )
200 .map_err(ConsensusError::RocksDBFailure)?;
201 }
202
203 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
204 batch
205 .insert_batch(
206 &self.finalized_commits,
207 [((commit_ref.index, commit_ref.digest), rejected_transactions)],
208 )
209 .map_err(ConsensusError::RocksDBFailure)?;
210 }
211
212 batch.write()?;
213 fail_point!("consensus-store-after-write");
214 Ok(())
215 }
216
217 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
218 let keys = refs
219 .iter()
220 .map(|r| (r.round, r.author, r.digest))
221 .collect::<Vec<_>>();
222 let serialized = self.blocks.multi_get(keys)?;
223 let mut blocks = vec![];
224 for (key, serialized) in refs.iter().zip(serialized) {
225 if let Some(serialized) = serialized {
226 let signed_block: SignedBlock =
227 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
228 let block = VerifiedBlock::new_verified(signed_block, serialized);
230 assert_eq!(*key, block.reference());
232 blocks.push(Some(block));
233 } else {
234 blocks.push(None);
235 }
236 }
237 Ok(blocks)
238 }
239
240 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
241 let refs = refs
242 .iter()
243 .map(|r| (r.round, r.author, r.digest))
244 .collect::<Vec<_>>();
245 let exist = self.blocks.multi_contains_keys(refs)?;
246 Ok(exist)
247 }
248
249 fn scan_blocks_by_author(
250 &self,
251 author: AuthorityIndex,
252 start_round: Round,
253 ) -> ConsensusResult<Vec<VerifiedBlock>> {
254 let mut refs = vec![];
255 for kv in self.digests_by_authorities.safe_range_iter((
256 Included((author, start_round, BlockDigest::MIN)),
257 Included((author, Round::MAX, BlockDigest::MAX)),
258 )) {
259 let ((author, round, digest), _) = kv?;
260 refs.push(BlockRef::new(round, author, digest));
261 }
262 let results = self.read_blocks(refs.as_slice())?;
263 let mut blocks = Vec::with_capacity(refs.len());
264 for (r, block) in refs.into_iter().zip(results.into_iter()) {
265 blocks.push(
266 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
267 );
268 }
269 Ok(blocks)
270 }
271
272 fn scan_last_blocks_by_author(
276 &self,
277 author: AuthorityIndex,
278 num_of_rounds: u64,
279 before_round: Option<Round>,
280 ) -> ConsensusResult<Vec<VerifiedBlock>> {
281 let before_round = before_round.unwrap_or(Round::MAX);
282 let mut refs = VecDeque::new();
283 for kv in self
284 .digests_by_authorities
285 .reversed_safe_iter_with_bounds(
286 Some((author, Round::MIN, BlockDigest::MIN)),
287 Some((author, before_round, BlockDigest::MAX)),
288 )?
289 .take(num_of_rounds as usize)
290 {
291 let ((author, round, digest), _) = kv?;
292 refs.push_front(BlockRef::new(round, author, digest));
293 }
294 let results = self.read_blocks(refs.as_slices().0)?;
295 let mut blocks = vec![];
296 for (r, block) in refs.into_iter().zip(results.into_iter()) {
297 blocks.push(
298 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
299 );
300 }
301 Ok(blocks)
302 }
303
304 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
305 let Some(result) = self
306 .commits
307 .reversed_safe_iter_with_bounds(None, None)?
308 .next()
309 else {
310 return Ok(None);
311 };
312 let ((_index, digest), serialized) = result?;
313 let commit = TrustedCommit::new_trusted(
314 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
315 serialized,
316 );
317 assert_eq!(commit.digest(), digest);
318 Ok(Some(commit))
319 }
320
321 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
322 let mut commits = vec![];
323 for result in self.commits.safe_range_iter((
324 Included((range.start(), CommitDigest::MIN)),
325 Included((range.end(), CommitDigest::MAX)),
326 )) {
327 let ((_index, digest), serialized) = result?;
328 let commit = TrustedCommit::new_trusted(
329 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
330 serialized,
331 );
332 assert_eq!(commit.digest(), digest);
333 commits.push(commit);
334 }
335 Ok(commits)
336 }
337
338 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
339 let mut votes = Vec::new();
340 for vote in self.commit_votes.safe_range_iter((
341 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
342 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
343 )) {
344 let ((_, _, block_ref), _) = vote?;
345 votes.push(block_ref);
346 }
347 Ok(votes)
348 }
349
350 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
351 let Some(result) = self
352 .commit_info
353 .reversed_safe_iter_with_bounds(None, None)?
354 .next()
355 else {
356 return Ok(None);
357 };
358 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
359 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
360 }
361
362 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
363 let Some(result) = self
364 .finalized_commits
365 .reversed_safe_iter_with_bounds(None, None)?
366 .next()
367 else {
368 return Ok(None);
369 };
370 let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
371 Ok(Some(CommitRef::new(index, digest)))
372 }
373
374 fn read_rejected_transactions(
375 &self,
376 commit_ref: CommitRef,
377 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
378 let result = self
379 .finalized_commits
380 .get(&(commit_ref.index, commit_ref.digest))?;
381 Ok(result)
382 }
383}