1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use crate::consensus::{ConsensusState, Dag};
use config::Committee;
use std::collections::HashSet;
use tracing::debug;
use types::{Certificate, CertificateDigest, Round};
pub fn order_leaders<'a, LeaderElector>(
committee: &Committee,
leader: &Certificate,
state: &'a ConsensusState,
get_leader: LeaderElector,
) -> Vec<Certificate>
where
LeaderElector: Fn(&Committee, Round, &'a Dag) -> Option<&'a (CertificateDigest, Certificate)>,
{
let mut to_commit = vec![leader.clone()];
let mut leader = leader;
for r in (state.last_committed_round + 2..leader.round())
.rev()
.step_by(2)
{
let (_, prev_leader) = match get_leader(committee, r, &state.dag) {
Some(x) => x,
None => continue,
};
if linked(leader, prev_leader, &state.dag) {
to_commit.push(prev_leader.clone());
leader = prev_leader;
}
}
to_commit
}
fn linked(leader: &Certificate, prev_leader: &Certificate, dag: &Dag) -> bool {
let mut parents = vec![leader];
for r in (prev_leader.round()..leader.round()).rev() {
parents = dag
.get(&(r))
.expect("We should have the whole history by now")
.values()
.filter(|(digest, _)| parents.iter().any(|x| x.header.parents.contains(digest)))
.map(|(_, certificate)| certificate)
.collect();
}
parents.contains(&prev_leader)
}
pub fn order_dag(
gc_depth: Round,
leader: &Certificate,
state: &ConsensusState,
) -> Vec<Certificate> {
debug!("Processing sub-dag of {:?}", leader);
let mut ordered = Vec::new();
let mut already_ordered = HashSet::new();
let mut buffer = vec![leader];
while let Some(x) = buffer.pop() {
debug!("Sequencing {:?}", x);
ordered.push(x.clone());
for parent in &x.header.parents {
let (digest, certificate) = match state
.dag
.get(&(x.round() - 1))
.and_then(|x| x.values().find(|(x, _)| x == parent))
{
Some(x) => x,
None => continue, };
let mut skip = already_ordered.contains(&digest);
skip |= state
.last_committed
.get(&certificate.origin())
.map_or_else(|| false, |r| r == &certificate.round());
if !skip {
buffer.push(certificate);
already_ordered.insert(digest);
}
}
}
ordered.retain(|x| x.round() + gc_depth >= state.last_committed_round);
ordered.sort_by_key(|x| x.round());
ordered
}