consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12    block::{BlockAPI, VerifiedBlock},
13    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14    context::Context,
15    dag_state::DagState,
16};
17
18/// The `StorageAPI` trait provides an interface for the block store and has been
19/// mostly introduced for allowing to inject the test store in `DagBuilder`.
20pub(crate) trait BlockStoreAPI {
21    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23    fn gc_round(&self) -> Round;
24
25    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27    fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34        DagState::get_blocks(self, refs)
35    }
36
37    fn gc_round(&self) -> Round {
38        DagState::gc_round(self)
39    }
40
41    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42        DagState::set_committed(self, block_ref)
43    }
44
45    fn is_committed(&self, block_ref: &BlockRef) -> bool {
46        DagState::is_committed(self, block_ref)
47    }
48}
49
50/// Expand a committed sequence of leader into a sequence of sub-dags.
51#[derive(Clone)]
52pub struct Linearizer {
53    /// In memory block store representing the dag state
54    context: Arc<Context>,
55    dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59    pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60        Self { context, dag_state }
61    }
62
63    /// Collect the sub-dag and the corresponding commit from a specific leader excluding any duplicates or
64    /// blocks that have already been committed (within previous sub-dags).
65    fn collect_sub_dag_and_commit(
66        &mut self,
67        leader_block: VerifiedBlock,
68    ) -> (CommittedSubDag, TrustedCommit) {
69        let _s = self
70            .context
71            .metrics
72            .node_metrics
73            .scope_processing_time
74            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75            .start_timer();
76
77        // Grab latest commit state from dag state
78        let mut dag_state = self.dag_state.write();
79        let last_commit_index = dag_state.last_commit_index();
80        let last_commit_digest = dag_state.last_commit_digest();
81        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83        // Now linearize the sub-dag starting from the leader block
84        let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86        let timestamp_ms = Self::calculate_commit_timestamp(
87            &self.context,
88            &mut dag_state,
89            &leader_block,
90            last_commit_timestamp_ms,
91        );
92
93        drop(dag_state);
94
95        // Create the Commit.
96        let commit = Commit::new(
97            last_commit_index + 1,
98            last_commit_digest,
99            timestamp_ms,
100            leader_block.reference(),
101            to_commit
102                .iter()
103                .map(|block| block.reference())
104                .collect::<Vec<_>>(),
105        );
106        let serialized = commit
107            .serialize()
108            .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109        let commit = TrustedCommit::new_trusted(commit, serialized);
110
111        // Create the corresponding committed sub dag
112        let sub_dag = CommittedSubDag::new(
113            leader_block.reference(),
114            to_commit,
115            timestamp_ms,
116            commit.reference(),
117        );
118
119        (sub_dag, commit)
120    }
121
122    /// Calculates the commit's timestamp. The timestamp will be calculated as the median of leader's parents (leader.round - 1)
123    /// timestamps by stake. To ensure that commit timestamp monotonicity is respected it is compared against the `last_commit_timestamp_ms`
124    /// and the maximum of the two is returned.
125    pub(crate) fn calculate_commit_timestamp(
126        context: &Context,
127        dag_state: &mut impl BlockStoreAPI,
128        leader_block: &VerifiedBlock,
129        last_commit_timestamp_ms: BlockTimestampMs,
130    ) -> BlockTimestampMs {
131        let timestamp_ms = {
132            // Select leaders' parent blocks.
133            let block_refs = leader_block
134                .ancestors()
135                .iter()
136                .filter(|block_ref| block_ref.round == leader_block.round() - 1)
137                .cloned()
138                .collect::<Vec<_>>();
139            // Get the blocks from dag state which should not fail.
140            let blocks = dag_state
141                .get_blocks(&block_refs)
142                .into_iter()
143                .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
144            median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
145                panic!(
146                    "Cannot compute median timestamp for leader block {:?} ancestors: {}",
147                    leader_block, e
148                )
149            })
150        };
151
152        // Always make sure that commit timestamps are monotonic, so override if necessary.
153        timestamp_ms.max(last_commit_timestamp_ms)
154    }
155
156    pub(crate) fn linearize_sub_dag(
157        leader_block: VerifiedBlock,
158        dag_state: &mut impl BlockStoreAPI,
159    ) -> Vec<VerifiedBlock> {
160        // The GC round here is calculated based on the last committed round of the leader block. The algorithm will attempt to
161        // commit blocks up to this GC round. Once this commit has been processed and written to DagState, then gc round will update
162        // and on the processing of the next commit we'll have it already updated, so no need to do any gc_round recalculations here.
163        // We just use whatever is currently in DagState.
164        let gc_round: Round = dag_state.gc_round();
165        let leader_block_ref = leader_block.reference();
166        let mut buffer = vec![leader_block];
167        let mut to_commit = Vec::new();
168
169        // Perform the recursion without stopping at the highest round round that has been committed per authority. Instead it will
170        // allow to commit blocks that are lower than the highest committed round for an authority but higher than gc_round.
171        assert!(
172            dag_state.set_committed(&leader_block_ref),
173            "Leader block with reference {:?} attempted to be committed twice",
174            leader_block_ref
175        );
176
177        while let Some(x) = buffer.pop() {
178            to_commit.push(x.clone());
179
180            let ancestors: Vec<VerifiedBlock> = dag_state
181                .get_blocks(
182                    &x.ancestors()
183                        .iter()
184                        .copied()
185                        .filter(|ancestor| {
186                            ancestor.round > gc_round && !dag_state.is_committed(ancestor)
187                        })
188                        .collect::<Vec<_>>(),
189                )
190                .into_iter()
191                .map(|ancestor_opt| {
192                    ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
193                })
194                .collect();
195
196            for ancestor in ancestors {
197                buffer.push(ancestor.clone());
198                assert!(
199                    dag_state.set_committed(&ancestor.reference()),
200                    "Block with reference {:?} attempted to be committed twice",
201                    ancestor.reference()
202                );
203            }
204        }
205
206        // The above code should have not yielded any blocks that are <= gc_round, but just to make sure that we'll never
207        // commit anything that should be garbage collected we attempt to prune here as well.
208        assert!(
209            to_commit.iter().all(|block| block.round() > gc_round),
210            "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
211            leader_block_ref
212        );
213
214        // Sort the blocks of the sub-dag blocks
215        sort_sub_dag_blocks(&mut to_commit);
216
217        to_commit
218    }
219
220    // This function should be called whenever a new commit is observed. This will
221    // iterate over the sequence of committed leaders and produce a list of committed
222    // sub-dags.
223    pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
224        if committed_leaders.is_empty() {
225            return vec![];
226        }
227
228        let mut committed_sub_dags = vec![];
229        for leader_block in committed_leaders {
230            // Collect the sub-dag generated using each of these leaders and the corresponding commit.
231            let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
232
233            self.update_blocks_pruned_metric(&sub_dag);
234
235            // Buffer commit in dag state for persistence later.
236            // This also updates the last committed rounds.
237            self.dag_state.write().add_commit(commit.clone());
238
239            committed_sub_dags.push(sub_dag);
240        }
241
242        committed_sub_dags
243    }
244
245    // Try to measure the number of blocks that get pruned due to GC. This is not very accurate, but it can give us a good enough idea.
246    // We consider a block as pruned when it is an ancestor of a block that has been committed as part of the provided `sub_dag`, but
247    // it has not been committed as part of previous commits. Right now we measure this via checking that highest committed round for the authority
248    // as we don't an efficient look up functionality to check if a block has been committed or not.
249    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
250        let (last_committed_rounds, gc_round) = {
251            let dag_state = self.dag_state.read();
252            (dag_state.last_committed_rounds(), dag_state.gc_round())
253        };
254
255        for block_ref in sub_dag
256            .blocks
257            .iter()
258            .flat_map(|block| block.ancestors())
259            .filter(
260                |ancestor_ref| {
261                    ancestor_ref.round <= gc_round
262                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
263                }, // If the last committed round is the same as the pruned block's round, then we know for sure that it has been committed and it doesn't count here
264                   // as pruned block.
265            )
266            .unique()
267        {
268            let hostname = &self.context.committee.authority(block_ref.author).hostname;
269
270            // If the last committed round from this authority is lower than the pruned ancestor in question, then we know for sure that it has not been committed.
271            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
272                &[hostname, "uncommitted"]
273            } else {
274                // If last committed round is higher for this authority, then we don't really know it's status, but we know that there is a higher committed block from this authority.
275                &[hostname, "higher_committed"]
276            };
277
278            self.context
279                .metrics
280                .node_metrics
281                .blocks_pruned_on_commit
282                .with_label_values(label_values)
283                .inc();
284        }
285    }
286}
287
288/// Computes the median timestamp of the blocks weighted by the stake of their authorities.
289/// This function assumes each block comes from a different authority of the same round.
290/// Error is returned if no blocks are provided or total stake is less than quorum threshold.
291pub(crate) fn median_timestamp_by_stake(
292    context: &Context,
293    blocks: impl Iterator<Item = VerifiedBlock>,
294) -> Result<BlockTimestampMs, String> {
295    let mut total_stake = 0;
296    let mut timestamps = vec![];
297    for block in blocks {
298        let stake = context.committee.authority(block.author()).stake;
299        timestamps.push((block.timestamp_ms(), stake));
300        total_stake += stake;
301    }
302
303    if timestamps.is_empty() {
304        return Err("No blocks provided".to_string());
305    }
306    if total_stake < context.committee.quorum_threshold() {
307        return Err(format!(
308            "Total stake {} < quorum threshold {}",
309            total_stake,
310            context.committee.quorum_threshold()
311        ));
312    }
313
314    Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
315}
316
317fn median_timestamps_by_stake_inner(
318    mut timestamps: Vec<(BlockTimestampMs, Stake)>,
319    total_stake: Stake,
320) -> BlockTimestampMs {
321    timestamps.sort_by_key(|(ts, _)| *ts);
322
323    let mut cumulative_stake = 0;
324    for (ts, stake) in &timestamps {
325        cumulative_stake += stake;
326        if cumulative_stake > total_stake / 2 {
327            return *ts;
328        }
329    }
330
331    timestamps.last().unwrap().0
332}
333
334#[cfg(test)]
335mod tests {
336    use consensus_config::AuthorityIndex;
337    use rstest::rstest;
338
339    use super::*;
340    use crate::{
341        CommitIndex, TestBlock,
342        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
343        context::Context,
344        leader_schedule::{LeaderSchedule, LeaderSwapTable},
345        storage::mem_store::MemStore,
346        test_dag_builder::DagBuilder,
347        test_dag_parser::parse_dag,
348    };
349
350    #[rstest]
351    #[tokio::test]
352    async fn test_handle_commit() {
353        telemetry_subscribers::init_for_testing();
354        let num_authorities = 4;
355        let (context, _keys) = Context::new_for_test(num_authorities);
356        let context = Arc::new(context);
357
358        let dag_state = Arc::new(RwLock::new(DagState::new(
359            context.clone(),
360            Arc::new(MemStore::new()),
361        )));
362        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
363
364        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
365        let num_rounds: u32 = 10;
366        let mut dag_builder = DagBuilder::new(context.clone());
367        dag_builder
368            .layers(1..=num_rounds)
369            .build()
370            .persist_layers(dag_state.clone());
371
372        let leaders = dag_builder
373            .leader_blocks(1..=num_rounds)
374            .into_iter()
375            .map(Option::unwrap)
376            .collect::<Vec<_>>();
377
378        let commits = linearizer.handle_commit(leaders.clone());
379        for (idx, subdag) in commits.into_iter().enumerate() {
380            tracing::info!("{subdag:?}");
381            assert_eq!(subdag.leader, leaders[idx].reference());
382
383            let expected_ts = {
384                let block_refs = leaders[idx]
385                    .ancestors()
386                    .iter()
387                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
388                    .cloned()
389                    .collect::<Vec<_>>();
390                let blocks = dag_state
391                    .read()
392                    .get_blocks(&block_refs)
393                    .into_iter()
394                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
395
396                median_timestamp_by_stake(&context, blocks).unwrap()
397            };
398            assert_eq!(subdag.timestamp_ms, expected_ts);
399
400            if idx == 0 {
401                // First subdag includes the leader block only
402                assert_eq!(subdag.blocks.len(), 1);
403            } else {
404                // Every subdag after will be missing the leader block from the previous
405                // committed subdag
406                assert_eq!(subdag.blocks.len(), num_authorities);
407            }
408            for block in subdag.blocks.iter() {
409                assert!(block.round() <= leaders[idx].round());
410            }
411            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
412        }
413    }
414
415    #[rstest]
416    #[tokio::test]
417    async fn test_handle_already_committed() {
418        telemetry_subscribers::init_for_testing();
419        let num_authorities = 4;
420        let (context, _) = Context::new_for_test(num_authorities);
421        let context = Arc::new(context);
422
423        let dag_state = Arc::new(RwLock::new(DagState::new(
424            context.clone(),
425            Arc::new(MemStore::new()),
426        )));
427        let leader_schedule = Arc::new(LeaderSchedule::new(
428            context.clone(),
429            LeaderSwapTable::default(),
430        ));
431        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
432        let wave_length = DEFAULT_WAVE_LENGTH;
433
434        let leader_round_wave_1 = 3;
435        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
436
437        // Build a Dag from round 1..=6
438        let mut dag_builder = DagBuilder::new(context.clone());
439        dag_builder.layers(1..=leader_round_wave_2).build();
440
441        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
442        // And then only the leader of round leader_round_wave_1
443        // Also store those to DagState
444        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
445        blocks.push(
446            dag_builder
447                .leader_block(leader_round_wave_1)
448                .expect("Leader block should have been found"),
449        );
450        dag_state.write().accept_blocks(blocks.clone());
451
452        let first_leader = dag_builder
453            .leader_block(leader_round_wave_1)
454            .expect("Wave 1 leader round block should exist");
455        let mut last_commit_index = 1;
456        let first_commit_data = TrustedCommit::new_for_test(
457            last_commit_index,
458            CommitDigest::MIN,
459            0,
460            first_leader.reference(),
461            blocks.iter().map(|block| block.reference()).collect(),
462        );
463        dag_state.write().add_commit(first_commit_data);
464
465        // Mark the blocks as committed in DagState. This will allow to correctly detect the committed blocks when the new linearizer logic is enabled.
466        for block in blocks.iter() {
467            dag_state.write().set_committed(&block.reference());
468        }
469
470        // Now take all the blocks from round `leader_round_wave_1` up to round `leader_round_wave_2-1`
471        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
472        // Filter out leader block of round `leader_round_wave_1`
473        blocks.retain(|block| {
474            !(block.round() == leader_round_wave_1
475                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
476        });
477        // Add the leader block of round `leader_round_wave_2`
478        blocks.push(
479            dag_builder
480                .leader_block(leader_round_wave_2)
481                .expect("Leader block should have been found"),
482        );
483        // Write them in dag state
484        dag_state.write().accept_blocks(blocks.clone());
485
486        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
487
488        // Now get the latest leader which is the leader round of wave 2
489        let leader = dag_builder
490            .leader_block(leader_round_wave_2)
491            .expect("Leader block should exist");
492
493        last_commit_index += 1;
494        let expected_second_commit = TrustedCommit::new_for_test(
495            last_commit_index,
496            CommitDigest::MIN,
497            0,
498            leader.reference(),
499            blocks.clone(),
500        );
501
502        let commit = linearizer.handle_commit(vec![leader.clone()]);
503        assert_eq!(commit.len(), 1);
504
505        let subdag = &commit[0];
506        tracing::info!("{subdag:?}");
507        assert_eq!(subdag.leader, leader.reference());
508        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
509
510        let expected_ts = median_timestamp_by_stake(
511            &context,
512            subdag.blocks.iter().filter_map(|block| {
513                if block.round() == subdag.leader.round - 1 {
514                    Some(block.clone())
515                } else {
516                    None
517                }
518            }),
519        )
520        .unwrap();
521        assert_eq!(subdag.timestamp_ms, expected_ts);
522
523        // Using the same sorting as used in CommittedSubDag::sort
524        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
525        assert_eq!(
526            subdag
527                .blocks
528                .clone()
529                .into_iter()
530                .map(|b| b.reference())
531                .collect::<Vec<_>>(),
532            blocks
533        );
534        for block in subdag.blocks.iter() {
535            assert!(block.round() <= expected_second_commit.leader().round);
536        }
537    }
538
539    /// This test will run the linearizer with gc_depth = 3 and make
540    /// sure that for the exact same DAG the linearizer will commit different blocks according to the rules.
541    #[tokio::test]
542    async fn test_handle_commit_with_gc_simple() {
543        telemetry_subscribers::init_for_testing();
544
545        const GC_DEPTH: u32 = 3;
546
547        let num_authorities = 4;
548        let (mut context, _keys) = Context::new_for_test(num_authorities);
549        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
550
551        let context = Arc::new(context);
552        let dag_state = Arc::new(RwLock::new(DagState::new(
553            context.clone(),
554            Arc::new(MemStore::new()),
555        )));
556        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
557
558        // Authorities of index 0->2 will always creates blocks that see each other, but until round 5 they won't see the blocks of authority 3.
559        // For authority 3 we create blocks that connect to all the other authorities.
560        // On round 5 we finally make the other authorities see the blocks of authority 3.
561        // Practically we "simulate" here a long chain created by authority 3 that is visible in round 5, but due to GC blocks of only round >=2 will
562        // be committed, when GC is enabled. When GC is disabled all blocks will be committed for rounds >= 1.
563        let dag_str = "DAG {
564                Round 0 : { 4 },
565                Round 1 : { * },
566                Round 2 : {
567                    A -> [-D1],
568                    B -> [-D1],
569                    C -> [-D1],
570                    D -> [*],
571                },
572                Round 3 : {
573                    A -> [-D2],
574                    B -> [-D2],
575                    C -> [-D2],
576                },
577                Round 4 : { 
578                    A -> [-D3],
579                    B -> [-D3],
580                    C -> [-D3],
581                    D -> [A3, B3, C3, D2],
582                },
583                Round 5 : { * },
584            }";
585
586        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
587        dag_builder.print();
588        dag_builder.persist_all_blocks(dag_state.clone());
589
590        let leaders = dag_builder
591            .leader_blocks(1..=6)
592            .into_iter()
593            .flatten()
594            .collect::<Vec<_>>();
595
596        let commits = linearizer.handle_commit(leaders.clone());
597        for (idx, subdag) in commits.into_iter().enumerate() {
598            tracing::info!("{subdag:?}");
599            assert_eq!(subdag.leader, leaders[idx].reference());
600
601            let expected_ts = {
602                let block_refs = leaders[idx]
603                    .ancestors()
604                    .iter()
605                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
606                    .cloned()
607                    .collect::<Vec<_>>();
608                let blocks = dag_state
609                    .read()
610                    .get_blocks(&block_refs)
611                    .into_iter()
612                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
613
614                median_timestamp_by_stake(&context, blocks).unwrap()
615            };
616            assert_eq!(subdag.timestamp_ms, expected_ts);
617
618            if idx == 0 {
619                // First subdag includes the leader block only
620                assert_eq!(subdag.blocks.len(), 1);
621            } else if idx == 1 {
622                assert_eq!(subdag.blocks.len(), 3);
623            } else if idx == 2 {
624                // We commit:
625                // * 1 block on round 4, the leader block
626                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was missing
627                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the "sub dag" of leader of round 3, which will be another 2 blocks
628                assert_eq!(subdag.blocks.len(), 6);
629            } else {
630                // Now it's going to be the first time that a leader will see the blocks of authority 3 and will attempt to commit
631                // the long chain. However, due to GC it will only commit blocks of round > 1. That's because it will commit blocks
632                // up to previous leader's round (round = 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
633                // to see on the sub dag committed blocks of round >= 2.
634                assert_eq!(subdag.blocks.len(), 5);
635
636                assert!(
637                    subdag.blocks.iter().all(|block| block.round() >= 2),
638                    "Found blocks that are of round < 2."
639                );
640
641                // Also ensure that gc_round has advanced with the latest committed leader
642                assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
643            }
644            for block in subdag.blocks.iter() {
645                assert!(block.round() <= leaders[idx].round());
646            }
647            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
648        }
649    }
650
651    #[tokio::test]
652    async fn test_handle_commit_below_highest_committed_round() {
653        telemetry_subscribers::init_for_testing();
654
655        const GC_DEPTH: u32 = 3;
656
657        let num_authorities = 4;
658        let (mut context, _keys) = Context::new_for_test(num_authorities);
659        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
660
661        let context = Arc::new(context);
662        let dag_state = Arc::new(RwLock::new(DagState::new(
663            context.clone(),
664            Arc::new(MemStore::new()),
665        )));
666        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
667
668        // Authority D will create an "orphaned" block on round 1 as it won't reference to it on the block of round 2. Similar, no other authority will reference to it on round 2.
669        // Then on round 3 the authorities A, B & C will link to block D1. Once the DAG gets committed we should see the block D1 getting committed as well. Normally ,as block D2 would
670        // have been committed first block D1 should be ommitted. With the new logic this is no longer true.
671        let dag_str = "DAG {
672                Round 0 : { 4 },
673                Round 1 : { * },
674                Round 2 : {
675                    A -> [-D1],
676                    B -> [-D1],
677                    C -> [-D1],
678                    D -> [-D1],
679                },
680                Round 3 : {
681                    A -> [A2, B2, C2, D1],
682                    B -> [A2, B2, C2, D1],
683                    C -> [A2, B2, C2, D1],
684                    D -> [A2, B2, C2, D2]
685                },
686                Round 4 : { * },
687            }";
688
689        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
690        dag_builder.print();
691        dag_builder.persist_all_blocks(dag_state.clone());
692
693        let leaders = dag_builder
694            .leader_blocks(1..=4)
695            .into_iter()
696            .flatten()
697            .collect::<Vec<_>>();
698
699        let commits = linearizer.handle_commit(leaders.clone());
700        for (idx, subdag) in commits.into_iter().enumerate() {
701            tracing::info!("{subdag:?}");
702            assert_eq!(subdag.leader, leaders[idx].reference());
703
704            let expected_ts = {
705                let block_refs = leaders[idx]
706                    .ancestors()
707                    .iter()
708                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
709                    .cloned()
710                    .collect::<Vec<_>>();
711                let blocks = dag_state
712                    .read()
713                    .get_blocks(&block_refs)
714                    .into_iter()
715                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
716
717                median_timestamp_by_stake(&context, blocks).unwrap()
718            };
719            assert_eq!(subdag.timestamp_ms, expected_ts);
720
721            if idx == 0 {
722                // First subdag includes the leader block only B1
723                assert_eq!(subdag.blocks.len(), 1);
724            } else if idx == 1 {
725                // We commit:
726                // * 1 block on round 2, the leader block C2
727                // * 2 blocks on round 1, A1, C1
728                assert_eq!(subdag.blocks.len(), 3);
729            } else if idx == 2 {
730                // We commit:
731                // * 1 block on round 3, the leader block D3
732                // * 3 blocks on round 2, A2, B2, D2
733                assert_eq!(subdag.blocks.len(), 4);
734
735                assert!(
736                    subdag.blocks.iter().any(|block| block.round() == 2
737                        && block.author() == AuthorityIndex::new_for_test(3)),
738                    "Block D2 should have been committed."
739                );
740            } else if idx == 3 {
741                // We commit:
742                // * 1 block on round 4, the leader block A4
743                // * 3 blocks on round 3, A3, B3, C3
744                // * 1 block of round 1, D1
745                assert_eq!(subdag.blocks.len(), 5);
746                assert!(
747                    subdag.blocks.iter().any(|block| block.round() == 1
748                        && block.author() == AuthorityIndex::new_for_test(3)),
749                    "Block D1 should have been committed."
750                );
751            } else {
752                panic!("Unexpected subdag with index {:?}", idx);
753            }
754
755            for block in subdag.blocks.iter() {
756                assert!(block.round() <= leaders[idx].round());
757            }
758            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
759        }
760    }
761
762    #[rstest]
763    #[case(3_000, 3_000, 6_000)]
764    #[tokio::test]
765    async fn test_calculate_commit_timestamp(
766        #[case] timestamp_1: u64,
767        #[case] timestamp_2: u64,
768        #[case] timestamp_3: u64,
769    ) {
770        // GIVEN
771        telemetry_subscribers::init_for_testing();
772
773        let num_authorities = 4;
774        let (context, _keys) = Context::new_for_test(num_authorities);
775
776        let context = Arc::new(context);
777        let store = Arc::new(MemStore::new());
778        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
779        let mut dag_state = dag_state.write();
780
781        let ancestors = vec![
782            VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
783            VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
784            VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
785            VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
786        ];
787
788        let leader_block = VerifiedBlock::new_for_test(
789            TestBlock::new(5, 0)
790                .set_timestamp_ms(5_000)
791                .set_ancestors(
792                    ancestors
793                        .iter()
794                        .map(|block| block.reference())
795                        .collect::<Vec<_>>(),
796                )
797                .build(),
798        );
799
800        for block in &ancestors {
801            dag_state.accept_block(block.clone());
802        }
803
804        let last_commit_timestamp_ms = 0;
805
806        // WHEN
807        let timestamp = Linearizer::calculate_commit_timestamp(
808            &context,
809            &mut dag_state,
810            &leader_block,
811            last_commit_timestamp_ms,
812        );
813        assert_eq!(timestamp, timestamp_1);
814
815        // AND skip the block of authority 0 and round 4.
816        let leader_block = VerifiedBlock::new_for_test(
817            TestBlock::new(5, 0)
818                .set_timestamp_ms(5_000)
819                .set_ancestors(
820                    ancestors
821                        .iter()
822                        .skip(1)
823                        .map(|block| block.reference())
824                        .collect::<Vec<_>>(),
825                )
826                .build(),
827        );
828
829        let timestamp = Linearizer::calculate_commit_timestamp(
830            &context,
831            &mut dag_state,
832            &leader_block,
833            last_commit_timestamp_ms,
834        );
835        assert_eq!(timestamp, timestamp_2);
836
837        // AND set the `last_commit_timestamp_ms` to 6_000
838        let last_commit_timestamp_ms = 6_000;
839        let timestamp = Linearizer::calculate_commit_timestamp(
840            &context,
841            &mut dag_state,
842            &leader_block,
843            last_commit_timestamp_ms,
844        );
845        assert_eq!(timestamp, timestamp_3);
846
847        // AND there is only one ancestor block to commit
848        let (context, _) = Context::new_for_test(1);
849        let leader_block = VerifiedBlock::new_for_test(
850            TestBlock::new(5, 0)
851                .set_timestamp_ms(5_000)
852                .set_ancestors(
853                    ancestors
854                        .iter()
855                        .take(1)
856                        .map(|block| block.reference())
857                        .collect::<Vec<_>>(),
858                )
859                .build(),
860        );
861        let last_commit_timestamp_ms = 0;
862        let timestamp = Linearizer::calculate_commit_timestamp(
863            &context,
864            &mut dag_state,
865            &leader_block,
866            last_commit_timestamp_ms,
867        );
868        assert_eq!(timestamp, 1_000);
869    }
870
871    #[test]
872    fn test_median_timestamps_by_stake() {
873        // One total stake.
874        let timestamps = vec![(1_000, 1)];
875        assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
876
877        // Odd number of total stakes.
878        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
879        assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
880
881        // Even number of total stakes.
882        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
883        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
884
885        // Even number of total stakes, different order.
886        let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
887        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
888
889        // Unequal stakes.
890        let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
891        assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
892
893        // Unequal stakes.
894        let timestamps = vec![
895            (500, 2),
896            (4_000, 2),
897            (2_500, 3),
898            (1_000, 5),
899            (3_000, 3),
900            (2_000, 4),
901        ];
902        assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
903
904        // One authority dominates.
905        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
906        assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
907    }
908
909    #[tokio::test]
910    async fn test_median_timestamps_by_stake_errors() {
911        let num_authorities = 4;
912        let (context, _keys) = Context::new_for_test(num_authorities);
913        let context = Arc::new(context);
914
915        // No blocks provided
916        let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
917        assert_eq!(err, "No blocks provided");
918
919        // Blocks provided but total stake is less than quorum threshold
920        let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
921        let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
922        assert_eq!(err, "Total stake 1 < quorum threshold 3");
923    }
924}