consensus_core/
transaction_vote_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, Round, TransactionIndex};
8use mysten_common::ZipDebugEqIteratorExt;
9use parking_lot::RwLock;
10use tracing::info;
11
12use crate::{
13    BlockAPI as _, VerifiedBlock,
14    block::{BlockTransactionVotes, GENESIS_ROUND},
15    block_verifier::BlockVerifier,
16    context::Context,
17    dag_state::DagState,
18    stake_aggregator::{QuorumThreshold, StakeAggregator},
19};
20
21/// TransactionVoteTracker has the following purposes:
22/// 1. Keeps track of own votes on transactions, and allows the votes to be retrieved
23///    later in core after acceptance of the blocks containing the transactions.
24/// 2. Aggregates reject votes on transactions, and allows the aggregated votes
25///    to be retrieved during post-commit finalization.
26///
27/// A transaction is rejected if a quorum of authorities vote to reject it. When this happens, it is
28/// guaranteed that no validator can observe a certification of the transaction, with <= f malicious
29/// stake.
30#[derive(Clone)]
31pub struct TransactionVoteTracker {
32    // The state of blocks being voted on.
33    vote_tracker_state: Arc<RwLock<VoteTrackerState>>,
34    // Verify transactions during recovery.
35    block_verifier: Arc<dyn BlockVerifier>,
36    // The state of the DAG.
37    dag_state: Arc<RwLock<DagState>>,
38}
39
40impl TransactionVoteTracker {
41    pub fn new(
42        context: Arc<Context>,
43        block_verifier: Arc<dyn BlockVerifier>,
44        dag_state: Arc<RwLock<DagState>>,
45    ) -> Self {
46        Self {
47            vote_tracker_state: Arc::new(RwLock::new(VoteTrackerState::new(context))),
48            block_verifier,
49            dag_state,
50        }
51    }
52
53    /// Recovers all blocks from DB after the given round.
54    ///
55    /// This is useful for initializing the vote tracker state
56    /// for future commits and block proposals.
57    pub(crate) fn recover_blocks_after_round(&self, after_round: Round) {
58        let context = self.vote_tracker_state.read().context.clone();
59        if !context.protocol_config.transaction_voting_enabled() {
60            info!("Skipping vote tracker recovery in non-mysticeti fast path mode");
61            return;
62        }
63
64        let store = self.dag_state.read().store().clone();
65
66        let recovery_start_round = after_round + 1;
67        info!(
68            "Recovering vote tracker state from round {}",
69            recovery_start_round,
70        );
71
72        let authorities = context
73            .committee
74            .authorities()
75            .map(|(index, _)| index)
76            .collect::<Vec<_>>();
77        for authority_index in authorities {
78            let blocks = store
79                .scan_blocks_by_author(authority_index, recovery_start_round)
80                .unwrap();
81            info!(
82                "Recovered and voting on {} blocks from authority {} {}",
83                blocks.len(),
84                authority_index,
85                context.committee.authority(authority_index).hostname
86            );
87            self.recover_and_vote_on_blocks(blocks);
88        }
89    }
90
91    /// Recovers and potentially votes on the given blocks.
92    ///
93    /// Because own votes on blocks are not stored, during recovery it is necessary to vote on
94    /// input blocks that are above GC round and have not been included before, which can be
95    /// included in a future proposed block.
96    ///
97    /// In addition, add_voted_blocks() will eventually process reject votes contained in the input blocks.
98    pub(crate) fn recover_and_vote_on_blocks(&self, blocks: Vec<VerifiedBlock>) {
99        let context = self.vote_tracker_state.read().context.clone();
100        let should_vote_blocks = {
101            let dag_state = self.dag_state.read();
102            let gc_round = dag_state.gc_round();
103            blocks
104                .iter()
105                // Must make sure the block is above GC round before calling has_been_included().
106                .map(|b| b.round() > gc_round && !dag_state.has_been_included(&b.reference()))
107                .collect::<Vec<_>>()
108        };
109        let voted_blocks = blocks
110            .into_iter()
111            .zip_debug_eq(should_vote_blocks)
112            .map(|(b, should_vote)| {
113                if !should_vote {
114                    // Voting is unnecessary for blocks already included in own proposed blocks,
115                    // or outside of local DAG GC bound.
116                    (b, vec![])
117                } else {
118                    // Voting is needed for blocks above GC round and not yet included in own proposed blocks.
119                    // A block proposal can include the input block later and retries own votes on it.
120                    let reject_transaction_votes =
121                        self.block_verifier.vote(&b).unwrap_or_else(|e| {
122                            panic!(
123                                "Failed to vote on block {} (own_index={}) during recovery: {}",
124                                b.reference(),
125                                context.own_index,
126                                e
127                            )
128                        });
129                    (b, reject_transaction_votes)
130                }
131            })
132            .collect::<Vec<_>>();
133        self.vote_tracker_state
134            .write()
135            .add_voted_blocks(voted_blocks);
136    }
137
138    /// Stores own reject votes on input blocks, and aggregates reject votes from the input blocks.
139    pub fn add_voted_blocks(&self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
140        self.vote_tracker_state
141            .write()
142            .add_voted_blocks(voted_blocks);
143    }
144
145    /// Retrieves own votes on peer block transactions.
146    pub(crate) fn get_own_votes(&self, block_refs: Vec<BlockRef>) -> Vec<BlockTransactionVotes> {
147        let mut votes = vec![];
148        let vote_tracker_state = self.vote_tracker_state.read();
149        for block_ref in block_refs {
150            if block_ref.round <= vote_tracker_state.gc_round {
151                continue;
152            }
153            let vote_info = vote_tracker_state.votes.get(&block_ref).unwrap_or_else(|| {
154                panic!(
155                    "Ancestor block {} not found in vote tracker state",
156                    block_ref
157                )
158            });
159            if !vote_info.own_reject_txn_votes.is_empty() {
160                votes.push(BlockTransactionVotes {
161                    block_ref,
162                    rejects: vote_info.own_reject_txn_votes.clone(),
163                });
164            }
165        }
166        votes
167    }
168
169    /// Retrieves transactions in the block that have received reject votes, and the total stake of the votes.
170    /// TransactionIndex not included in the output has no reject votes.
171    /// Returns None if no information is found for the block.
172    pub(crate) fn get_reject_votes(
173        &self,
174        block_ref: &BlockRef,
175    ) -> Option<Vec<(TransactionIndex, Stake)>> {
176        let accumulated_reject_votes = self
177            .vote_tracker_state
178            .read()
179            .votes
180            .get(block_ref)?
181            .reject_txn_votes
182            .iter()
183            .map(|(idx, stake_agg)| (*idx, stake_agg.stake()))
184            .collect::<Vec<_>>();
185        Some(accumulated_reject_votes)
186    }
187
188    /// Runs garbage collection on the internal state by removing data for blocks <= gc_round,
189    /// and updates the GC round for the vote tracker.
190    ///
191    /// IMPORTANT: the gc_round used here can trail the latest gc_round from DagState.
192    /// This is because the gc round here is determined by CommitFinalizer, which needs to process
193    /// commits before the latest commit in DagState. Reject votes received by transactions below
194    /// local DAG gc_round may still need to be accessed from CommitFinalizer.
195    pub(crate) fn run_gc(&self, gc_round: Round) {
196        let dag_state_gc_round = self.dag_state.read().gc_round();
197        assert!(
198            gc_round <= dag_state_gc_round,
199            "TransactionVoteTracker cannot GC higher than DagState GC round ({} > {})",
200            gc_round,
201            dag_state_gc_round
202        );
203        self.vote_tracker_state.write().update_gc_round(gc_round);
204    }
205}
206
207/// VoteTrackerState keeps track of votes received by each transaction and block,
208/// and helps determine if votes reach a quorum. Reject votes can start accumulating
209/// even before the target block is received by this authority.
210struct VoteTrackerState {
211    context: Arc<Context>,
212
213    // Maps received blocks' refs to votes on those blocks from other blocks.
214    // Even if a block has no reject votes on its transactions, it still has an entry here.
215    votes: BTreeMap<BlockRef, VoteInfo>,
216
217    // Highest round where blocks are GC'ed.
218    gc_round: Round,
219}
220
221impl VoteTrackerState {
222    fn new(context: Arc<Context>) -> Self {
223        Self {
224            context,
225            votes: BTreeMap::new(),
226            gc_round: GENESIS_ROUND,
227        }
228    }
229
230    fn add_voted_blocks(&mut self, voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)>) {
231        for (voted_block, reject_txn_votes) in voted_blocks {
232            self.add_voted_block(voted_block, reject_txn_votes);
233        }
234    }
235
236    fn add_voted_block(
237        &mut self,
238        voted_block: VerifiedBlock,
239        reject_txn_votes: Vec<TransactionIndex>,
240    ) {
241        if voted_block.round() <= self.gc_round {
242            // Ignore the block and own votes, since they are outside of vote tracker GC bound.
243            return;
244        }
245
246        // Count own reject votes against each peer authority.
247        let peer_hostname = &self
248            .context
249            .committee
250            .authority(voted_block.author())
251            .hostname;
252        self.context
253            .metrics
254            .node_metrics
255            .certifier_own_reject_votes
256            .with_label_values(&[peer_hostname])
257            .inc_by(reject_txn_votes.len() as u64);
258
259        // Initialize the entry for the voted block.
260        let vote_info = self.votes.entry(voted_block.reference()).or_default();
261        if vote_info.block.is_some() {
262            // Input block has already been processed and added to the state.
263            return;
264        }
265        vote_info.block = Some(voted_block.clone());
266        vote_info.own_reject_txn_votes = reject_txn_votes;
267
268        // Update reject votes from the input block.
269        for block_votes in voted_block.transaction_votes() {
270            if block_votes.block_ref.round <= self.gc_round {
271                // Block is outside of GC bound.
272                continue;
273            }
274            let vote_info = self.votes.entry(block_votes.block_ref).or_default();
275            for reject in &block_votes.rejects {
276                vote_info
277                    .reject_txn_votes
278                    .entry(*reject)
279                    .or_default()
280                    .add_unique(voted_block.author(), &self.context.committee);
281            }
282        }
283    }
284
285    /// Updates the GC round and cleans up obsolete internal state.
286    fn update_gc_round(&mut self, gc_round: Round) {
287        self.gc_round = gc_round;
288        while let Some((block_ref, _)) = self.votes.first_key_value() {
289            if block_ref.round <= self.gc_round {
290                self.votes.pop_first();
291            } else {
292                break;
293            }
294        }
295
296        self.context
297            .metrics
298            .node_metrics
299            .certifier_gc_round
300            .set(self.gc_round as i64);
301    }
302}
303
304/// VoteInfo keeps track of votes received for each transaction of this block,
305/// possibly even before the block is received by this authority.
306#[derive(Default)]
307struct VoteInfo {
308    // Content of the block.
309    // None if the blocks has not been received.
310    block: Option<VerifiedBlock>,
311    // Rejection votes by this authority on this block.
312    // This field is written when the block is first received and its transactions are voted on.
313    // It is read from core after the block is accepted.
314    own_reject_txn_votes: Vec<TransactionIndex>,
315    // Accumulates reject votes per transaction in this block.
316    reject_txn_votes: BTreeMap<TransactionIndex, StakeAggregator<QuorumThreshold>>,
317}
318
319#[cfg(test)]
320mod test {
321    use std::sync::Arc;
322
323    use consensus_config::{AuthorityIndex, Parameters};
324
325    use crate::{
326        TestBlock, Transaction, VerifiedBlock, block::BlockTransactionVotes, context::Context,
327        metrics::test_metrics,
328    };
329
330    use super::*;
331
332    // 4 authorities with stakes [1, 2, 3, 4], total 10.
333    #[tokio::test]
334    async fn test_reject_vote_tracking() {
335        telemetry_subscribers::init_for_testing();
336        let (committee, _keypairs) =
337            consensus_config::local_committee_and_keys(0, vec![1, 2, 3, 4]);
338        let temp_dir = tempfile::TempDir::new().unwrap();
339        let context = Arc::new(Context::new(
340            0,
341            Some(AuthorityIndex::new_for_test(0)),
342            committee,
343            Parameters {
344                db_path: temp_dir.path().to_path_buf(),
345                ..Default::default()
346            },
347            consensus_config::ConsensusProtocolConfig::for_testing(),
348            test_metrics(),
349            Arc::new(crate::Clock::default()),
350        ));
351
352        let transactions = vec![Transaction::new(vec![0u8; 16]); 4];
353
354        // Round 1: create a block from each authority.
355        let round_1_blocks: Vec<VerifiedBlock> = (0..4)
356            .map(|author| {
357                VerifiedBlock::new_for_test(
358                    TestBlock::new(1, author)
359                        .set_transactions(transactions.clone())
360                        .build(),
361                )
362            })
363            .collect();
364
365        // Add round 1 blocks with own reject votes:
366        // - reject txn 0 of block from authority 0
367        // - reject txns 1 and 2 of block from authority 1
368        // - no rejects for blocks from authorities 2 and 3
369        let mut state = VoteTrackerState::new(context.clone());
370        state.add_voted_blocks(vec![
371            (round_1_blocks[0].clone(), vec![0]),
372            (round_1_blocks[1].clone(), vec![1, 2]),
373            (round_1_blocks[2].clone(), vec![]),
374            (round_1_blocks[3].clone(), vec![]),
375        ]);
376
377        // Verify own reject votes are stored correctly.
378        let vote_info_0 = state.votes.get(&round_1_blocks[0].reference()).unwrap();
379        assert_eq!(vote_info_0.own_reject_txn_votes, vec![0]);
380        let vote_info_1 = state.votes.get(&round_1_blocks[1].reference()).unwrap();
381        assert_eq!(vote_info_1.own_reject_txn_votes, vec![1, 2]);
382        let vote_info_2 = state.votes.get(&round_1_blocks[2].reference()).unwrap();
383        assert!(vote_info_2.own_reject_txn_votes.is_empty());
384
385        // No reject votes have been aggregated yet (round 1 blocks have no transaction_votes).
386        assert!(vote_info_0.reject_txn_votes.is_empty());
387        assert!(vote_info_1.reject_txn_votes.is_empty());
388
389        // Round 2: authorities 0, 1, 2 create blocks that reject transactions in round 1 blocks.
390        let ancestors: Vec<BlockRef> = round_1_blocks.iter().map(|b| b.reference()).collect();
391
392        // Authority 0 (stake 1) rejects txn 0 of block[0] and txn 1 of block[1].
393        let block_r2_a0 = VerifiedBlock::new_for_test(
394            TestBlock::new(2, 0)
395                .set_ancestors_raw(ancestors.clone())
396                .set_transactions(transactions.clone())
397                .set_transaction_votes(vec![
398                    BlockTransactionVotes {
399                        block_ref: round_1_blocks[0].reference(),
400                        rejects: vec![0],
401                    },
402                    BlockTransactionVotes {
403                        block_ref: round_1_blocks[1].reference(),
404                        rejects: vec![1],
405                    },
406                ])
407                .build(),
408        );
409
410        // Authority 1 (stake 2) rejects txn 0 of block[0] and txns 1,2 of block[1].
411        let block_r2_a1 = VerifiedBlock::new_for_test(
412            TestBlock::new(2, 1)
413                .set_ancestors_raw(ancestors.clone())
414                .set_transactions(transactions.clone())
415                .set_transaction_votes(vec![
416                    BlockTransactionVotes {
417                        block_ref: round_1_blocks[0].reference(),
418                        rejects: vec![0],
419                    },
420                    BlockTransactionVotes {
421                        block_ref: round_1_blocks[1].reference(),
422                        rejects: vec![1, 2],
423                    },
424                ])
425                .build(),
426        );
427
428        // Authority 2 (stake 3) rejects txn 2 of block[1] only.
429        let block_r2_a2 = VerifiedBlock::new_for_test(
430            TestBlock::new(2, 2)
431                .set_ancestors_raw(ancestors.clone())
432                .set_transactions(transactions.clone())
433                .set_transaction_votes(vec![BlockTransactionVotes {
434                    block_ref: round_1_blocks[1].reference(),
435                    rejects: vec![2],
436                }])
437                .build(),
438        );
439
440        state.add_voted_blocks(vec![
441            (block_r2_a0, vec![]),
442            (block_r2_a1, vec![]),
443            (block_r2_a2, vec![]),
444        ]);
445
446        // Verify aggregated reject votes for block[0]:
447        // txn 0: authority 0 (stake 1) + authority 1 (stake 2) = 3
448        let reject_votes_0 = &state
449            .votes
450            .get(&round_1_blocks[0].reference())
451            .unwrap()
452            .reject_txn_votes;
453        assert_eq!(reject_votes_0.len(), 1);
454        assert_eq!(reject_votes_0.get(&0).unwrap().stake(), 3);
455
456        // Verify aggregated reject votes for block[1]:
457        // txn 1: authority 0 (stake 1) + authority 1 (stake 2) = 3
458        // txn 2: authority 1 (stake 2) + authority 2 (stake 3) = 5
459        let reject_votes_1 = &state
460            .votes
461            .get(&round_1_blocks[1].reference())
462            .unwrap()
463            .reject_txn_votes;
464        assert_eq!(reject_votes_1.len(), 2);
465        assert_eq!(reject_votes_1.get(&1).unwrap().stake(), 3);
466        assert_eq!(reject_votes_1.get(&2).unwrap().stake(), 5);
467
468        // block[2] and block[3] have no reject votes from others.
469        let reject_votes_2 = &state
470            .votes
471            .get(&round_1_blocks[2].reference())
472            .unwrap()
473            .reject_txn_votes;
474        assert!(reject_votes_2.is_empty());
475    }
476}