1use std::{collections::BTreeMap, sync::Arc};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_common::ZipDebugEqIteratorExt;
9use parking_lot::RwLock;
10use tracing::info;
11
12use crate::{
13 BlockAPI as _, VerifiedBlock,
14 block::{BlockTransactionVotes, GENESIS_ROUND},
15 block_verifier::BlockVerifier,
16 context::Context,
17 dag_state::DagState,
18 stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21#[derive(Clone)]
31pub struct TransactionVoteTracker {
32 vote_tracker_state: Arc<RwLock<VoteTrackerState>>,
34 block_verifier: Arc<dyn BlockVerifier>,
36 dag_state: Arc<RwLock<DagState>>,
38}
39
40impl TransactionVoteTracker {
41 pub fn new(
42 context: Arc<Context>,
43 block_verifier: Arc<dyn BlockVerifier>,
44 dag_state: Arc<RwLock<DagState>>,
45 ) -> Self {
46 Self {
47 vote_tracker_state: Arc::new(RwLock::new(VoteTrackerState::new(context))),
48 block_verifier,
49 dag_state,
50 }
51 }
52
53 pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
58 let context = self.vote_tracker_state.read().context.clone();
59 if !context.protocol_config.transaction_voting_enabled() {
60 info!("Skipping vote tracker recovery in non-mysticeti fast path mode");
61 return;
62 }
63
64 let store = self.dag_state.read().store().clone();
65
66 let recovery_start_round = after_round + 1;
67 info!(
68 "Recovering vote tracker state from round {}",
69 recovery_start_round,
70 );
71
72 let authorities = context
73 .committee
74 .authorities()
75 .map(|(index, _)| index)
76 .collect::<Vec<_>>();
77 for authority_index in authorities {
78 let blocks = store
79 .scan_blocks_by_author(authority_index, recovery_start_round)
80 .unwrap();
81 info!(
82 "Recovered and voting on {} blocks from authority {} {}",
83 blocks.len(),
84 authority_index,
85 context.committee.authority(authority_index).hostname
86 );
87 self.recover_and_vote_on_blocks(blocks);
88 }
89 }
90
91 pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
99 let context = self.vote_tracker_state.read().context.clone();
100 let should_vote_blocks = {
101 let dag_state = self.dag_state.read();
102 let gc_round = dag_state.gc_round();
103 blocks
104 .iter()
105 .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
107 .collect::<Vec<_>>()
108 };
109 let voted_blocks = blocks
110 .into_iter()
111 .zip_debug_eq(should_vote_blocks)
112 .map(|(b, should_vote)| {
113 if !should_vote {
114 (b, vec![])
117 } else {
118 let reject_transaction_votes =
121 self.block_verifier.vote(&b).unwrap_or_else(|e| {
122 panic!(
123 "Failed to vote on block {} (own_index={}) during recovery: {}",
124 b.reference(),
125 context.own_index,
126 e
127 )
128 });
129 (b, reject_transaction_votes)
130 }
131 })
132 .collect::<Vec<_>>();
133 self.vote_tracker_state
134 .write()
135 .add_voted_blocks(voted_blocks);
136 }
137
138 pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
140 self.vote_tracker_state
141 .write()
142 .add_voted_blocks(voted_blocks);
143 }
144
145 pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
147 let mut votes = vec![];
148 let vote_tracker_state = self.vote_tracker_state.read();
149 for block_ref in block_refs {
150 if block_ref.round <= vote_tracker_state.gc_round {
151 continue;
152 }
153 let vote_info = vote_tracker_state.votes.get(&block_ref).unwrap_or_else(|| {
154 panic!(
155 "Ancestor block {} not found in vote tracker state",
156 block_ref
157 )
158 });
159 if !vote_info.own_reject_txn_votes.is_empty() {
160 votes.push(BlockTransactionVotes {
161 block_ref,
162 rejects: vote_info.own_reject_txn_votes.clone(),
163 });
164 }
165 }
166 votes
167 }
168
169 pub(crate) fn get_reject_votes(
173 &self,
174 block_ref: &BlockRef,
175 ) -> Option<Vec<(TransactionIndex, Stake)>> {
176 let accumulated_reject_votes = self
177 .vote_tracker_state
178 .read()
179 .votes
180 .get(block_ref)?
181 .reject_txn_votes
182 .iter()
183 .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
184 .collect::<Vec<_>>();
185 Some(accumulated_reject_votes)
186 }
187
188 pub(crate) fn run_gc(&self, gc_round: Round) {
196 let dag_state_gc_round = self.dag_state.read().gc_round();
197 assert!(
198 gc_round <= dag_state_gc_round,
199 "TransactionVoteTracker cannot GC higher than DagState GC round ({} > {})",
200 gc_round,
201 dag_state_gc_round
202 );
203 self.vote_tracker_state.write().update_gc_round(gc_round);
204 }
205}
206
207struct VoteTrackerState {
211 context: Arc<Context>,
212
213 votes: BTreeMap<BlockRef, VoteInfo>,
216
217 gc_round: Round,
219}
220
221impl VoteTrackerState {
222 fn new(context: Arc<Context>) -> Self {
223 Self {
224 context,
225 votes: BTreeMap::new(),
226 gc_round: GENESIS_ROUND,
227 }
228 }
229
230 fn add_voted_blocks(&mut self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
231 for (voted_block, reject_txn_votes) in voted_blocks {
232 self.add_voted_block(voted_block, reject_txn_votes);
233 }
234 }
235
236 fn add_voted_block(
237 &mut self,
238 voted_block: VerifiedBlock,
239 reject_txn_votes: Vec<TransactionIndex>,
240 ) {
241 if voted_block.round() <= self.gc_round {
242 return;
244 }
245
246 let peer_hostname = &self
248 .context
249 .committee
250 .authority(voted_block.author())
251 .hostname;
252 self.context
253 .metrics
254 .node_metrics
255 .certifier_own_reject_votes
256 .with_label_values(&[peer_hostname])
257 .inc_by(reject_txn_votes.len() as u64);
258
259 let vote_info = self.votes.entry(voted_block.reference()).or_default();
261 if vote_info.block.is_some() {
262 return;
264 }
265 vote_info.block = Some(voted_block.clone());
266 vote_info.own_reject_txn_votes = reject_txn_votes;
267
268 for block_votes in voted_block.transaction_votes() {
270 if block_votes.block_ref.round <= self.gc_round {
271 continue;
273 }
274 let vote_info = self.votes.entry(block_votes.block_ref).or_default();
275 for reject in &block_votes.rejects {
276 vote_info
277 .reject_txn_votes
278 .entry(*reject)
279 .or_default()
280 .add_unique(voted_block.author(), &self.context.committee);
281 }
282 }
283 }
284
285 fn update_gc_round(&mut self, gc_round: Round) {
287 self.gc_round = gc_round;
288 while let Some((block_ref, _)) = self.votes.first_key_value() {
289 if block_ref.round <= self.gc_round {
290 self.votes.pop_first();
291 } else {
292 break;
293 }
294 }
295
296 self.context
297 .metrics
298 .node_metrics
299 .certifier_gc_round
300 .set(self.gc_round as i64);
301 }
302}
303
304#[derive(Default)]
307struct VoteInfo {
308 block: Option<VerifiedBlock>,
311 own_reject_txn_votes: Vec<TransactionIndex>,
315 reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
317}
318
319#[cfg(test)]
320mod test {
321 use std::sync::Arc;
322
323 use consensus_config::{AuthorityIndex, Parameters};
324
325 use crate::{
326 TestBlock, Transaction, VerifiedBlock, block::BlockTransactionVotes, context::Context,
327 metrics::test_metrics,
328 };
329
330 use super::*;
331
332 #[tokio::test]
334 async fn test_reject_vote_tracking() {
335 telemetry_subscribers::init_for_testing();
336 let (committee, _keypairs) =
337 consensus_config::local_committee_and_keys(0, vec![1, 2, 3, 4]);
338 let temp_dir = tempfile::TempDir::new().unwrap();
339 let context = Arc::new(Context::new(
340 0,
341 Some(AuthorityIndex::new_for_test(0)),
342 committee,
343 Parameters {
344 db_path: temp_dir.path().to_path_buf(),
345 ..Default::default()
346 },
347 consensus_config::ConsensusProtocolConfig::for_testing(),
348 test_metrics(),
349 Arc::new(crate::Clock::default()),
350 ));
351
352 let transactions = vec![Transaction::new(vec![0u8; 16]); 4];
353
354 let round_1_blocks: Vec<VerifiedBlock> = (0..4)
356 .map(|author| {
357 VerifiedBlock::new_for_test(
358 TestBlock::new(1, author)
359 .set_transactions(transactions.clone())
360 .build(),
361 )
362 })
363 .collect();
364
365 let mut state = VoteTrackerState::new(context.clone());
370 state.add_voted_blocks(vec![
371 (round_1_blocks[0].clone(), vec![0]),
372 (round_1_blocks[1].clone(), vec![1, 2]),
373 (round_1_blocks[2].clone(), vec![]),
374 (round_1_blocks[3].clone(), vec![]),
375 ]);
376
377 let vote_info_0 = state.votes.get(&round_1_blocks[0].reference()).unwrap();
379 assert_eq!(vote_info_0.own_reject_txn_votes, vec![0]);
380 let vote_info_1 = state.votes.get(&round_1_blocks[1].reference()).unwrap();
381 assert_eq!(vote_info_1.own_reject_txn_votes, vec![1, 2]);
382 let vote_info_2 = state.votes.get(&round_1_blocks[2].reference()).unwrap();
383 assert!(vote_info_2.own_reject_txn_votes.is_empty());
384
385 assert!(vote_info_0.reject_txn_votes.is_empty());
387 assert!(vote_info_1.reject_txn_votes.is_empty());
388
389 let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
391
392 let block_r2_a0 = VerifiedBlock::new_for_test(
394 TestBlock::new(2, 0)
395 .set_ancestors_raw(ancestors.clone())
396 .set_transactions(transactions.clone())
397 .set_transaction_votes(vec![
398 BlockTransactionVotes {
399 block_ref: round_1_blocks[0].reference(),
400 rejects: vec![0],
401 },
402 BlockTransactionVotes {
403 block_ref: round_1_blocks[1].reference(),
404 rejects: vec![1],
405 },
406 ])
407 .build(),
408 );
409
410 let block_r2_a1 = VerifiedBlock::new_for_test(
412 TestBlock::new(2, 1)
413 .set_ancestors_raw(ancestors.clone())
414 .set_transactions(transactions.clone())
415 .set_transaction_votes(vec![
416 BlockTransactionVotes {
417 block_ref: round_1_blocks[0].reference(),
418 rejects: vec![0],
419 },
420 BlockTransactionVotes {
421 block_ref: round_1_blocks[1].reference(),
422 rejects: vec![1, 2],
423 },
424 ])
425 .build(),
426 );
427
428 let block_r2_a2 = VerifiedBlock::new_for_test(
430 TestBlock::new(2, 2)
431 .set_ancestors_raw(ancestors.clone())
432 .set_transactions(transactions.clone())
433 .set_transaction_votes(vec![BlockTransactionVotes {
434 block_ref: round_1_blocks[1].reference(),
435 rejects: vec![2],
436 }])
437 .build(),
438 );
439
440 state.add_voted_blocks(vec![
441 (block_r2_a0, vec![]),
442 (block_r2_a1, vec![]),
443 (block_r2_a2, vec![]),
444 ]);
445
446 let reject_votes_0 = &state
449 .votes
450 .get(&round_1_blocks[0].reference())
451 .unwrap()
452 .reject_txn_votes;
453 assert_eq!(reject_votes_0.len(), 1);
454 assert_eq!(reject_votes_0.get(&0).unwrap().stake(), 3);
455
456 let reject_votes_1 = &state
460 .votes
461 .get(&round_1_blocks[1].reference())
462 .unwrap()
463 .reject_txn_votes;
464 assert_eq!(reject_votes_1.len(), 2);
465 assert_eq!(reject_votes_1.get(&1).unwrap().stake(), 3);
466 assert_eq!(reject_votes_1.get(&2).unwrap().stake(), 5);
467
468 let reject_votes_2 = &state
470 .votes
471 .get(&round_1_blocks[2].reference())
472 .unwrap()
473 .reject_txn_votes;
474 assert!(reject_votes_2.is_empty());
475 }
476}