1use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12 block::{BlockAPI, VerifiedBlock},
13 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14 context::Context,
15 dag_state::DagState,
16};
17
18pub(crate) trait BlockStoreAPI {
21 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23 fn gc_round(&self) -> Round;
24
25 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27 fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34 DagState::get_blocks(self, refs)
35 }
36
37 fn gc_round(&self) -> Round {
38 DagState::gc_round(self)
39 }
40
41 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42 DagState::set_committed(self, block_ref)
43 }
44
45 fn is_committed(&self, block_ref: &BlockRef) -> bool {
46 DagState::is_committed(self, block_ref)
47 }
48}
49
50#[derive(Clone)]
52pub struct Linearizer {
53 context: Arc<Context>,
55 dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59 pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60 Self { context, dag_state }
61 }
62
63 fn collect_sub_dag_and_commit(
66 &mut self,
67 leader_block: VerifiedBlock,
68 ) -> (CommittedSubDag, TrustedCommit) {
69 let _s = self
70 .context
71 .metrics
72 .node_metrics
73 .scope_processing_time
74 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75 .start_timer();
76
77 let mut dag_state = self.dag_state.write();
79 let last_commit_index = dag_state.last_commit_index();
80 let last_commit_digest = dag_state.last_commit_digest();
81 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83 let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86 let timestamp_ms = Self::calculate_commit_timestamp(
87 &self.context,
88 &mut dag_state,
89 &leader_block,
90 last_commit_timestamp_ms,
91 );
92
93 drop(dag_state);
94
95 let commit = Commit::new(
97 last_commit_index + 1,
98 last_commit_digest,
99 timestamp_ms,
100 leader_block.reference(),
101 to_commit
102 .iter()
103 .map(|block| block.reference())
104 .collect::<Vec<_>>(),
105 );
106 let serialized = commit
107 .serialize()
108 .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109 let commit = TrustedCommit::new_trusted(commit, serialized);
110
111 let sub_dag = CommittedSubDag::new(
113 leader_block.reference(),
114 to_commit,
115 timestamp_ms,
116 commit.reference(),
117 );
118
119 (sub_dag, commit)
120 }
121
122 pub(crate) fn calculate_commit_timestamp(
126 context: &Context,
127 dag_state: &mut impl BlockStoreAPI,
128 leader_block: &VerifiedBlock,
129 last_commit_timestamp_ms: BlockTimestampMs,
130 ) -> BlockTimestampMs {
131 let timestamp_ms = {
132 let block_refs = leader_block
134 .ancestors()
135 .iter()
136 .filter(|block_ref| block_ref.round == leader_block.round() - 1)
137 .cloned()
138 .collect::<Vec<_>>();
139 let blocks = dag_state
141 .get_blocks(&block_refs)
142 .into_iter()
143 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
144 median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
145 panic!(
146 "Cannot compute median timestamp for leader block {:?} ancestors: {}",
147 leader_block, e
148 )
149 })
150 };
151
152 timestamp_ms.max(last_commit_timestamp_ms)
154 }
155
156 pub(crate) fn linearize_sub_dag(
157 leader_block: VerifiedBlock,
158 dag_state: &mut impl BlockStoreAPI,
159 ) -> Vec<VerifiedBlock> {
160 let gc_round: Round = dag_state.gc_round();
165 let leader_block_ref = leader_block.reference();
166 let mut buffer = vec![leader_block];
167 let mut to_commit = Vec::new();
168
169 assert!(
172 dag_state.set_committed(&leader_block_ref),
173 "Leader block with reference {:?} attempted to be committed twice",
174 leader_block_ref
175 );
176
177 while let Some(x) = buffer.pop() {
178 to_commit.push(x.clone());
179
180 let ancestors: Vec<VerifiedBlock> = dag_state
181 .get_blocks(
182 &x.ancestors()
183 .iter()
184 .copied()
185 .filter(|ancestor| {
186 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
187 })
188 .collect::<Vec<_>>(),
189 )
190 .into_iter()
191 .map(|ancestor_opt| {
192 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
193 })
194 .collect();
195
196 for ancestor in ancestors {
197 buffer.push(ancestor.clone());
198 assert!(
199 dag_state.set_committed(&ancestor.reference()),
200 "Block with reference {:?} attempted to be committed twice",
201 ancestor.reference()
202 );
203 }
204 }
205
206 assert!(
209 to_commit.iter().all(|block| block.round() > gc_round),
210 "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
211 leader_block_ref
212 );
213
214 sort_sub_dag_blocks(&mut to_commit);
216
217 to_commit
218 }
219
220 pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
224 if committed_leaders.is_empty() {
225 return vec![];
226 }
227
228 let mut committed_sub_dags = vec![];
229 for leader_block in committed_leaders {
230 let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
232
233 self.update_blocks_pruned_metric(&sub_dag);
234
235 self.dag_state.write().add_commit(commit.clone());
238
239 committed_sub_dags.push(sub_dag);
240 }
241
242 committed_sub_dags
243 }
244
245 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
250 let (last_committed_rounds, gc_round) = {
251 let dag_state = self.dag_state.read();
252 (dag_state.last_committed_rounds(), dag_state.gc_round())
253 };
254
255 for block_ref in sub_dag
256 .blocks
257 .iter()
258 .flat_map(|block| block.ancestors())
259 .filter(
260 |ancestor_ref| {
261 ancestor_ref.round <= gc_round
262 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
263 }, )
266 .unique()
267 {
268 let hostname = &self.context.committee.authority(block_ref.author).hostname;
269
270 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
272 &[hostname, "uncommitted"]
273 } else {
274 &[hostname, "higher_committed"]
276 };
277
278 self.context
279 .metrics
280 .node_metrics
281 .blocks_pruned_on_commit
282 .with_label_values(label_values)
283 .inc();
284 }
285 }
286}
287
288pub(crate) fn median_timestamp_by_stake(
292 context: &Context,
293 blocks: impl Iterator<Item = VerifiedBlock>,
294) -> Result<BlockTimestampMs, String> {
295 let mut total_stake = 0;
296 let mut timestamps = vec![];
297 for block in blocks {
298 let stake = context.committee.authority(block.author()).stake;
299 timestamps.push((block.timestamp_ms(), stake));
300 total_stake += stake;
301 }
302
303 if timestamps.is_empty() {
304 return Err("No blocks provided".to_string());
305 }
306 if total_stake < context.committee.quorum_threshold() {
307 return Err(format!(
308 "Total stake {} < quorum threshold {}",
309 total_stake,
310 context.committee.quorum_threshold()
311 )
312 .to_string());
313 }
314
315 Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
316}
317
318fn median_timestamps_by_stake_inner(
319 mut timestamps: Vec<(BlockTimestampMs, Stake)>,
320 total_stake: Stake,
321) -> BlockTimestampMs {
322 timestamps.sort_by_key(|(ts, _)| *ts);
323
324 let mut cumulative_stake = 0;
325 for (ts, stake) in ×tamps {
326 cumulative_stake += stake;
327 if cumulative_stake > total_stake / 2 {
328 return *ts;
329 }
330 }
331
332 timestamps.last().unwrap().0
333}
334
335#[cfg(test)]
336mod tests {
337 use consensus_config::AuthorityIndex;
338 use rstest::rstest;
339
340 use super::*;
341 use crate::{
342 CommitIndex, TestBlock,
343 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
344 context::Context,
345 leader_schedule::{LeaderSchedule, LeaderSwapTable},
346 storage::mem_store::MemStore,
347 test_dag_builder::DagBuilder,
348 test_dag_parser::parse_dag,
349 };
350
351 #[rstest]
352 #[tokio::test]
353 async fn test_handle_commit() {
354 telemetry_subscribers::init_for_testing();
355 let num_authorities = 4;
356 let (context, _keys) = Context::new_for_test(num_authorities);
357 let context = Arc::new(context);
358
359 let dag_state = Arc::new(RwLock::new(DagState::new(
360 context.clone(),
361 Arc::new(MemStore::new()),
362 )));
363 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
364
365 let num_rounds: u32 = 10;
367 let mut dag_builder = DagBuilder::new(context.clone());
368 dag_builder
369 .layers(1..=num_rounds)
370 .build()
371 .persist_layers(dag_state.clone());
372
373 let leaders = dag_builder
374 .leader_blocks(1..=num_rounds)
375 .into_iter()
376 .map(Option::unwrap)
377 .collect::<Vec<_>>();
378
379 let commits = linearizer.handle_commit(leaders.clone());
380 for (idx, subdag) in commits.into_iter().enumerate() {
381 tracing::info!("{subdag:?}");
382 assert_eq!(subdag.leader, leaders[idx].reference());
383
384 let expected_ts = {
385 let block_refs = leaders[idx]
386 .ancestors()
387 .iter()
388 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
389 .cloned()
390 .collect::<Vec<_>>();
391 let blocks = dag_state
392 .read()
393 .get_blocks(&block_refs)
394 .into_iter()
395 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
396
397 median_timestamp_by_stake(&context, blocks).unwrap()
398 };
399 assert_eq!(subdag.timestamp_ms, expected_ts);
400
401 if idx == 0 {
402 assert_eq!(subdag.blocks.len(), 1);
404 } else {
405 assert_eq!(subdag.blocks.len(), num_authorities);
408 }
409 for block in subdag.blocks.iter() {
410 assert!(block.round() <= leaders[idx].round());
411 }
412 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
413 }
414 }
415
416 #[rstest]
417 #[tokio::test]
418 async fn test_handle_already_committed() {
419 telemetry_subscribers::init_for_testing();
420 let num_authorities = 4;
421 let (context, _) = Context::new_for_test(num_authorities);
422 let context = Arc::new(context);
423
424 let dag_state = Arc::new(RwLock::new(DagState::new(
425 context.clone(),
426 Arc::new(MemStore::new()),
427 )));
428 let leader_schedule = Arc::new(LeaderSchedule::new(
429 context.clone(),
430 LeaderSwapTable::default(),
431 ));
432 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
433 let wave_length = DEFAULT_WAVE_LENGTH;
434
435 let leader_round_wave_1 = 3;
436 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
437
438 let mut dag_builder = DagBuilder::new(context.clone());
440 dag_builder.layers(1..=leader_round_wave_2).build();
441
442 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
446 blocks.push(
447 dag_builder
448 .leader_block(leader_round_wave_1)
449 .expect("Leader block should have been found"),
450 );
451 dag_state.write().accept_blocks(blocks.clone());
452
453 let first_leader = dag_builder
454 .leader_block(leader_round_wave_1)
455 .expect("Wave 1 leader round block should exist");
456 let mut last_commit_index = 1;
457 let first_commit_data = TrustedCommit::new_for_test(
458 last_commit_index,
459 CommitDigest::MIN,
460 0,
461 first_leader.reference(),
462 blocks.iter().map(|block| block.reference()).collect(),
463 );
464 dag_state.write().add_commit(first_commit_data);
465
466 for block in blocks.iter() {
468 dag_state.write().set_committed(&block.reference());
469 }
470
471 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
473 blocks.retain(|block| {
475 !(block.round() == leader_round_wave_1
476 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
477 });
478 blocks.push(
480 dag_builder
481 .leader_block(leader_round_wave_2)
482 .expect("Leader block should have been found"),
483 );
484 dag_state.write().accept_blocks(blocks.clone());
486
487 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
488
489 let leader = dag_builder
491 .leader_block(leader_round_wave_2)
492 .expect("Leader block should exist");
493
494 last_commit_index += 1;
495 let expected_second_commit = TrustedCommit::new_for_test(
496 last_commit_index,
497 CommitDigest::MIN,
498 0,
499 leader.reference(),
500 blocks.clone(),
501 );
502
503 let commit = linearizer.handle_commit(vec![leader.clone()]);
504 assert_eq!(commit.len(), 1);
505
506 let subdag = &commit[0];
507 tracing::info!("{subdag:?}");
508 assert_eq!(subdag.leader, leader.reference());
509 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
510
511 let expected_ts = median_timestamp_by_stake(
512 &context,
513 subdag.blocks.iter().filter_map(|block| {
514 if block.round() == subdag.leader.round - 1 {
515 Some(block.clone())
516 } else {
517 None
518 }
519 }),
520 )
521 .unwrap();
522 assert_eq!(subdag.timestamp_ms, expected_ts);
523
524 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
526 assert_eq!(
527 subdag
528 .blocks
529 .clone()
530 .into_iter()
531 .map(|b| b.reference())
532 .collect::<Vec<_>>(),
533 blocks
534 );
535 for block in subdag.blocks.iter() {
536 assert!(block.round() <= expected_second_commit.leader().round);
537 }
538 }
539
540 #[tokio::test]
543 async fn test_handle_commit_with_gc_simple() {
544 telemetry_subscribers::init_for_testing();
545
546 const GC_DEPTH: u32 = 3;
547
548 let num_authorities = 4;
549 let (mut context, _keys) = Context::new_for_test(num_authorities);
550 context
551 .protocol_config
552 .set_consensus_gc_depth_for_testing(GC_DEPTH);
553
554 let context = Arc::new(context);
555 let dag_state = Arc::new(RwLock::new(DagState::new(
556 context.clone(),
557 Arc::new(MemStore::new()),
558 )));
559 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
560
561 let dag_str = "DAG {
567 Round 0 : { 4 },
568 Round 1 : { * },
569 Round 2 : {
570 A -> [-D1],
571 B -> [-D1],
572 C -> [-D1],
573 D -> [*],
574 },
575 Round 3 : {
576 A -> [-D2],
577 B -> [-D2],
578 C -> [-D2],
579 },
580 Round 4 : {
581 A -> [-D3],
582 B -> [-D3],
583 C -> [-D3],
584 D -> [A3, B3, C3, D2],
585 },
586 Round 5 : { * },
587 }";
588
589 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
590 dag_builder.print();
591 dag_builder.persist_all_blocks(dag_state.clone());
592
593 let leaders = dag_builder
594 .leader_blocks(1..=6)
595 .into_iter()
596 .flatten()
597 .collect::<Vec<_>>();
598
599 let commits = linearizer.handle_commit(leaders.clone());
600 for (idx, subdag) in commits.into_iter().enumerate() {
601 tracing::info!("{subdag:?}");
602 assert_eq!(subdag.leader, leaders[idx].reference());
603
604 let expected_ts = {
605 let block_refs = leaders[idx]
606 .ancestors()
607 .iter()
608 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
609 .cloned()
610 .collect::<Vec<_>>();
611 let blocks = dag_state
612 .read()
613 .get_blocks(&block_refs)
614 .into_iter()
615 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
616
617 median_timestamp_by_stake(&context, blocks).unwrap()
618 };
619 assert_eq!(subdag.timestamp_ms, expected_ts);
620
621 if idx == 0 {
622 assert_eq!(subdag.blocks.len(), 1);
624 } else if idx == 1 {
625 assert_eq!(subdag.blocks.len(), 3);
626 } else if idx == 2 {
627 assert_eq!(subdag.blocks.len(), 6);
632 } else {
633 assert_eq!(subdag.blocks.len(), 5);
638
639 assert!(
640 subdag.blocks.iter().all(|block| block.round() >= 2),
641 "Found blocks that are of round < 2."
642 );
643
644 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
646 }
647 for block in subdag.blocks.iter() {
648 assert!(block.round() <= leaders[idx].round());
649 }
650 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
651 }
652 }
653
654 #[tokio::test]
655 async fn test_handle_commit_below_highest_committed_round() {
656 telemetry_subscribers::init_for_testing();
657
658 const GC_DEPTH: u32 = 3;
659
660 let num_authorities = 4;
661 let (mut context, _keys) = Context::new_for_test(num_authorities);
662 context
663 .protocol_config
664 .set_consensus_gc_depth_for_testing(GC_DEPTH);
665
666 let context = Arc::new(context);
667 let dag_state = Arc::new(RwLock::new(DagState::new(
668 context.clone(),
669 Arc::new(MemStore::new()),
670 )));
671 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
672
673 let dag_str = "DAG {
677 Round 0 : { 4 },
678 Round 1 : { * },
679 Round 2 : {
680 A -> [-D1],
681 B -> [-D1],
682 C -> [-D1],
683 D -> [-D1],
684 },
685 Round 3 : {
686 A -> [A2, B2, C2, D1],
687 B -> [A2, B2, C2, D1],
688 C -> [A2, B2, C2, D1],
689 D -> [A2, B2, C2, D2]
690 },
691 Round 4 : { * },
692 }";
693
694 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
695 dag_builder.print();
696 dag_builder.persist_all_blocks(dag_state.clone());
697
698 let leaders = dag_builder
699 .leader_blocks(1..=4)
700 .into_iter()
701 .flatten()
702 .collect::<Vec<_>>();
703
704 let commits = linearizer.handle_commit(leaders.clone());
705 for (idx, subdag) in commits.into_iter().enumerate() {
706 tracing::info!("{subdag:?}");
707 assert_eq!(subdag.leader, leaders[idx].reference());
708
709 let expected_ts = {
710 let block_refs = leaders[idx]
711 .ancestors()
712 .iter()
713 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
714 .cloned()
715 .collect::<Vec<_>>();
716 let blocks = dag_state
717 .read()
718 .get_blocks(&block_refs)
719 .into_iter()
720 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
721
722 median_timestamp_by_stake(&context, blocks).unwrap()
723 };
724 assert_eq!(subdag.timestamp_ms, expected_ts);
725
726 if idx == 0 {
727 assert_eq!(subdag.blocks.len(), 1);
729 } else if idx == 1 {
730 assert_eq!(subdag.blocks.len(), 3);
734 } else if idx == 2 {
735 assert_eq!(subdag.blocks.len(), 4);
739
740 assert!(
741 subdag.blocks.iter().any(|block| block.round() == 2
742 && block.author() == AuthorityIndex::new_for_test(3)),
743 "Block D2 should have been committed."
744 );
745 } else if idx == 3 {
746 assert_eq!(subdag.blocks.len(), 5);
751 assert!(
752 subdag.blocks.iter().any(|block| block.round() == 1
753 && block.author() == AuthorityIndex::new_for_test(3)),
754 "Block D1 should have been committed."
755 );
756 } else {
757 panic!("Unexpected subdag with index {:?}", idx);
758 }
759
760 for block in subdag.blocks.iter() {
761 assert!(block.round() <= leaders[idx].round());
762 }
763 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
764 }
765 }
766
767 #[rstest]
768 #[case(3_000, 3_000, 6_000)]
769 #[tokio::test]
770 async fn test_calculate_commit_timestamp(
771 #[case] timestamp_1: u64,
772 #[case] timestamp_2: u64,
773 #[case] timestamp_3: u64,
774 ) {
775 telemetry_subscribers::init_for_testing();
777
778 let num_authorities = 4;
779 let (context, _keys) = Context::new_for_test(num_authorities);
780
781 let context = Arc::new(context);
782 let store = Arc::new(MemStore::new());
783 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
784 let mut dag_state = dag_state.write();
785
786 let ancestors = vec![
787 VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
788 VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
789 VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
790 VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
791 ];
792
793 let leader_block = VerifiedBlock::new_for_test(
794 TestBlock::new(5, 0)
795 .set_timestamp_ms(5_000)
796 .set_ancestors(
797 ancestors
798 .iter()
799 .map(|block| block.reference())
800 .collect::<Vec<_>>(),
801 )
802 .build(),
803 );
804
805 for block in &ancestors {
806 dag_state.accept_block(block.clone());
807 }
808
809 let last_commit_timestamp_ms = 0;
810
811 let timestamp = Linearizer::calculate_commit_timestamp(
813 &context,
814 &mut dag_state,
815 &leader_block,
816 last_commit_timestamp_ms,
817 );
818 assert_eq!(timestamp, timestamp_1);
819
820 let leader_block = VerifiedBlock::new_for_test(
822 TestBlock::new(5, 0)
823 .set_timestamp_ms(5_000)
824 .set_ancestors(
825 ancestors
826 .iter()
827 .skip(1)
828 .map(|block| block.reference())
829 .collect::<Vec<_>>(),
830 )
831 .build(),
832 );
833
834 let timestamp = Linearizer::calculate_commit_timestamp(
835 &context,
836 &mut dag_state,
837 &leader_block,
838 last_commit_timestamp_ms,
839 );
840 assert_eq!(timestamp, timestamp_2);
841
842 let last_commit_timestamp_ms = 6_000;
844 let timestamp = Linearizer::calculate_commit_timestamp(
845 &context,
846 &mut dag_state,
847 &leader_block,
848 last_commit_timestamp_ms,
849 );
850 assert_eq!(timestamp, timestamp_3);
851
852 let (context, _) = Context::new_for_test(1);
854 let leader_block = VerifiedBlock::new_for_test(
855 TestBlock::new(5, 0)
856 .set_timestamp_ms(5_000)
857 .set_ancestors(
858 ancestors
859 .iter()
860 .take(1)
861 .map(|block| block.reference())
862 .collect::<Vec<_>>(),
863 )
864 .build(),
865 );
866 let last_commit_timestamp_ms = 0;
867 let timestamp = Linearizer::calculate_commit_timestamp(
868 &context,
869 &mut dag_state,
870 &leader_block,
871 last_commit_timestamp_ms,
872 );
873 assert_eq!(timestamp, 1_000);
874 }
875
876 #[test]
877 fn test_median_timestamps_by_stake() {
878 let timestamps = vec![(1_000, 1)];
880 assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
881
882 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
884 assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
885
886 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
888 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
889
890 let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
892 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
893
894 let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
896 assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
897
898 let timestamps = vec![
900 (500, 2),
901 (4_000, 2),
902 (2_500, 3),
903 (1_000, 5),
904 (3_000, 3),
905 (2_000, 4),
906 ];
907 assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
908
909 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
911 assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
912 }
913
914 #[tokio::test]
915 async fn test_median_timestamps_by_stake_errors() {
916 let num_authorities = 4;
917 let (context, _keys) = Context::new_for_test(num_authorities);
918 let context = Arc::new(context);
919
920 let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
922 assert_eq!(err, "No blocks provided");
923
924 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
926 let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
927 assert_eq!(err, "Total stake 1 < quorum threshold 3");
928 }
929}