1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#![allow(clippy::mutable_key_type)]
use crate::{CertificateDigest, Round};
use crypto::PublicKey;
use std::{collections::HashMap, ops::RangeInclusive};
use store::{
rocks::{DBMap, TypedStoreError},
traits::Map,
};
use tokio::sync::mpsc;
pub type SequenceNumber = u64;
pub type ShutdownToken = mpsc::Sender<()>;
pub type StoreResult<T> = Result<T, TypedStoreError>;
pub struct ConsensusStore {
last_committed: DBMap<PublicKey, Round>,
sequence: DBMap<SequenceNumber, CertificateDigest>,
}
impl ConsensusStore {
pub fn new(
last_committed: DBMap<PublicKey, Round>,
sequence: DBMap<SequenceNumber, CertificateDigest>,
) -> Self {
Self {
last_committed,
sequence,
}
}
pub fn clear(&self) -> StoreResult<()> {
self.last_committed.clear()?;
self.sequence.clear()?;
Ok(())
}
pub fn write_consensus_state(
&self,
last_committed: &HashMap<PublicKey, Round>,
consensus_index: &SequenceNumber,
certificate_id: &CertificateDigest,
) -> Result<(), TypedStoreError> {
let mut write_batch = self.last_committed.batch();
write_batch = write_batch.insert_batch(&self.last_committed, last_committed.iter())?;
write_batch = write_batch.insert_batch(
&self.sequence,
std::iter::once((consensus_index, certificate_id)),
)?;
write_batch.write()
}
pub fn read_last_committed(&self) -> HashMap<PublicKey, Round> {
self.last_committed.iter().collect()
}
pub fn read_sequenced_certificates(
&self,
missing: &RangeInclusive<SequenceNumber>,
) -> StoreResult<Vec<Option<CertificateDigest>>> {
Ok(self
.sequence
.iter()
.skip_to(missing.start())?
.take_while(|(index, _)| index <= missing.end())
.map(|(_, digest)| Some(digest))
.collect())
}
pub fn read_last_consensus_index(&self) -> StoreResult<SequenceNumber> {
Ok(self
.sequence
.keys()
.skip_prior_to(&SequenceNumber::MAX)?
.next()
.unwrap_or_default())
}
}