1use std::{
5 collections::{BTreeMap, VecDeque},
6 ops::Bound::{Excluded, Included},
7 time::Duration,
8};
9
10use bytes::Bytes;
11use consensus_config::AuthorityIndex;
12use consensus_types::block::{BlockDigest, BlockRef, Round, TransactionIndex};
13use mysten_common::ZipDebugEqIteratorExt;
14use sui_macros::fail_point;
15use typed_store::{
16 DBMapUtils, Map as _,
17 metrics::SamplingInterval,
18 rocks::{DBMap, DBMapTableConfigMap, MetricConf, default_db_options},
19};
20
21use super::{CommitInfo, Store, WriteBatch};
22use crate::{
23 block::{BlockAPI as _, SignedBlock, VerifiedBlock},
24 commit::{CommitAPI as _, CommitDigest, CommitIndex, CommitRange, CommitRef, TrustedCommit},
25 error::{ConsensusError, ConsensusResult},
26};
27
28#[derive(DBMapUtils)]
30#[cfg_attr(tidehunter, tidehunter)]
31pub struct RocksDBStore {
32 blocks: DBMap<(Round, AuthorityIndex, BlockDigest), Bytes>,
34 #[rename = "digests"]
36 digests_by_authorities: DBMap<(AuthorityIndex, Round, BlockDigest), ()>,
37 commits: DBMap<(CommitIndex, CommitDigest), Bytes>,
39 commit_votes: DBMap<(CommitIndex, CommitDigest, BlockRef), ()>,
42 commit_info: DBMap<(CommitIndex, CommitDigest), CommitInfo>,
44 finalized_commits:
46 DBMap<(CommitIndex, CommitDigest), BTreeMap<BlockRef, Vec<TransactionIndex>>>,
47}
48
49impl RocksDBStore {
50 const BLOCKS_CF: &'static str = "blocks";
51 const DIGESTS_BY_AUTHORITIES_CF: &'static str = "digests";
52 const COMMITS_CF: &'static str = "commits";
53 const COMMIT_VOTES_CF: &'static str = "commit_votes";
54 const COMMIT_INFO_CF: &'static str = "commit_info";
55 const FINALIZED_COMMITS_CF: &'static str = "finalized_commits";
56
57 #[cfg(not(tidehunter))]
59 pub fn new(path: &str) -> Self {
60 let db_options =
63 default_db_options().optimize_db_for_write_throughput(2, true);
64 let mut metrics_conf = MetricConf::new("consensus");
65 metrics_conf.read_sample_interval = SamplingInterval::new(Duration::from_secs(60), 0);
66 let cf_options = default_db_options().optimize_for_no_deletion();
67 let column_family_options = DBMapTableConfigMap::new(BTreeMap::from([
68 (
69 Self::BLOCKS_CF.to_string(),
70 cf_options
71 .clone()
72 .set_block_options(512, 128 << 10),
74 ),
75 (
76 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
77 cf_options.clone(),
78 ),
79 (Self::COMMITS_CF.to_string(), cf_options.clone()),
80 (Self::COMMIT_VOTES_CF.to_string(), cf_options.clone()),
81 (Self::COMMIT_INFO_CF.to_string(), cf_options.clone()),
82 (Self::FINALIZED_COMMITS_CF.to_string(), cf_options.clone()),
83 ]));
84 Self::open_tables_read_write(
85 path.into(),
86 metrics_conf,
87 Some(db_options.options),
88 Some(column_family_options),
89 )
90 }
91
92 #[cfg(tidehunter)]
93 pub fn new(path: &str) -> Self {
94 tracing::warn!("Consensus store using tidehunter");
95 use typed_store::tidehunter_util::{
96 KeyIndexing, KeySpaceConfig, KeyType, ThConfig, default_mutex_count,
97 };
98 let mutexes = default_mutex_count();
99 let index_digest_key = KeyIndexing::key_reduction(36, 0..12);
100 let index_index_digest_key = KeyIndexing::key_reduction(40, 0..24);
101 let commit_vote_key = KeyIndexing::key_reduction(76, 0..60);
102 let u32_prefix = KeyType::from_prefix_bits(3 * 8);
103 let u64_prefix = KeyType::from_prefix_bits(6 * 8);
104 let configs = vec![
105 (
106 Self::BLOCKS_CF.to_string(),
107 ThConfig::new_with_config_indexing(
108 index_index_digest_key.clone(),
109 mutexes,
110 u32_prefix.clone(),
111 KeySpaceConfig::new(),
112 ),
113 ),
114 (
115 Self::DIGESTS_BY_AUTHORITIES_CF.to_string(),
116 ThConfig::new_with_config_indexing(
117 index_index_digest_key.clone(),
118 mutexes,
119 u64_prefix.clone(),
120 KeySpaceConfig::new(),
121 ),
122 ),
123 (
124 Self::COMMITS_CF.to_string(),
125 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
126 ),
127 (
128 Self::COMMIT_VOTES_CF.to_string(),
129 ThConfig::new_with_config_indexing(
130 commit_vote_key,
131 mutexes,
132 u32_prefix.clone(),
133 KeySpaceConfig::new(),
134 ),
135 ),
136 (
137 Self::COMMIT_INFO_CF.to_string(),
138 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
139 ),
140 (
141 Self::FINALIZED_COMMITS_CF.to_string(),
142 ThConfig::new_with_indexing(index_digest_key.clone(), mutexes, u32_prefix.clone()),
143 ),
144 ];
145 Self::open_tables_read_write(
146 path.into(),
147 MetricConf::new("consensus")
148 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0))
149 .with_th_batch_compression(),
150 configs.into_iter().collect(),
151 )
152 }
153}
154
155impl Store for RocksDBStore {
156 fn write(&self, write_batch: WriteBatch) -> ConsensusResult<()> {
157 fail_point!("consensus-store-before-write");
158
159 let mut batch = self.blocks.batch();
160 for block in write_batch.blocks {
161 let block_ref = block.reference();
162 batch
163 .insert_batch(
164 &self.blocks,
165 [(
166 (block_ref.round, block_ref.author, block_ref.digest),
167 block.serialized(),
168 )],
169 )
170 .map_err(ConsensusError::RocksDBFailure)?;
171 batch
172 .insert_batch(
173 &self.digests_by_authorities,
174 [((block_ref.author, block_ref.round, block_ref.digest), ())],
175 )
176 .map_err(ConsensusError::RocksDBFailure)?;
177 for vote in block.commit_votes() {
178 batch
179 .insert_batch(
180 &self.commit_votes,
181 [((vote.index, vote.digest, block_ref), ())],
182 )
183 .map_err(ConsensusError::RocksDBFailure)?;
184 }
185 }
186
187 for commit in write_batch.commits {
188 batch
189 .insert_batch(
190 &self.commits,
191 [((commit.index(), commit.digest()), commit.serialized())],
192 )
193 .map_err(ConsensusError::RocksDBFailure)?;
194 }
195
196 for (commit_ref, commit_info) in write_batch.commit_info {
197 batch
198 .insert_batch(
199 &self.commit_info,
200 [((commit_ref.index, commit_ref.digest), commit_info)],
201 )
202 .map_err(ConsensusError::RocksDBFailure)?;
203 }
204
205 for (commit_ref, rejected_transactions) in write_batch.finalized_commits {
206 batch
207 .insert_batch(
208 &self.finalized_commits,
209 [((commit_ref.index, commit_ref.digest), rejected_transactions)],
210 )
211 .map_err(ConsensusError::RocksDBFailure)?;
212 }
213
214 batch.write()?;
215 fail_point!("consensus-store-after-write");
216 Ok(())
217 }
218
219 fn read_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<Option<VerifiedBlock>>> {
220 let keys = refs
221 .iter()
222 .map(|r| (r.round, r.author, r.digest))
223 .collect::<Vec<_>>();
224 let serialized = self.blocks.multi_get(keys)?;
225 let mut blocks = vec![];
226 for (key, serialized) in refs.iter().zip_debug_eq(serialized) {
227 if let Some(serialized) = serialized {
228 let signed_block: SignedBlock =
229 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
230 let block = VerifiedBlock::new_verified(signed_block, serialized);
232 assert_eq!(*key, block.reference());
234 blocks.push(Some(block));
235 } else {
236 blocks.push(None);
237 }
238 }
239 Ok(blocks)
240 }
241
242 fn contains_blocks(&self, refs: &[BlockRef]) -> ConsensusResult<Vec<bool>> {
243 let refs = refs
244 .iter()
245 .map(|r| (r.round, r.author, r.digest))
246 .collect::<Vec<_>>();
247 let exist = self.blocks.multi_contains_keys(refs)?;
248 Ok(exist)
249 }
250
251 fn scan_blocks_by_author(
252 &self,
253 author: AuthorityIndex,
254 start_round: Round,
255 ) -> ConsensusResult<Vec<VerifiedBlock>> {
256 self.scan_blocks_by_author_in_range(author, start_round, Round::MAX, usize::MAX)
257 }
258
259 fn scan_blocks_by_author_in_range(
260 &self,
261 author: AuthorityIndex,
262 start_round: Round,
263 end_round: Round,
264 limit: usize,
265 ) -> ConsensusResult<Vec<VerifiedBlock>> {
266 let mut refs = vec![];
267 for kv in self.digests_by_authorities.safe_range_iter((
268 Included((author, start_round, BlockDigest::MIN)),
269 Excluded((author, end_round, BlockDigest::MIN)),
270 )) {
271 let ((author, round, digest), _) = kv?;
272 refs.push(BlockRef::new(round, author, digest));
273 if refs.len() >= limit {
274 break;
275 }
276 }
277 let results = self.read_blocks(refs.as_slice())?;
278 let mut blocks = Vec::with_capacity(refs.len());
279 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
280 blocks.push(
281 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
282 );
283 }
284 Ok(blocks)
285 }
286
287 fn scan_last_blocks_by_author(
291 &self,
292 author: AuthorityIndex,
293 num_of_rounds: u64,
294 before_round: Option<Round>,
295 ) -> ConsensusResult<Vec<VerifiedBlock>> {
296 let before_round = before_round.unwrap_or(Round::MAX);
297 let mut refs = VecDeque::new();
298 for kv in self
299 .digests_by_authorities
300 .reversed_safe_iter_with_bounds(
301 Some((author, Round::MIN, BlockDigest::MIN)),
302 Some((author, before_round, BlockDigest::MAX)),
303 )?
304 .take(num_of_rounds as usize)
305 {
306 let ((author, round, digest), _) = kv?;
307 refs.push_front(BlockRef::new(round, author, digest));
308 }
309 let refs_slice = refs.make_contiguous();
310 let results = self.read_blocks(refs_slice)?;
311 let mut blocks = vec![];
312 for (r, block) in refs.into_iter().zip_debug_eq(results.into_iter()) {
313 blocks.push(
314 block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)),
315 );
316 }
317 Ok(blocks)
318 }
319
320 fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
321 let Some(result) = self
322 .commits
323 .reversed_safe_iter_with_bounds(None, None)?
324 .next()
325 else {
326 return Ok(None);
327 };
328 let ((_index, digest), serialized) = result?;
329 let commit = TrustedCommit::new_trusted(
330 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
331 serialized,
332 );
333 assert_eq!(commit.digest(), digest);
334 Ok(Some(commit))
335 }
336
337 fn scan_commits(&self, range: CommitRange) -> ConsensusResult<Vec<TrustedCommit>> {
338 let mut commits = vec![];
339 for result in self.commits.safe_range_iter((
340 Included((range.start(), CommitDigest::MIN)),
341 Included((range.end(), CommitDigest::MAX)),
342 )) {
343 let ((_index, digest), serialized) = result?;
344 let commit = TrustedCommit::new_trusted(
345 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedCommit)?,
346 serialized,
347 );
348 assert_eq!(commit.digest(), digest);
349 commits.push(commit);
350 }
351 Ok(commits)
352 }
353
354 fn read_commit_votes(&self, commit_index: CommitIndex) -> ConsensusResult<Vec<BlockRef>> {
355 let mut votes = Vec::new();
356 for vote in self.commit_votes.safe_range_iter((
357 Included((commit_index, CommitDigest::MIN, BlockRef::MIN)),
358 Included((commit_index, CommitDigest::MAX, BlockRef::MAX)),
359 )) {
360 let ((_, _, block_ref), _) = vote?;
361 votes.push(block_ref);
362 }
363 Ok(votes)
364 }
365
366 fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
367 let Some(result) = self
368 .commit_info
369 .reversed_safe_iter_with_bounds(None, None)?
370 .next()
371 else {
372 return Ok(None);
373 };
374 let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
375 Ok(Some((CommitRef::new(key.0, key.1), commit_info)))
376 }
377
378 fn read_last_finalized_commit(&self) -> ConsensusResult<Option<CommitRef>> {
379 let Some(result) = self
380 .finalized_commits
381 .reversed_safe_iter_with_bounds(None, None)?
382 .next()
383 else {
384 return Ok(None);
385 };
386 let ((index, digest), _) = result.map_err(ConsensusError::RocksDBFailure)?;
387 Ok(Some(CommitRef::new(index, digest)))
388 }
389
390 fn read_rejected_transactions(
391 &self,
392 commit_ref: CommitRef,
393 ) -> ConsensusResult<Option<BTreeMap<BlockRef, Vec<TransactionIndex>>>> {
394 let result = self
395 .finalized_commits
396 .get(&(commit_ref.index, commit_ref.digest))?;
397 Ok(result)
398 }
399}