consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12    block::{BlockAPI, VerifiedBlock},
13    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14    context::Context,
15    dag_state::DagState,
16};
17
18/// The `StorageAPI` trait provides an interface for the block store and has been
19/// mostly introduced for allowing to inject the test store in `DagBuilder`.
20pub(crate) trait BlockStoreAPI {
21    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23    fn gc_round(&self) -> Round;
24
25    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27    fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34        DagState::get_blocks(self, refs)
35    }
36
37    fn gc_round(&self) -> Round {
38        DagState::gc_round(self)
39    }
40
41    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42        DagState::set_committed(self, block_ref)
43    }
44
45    fn is_committed(&self, block_ref: &BlockRef) -> bool {
46        DagState::is_committed(self, block_ref)
47    }
48}
49
50/// Expand a committed sequence of leader into a sequence of sub-dags.
51#[derive(Clone)]
52pub struct Linearizer {
53    /// In memory block store representing the dag state
54    context: Arc<Context>,
55    dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59    pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60        Self { context, dag_state }
61    }
62
63    /// Collect the sub-dag and the corresponding commit from a specific leader excluding any duplicates or
64    /// blocks that have already been committed (within previous sub-dags).
65    fn collect_sub_dag_and_commit(
66        &mut self,
67        leader_block: VerifiedBlock,
68    ) -> (CommittedSubDag, TrustedCommit) {
69        let _s = self
70            .context
71            .metrics
72            .node_metrics
73            .scope_processing_time
74            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75            .start_timer();
76
77        // Grab latest commit state from dag state
78        let mut dag_state = self.dag_state.write();
79        let last_commit_index = dag_state.last_commit_index();
80        let last_commit_digest = dag_state.last_commit_digest();
81        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83        // Now linearize the sub-dag starting from the leader block
84        let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86        let timestamp_ms = Self::calculate_commit_timestamp(
87            &self.context,
88            &mut dag_state,
89            &leader_block,
90            last_commit_timestamp_ms,
91        );
92
93        drop(dag_state);
94
95        // Create the Commit.
96        let commit = Commit::new(
97            last_commit_index + 1,
98            last_commit_digest,
99            timestamp_ms,
100            leader_block.reference(),
101            to_commit
102                .iter()
103                .map(|block| block.reference())
104                .collect::<Vec<_>>(),
105        );
106        let serialized = commit
107            .serialize()
108            .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109        let commit = TrustedCommit::new_trusted(commit, serialized);
110
111        // Create the corresponding committed sub dag
112        let sub_dag = CommittedSubDag::new(
113            leader_block.reference(),
114            to_commit,
115            timestamp_ms,
116            commit.reference(),
117        );
118
119        (sub_dag, commit)
120    }
121
122    /// Calculates the commit's timestamp. The timestamp will be calculated as the median of leader's parents (leader.round - 1)
123    /// timestamps by stake. To ensure that commit timestamp monotonicity is respected it is compared against the `last_commit_timestamp_ms`
124    /// and the maximum of the two is returned.
125    pub(crate) fn calculate_commit_timestamp(
126        context: &Context,
127        dag_state: &mut impl BlockStoreAPI,
128        leader_block: &VerifiedBlock,
129        last_commit_timestamp_ms: BlockTimestampMs,
130    ) -> BlockTimestampMs {
131        let timestamp_ms = {
132            // Select leaders' parent blocks.
133            let block_refs = leader_block
134                .ancestors()
135                .iter()
136                .filter(|block_ref| block_ref.round == leader_block.round() - 1)
137                .cloned()
138                .collect::<Vec<_>>();
139            // Get the blocks from dag state which should not fail.
140            let blocks = dag_state
141                .get_blocks(&block_refs)
142                .into_iter()
143                .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
144            median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
145                panic!(
146                    "Cannot compute median timestamp for leader block {:?} ancestors: {}",
147                    leader_block, e
148                )
149            })
150        };
151
152        // Always make sure that commit timestamps are monotonic, so override if necessary.
153        timestamp_ms.max(last_commit_timestamp_ms)
154    }
155
156    pub(crate) fn linearize_sub_dag(
157        leader_block: VerifiedBlock,
158        dag_state: &mut impl BlockStoreAPI,
159    ) -> Vec<VerifiedBlock> {
160        // The GC round here is calculated based on the last committed round of the leader block. The algorithm will attempt to
161        // commit blocks up to this GC round. Once this commit has been processed and written to DagState, then gc round will update
162        // and on the processing of the next commit we'll have it already updated, so no need to do any gc_round recalculations here.
163        // We just use whatever is currently in DagState.
164        let gc_round: Round = dag_state.gc_round();
165        let leader_block_ref = leader_block.reference();
166        let mut buffer = vec![leader_block];
167        let mut to_commit = Vec::new();
168
169        // Perform the recursion without stopping at the highest round round that has been committed per authority. Instead it will
170        // allow to commit blocks that are lower than the highest committed round for an authority but higher than gc_round.
171        assert!(
172            dag_state.set_committed(&leader_block_ref),
173            "Leader block with reference {:?} attempted to be committed twice",
174            leader_block_ref
175        );
176
177        while let Some(x) = buffer.pop() {
178            to_commit.push(x.clone());
179
180            let ancestors: Vec<VerifiedBlock> = dag_state
181                .get_blocks(
182                    &x.ancestors()
183                        .iter()
184                        .copied()
185                        .filter(|ancestor| {
186                            ancestor.round > gc_round && !dag_state.is_committed(ancestor)
187                        })
188                        .collect::<Vec<_>>(),
189                )
190                .into_iter()
191                .map(|ancestor_opt| {
192                    ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
193                })
194                .collect();
195
196            for ancestor in ancestors {
197                buffer.push(ancestor.clone());
198                assert!(
199                    dag_state.set_committed(&ancestor.reference()),
200                    "Block with reference {:?} attempted to be committed twice",
201                    ancestor.reference()
202                );
203            }
204        }
205
206        // The above code should have not yielded any blocks that are <= gc_round, but just to make sure that we'll never
207        // commit anything that should be garbage collected we attempt to prune here as well.
208        assert!(
209            to_commit.iter().all(|block| block.round() > gc_round),
210            "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
211            leader_block_ref
212        );
213
214        // Sort the blocks of the sub-dag blocks
215        sort_sub_dag_blocks(&mut to_commit);
216
217        to_commit
218    }
219
220    // This function should be called whenever a new commit is observed. This will
221    // iterate over the sequence of committed leaders and produce a list of committed
222    // sub-dags.
223    pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
224        if committed_leaders.is_empty() {
225            return vec![];
226        }
227
228        let mut committed_sub_dags = vec![];
229        for leader_block in committed_leaders {
230            // Collect the sub-dag generated using each of these leaders and the corresponding commit.
231            let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
232
233            self.update_blocks_pruned_metric(&sub_dag);
234
235            // Buffer commit in dag state for persistence later.
236            // This also updates the last committed rounds.
237            self.dag_state.write().add_commit(commit.clone());
238
239            committed_sub_dags.push(sub_dag);
240        }
241
242        committed_sub_dags
243    }
244
245    // Try to measure the number of blocks that get pruned due to GC. This is not very accurate, but it can give us a good enough idea.
246    // We consider a block as pruned when it is an ancestor of a block that has been committed as part of the provided `sub_dag`, but
247    // it has not been committed as part of previous commits. Right now we measure this via checking that highest committed round for the authority
248    // as we don't an efficient look up functionality to check if a block has been committed or not.
249    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
250        let (last_committed_rounds, gc_round) = {
251            let dag_state = self.dag_state.read();
252            (dag_state.last_committed_rounds(), dag_state.gc_round())
253        };
254
255        for block_ref in sub_dag
256            .blocks
257            .iter()
258            .flat_map(|block| block.ancestors())
259            .filter(
260                |ancestor_ref| {
261                    ancestor_ref.round <= gc_round
262                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
263                }, // If the last committed round is the same as the pruned block's round, then we know for sure that it has been committed and it doesn't count here
264                   // as pruned block.
265            )
266            .unique()
267        {
268            let hostname = &self.context.committee.authority(block_ref.author).hostname;
269
270            // If the last committed round from this authority is lower than the pruned ancestor in question, then we know for sure that it has not been committed.
271            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
272                &[hostname, "uncommitted"]
273            } else {
274                // If last committed round is higher for this authority, then we don't really know it's status, but we know that there is a higher committed block from this authority.
275                &[hostname, "higher_committed"]
276            };
277
278            self.context
279                .metrics
280                .node_metrics
281                .blocks_pruned_on_commit
282                .with_label_values(label_values)
283                .inc();
284        }
285    }
286}
287
288/// Computes the median timestamp of the blocks weighted by the stake of their authorities.
289/// This function assumes each block comes from a different authority of the same round.
290/// Error is returned if no blocks are provided or total stake is less than quorum threshold.
291pub(crate) fn median_timestamp_by_stake(
292    context: &Context,
293    blocks: impl Iterator<Item = VerifiedBlock>,
294) -> Result<BlockTimestampMs, String> {
295    let mut total_stake = 0;
296    let mut timestamps = vec![];
297    for block in blocks {
298        let stake = context.committee.authority(block.author()).stake;
299        timestamps.push((block.timestamp_ms(), stake));
300        total_stake += stake;
301    }
302
303    if timestamps.is_empty() {
304        return Err("No blocks provided".to_string());
305    }
306    if total_stake < context.committee.quorum_threshold() {
307        return Err(format!(
308            "Total stake {} < quorum threshold {}",
309            total_stake,
310            context.committee.quorum_threshold()
311        )
312        .to_string());
313    }
314
315    Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
316}
317
318fn median_timestamps_by_stake_inner(
319    mut timestamps: Vec<(BlockTimestampMs, Stake)>,
320    total_stake: Stake,
321) -> BlockTimestampMs {
322    timestamps.sort_by_key(|(ts, _)| *ts);
323
324    let mut cumulative_stake = 0;
325    for (ts, stake) in &timestamps {
326        cumulative_stake += stake;
327        if cumulative_stake > total_stake / 2 {
328            return *ts;
329        }
330    }
331
332    timestamps.last().unwrap().0
333}
334
335#[cfg(test)]
336mod tests {
337    use consensus_config::AuthorityIndex;
338    use rstest::rstest;
339
340    use super::*;
341    use crate::{
342        CommitIndex, TestBlock,
343        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
344        context::Context,
345        leader_schedule::{LeaderSchedule, LeaderSwapTable},
346        storage::mem_store::MemStore,
347        test_dag_builder::DagBuilder,
348        test_dag_parser::parse_dag,
349    };
350
351    #[rstest]
352    #[tokio::test]
353    async fn test_handle_commit() {
354        telemetry_subscribers::init_for_testing();
355        let num_authorities = 4;
356        let (context, _keys) = Context::new_for_test(num_authorities);
357        let context = Arc::new(context);
358
359        let dag_state = Arc::new(RwLock::new(DagState::new(
360            context.clone(),
361            Arc::new(MemStore::new()),
362        )));
363        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
364
365        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
366        let num_rounds: u32 = 10;
367        let mut dag_builder = DagBuilder::new(context.clone());
368        dag_builder
369            .layers(1..=num_rounds)
370            .build()
371            .persist_layers(dag_state.clone());
372
373        let leaders = dag_builder
374            .leader_blocks(1..=num_rounds)
375            .into_iter()
376            .map(Option::unwrap)
377            .collect::<Vec<_>>();
378
379        let commits = linearizer.handle_commit(leaders.clone());
380        for (idx, subdag) in commits.into_iter().enumerate() {
381            tracing::info!("{subdag:?}");
382            assert_eq!(subdag.leader, leaders[idx].reference());
383
384            let expected_ts = {
385                let block_refs = leaders[idx]
386                    .ancestors()
387                    .iter()
388                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
389                    .cloned()
390                    .collect::<Vec<_>>();
391                let blocks = dag_state
392                    .read()
393                    .get_blocks(&block_refs)
394                    .into_iter()
395                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
396
397                median_timestamp_by_stake(&context, blocks).unwrap()
398            };
399            assert_eq!(subdag.timestamp_ms, expected_ts);
400
401            if idx == 0 {
402                // First subdag includes the leader block only
403                assert_eq!(subdag.blocks.len(), 1);
404            } else {
405                // Every subdag after will be missing the leader block from the previous
406                // committed subdag
407                assert_eq!(subdag.blocks.len(), num_authorities);
408            }
409            for block in subdag.blocks.iter() {
410                assert!(block.round() <= leaders[idx].round());
411            }
412            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
413        }
414    }
415
416    #[rstest]
417    #[tokio::test]
418    async fn test_handle_already_committed() {
419        telemetry_subscribers::init_for_testing();
420        let num_authorities = 4;
421        let (context, _) = Context::new_for_test(num_authorities);
422        let context = Arc::new(context);
423
424        let dag_state = Arc::new(RwLock::new(DagState::new(
425            context.clone(),
426            Arc::new(MemStore::new()),
427        )));
428        let leader_schedule = Arc::new(LeaderSchedule::new(
429            context.clone(),
430            LeaderSwapTable::default(),
431        ));
432        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
433        let wave_length = DEFAULT_WAVE_LENGTH;
434
435        let leader_round_wave_1 = 3;
436        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
437
438        // Build a Dag from round 1..=6
439        let mut dag_builder = DagBuilder::new(context.clone());
440        dag_builder.layers(1..=leader_round_wave_2).build();
441
442        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
443        // And then only the leader of round leader_round_wave_1
444        // Also store those to DagState
445        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
446        blocks.push(
447            dag_builder
448                .leader_block(leader_round_wave_1)
449                .expect("Leader block should have been found"),
450        );
451        dag_state.write().accept_blocks(blocks.clone());
452
453        let first_leader = dag_builder
454            .leader_block(leader_round_wave_1)
455            .expect("Wave 1 leader round block should exist");
456        let mut last_commit_index = 1;
457        let first_commit_data = TrustedCommit::new_for_test(
458            last_commit_index,
459            CommitDigest::MIN,
460            0,
461            first_leader.reference(),
462            blocks.iter().map(|block| block.reference()).collect(),
463        );
464        dag_state.write().add_commit(first_commit_data);
465
466        // Mark the blocks as committed in DagState. This will allow to correctly detect the committed blocks when the new linearizer logic is enabled.
467        for block in blocks.iter() {
468            dag_state.write().set_committed(&block.reference());
469        }
470
471        // Now take all the blocks from round `leader_round_wave_1` up to round `leader_round_wave_2-1`
472        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
473        // Filter out leader block of round `leader_round_wave_1`
474        blocks.retain(|block| {
475            !(block.round() == leader_round_wave_1
476                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
477        });
478        // Add the leader block of round `leader_round_wave_2`
479        blocks.push(
480            dag_builder
481                .leader_block(leader_round_wave_2)
482                .expect("Leader block should have been found"),
483        );
484        // Write them in dag state
485        dag_state.write().accept_blocks(blocks.clone());
486
487        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
488
489        // Now get the latest leader which is the leader round of wave 2
490        let leader = dag_builder
491            .leader_block(leader_round_wave_2)
492            .expect("Leader block should exist");
493
494        last_commit_index += 1;
495        let expected_second_commit = TrustedCommit::new_for_test(
496            last_commit_index,
497            CommitDigest::MIN,
498            0,
499            leader.reference(),
500            blocks.clone(),
501        );
502
503        let commit = linearizer.handle_commit(vec![leader.clone()]);
504        assert_eq!(commit.len(), 1);
505
506        let subdag = &commit[0];
507        tracing::info!("{subdag:?}");
508        assert_eq!(subdag.leader, leader.reference());
509        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
510
511        let expected_ts = median_timestamp_by_stake(
512            &context,
513            subdag.blocks.iter().filter_map(|block| {
514                if block.round() == subdag.leader.round - 1 {
515                    Some(block.clone())
516                } else {
517                    None
518                }
519            }),
520        )
521        .unwrap();
522        assert_eq!(subdag.timestamp_ms, expected_ts);
523
524        // Using the same sorting as used in CommittedSubDag::sort
525        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
526        assert_eq!(
527            subdag
528                .blocks
529                .clone()
530                .into_iter()
531                .map(|b| b.reference())
532                .collect::<Vec<_>>(),
533            blocks
534        );
535        for block in subdag.blocks.iter() {
536            assert!(block.round() <= expected_second_commit.leader().round);
537        }
538    }
539
540    /// This test will run the linearizer with gc_depth = 3 and make
541    /// sure that for the exact same DAG the linearizer will commit different blocks according to the rules.
542    #[tokio::test]
543    async fn test_handle_commit_with_gc_simple() {
544        telemetry_subscribers::init_for_testing();
545
546        const GC_DEPTH: u32 = 3;
547
548        let num_authorities = 4;
549        let (mut context, _keys) = Context::new_for_test(num_authorities);
550        context
551            .protocol_config
552            .set_consensus_gc_depth_for_testing(GC_DEPTH);
553
554        let context = Arc::new(context);
555        let dag_state = Arc::new(RwLock::new(DagState::new(
556            context.clone(),
557            Arc::new(MemStore::new()),
558        )));
559        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
560
561        // Authorities of index 0->2 will always creates blocks that see each other, but until round 5 they won't see the blocks of authority 3.
562        // For authority 3 we create blocks that connect to all the other authorities.
563        // On round 5 we finally make the other authorities see the blocks of authority 3.
564        // Practically we "simulate" here a long chain created by authority 3 that is visible in round 5, but due to GC blocks of only round >=2 will
565        // be committed, when GC is enabled. When GC is disabled all blocks will be committed for rounds >= 1.
566        let dag_str = "DAG {
567                Round 0 : { 4 },
568                Round 1 : { * },
569                Round 2 : {
570                    A -> [-D1],
571                    B -> [-D1],
572                    C -> [-D1],
573                    D -> [*],
574                },
575                Round 3 : {
576                    A -> [-D2],
577                    B -> [-D2],
578                    C -> [-D2],
579                },
580                Round 4 : { 
581                    A -> [-D3],
582                    B -> [-D3],
583                    C -> [-D3],
584                    D -> [A3, B3, C3, D2],
585                },
586                Round 5 : { * },
587            }";
588
589        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
590        dag_builder.print();
591        dag_builder.persist_all_blocks(dag_state.clone());
592
593        let leaders = dag_builder
594            .leader_blocks(1..=6)
595            .into_iter()
596            .flatten()
597            .collect::<Vec<_>>();
598
599        let commits = linearizer.handle_commit(leaders.clone());
600        for (idx, subdag) in commits.into_iter().enumerate() {
601            tracing::info!("{subdag:?}");
602            assert_eq!(subdag.leader, leaders[idx].reference());
603
604            let expected_ts = {
605                let block_refs = leaders[idx]
606                    .ancestors()
607                    .iter()
608                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
609                    .cloned()
610                    .collect::<Vec<_>>();
611                let blocks = dag_state
612                    .read()
613                    .get_blocks(&block_refs)
614                    .into_iter()
615                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
616
617                median_timestamp_by_stake(&context, blocks).unwrap()
618            };
619            assert_eq!(subdag.timestamp_ms, expected_ts);
620
621            if idx == 0 {
622                // First subdag includes the leader block only
623                assert_eq!(subdag.blocks.len(), 1);
624            } else if idx == 1 {
625                assert_eq!(subdag.blocks.len(), 3);
626            } else if idx == 2 {
627                // We commit:
628                // * 1 block on round 4, the leader block
629                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was missing
630                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the "sub dag" of leader of round 3, which will be another 2 blocks
631                assert_eq!(subdag.blocks.len(), 6);
632            } else {
633                // Now it's going to be the first time that a leader will see the blocks of authority 3 and will attempt to commit
634                // the long chain. However, due to GC it will only commit blocks of round > 1. That's because it will commit blocks
635                // up to previous leader's round (round = 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
636                // to see on the sub dag committed blocks of round >= 2.
637                assert_eq!(subdag.blocks.len(), 5);
638
639                assert!(
640                    subdag.blocks.iter().all(|block| block.round() >= 2),
641                    "Found blocks that are of round < 2."
642                );
643
644                // Also ensure that gc_round has advanced with the latest committed leader
645                assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
646            }
647            for block in subdag.blocks.iter() {
648                assert!(block.round() <= leaders[idx].round());
649            }
650            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
651        }
652    }
653
654    #[tokio::test]
655    async fn test_handle_commit_below_highest_committed_round() {
656        telemetry_subscribers::init_for_testing();
657
658        const GC_DEPTH: u32 = 3;
659
660        let num_authorities = 4;
661        let (mut context, _keys) = Context::new_for_test(num_authorities);
662        context
663            .protocol_config
664            .set_consensus_gc_depth_for_testing(GC_DEPTH);
665
666        let context = Arc::new(context);
667        let dag_state = Arc::new(RwLock::new(DagState::new(
668            context.clone(),
669            Arc::new(MemStore::new()),
670        )));
671        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
672
673        // Authority D will create an "orphaned" block on round 1 as it won't reference to it on the block of round 2. Similar, no other authority will reference to it on round 2.
674        // Then on round 3 the authorities A, B & C will link to block D1. Once the DAG gets committed we should see the block D1 getting committed as well. Normally ,as block D2 would
675        // have been committed first block D1 should be ommitted. With the new logic this is no longer true.
676        let dag_str = "DAG {
677                Round 0 : { 4 },
678                Round 1 : { * },
679                Round 2 : {
680                    A -> [-D1],
681                    B -> [-D1],
682                    C -> [-D1],
683                    D -> [-D1],
684                },
685                Round 3 : {
686                    A -> [A2, B2, C2, D1],
687                    B -> [A2, B2, C2, D1],
688                    C -> [A2, B2, C2, D1],
689                    D -> [A2, B2, C2, D2]
690                },
691                Round 4 : { * },
692            }";
693
694        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
695        dag_builder.print();
696        dag_builder.persist_all_blocks(dag_state.clone());
697
698        let leaders = dag_builder
699            .leader_blocks(1..=4)
700            .into_iter()
701            .flatten()
702            .collect::<Vec<_>>();
703
704        let commits = linearizer.handle_commit(leaders.clone());
705        for (idx, subdag) in commits.into_iter().enumerate() {
706            tracing::info!("{subdag:?}");
707            assert_eq!(subdag.leader, leaders[idx].reference());
708
709            let expected_ts = {
710                let block_refs = leaders[idx]
711                    .ancestors()
712                    .iter()
713                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
714                    .cloned()
715                    .collect::<Vec<_>>();
716                let blocks = dag_state
717                    .read()
718                    .get_blocks(&block_refs)
719                    .into_iter()
720                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
721
722                median_timestamp_by_stake(&context, blocks).unwrap()
723            };
724            assert_eq!(subdag.timestamp_ms, expected_ts);
725
726            if idx == 0 {
727                // First subdag includes the leader block only B1
728                assert_eq!(subdag.blocks.len(), 1);
729            } else if idx == 1 {
730                // We commit:
731                // * 1 block on round 2, the leader block C2
732                // * 2 blocks on round 1, A1, C1
733                assert_eq!(subdag.blocks.len(), 3);
734            } else if idx == 2 {
735                // We commit:
736                // * 1 block on round 3, the leader block D3
737                // * 3 blocks on round 2, A2, B2, D2
738                assert_eq!(subdag.blocks.len(), 4);
739
740                assert!(
741                    subdag.blocks.iter().any(|block| block.round() == 2
742                        && block.author() == AuthorityIndex::new_for_test(3)),
743                    "Block D2 should have been committed."
744                );
745            } else if idx == 3 {
746                // We commit:
747                // * 1 block on round 4, the leader block A4
748                // * 3 blocks on round 3, A3, B3, C3
749                // * 1 block of round 1, D1
750                assert_eq!(subdag.blocks.len(), 5);
751                assert!(
752                    subdag.blocks.iter().any(|block| block.round() == 1
753                        && block.author() == AuthorityIndex::new_for_test(3)),
754                    "Block D1 should have been committed."
755                );
756            } else {
757                panic!("Unexpected subdag with index {:?}", idx);
758            }
759
760            for block in subdag.blocks.iter() {
761                assert!(block.round() <= leaders[idx].round());
762            }
763            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
764        }
765    }
766
767    #[rstest]
768    #[case(3_000, 3_000, 6_000)]
769    #[tokio::test]
770    async fn test_calculate_commit_timestamp(
771        #[case] timestamp_1: u64,
772        #[case] timestamp_2: u64,
773        #[case] timestamp_3: u64,
774    ) {
775        // GIVEN
776        telemetry_subscribers::init_for_testing();
777
778        let num_authorities = 4;
779        let (context, _keys) = Context::new_for_test(num_authorities);
780
781        let context = Arc::new(context);
782        let store = Arc::new(MemStore::new());
783        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
784        let mut dag_state = dag_state.write();
785
786        let ancestors = vec![
787            VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
788            VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
789            VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
790            VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
791        ];
792
793        let leader_block = VerifiedBlock::new_for_test(
794            TestBlock::new(5, 0)
795                .set_timestamp_ms(5_000)
796                .set_ancestors(
797                    ancestors
798                        .iter()
799                        .map(|block| block.reference())
800                        .collect::<Vec<_>>(),
801                )
802                .build(),
803        );
804
805        for block in &ancestors {
806            dag_state.accept_block(block.clone());
807        }
808
809        let last_commit_timestamp_ms = 0;
810
811        // WHEN
812        let timestamp = Linearizer::calculate_commit_timestamp(
813            &context,
814            &mut dag_state,
815            &leader_block,
816            last_commit_timestamp_ms,
817        );
818        assert_eq!(timestamp, timestamp_1);
819
820        // AND skip the block of authority 0 and round 4.
821        let leader_block = VerifiedBlock::new_for_test(
822            TestBlock::new(5, 0)
823                .set_timestamp_ms(5_000)
824                .set_ancestors(
825                    ancestors
826                        .iter()
827                        .skip(1)
828                        .map(|block| block.reference())
829                        .collect::<Vec<_>>(),
830                )
831                .build(),
832        );
833
834        let timestamp = Linearizer::calculate_commit_timestamp(
835            &context,
836            &mut dag_state,
837            &leader_block,
838            last_commit_timestamp_ms,
839        );
840        assert_eq!(timestamp, timestamp_2);
841
842        // AND set the `last_commit_timestamp_ms` to 6_000
843        let last_commit_timestamp_ms = 6_000;
844        let timestamp = Linearizer::calculate_commit_timestamp(
845            &context,
846            &mut dag_state,
847            &leader_block,
848            last_commit_timestamp_ms,
849        );
850        assert_eq!(timestamp, timestamp_3);
851
852        // AND there is only one ancestor block to commit
853        let (context, _) = Context::new_for_test(1);
854        let leader_block = VerifiedBlock::new_for_test(
855            TestBlock::new(5, 0)
856                .set_timestamp_ms(5_000)
857                .set_ancestors(
858                    ancestors
859                        .iter()
860                        .take(1)
861                        .map(|block| block.reference())
862                        .collect::<Vec<_>>(),
863                )
864                .build(),
865        );
866        let last_commit_timestamp_ms = 0;
867        let timestamp = Linearizer::calculate_commit_timestamp(
868            &context,
869            &mut dag_state,
870            &leader_block,
871            last_commit_timestamp_ms,
872        );
873        assert_eq!(timestamp, 1_000);
874    }
875
876    #[test]
877    fn test_median_timestamps_by_stake() {
878        // One total stake.
879        let timestamps = vec![(1_000, 1)];
880        assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
881
882        // Odd number of total stakes.
883        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
884        assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
885
886        // Even number of total stakes.
887        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
888        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
889
890        // Even number of total stakes, different order.
891        let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
892        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
893
894        // Unequal stakes.
895        let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
896        assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
897
898        // Unequal stakes.
899        let timestamps = vec![
900            (500, 2),
901            (4_000, 2),
902            (2_500, 3),
903            (1_000, 5),
904            (3_000, 3),
905            (2_000, 4),
906        ];
907        assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
908
909        // One authority dominates.
910        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
911        assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
912    }
913
914    #[tokio::test]
915    async fn test_median_timestamps_by_stake_errors() {
916        let num_authorities = 4;
917        let (context, _keys) = Context::new_for_test(num_authorities);
918        let context = Arc::new(context);
919
920        // No blocks provided
921        let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
922        assert_eq!(err, "No blocks provided");
923
924        // Blocks provided but total stake is less than quorum threshold
925        let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
926        let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
927        assert_eq!(err, "Total stake 1 < quorum threshold 3");
928    }
929}