consensus_core/
linearizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12    block::{BlockAPI, VerifiedBlock},
13    commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14    context::Context,
15    dag_state::DagState,
16};
17
18/// The `StorageAPI` trait provides an interface for the block store and has been
19/// mostly introduced for allowing to inject the test store in `DagBuilder`.
20pub(crate) trait BlockStoreAPI {
21    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23    fn gc_round(&self) -> Round;
24
25    fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27    fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31    for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33    fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34        DagState::get_blocks(self, refs)
35    }
36
37    fn gc_round(&self) -> Round {
38        DagState::gc_round(self)
39    }
40
41    fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42        DagState::set_committed(self, block_ref)
43    }
44
45    fn is_committed(&self, block_ref: &BlockRef) -> bool {
46        DagState::is_committed(self, block_ref)
47    }
48}
49
50/// Expand a committed sequence of leader into a sequence of sub-dags.
51#[derive(Clone)]
52pub struct Linearizer {
53    /// In memory block store representing the dag state
54    context: Arc<Context>,
55    dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59    pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60        Self { context, dag_state }
61    }
62
63    /// Collect the sub-dag and the corresponding commit from a specific leader excluding any duplicates or
64    /// blocks that have already been committed (within previous sub-dags).
65    fn collect_sub_dag_and_commit(
66        &mut self,
67        leader_block: VerifiedBlock,
68    ) -> (CommittedSubDag, TrustedCommit) {
69        let _s = self
70            .context
71            .metrics
72            .node_metrics
73            .scope_processing_time
74            .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75            .start_timer();
76
77        // Grab latest commit state from dag state
78        let mut dag_state = self.dag_state.write();
79        let last_commit_index = dag_state.last_commit_index();
80        let last_commit_digest = dag_state.last_commit_digest();
81        let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83        // Now linearize the sub-dag starting from the leader block
84        let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86        let timestamp_ms = Self::calculate_commit_timestamp(
87            &self.context,
88            &mut dag_state,
89            &leader_block,
90            last_commit_timestamp_ms,
91        );
92
93        drop(dag_state);
94
95        // Create the Commit.
96        let commit = Commit::new(
97            last_commit_index + 1,
98            last_commit_digest,
99            timestamp_ms,
100            leader_block.reference(),
101            to_commit
102                .iter()
103                .map(|block| block.reference())
104                .collect::<Vec<_>>(),
105        );
106        let serialized = commit
107            .serialize()
108            .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109        let commit = TrustedCommit::new_trusted(commit, serialized);
110
111        // Create the corresponding committed sub dag
112        let sub_dag = CommittedSubDag::new(
113            leader_block.reference(),
114            to_commit,
115            timestamp_ms,
116            commit.reference(),
117            self.context
118                .protocol_config
119                .consensus_always_accept_system_transactions(),
120        );
121
122        (sub_dag, commit)
123    }
124
125    /// Calculates the commit's timestamp. The timestamp will be calculated as the median of leader's parents (leader.round - 1)
126    /// timestamps by stake. To ensure that commit timestamp monotonicity is respected it is compared against the `last_commit_timestamp_ms`
127    /// and the maximum of the two is returned.
128    pub(crate) fn calculate_commit_timestamp(
129        context: &Context,
130        dag_state: &mut impl BlockStoreAPI,
131        leader_block: &VerifiedBlock,
132        last_commit_timestamp_ms: BlockTimestampMs,
133    ) -> BlockTimestampMs {
134        let timestamp_ms = {
135            // Select leaders' parent blocks.
136            let block_refs = leader_block
137                .ancestors()
138                .iter()
139                .filter(|block_ref| block_ref.round == leader_block.round() - 1)
140                .cloned()
141                .collect::<Vec<_>>();
142            // Get the blocks from dag state which should not fail.
143            let blocks = dag_state
144                .get_blocks(&block_refs)
145                .into_iter()
146                .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
147            median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
148                panic!(
149                    "Cannot compute median timestamp for leader block {:?} ancestors: {}",
150                    leader_block, e
151                )
152            })
153        };
154
155        // Always make sure that commit timestamps are monotonic, so override if necessary.
156        timestamp_ms.max(last_commit_timestamp_ms)
157    }
158
159    pub(crate) fn linearize_sub_dag(
160        leader_block: VerifiedBlock,
161        dag_state: &mut impl BlockStoreAPI,
162    ) -> Vec<VerifiedBlock> {
163        // The GC round here is calculated based on the last committed round of the leader block. The algorithm will attempt to
164        // commit blocks up to this GC round. Once this commit has been processed and written to DagState, then gc round will update
165        // and on the processing of the next commit we'll have it already updated, so no need to do any gc_round recalculations here.
166        // We just use whatever is currently in DagState.
167        let gc_round: Round = dag_state.gc_round();
168        let leader_block_ref = leader_block.reference();
169        let mut buffer = vec![leader_block];
170        let mut to_commit = Vec::new();
171
172        // Perform the recursion without stopping at the highest round round that has been committed per authority. Instead it will
173        // allow to commit blocks that are lower than the highest committed round for an authority but higher than gc_round.
174        assert!(
175            dag_state.set_committed(&leader_block_ref),
176            "Leader block with reference {:?} attempted to be committed twice",
177            leader_block_ref
178        );
179
180        while let Some(x) = buffer.pop() {
181            to_commit.push(x.clone());
182
183            let ancestors: Vec<VerifiedBlock> = dag_state
184                .get_blocks(
185                    &x.ancestors()
186                        .iter()
187                        .copied()
188                        .filter(|ancestor| {
189                            ancestor.round > gc_round && !dag_state.is_committed(ancestor)
190                        })
191                        .collect::<Vec<_>>(),
192                )
193                .into_iter()
194                .map(|ancestor_opt| {
195                    ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
196                })
197                .collect();
198
199            for ancestor in ancestors {
200                buffer.push(ancestor.clone());
201                assert!(
202                    dag_state.set_committed(&ancestor.reference()),
203                    "Block with reference {:?} attempted to be committed twice",
204                    ancestor.reference()
205                );
206            }
207        }
208
209        // The above code should have not yielded any blocks that are <= gc_round, but just to make sure that we'll never
210        // commit anything that should be garbage collected we attempt to prune here as well.
211        assert!(
212            to_commit.iter().all(|block| block.round() > gc_round),
213            "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
214            leader_block_ref
215        );
216
217        // Sort the blocks of the sub-dag blocks
218        sort_sub_dag_blocks(&mut to_commit);
219
220        to_commit
221    }
222
223    // This function should be called whenever a new commit is observed. This will
224    // iterate over the sequence of committed leaders and produce a list of committed
225    // sub-dags.
226    pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
227        if committed_leaders.is_empty() {
228            return vec![];
229        }
230
231        let mut committed_sub_dags = vec![];
232        for leader_block in committed_leaders {
233            // Collect the sub-dag generated using each of these leaders and the corresponding commit.
234            let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
235
236            self.update_blocks_pruned_metric(&sub_dag);
237
238            // Buffer commit in dag state for persistence later.
239            // This also updates the last committed rounds.
240            self.dag_state.write().add_commit(commit.clone());
241
242            committed_sub_dags.push(sub_dag);
243        }
244
245        committed_sub_dags
246    }
247
248    // Try to measure the number of blocks that get pruned due to GC. This is not very accurate, but it can give us a good enough idea.
249    // We consider a block as pruned when it is an ancestor of a block that has been committed as part of the provided `sub_dag`, but
250    // it has not been committed as part of previous commits. Right now we measure this via checking that highest committed round for the authority
251    // as we don't an efficient look up functionality to check if a block has been committed or not.
252    fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
253        let (last_committed_rounds, gc_round) = {
254            let dag_state = self.dag_state.read();
255            (dag_state.last_committed_rounds(), dag_state.gc_round())
256        };
257
258        for block_ref in sub_dag
259            .blocks
260            .iter()
261            .flat_map(|block| block.ancestors())
262            .filter(
263                |ancestor_ref| {
264                    ancestor_ref.round <= gc_round
265                        && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
266                }, // If the last committed round is the same as the pruned block's round, then we know for sure that it has been committed and it doesn't count here
267                   // as pruned block.
268            )
269            .unique()
270        {
271            let hostname = &self.context.committee.authority(block_ref.author).hostname;
272
273            // If the last committed round from this authority is lower than the pruned ancestor in question, then we know for sure that it has not been committed.
274            let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
275                &[hostname, "uncommitted"]
276            } else {
277                // If last committed round is higher for this authority, then we don't really know it's status, but we know that there is a higher committed block from this authority.
278                &[hostname, "higher_committed"]
279            };
280
281            self.context
282                .metrics
283                .node_metrics
284                .blocks_pruned_on_commit
285                .with_label_values(label_values)
286                .inc();
287        }
288    }
289}
290
291/// Computes the median timestamp of the blocks weighted by the stake of their authorities.
292/// This function assumes each block comes from a different authority of the same round.
293/// Error is returned if no blocks are provided or total stake is less than quorum threshold.
294pub(crate) fn median_timestamp_by_stake(
295    context: &Context,
296    blocks: impl Iterator<Item = VerifiedBlock>,
297) -> Result<BlockTimestampMs, String> {
298    let mut total_stake = 0;
299    let mut timestamps = vec![];
300    for block in blocks {
301        let stake = context.committee.authority(block.author()).stake;
302        timestamps.push((block.timestamp_ms(), stake));
303        total_stake += stake;
304    }
305
306    if timestamps.is_empty() {
307        return Err("No blocks provided".to_string());
308    }
309    if total_stake < context.committee.quorum_threshold() {
310        return Err(format!(
311            "Total stake {} < quorum threshold {}",
312            total_stake,
313            context.committee.quorum_threshold()
314        ));
315    }
316
317    Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
318}
319
320fn median_timestamps_by_stake_inner(
321    mut timestamps: Vec<(BlockTimestampMs, Stake)>,
322    total_stake: Stake,
323) -> BlockTimestampMs {
324    timestamps.sort_by_key(|(ts, _)| *ts);
325
326    let mut cumulative_stake = 0;
327    for (ts, stake) in &timestamps {
328        cumulative_stake += stake;
329        if cumulative_stake > total_stake / 2 {
330            return *ts;
331        }
332    }
333
334    timestamps.last().unwrap().0
335}
336
337#[cfg(test)]
338mod tests {
339    use consensus_config::AuthorityIndex;
340    use rstest::rstest;
341
342    use super::*;
343    use crate::{
344        CommitIndex, TestBlock,
345        commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
346        context::Context,
347        leader_schedule::{LeaderSchedule, LeaderSwapTable},
348        storage::mem_store::MemStore,
349        test_dag_builder::DagBuilder,
350        test_dag_parser::parse_dag,
351    };
352
353    #[rstest]
354    #[tokio::test]
355    async fn test_handle_commit() {
356        telemetry_subscribers::init_for_testing();
357        let num_authorities = 4;
358        let (context, _keys) = Context::new_for_test(num_authorities);
359        let context = Arc::new(context);
360
361        let dag_state = Arc::new(RwLock::new(DagState::new(
362            context.clone(),
363            Arc::new(MemStore::new()),
364        )));
365        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
366
367        // Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
368        let num_rounds: u32 = 10;
369        let mut dag_builder = DagBuilder::new(context.clone());
370        dag_builder
371            .layers(1..=num_rounds)
372            .build()
373            .persist_layers(dag_state.clone());
374
375        let leaders = dag_builder
376            .leader_blocks(1..=num_rounds)
377            .into_iter()
378            .map(Option::unwrap)
379            .collect::<Vec<_>>();
380
381        let commits = linearizer.handle_commit(leaders.clone());
382        for (idx, subdag) in commits.into_iter().enumerate() {
383            tracing::info!("{subdag:?}");
384            assert_eq!(subdag.leader, leaders[idx].reference());
385
386            let expected_ts = {
387                let block_refs = leaders[idx]
388                    .ancestors()
389                    .iter()
390                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
391                    .cloned()
392                    .collect::<Vec<_>>();
393                let blocks = dag_state
394                    .read()
395                    .get_blocks(&block_refs)
396                    .into_iter()
397                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
398
399                median_timestamp_by_stake(&context, blocks).unwrap()
400            };
401            assert_eq!(subdag.timestamp_ms, expected_ts);
402
403            if idx == 0 {
404                // First subdag includes the leader block only
405                assert_eq!(subdag.blocks.len(), 1);
406            } else {
407                // Every subdag after will be missing the leader block from the previous
408                // committed subdag
409                assert_eq!(subdag.blocks.len(), num_authorities);
410            }
411            for block in subdag.blocks.iter() {
412                assert!(block.round() <= leaders[idx].round());
413            }
414            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
415        }
416    }
417
418    #[rstest]
419    #[tokio::test]
420    async fn test_handle_already_committed() {
421        telemetry_subscribers::init_for_testing();
422        let num_authorities = 4;
423        let (context, _) = Context::new_for_test(num_authorities);
424        let context = Arc::new(context);
425
426        let dag_state = Arc::new(RwLock::new(DagState::new(
427            context.clone(),
428            Arc::new(MemStore::new()),
429        )));
430        let leader_schedule = Arc::new(LeaderSchedule::new(
431            context.clone(),
432            LeaderSwapTable::default(),
433        ));
434        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
435        let wave_length = DEFAULT_WAVE_LENGTH;
436
437        let leader_round_wave_1 = 3;
438        let leader_round_wave_2 = leader_round_wave_1 + wave_length;
439
440        // Build a Dag from round 1..=6
441        let mut dag_builder = DagBuilder::new(context.clone());
442        dag_builder.layers(1..=leader_round_wave_2).build();
443
444        // Now retrieve all the blocks up to round leader_round_wave_1 - 1
445        // And then only the leader of round leader_round_wave_1
446        // Also store those to DagState
447        let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
448        blocks.push(
449            dag_builder
450                .leader_block(leader_round_wave_1)
451                .expect("Leader block should have been found"),
452        );
453        dag_state.write().accept_blocks(blocks.clone());
454
455        let first_leader = dag_builder
456            .leader_block(leader_round_wave_1)
457            .expect("Wave 1 leader round block should exist");
458        let mut last_commit_index = 1;
459        let first_commit_data = TrustedCommit::new_for_test(
460            last_commit_index,
461            CommitDigest::MIN,
462            0,
463            first_leader.reference(),
464            blocks.iter().map(|block| block.reference()).collect(),
465        );
466        dag_state.write().add_commit(first_commit_data);
467
468        // Mark the blocks as committed in DagState. This will allow to correctly detect the committed blocks when the new linearizer logic is enabled.
469        for block in blocks.iter() {
470            dag_state.write().set_committed(&block.reference());
471        }
472
473        // Now take all the blocks from round `leader_round_wave_1` up to round `leader_round_wave_2-1`
474        let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
475        // Filter out leader block of round `leader_round_wave_1`
476        blocks.retain(|block| {
477            !(block.round() == leader_round_wave_1
478                && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
479        });
480        // Add the leader block of round `leader_round_wave_2`
481        blocks.push(
482            dag_builder
483                .leader_block(leader_round_wave_2)
484                .expect("Leader block should have been found"),
485        );
486        // Write them in dag state
487        dag_state.write().accept_blocks(blocks.clone());
488
489        let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
490
491        // Now get the latest leader which is the leader round of wave 2
492        let leader = dag_builder
493            .leader_block(leader_round_wave_2)
494            .expect("Leader block should exist");
495
496        last_commit_index += 1;
497        let expected_second_commit = TrustedCommit::new_for_test(
498            last_commit_index,
499            CommitDigest::MIN,
500            0,
501            leader.reference(),
502            blocks.clone(),
503        );
504
505        let commit = linearizer.handle_commit(vec![leader.clone()]);
506        assert_eq!(commit.len(), 1);
507
508        let subdag = &commit[0];
509        tracing::info!("{subdag:?}");
510        assert_eq!(subdag.leader, leader.reference());
511        assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
512
513        let expected_ts = median_timestamp_by_stake(
514            &context,
515            subdag.blocks.iter().filter_map(|block| {
516                if block.round() == subdag.leader.round - 1 {
517                    Some(block.clone())
518                } else {
519                    None
520                }
521            }),
522        )
523        .unwrap();
524        assert_eq!(subdag.timestamp_ms, expected_ts);
525
526        // Using the same sorting as used in CommittedSubDag::sort
527        blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
528        assert_eq!(
529            subdag
530                .blocks
531                .clone()
532                .into_iter()
533                .map(|b| b.reference())
534                .collect::<Vec<_>>(),
535            blocks
536        );
537        for block in subdag.blocks.iter() {
538            assert!(block.round() <= expected_second_commit.leader().round);
539        }
540    }
541
542    /// This test will run the linearizer with gc_depth = 3 and make
543    /// sure that for the exact same DAG the linearizer will commit different blocks according to the rules.
544    #[tokio::test]
545    async fn test_handle_commit_with_gc_simple() {
546        telemetry_subscribers::init_for_testing();
547
548        const GC_DEPTH: u32 = 3;
549
550        let num_authorities = 4;
551        let (mut context, _keys) = Context::new_for_test(num_authorities);
552        context
553            .protocol_config
554            .set_consensus_gc_depth_for_testing(GC_DEPTH);
555
556        let context = Arc::new(context);
557        let dag_state = Arc::new(RwLock::new(DagState::new(
558            context.clone(),
559            Arc::new(MemStore::new()),
560        )));
561        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
562
563        // Authorities of index 0->2 will always creates blocks that see each other, but until round 5 they won't see the blocks of authority 3.
564        // For authority 3 we create blocks that connect to all the other authorities.
565        // On round 5 we finally make the other authorities see the blocks of authority 3.
566        // Practically we "simulate" here a long chain created by authority 3 that is visible in round 5, but due to GC blocks of only round >=2 will
567        // be committed, when GC is enabled. When GC is disabled all blocks will be committed for rounds >= 1.
568        let dag_str = "DAG {
569                Round 0 : { 4 },
570                Round 1 : { * },
571                Round 2 : {
572                    A -> [-D1],
573                    B -> [-D1],
574                    C -> [-D1],
575                    D -> [*],
576                },
577                Round 3 : {
578                    A -> [-D2],
579                    B -> [-D2],
580                    C -> [-D2],
581                },
582                Round 4 : { 
583                    A -> [-D3],
584                    B -> [-D3],
585                    C -> [-D3],
586                    D -> [A3, B3, C3, D2],
587                },
588                Round 5 : { * },
589            }";
590
591        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
592        dag_builder.print();
593        dag_builder.persist_all_blocks(dag_state.clone());
594
595        let leaders = dag_builder
596            .leader_blocks(1..=6)
597            .into_iter()
598            .flatten()
599            .collect::<Vec<_>>();
600
601        let commits = linearizer.handle_commit(leaders.clone());
602        for (idx, subdag) in commits.into_iter().enumerate() {
603            tracing::info!("{subdag:?}");
604            assert_eq!(subdag.leader, leaders[idx].reference());
605
606            let expected_ts = {
607                let block_refs = leaders[idx]
608                    .ancestors()
609                    .iter()
610                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
611                    .cloned()
612                    .collect::<Vec<_>>();
613                let blocks = dag_state
614                    .read()
615                    .get_blocks(&block_refs)
616                    .into_iter()
617                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
618
619                median_timestamp_by_stake(&context, blocks).unwrap()
620            };
621            assert_eq!(subdag.timestamp_ms, expected_ts);
622
623            if idx == 0 {
624                // First subdag includes the leader block only
625                assert_eq!(subdag.blocks.len(), 1);
626            } else if idx == 1 {
627                assert_eq!(subdag.blocks.len(), 3);
628            } else if idx == 2 {
629                // We commit:
630                // * 1 block on round 4, the leader block
631                // * 3 blocks on round 3, as no commit happened on round 3 since the leader was missing
632                // * 2 blocks on round 2, again as no commit happened on round 3, we commit the "sub dag" of leader of round 3, which will be another 2 blocks
633                assert_eq!(subdag.blocks.len(), 6);
634            } else {
635                // Now it's going to be the first time that a leader will see the blocks of authority 3 and will attempt to commit
636                // the long chain. However, due to GC it will only commit blocks of round > 1. That's because it will commit blocks
637                // up to previous leader's round (round = 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
638                // to see on the sub dag committed blocks of round >= 2.
639                assert_eq!(subdag.blocks.len(), 5);
640
641                assert!(
642                    subdag.blocks.iter().all(|block| block.round() >= 2),
643                    "Found blocks that are of round < 2."
644                );
645
646                // Also ensure that gc_round has advanced with the latest committed leader
647                assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
648            }
649            for block in subdag.blocks.iter() {
650                assert!(block.round() <= leaders[idx].round());
651            }
652            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
653        }
654    }
655
656    #[tokio::test]
657    async fn test_handle_commit_below_highest_committed_round() {
658        telemetry_subscribers::init_for_testing();
659
660        const GC_DEPTH: u32 = 3;
661
662        let num_authorities = 4;
663        let (mut context, _keys) = Context::new_for_test(num_authorities);
664        context
665            .protocol_config
666            .set_consensus_gc_depth_for_testing(GC_DEPTH);
667
668        let context = Arc::new(context);
669        let dag_state = Arc::new(RwLock::new(DagState::new(
670            context.clone(),
671            Arc::new(MemStore::new()),
672        )));
673        let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
674
675        // Authority D will create an "orphaned" block on round 1 as it won't reference to it on the block of round 2. Similar, no other authority will reference to it on round 2.
676        // Then on round 3 the authorities A, B & C will link to block D1. Once the DAG gets committed we should see the block D1 getting committed as well. Normally ,as block D2 would
677        // have been committed first block D1 should be ommitted. With the new logic this is no longer true.
678        let dag_str = "DAG {
679                Round 0 : { 4 },
680                Round 1 : { * },
681                Round 2 : {
682                    A -> [-D1],
683                    B -> [-D1],
684                    C -> [-D1],
685                    D -> [-D1],
686                },
687                Round 3 : {
688                    A -> [A2, B2, C2, D1],
689                    B -> [A2, B2, C2, D1],
690                    C -> [A2, B2, C2, D1],
691                    D -> [A2, B2, C2, D2]
692                },
693                Round 4 : { * },
694            }";
695
696        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
697        dag_builder.print();
698        dag_builder.persist_all_blocks(dag_state.clone());
699
700        let leaders = dag_builder
701            .leader_blocks(1..=4)
702            .into_iter()
703            .flatten()
704            .collect::<Vec<_>>();
705
706        let commits = linearizer.handle_commit(leaders.clone());
707        for (idx, subdag) in commits.into_iter().enumerate() {
708            tracing::info!("{subdag:?}");
709            assert_eq!(subdag.leader, leaders[idx].reference());
710
711            let expected_ts = {
712                let block_refs = leaders[idx]
713                    .ancestors()
714                    .iter()
715                    .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
716                    .cloned()
717                    .collect::<Vec<_>>();
718                let blocks = dag_state
719                    .read()
720                    .get_blocks(&block_refs)
721                    .into_iter()
722                    .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
723
724                median_timestamp_by_stake(&context, blocks).unwrap()
725            };
726            assert_eq!(subdag.timestamp_ms, expected_ts);
727
728            if idx == 0 {
729                // First subdag includes the leader block only B1
730                assert_eq!(subdag.blocks.len(), 1);
731            } else if idx == 1 {
732                // We commit:
733                // * 1 block on round 2, the leader block C2
734                // * 2 blocks on round 1, A1, C1
735                assert_eq!(subdag.blocks.len(), 3);
736            } else if idx == 2 {
737                // We commit:
738                // * 1 block on round 3, the leader block D3
739                // * 3 blocks on round 2, A2, B2, D2
740                assert_eq!(subdag.blocks.len(), 4);
741
742                assert!(
743                    subdag.blocks.iter().any(|block| block.round() == 2
744                        && block.author() == AuthorityIndex::new_for_test(3)),
745                    "Block D2 should have been committed."
746                );
747            } else if idx == 3 {
748                // We commit:
749                // * 1 block on round 4, the leader block A4
750                // * 3 blocks on round 3, A3, B3, C3
751                // * 1 block of round 1, D1
752                assert_eq!(subdag.blocks.len(), 5);
753                assert!(
754                    subdag.blocks.iter().any(|block| block.round() == 1
755                        && block.author() == AuthorityIndex::new_for_test(3)),
756                    "Block D1 should have been committed."
757                );
758            } else {
759                panic!("Unexpected subdag with index {:?}", idx);
760            }
761
762            for block in subdag.blocks.iter() {
763                assert!(block.round() <= leaders[idx].round());
764            }
765            assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
766        }
767    }
768
769    #[rstest]
770    #[case(3_000, 3_000, 6_000)]
771    #[tokio::test]
772    async fn test_calculate_commit_timestamp(
773        #[case] timestamp_1: u64,
774        #[case] timestamp_2: u64,
775        #[case] timestamp_3: u64,
776    ) {
777        // GIVEN
778        telemetry_subscribers::init_for_testing();
779
780        let num_authorities = 4;
781        let (context, _keys) = Context::new_for_test(num_authorities);
782
783        let context = Arc::new(context);
784        let store = Arc::new(MemStore::new());
785        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
786        let mut dag_state = dag_state.write();
787
788        let ancestors = vec![
789            VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
790            VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
791            VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
792            VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
793        ];
794
795        let leader_block = VerifiedBlock::new_for_test(
796            TestBlock::new(5, 0)
797                .set_timestamp_ms(5_000)
798                .set_ancestors(
799                    ancestors
800                        .iter()
801                        .map(|block| block.reference())
802                        .collect::<Vec<_>>(),
803                )
804                .build(),
805        );
806
807        for block in &ancestors {
808            dag_state.accept_block(block.clone());
809        }
810
811        let last_commit_timestamp_ms = 0;
812
813        // WHEN
814        let timestamp = Linearizer::calculate_commit_timestamp(
815            &context,
816            &mut dag_state,
817            &leader_block,
818            last_commit_timestamp_ms,
819        );
820        assert_eq!(timestamp, timestamp_1);
821
822        // AND skip the block of authority 0 and round 4.
823        let leader_block = VerifiedBlock::new_for_test(
824            TestBlock::new(5, 0)
825                .set_timestamp_ms(5_000)
826                .set_ancestors(
827                    ancestors
828                        .iter()
829                        .skip(1)
830                        .map(|block| block.reference())
831                        .collect::<Vec<_>>(),
832                )
833                .build(),
834        );
835
836        let timestamp = Linearizer::calculate_commit_timestamp(
837            &context,
838            &mut dag_state,
839            &leader_block,
840            last_commit_timestamp_ms,
841        );
842        assert_eq!(timestamp, timestamp_2);
843
844        // AND set the `last_commit_timestamp_ms` to 6_000
845        let last_commit_timestamp_ms = 6_000;
846        let timestamp = Linearizer::calculate_commit_timestamp(
847            &context,
848            &mut dag_state,
849            &leader_block,
850            last_commit_timestamp_ms,
851        );
852        assert_eq!(timestamp, timestamp_3);
853
854        // AND there is only one ancestor block to commit
855        let (context, _) = Context::new_for_test(1);
856        let leader_block = VerifiedBlock::new_for_test(
857            TestBlock::new(5, 0)
858                .set_timestamp_ms(5_000)
859                .set_ancestors(
860                    ancestors
861                        .iter()
862                        .take(1)
863                        .map(|block| block.reference())
864                        .collect::<Vec<_>>(),
865                )
866                .build(),
867        );
868        let last_commit_timestamp_ms = 0;
869        let timestamp = Linearizer::calculate_commit_timestamp(
870            &context,
871            &mut dag_state,
872            &leader_block,
873            last_commit_timestamp_ms,
874        );
875        assert_eq!(timestamp, 1_000);
876    }
877
878    #[test]
879    fn test_median_timestamps_by_stake() {
880        // One total stake.
881        let timestamps = vec![(1_000, 1)];
882        assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
883
884        // Odd number of total stakes.
885        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
886        assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
887
888        // Even number of total stakes.
889        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
890        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
891
892        // Even number of total stakes, different order.
893        let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
894        assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
895
896        // Unequal stakes.
897        let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
898        assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
899
900        // Unequal stakes.
901        let timestamps = vec![
902            (500, 2),
903            (4_000, 2),
904            (2_500, 3),
905            (1_000, 5),
906            (3_000, 3),
907            (2_000, 4),
908        ];
909        assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
910
911        // One authority dominates.
912        let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
913        assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
914    }
915
916    #[tokio::test]
917    async fn test_median_timestamps_by_stake_errors() {
918        let num_authorities = 4;
919        let (context, _keys) = Context::new_for_test(num_authorities);
920        let context = Arc::new(context);
921
922        // No blocks provided
923        let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
924        assert_eq!(err, "No blocks provided");
925
926        // Blocks provided but total stake is less than quorum threshold
927        let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
928        let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
929        assert_eq!(err, "Total stake 1 < quorum threshold 3");
930    }
931}