1use std::sync::Arc;
5
6use consensus_config::{AuthorityIndex, Stake};
7use parking_lot::RwLock;
8use tracing::{debug, info};
9
10use crate::{
11 context::Context, dag_state::DagState, leader_scoring::ReputationScores,
12 round_tracker::QuorumRound,
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub(crate) enum AncestorState {
17 Include,
18 Exclude(u64),
20}
21
22#[derive(Clone)]
23struct AncestorInfo {
24 state: AncestorState,
25 lock_until_round: u32,
28}
29
30impl AncestorInfo {
31 fn new() -> Self {
32 Self {
33 state: AncestorState::Include,
34 lock_until_round: 0,
35 }
36 }
37
38 fn is_locked(&self, current_clock_round: u32) -> bool {
39 self.lock_until_round >= current_clock_round
40 }
41
42 fn set_lock(&mut self, lock_until_round: u32) {
43 self.lock_until_round = lock_until_round;
44 }
45}
46
47#[derive(Debug)]
48struct StateTransition {
49 authority_id: AuthorityIndex,
50 score: u64,
52 stake: u64,
54 high_quorum_round: u32,
57}
58
59pub(crate) struct AncestorStateManager {
60 context: Arc<Context>,
61 dag_state: Arc<RwLock<DagState>>,
62 state_map: Vec<AncestorInfo>,
63 excluded_nodes_stake_threshold: u64,
64 total_excluded_stake: Stake,
67 pub(crate) propagation_scores: ReputationScores,
70}
71
72impl AncestorStateManager {
73 #[cfg(not(test))]
76 const STATE_LOCK_CLOCK_ROUNDS: u32 = 450;
77 #[cfg(test)]
78 const STATE_LOCK_CLOCK_ROUNDS: u32 = 5;
79
80 const SCORE_EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 20;
82
83 pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
84 let state_map = vec![AncestorInfo::new(); context.committee.size()];
85
86 let excluded_nodes_stake_threshold_percentage =
89 2 * context.protocol_config.bad_nodes_stake_threshold() / 3;
90
91 let excluded_nodes_stake_threshold = (excluded_nodes_stake_threshold_percentage
92 * context.committee.total_stake())
93 / 100 as Stake;
94
95 Self {
96 context,
97 dag_state,
98 state_map,
99 excluded_nodes_stake_threshold,
100 total_excluded_stake: 0,
102 propagation_scores: ReputationScores::default(),
103 }
104 }
105
106 pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
107 self.propagation_scores = scores;
108 }
109
110 pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
111 self.state_map.iter().map(|info| info.state).collect()
112 }
113
114 pub(crate) fn update_all_ancestors_state(&mut self, accepted_quorum_rounds: &[QuorumRound]) {
116 let network_high_quorum_round =
120 self.calculate_network_high_quorum_round(accepted_quorum_rounds);
121
122 let current_clock_round = self.dag_state.read().threshold_clock_round();
123 let low_score_threshold = (self.propagation_scores.highest_score()
124 * Self::SCORE_EXCLUSION_THRESHOLD_PERCENTAGE)
125 / 100;
126
127 debug!(
128 "Updating all ancestor state at round {current_clock_round} using network high quorum round of {network_high_quorum_round}, low score threshold of {low_score_threshold}, and exclude stake threshold of {}",
129 self.excluded_nodes_stake_threshold
130 );
131
132 let mut exclude_to_include = Vec::new();
136 let mut include_to_exclude = Vec::new();
137
138 for (idx, score) in self
142 .propagation_scores
143 .scores_per_authority
144 .iter()
145 .enumerate()
146 {
147 let authority_id = self
148 .context
149 .committee
150 .to_authority_index(idx)
151 .expect("Index should be valid");
152 let ancestor_info = &self.state_map[idx];
153 let (_low, authority_high_quorum_round) = accepted_quorum_rounds[idx];
154 let stake = self.context.committee.authority(authority_id).stake;
155
156 if ancestor_info.is_locked(current_clock_round) {
158 continue;
159 }
160
161 match ancestor_info.state {
162 AncestorState::Include => {
163 if *score <= low_score_threshold {
164 include_to_exclude.push(StateTransition {
165 authority_id,
166 score: *score,
167 stake,
168 high_quorum_round: authority_high_quorum_round,
169 });
170 }
171 }
172 AncestorState::Exclude(_) => {
173 if *score > low_score_threshold
174 || authority_high_quorum_round >= network_high_quorum_round
175 {
176 exclude_to_include.push(StateTransition {
177 authority_id,
178 score: *score,
179 stake,
180 high_quorum_round: authority_high_quorum_round,
181 });
182 }
183 }
184 }
185 }
186
187 for transition in exclude_to_include {
190 self.apply_state_change(transition, AncestorState::Include, current_clock_round);
191 }
192
193 include_to_exclude.sort_by_key(|t| t.score);
196
197 for transition in include_to_exclude {
201 if self.total_excluded_stake + transition.stake <= self.excluded_nodes_stake_threshold {
205 let new_state = AncestorState::Exclude(transition.score);
206 self.apply_state_change(transition, new_state, current_clock_round);
207 } else {
208 info!(
209 "Authority {} would have moved to {:?} state with score {} & quorum_round {} but we would have exceeded total excluded stake threshold. current_excluded_stake {} + authority_stake {} > exclude_stake_threshold {}",
210 transition.authority_id,
211 AncestorState::Exclude(transition.score),
212 transition.score,
213 transition.high_quorum_round,
214 self.total_excluded_stake,
215 transition.stake,
216 self.excluded_nodes_stake_threshold
217 );
218 }
219 }
220 }
221
222 fn apply_state_change(
223 &mut self,
224 transition: StateTransition,
225 new_state: AncestorState,
226 current_clock_round: u32,
227 ) {
228 let block_hostname = &self
229 .context
230 .committee
231 .authority(transition.authority_id)
232 .hostname;
233 let ancestor_info = &mut self.state_map[transition.authority_id.value()];
234
235 match (ancestor_info.state, new_state) {
236 (AncestorState::Exclude(_), AncestorState::Include) => {
237 self.total_excluded_stake = self.total_excluded_stake
238 .checked_sub(transition.stake)
239 .expect("total_excluded_stake underflow - trying to subtract more stake than we're tracking as excluded");
240 }
241 (AncestorState::Include, AncestorState::Exclude(_)) => {
242 self.total_excluded_stake += transition.stake;
243 }
244 _ => {
245 panic!("Calls to this function should only be made for state transition.")
246 }
247 }
248
249 ancestor_info.state = new_state;
250 let lock_until_round = current_clock_round + Self::STATE_LOCK_CLOCK_ROUNDS;
251 ancestor_info.set_lock(lock_until_round);
252
253 info!(
254 "Authority {} moved to {new_state:?} state with score {} & quorum_round {} and locked until round {lock_until_round}. Total excluded stake: {}",
255 transition.authority_id,
256 transition.score,
257 transition.high_quorum_round,
258 self.total_excluded_stake
259 );
260
261 self.context
262 .metrics
263 .node_metrics
264 .ancestor_state_change_by_authority
265 .with_label_values(&[
266 block_hostname.as_str(),
267 match new_state {
268 AncestorState::Include => "include",
269 AncestorState::Exclude(_) => "exclude",
270 },
271 ])
272 .inc();
273 }
274
275 fn calculate_network_high_quorum_round(&self, accepted_quorum_rounds: &[QuorumRound]) -> u32 {
283 let committee = &self.context.committee;
284
285 let mut high_quorum_rounds_with_stake = accepted_quorum_rounds
286 .iter()
287 .zip(committee.authorities())
288 .map(|((_low, high), (_, authority))| (*high, authority.stake))
289 .collect::<Vec<_>>();
290 high_quorum_rounds_with_stake.sort();
291
292 let mut total_stake = 0;
293 let mut network_high_quorum_round = 0;
294
295 for (round, stake) in high_quorum_rounds_with_stake.iter() {
296 total_stake += stake;
297 if total_stake >= self.context.committee.quorum_threshold() {
298 network_high_quorum_round = *round;
299 break;
300 }
301 }
302
303 network_high_quorum_round
304 }
305}
306
307#[cfg(test)]
308mod test {
309 use super::*;
310 use crate::{
311 leader_scoring::ReputationScores, storage::mem_store::MemStore,
312 test_dag_builder::DagBuilder,
313 };
314
315 #[tokio::test]
316 async fn test_calculate_network_high_accepted_quorum_round() {
317 telemetry_subscribers::init_for_testing();
318
319 let (context, _key_pairs) = Context::new_for_test(4);
320 let context = Arc::new(context);
321 let store = Arc::new(MemStore::new());
322 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
323
324 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
325 let mut ancestor_state_manager =
326 AncestorStateManager::new(context.clone(), dag_state.clone());
327 ancestor_state_manager.set_propagation_scores(scores);
328
329 let network_high_quorum_round =
332 ancestor_state_manager.calculate_network_high_quorum_round(&[]);
333 assert_eq!(network_high_quorum_round, 0);
334
335 let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
336
337 let network_high_quorum_round =
338 ancestor_state_manager.calculate_network_high_quorum_round(&accepted_quorum_rounds);
339 assert_eq!(network_high_quorum_round, 229);
340 }
341
342 #[tokio::test]
349 async fn test_update_all_ancestor_state_using_accepted_rounds() {
350 telemetry_subscribers::init_for_testing();
351 let (mut context, _key_pairs) = Context::new_for_test(5);
352 context
353 .protocol_config
354 .set_bad_nodes_stake_threshold_for_testing(33);
355 let context = Arc::new(context);
356 let store = Arc::new(MemStore::new());
357 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
358 let mut dag_builder = DagBuilder::new(context.clone());
359
360 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3, 4]);
361 let mut ancestor_state_manager = AncestorStateManager::new(context, dag_state.clone());
362 ancestor_state_manager.set_propagation_scores(scores);
363
364 let accepted_quorum_rounds =
365 vec![(225, 229), (225, 229), (229, 300), (229, 300), (229, 300)];
366 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
367
368 let state_map = ancestor_state_manager.get_ancestor_states();
371 for state in state_map.iter() {
372 assert_eq!(*state, AncestorState::Include);
373 }
374
375 let scores = ReputationScores::new((1..=300).into(), vec![10, 9, 100, 100, 100]);
376 ancestor_state_manager.set_propagation_scores(scores);
377 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
378
379 let state_map = ancestor_state_manager.get_ancestor_states();
385 for (authority, state) in state_map.iter().enumerate() {
386 if authority == 1 {
387 assert_eq!(*state, AncestorState::Exclude(9));
388 } else {
389 assert_eq!(*state, AncestorState::Include);
390 }
391 }
392
393 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
394
395 let state_map = ancestor_state_manager.get_ancestor_states();
398 for (authority, state) in state_map.iter().enumerate() {
399 if authority == 1 {
400 assert_eq!(*state, AncestorState::Exclude(9));
401 } else {
402 assert_eq!(*state, AncestorState::Include);
403 }
404 }
405
406 dag_builder.layers(1..=6).build();
409 let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
410 dag_state.write().accept_blocks(blocks);
411
412 let accepted_quorum_rounds =
413 vec![(225, 229), (229, 300), (229, 300), (229, 300), (229, 300)];
414 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
415
416 let state_map = ancestor_state_manager.get_ancestor_states();
420 for (authority, state) in state_map.iter().enumerate() {
421 if authority == 0 {
422 assert_eq!(*state, AncestorState::Exclude(10));
423 } else {
424 assert_eq!(*state, AncestorState::Include);
425 }
426 }
427
428 let accepted_quorum_rounds =
429 vec![(229, 300), (229, 300), (229, 300), (229, 300), (229, 300)];
430 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
431
432 let state_map = ancestor_state_manager.get_ancestor_states();
436
437 for (authority, state) in state_map.iter().enumerate() {
438 if authority == 0 {
439 assert_eq!(*state, AncestorState::Exclude(10));
440 } else {
441 assert_eq!(*state, AncestorState::Include);
442 }
443 }
444
445 dag_builder.layers(7..=12).build();
447 let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
448 dag_state.write().accept_blocks(blocks);
449
450 let scores = ReputationScores::new((1..=300).into(), vec![10, 100, 100, 100, 100]);
451 ancestor_state_manager.set_propagation_scores(scores);
452 ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
453
454 let state_map = ancestor_state_manager.get_ancestor_states();
457 for state in state_map.iter() {
458 assert_eq!(*state, AncestorState::Include);
459 }
460 }
461}