1use std::sync::Arc;
5
6use consensus_config::{AuthorityIndex, Stake};
7use parking_lot::RwLock;
8use tracing::{debug, info};
9
10use crate::{
11 context::Context, dag_state::DagState, leader_scoring::ReputationScores,
12 round_tracker::QuorumRound,
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub(crate) enum AncestorState {
17 Include,
18 Exclude(u64),
20}
21
22#[derive(Clone)]
23struct AncestorInfo {
24 state: AncestorState,
25 lock_until_round: u32,
28}
29
30impl AncestorInfo {
31 fn new() -> Self {
32 Self {
33 state: AncestorState::Include,
34 lock_until_round: 0,
35 }
36 }
37
38 fn is_locked(&self, current_clock_round: u32) -> bool {
39 self.lock_until_round >= current_clock_round
40 }
41
42 fn set_lock(&mut self, lock_until_round: u32) {
43 self.lock_until_round = lock_until_round;
44 }
45}
46
47#[derive(Debug)]
48struct StateTransition {
49 authority_id: AuthorityIndex,
50 score: u64,
52 stake: u64,
54 high_quorum_round: u32,
57}
58
59pub(crate) struct AncestorStateManager {
60 context: Arc<Context>,
61 dag_state: Arc<RwLock<DagState>>,
62 state_map: Vec<AncestorInfo>,
63 excluded_nodes_stake_threshold: u64,
64 total_excluded_stake: Stake,
67 pub(crate) propagation_scores: ReputationScores,
70}
71
72impl AncestorStateManager {
73 #[cfg(not(test))]
76 const STATE_LOCK_CLOCK_ROUNDS: u32 = 450;
77 #[cfg(test)]
78 const STATE_LOCK_CLOCK_ROUNDS: u32 = 5;
79
80 const SCORE_EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 20;
82
83 pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
84 let state_map = vec![AncestorInfo::new(); context.committee.size()];
85
86 let excluded_nodes_stake_threshold_percentage = 2 * context
89 .protocol_config
90 .consensus_bad_nodes_stake_threshold()
91 / 3;
92
93 let excluded_nodes_stake_threshold = (excluded_nodes_stake_threshold_percentage
94 * context.committee.total_stake())
95 / 100 as Stake;
96
97 Self {
98 context,
99 dag_state,
100 state_map,
101 excluded_nodes_stake_threshold,
102 total_excluded_stake: 0,
104 propagation_scores: ReputationScores::default(),
105 }
106 }
107
108 pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
109 self.propagation_scores = scores;
110 }
111
112 pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
113 self.state_map.iter().map(|info| info.state).collect()
114 }
115
116 pub(crate) fn update_all_ancestors_state(&mut self, accepted_quorum_rounds: &[QuorumRound]) {
118 let network_high_quorum_round =
122 self.calculate_network_high_quorum_round(accepted_quorum_rounds);
123
124 let current_clock_round = self.dag_state.read().threshold_clock_round();
125 let low_score_threshold = (self.propagation_scores.highest_score()
126 * Self::SCORE_EXCLUSION_THRESHOLD_PERCENTAGE)
127 / 100;
128
129 debug!(
130 "Updating all ancestor state at round {current_clock_round} using network high quorum round of {network_high_quorum_round}, low score threshold of {low_score_threshold}, and exclude stake threshold of {}",
131 self.excluded_nodes_stake_threshold
132 );
133
134 let mut exclude_to_include = Vec::new();
138 let mut include_to_exclude = Vec::new();
139
140 for (idx, score) in self
144 .propagation_scores
145 .scores_per_authority
146 .iter()
147 .enumerate()
148 {
149 let authority_id = self
150 .context
151 .committee
152 .to_authority_index(idx)
153 .expect("Index should be valid");
154 let ancestor_info = &self.state_map[idx];
155 let (_low, authority_high_quorum_round) = accepted_quorum_rounds[idx];
156 let stake = self.context.committee.authority(authority_id).stake;
157
158 if ancestor_info.is_locked(current_clock_round) {
160 continue;
161 }
162
163 match ancestor_info.state {
164 AncestorState::Include => {
165 if *score <= low_score_threshold {
166 include_to_exclude.push(StateTransition {
167 authority_id,
168 score: *score,
169 stake,
170 high_quorum_round: authority_high_quorum_round,
171 });
172 }
173 }
174 AncestorState::Exclude(_) => {
175 if *score > low_score_threshold
176 || authority_high_quorum_round >= network_high_quorum_round
177 {
178 exclude_to_include.push(StateTransition {
179 authority_id,
180 score: *score,
181 stake,
182 high_quorum_round: authority_high_quorum_round,
183 });
184 }
185 }
186 }
187 }
188
189 for transition in exclude_to_include {
192 self.apply_state_change(transition, AncestorState::Include, current_clock_round);
193 }
194
195 include_to_exclude.sort_by_key(|t| t.score);
198
199 for transition in include_to_exclude {
203 if self.total_excluded_stake + transition.stake <= self.excluded_nodes_stake_threshold {
207 let new_state = AncestorState::Exclude(transition.score);
208 self.apply_state_change(transition, new_state, current_clock_round);
209 } else {
210 info!(
211 "Authority {} would have moved to {:?} state with score {} & quorum_round {} but we would have exceeded total excluded stake threshold. current_excluded_stake {} + authority_stake {} > exclude_stake_threshold {}",
212 transition.authority_id,
213 AncestorState::Exclude(transition.score),
214 transition.score,
215 transition.high_quorum_round,
216 self.total_excluded_stake,
217 transition.stake,
218 self.excluded_nodes_stake_threshold
219 );
220 }
221 }
222 }
223
224 fn apply_state_change(
225 &mut self,
226 transition: StateTransition,
227 new_state: AncestorState,
228 current_clock_round: u32,
229 ) {
230 let block_hostname = &self
231 .context
232 .committee
233 .authority(transition.authority_id)
234 .hostname;
235 let ancestor_info = &mut self.state_map[transition.authority_id.value()];
236
237 match (ancestor_info.state, new_state) {
238 (AncestorState::Exclude(_), AncestorState::Include) => {
239 self.total_excluded_stake = self.total_excluded_stake
240 .checked_sub(transition.stake)
241 .expect("total_excluded_stake underflow - trying to subtract more stake than we're tracking as excluded");
242 }
243 (AncestorState::Include, AncestorState::Exclude(_)) => {
244 self.total_excluded_stake += transition.stake;
245 }
246 _ => {
247 panic!("Calls to this function should only be made for state transition.")
248 }
249 }
250
251 ancestor_info.state = new_state;
252 let lock_until_round = current_clock_round + Self::STATE_LOCK_CLOCK_ROUNDS;
253 ancestor_info.set_lock(lock_until_round);
254
255 info!(
256 "Authority {} moved to {new_state:?} state with score {} & quorum_round {} and locked until round {lock_until_round}. Total excluded stake: {}",
257 transition.authority_id,
258 transition.score,
259 transition.high_quorum_round,
260 self.total_excluded_stake
261 );
262
263 self.context
264 .metrics
265 .node_metrics
266 .ancestor_state_change_by_authority
267 .with_label_values(&[
268 block_hostname,
269 match new_state {
270 AncestorState::Include => "include",
271 AncestorState::Exclude(_) => "exclude",
272 },
273 ])
274 .inc();
275 }
276
277 fn calculate_network_high_quorum_round(&self, accepted_quorum_rounds: &[QuorumRound]) -> u32 {
285 let committee = &self.context.committee;
286
287 let mut high_quorum_rounds_with_stake = accepted_quorum_rounds
288 .iter()
289 .zip(committee.authorities())
290 .map(|((_low, high), (_, authority))| (*high, authority.stake))
291 .collect::<Vec<_>>();
292 high_quorum_rounds_with_stake.sort();
293
294 let mut total_stake = 0;
295 let mut network_high_quorum_round = 0;
296
297 for (round, stake) in high_quorum_rounds_with_stake.iter() {
298 total_stake += stake;
299 if total_stake >= self.context.committee.quorum_threshold() {
300 network_high_quorum_round = *round;
301 break;
302 }
303 }
304
305 network_high_quorum_round
306 }
307}
308
309#[cfg(test)]
310mod test {
311 use super::*;
312 use crate::{
313 leader_scoring::ReputationScores, storage::mem_store::MemStore,
314 test_dag_builder::DagBuilder,
315 };
316
317 #[tokio::test]
318 async fn test_calculate_network_high_accepted_quorum_round() {
319 telemetry_subscribers::init_for_testing();
320
321 let (context, _key_pairs) = Context::new_for_test(4);
322 let context = Arc::new(context);
323 let store = Arc::new(MemStore::new());
324 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
325
326 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
327 let mut ancestor_state_manager =
328 AncestorStateManager::new(context.clone(), dag_state.clone());
329 ancestor_state_manager.set_propagation_scores(scores);
330
331 let network_high_quorum_round =
334 ancestor_state_manager.calculate_network_high_quorum_round(&[]);
335 assert_eq!(network_high_quorum_round, 0);
336
337 let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
338
339 let network_high_quorum_round =
340 ancestor_state_manager.calculate_network_high_quorum_round(&accepted_quorum_rounds);
341 assert_eq!(network_high_quorum_round, 229);
342 }
343
344 #[tokio::test]
351 async fn test_update_all_ancestor_state_using_accepted_rounds() {
352 telemetry_subscribers::init_for_testing();
353 let (mut context, _key_pairs) = Context::new_for_test(5);
354 context
355 .protocol_config
356 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
357 let context = Arc::new(context);
358 let store = Arc::new(MemStore::new());
359 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
360 let mut dag_builder = DagBuilder::new(context.clone());
361
362 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3, 4]);
363 let mut ancestor_state_manager = AncestorStateManager::new(context, dag_state.clone());
364 ancestor_state_manager.set_propagation_scores(scores);
365
366 let accepted_quorum_rounds =
367 vec![(225, 229), (225, 229), (229, 300), (229, 300), (229, 300)];
368 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
369
370 let state_map = ancestor_state_manager.get_ancestor_states();
373 for state in state_map.iter() {
374 assert_eq!(*state, AncestorState::Include);
375 }
376
377 let scores = ReputationScores::new((1..=300).into(), vec![10, 9, 100, 100, 100]);
378 ancestor_state_manager.set_propagation_scores(scores);
379 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
380
381 let state_map = ancestor_state_manager.get_ancestor_states();
387 for (authority, state) in state_map.iter().enumerate() {
388 if authority == 1 {
389 assert_eq!(*state, AncestorState::Exclude(9));
390 } else {
391 assert_eq!(*state, AncestorState::Include);
392 }
393 }
394
395 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
396
397 let state_map = ancestor_state_manager.get_ancestor_states();
400 for (authority, state) in state_map.iter().enumerate() {
401 if authority == 1 {
402 assert_eq!(*state, AncestorState::Exclude(9));
403 } else {
404 assert_eq!(*state, AncestorState::Include);
405 }
406 }
407
408 dag_builder.layers(1..=6).build();
411 let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
412 dag_state.write().accept_blocks(blocks);
413
414 let accepted_quorum_rounds =
415 vec![(225, 229), (229, 300), (229, 300), (229, 300), (229, 300)];
416 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
417
418 let state_map = ancestor_state_manager.get_ancestor_states();
422 for (authority, state) in state_map.iter().enumerate() {
423 if authority == 0 {
424 assert_eq!(*state, AncestorState::Exclude(10));
425 } else {
426 assert_eq!(*state, AncestorState::Include);
427 }
428 }
429
430 let accepted_quorum_rounds =
431 vec![(229, 300), (229, 300), (229, 300), (229, 300), (229, 300)];
432 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
433
434 let state_map = ancestor_state_manager.get_ancestor_states();
438
439 for (authority, state) in state_map.iter().enumerate() {
440 if authority == 0 {
441 assert_eq!(*state, AncestorState::Exclude(10));
442 } else {
443 assert_eq!(*state, AncestorState::Include);
444 }
445 }
446
447 dag_builder.layers(7..=12).build();
449 let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
450 dag_state.write().accept_blocks(blocks);
451
452 let scores = ReputationScores::new((1..=300).into(), vec![10, 100, 100, 100, 100]);
453 ancestor_state_manager.set_propagation_scores(scores);
454 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
455
456 let state_map = ancestor_state_manager.get_ancestor_states();
459 for state in state_map.iter() {
460 assert_eq!(*state, AncestorState::Include);
461 }
462 }
463}