1use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12 block::{BlockAPI, VerifiedBlock},
13 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14 context::Context,
15 dag_state::DagState,
16};
17
18pub(crate) trait BlockStoreAPI {
21 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23 fn gc_round(&self) -> Round;
24
25 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27 fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34 DagState::get_blocks(self, refs)
35 }
36
37 fn gc_round(&self) -> Round {
38 DagState::gc_round(self)
39 }
40
41 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42 DagState::set_committed(self, block_ref)
43 }
44
45 fn is_committed(&self, block_ref: &BlockRef) -> bool {
46 DagState::is_committed(self, block_ref)
47 }
48}
49
50#[derive(Clone)]
52pub struct Linearizer {
53 context: Arc<Context>,
55 dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59 pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60 Self { context, dag_state }
61 }
62
63 fn collect_sub_dag_and_commit(
66 &mut self,
67 leader_block: VerifiedBlock,
68 ) -> (CommittedSubDag, TrustedCommit) {
69 let _s = self
70 .context
71 .metrics
72 .node_metrics
73 .scope_processing_time
74 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75 .start_timer();
76
77 let mut dag_state = self.dag_state.write();
79 let last_commit_index = dag_state.last_commit_index();
80 let last_commit_digest = dag_state.last_commit_digest();
81 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83 let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86 let timestamp_ms = Self::calculate_commit_timestamp(
87 &self.context,
88 &mut dag_state,
89 &leader_block,
90 last_commit_timestamp_ms,
91 );
92
93 drop(dag_state);
94
95 let commit = Commit::new(
97 last_commit_index + 1,
98 last_commit_digest,
99 timestamp_ms,
100 leader_block.reference(),
101 to_commit
102 .iter()
103 .map(|block| block.reference())
104 .collect::<Vec<_>>(),
105 );
106 let serialized = commit
107 .serialize()
108 .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109 let commit = TrustedCommit::new_trusted(commit, serialized);
110
111 let sub_dag = CommittedSubDag::new(
113 leader_block.reference(),
114 to_commit,
115 timestamp_ms,
116 commit.reference(),
117 );
118
119 (sub_dag, commit)
120 }
121
122 pub(crate) fn calculate_commit_timestamp(
126 context: &Context,
127 dag_state: &mut impl BlockStoreAPI,
128 leader_block: &VerifiedBlock,
129 last_commit_timestamp_ms: BlockTimestampMs,
130 ) -> BlockTimestampMs {
131 let timestamp_ms = {
132 let block_refs = leader_block
134 .ancestors()
135 .iter()
136 .filter(|block_ref| block_ref.round == leader_block.round() - 1)
137 .cloned()
138 .collect::<Vec<_>>();
139 let blocks = dag_state
141 .get_blocks(&block_refs)
142 .into_iter()
143 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
144 median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
145 panic!(
146 "Cannot compute median timestamp for leader block {:?} ancestors: {}",
147 leader_block, e
148 )
149 })
150 };
151
152 timestamp_ms.max(last_commit_timestamp_ms)
154 }
155
156 pub(crate) fn linearize_sub_dag(
157 leader_block: VerifiedBlock,
158 dag_state: &mut impl BlockStoreAPI,
159 ) -> Vec<VerifiedBlock> {
160 let gc_round: Round = dag_state.gc_round();
165 let leader_block_ref = leader_block.reference();
166 let mut buffer = vec![leader_block];
167 let mut to_commit = Vec::new();
168
169 assert!(
172 dag_state.set_committed(&leader_block_ref),
173 "Leader block with reference {:?} attempted to be committed twice",
174 leader_block_ref
175 );
176
177 while let Some(x) = buffer.pop() {
178 to_commit.push(x.clone());
179
180 let ancestors: Vec<VerifiedBlock> = dag_state
181 .get_blocks(
182 &x.ancestors()
183 .iter()
184 .copied()
185 .filter(|ancestor| {
186 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
187 })
188 .collect::<Vec<_>>(),
189 )
190 .into_iter()
191 .map(|ancestor_opt| {
192 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
193 })
194 .collect();
195
196 for ancestor in ancestors {
197 buffer.push(ancestor.clone());
198 assert!(
199 dag_state.set_committed(&ancestor.reference()),
200 "Block with reference {:?} attempted to be committed twice",
201 ancestor.reference()
202 );
203 }
204 }
205
206 assert!(
209 to_commit.iter().all(|block| block.round() > gc_round),
210 "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
211 leader_block_ref
212 );
213
214 sort_sub_dag_blocks(&mut to_commit);
216
217 to_commit
218 }
219
220 pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
224 if committed_leaders.is_empty() {
225 return vec![];
226 }
227
228 let mut committed_sub_dags = vec![];
229 for leader_block in committed_leaders {
230 let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
232
233 self.update_blocks_pruned_metric(&sub_dag);
234
235 self.dag_state.write().add_commit(commit.clone());
238
239 committed_sub_dags.push(sub_dag);
240 }
241
242 committed_sub_dags
243 }
244
245 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
250 let (last_committed_rounds, gc_round) = {
251 let dag_state = self.dag_state.read();
252 (dag_state.last_committed_rounds(), dag_state.gc_round())
253 };
254
255 for block_ref in sub_dag
256 .blocks
257 .iter()
258 .flat_map(|block| block.ancestors())
259 .filter(
260 |ancestor_ref| {
261 ancestor_ref.round <= gc_round
262 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
263 }, )
266 .unique()
267 {
268 let hostname = &self.context.committee.authority(block_ref.author).hostname;
269
270 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
272 &[hostname, "uncommitted"]
273 } else {
274 &[hostname, "higher_committed"]
276 };
277
278 self.context
279 .metrics
280 .node_metrics
281 .blocks_pruned_on_commit
282 .with_label_values(label_values)
283 .inc();
284 }
285 }
286}
287
288pub(crate) fn median_timestamp_by_stake(
292 context: &Context,
293 blocks: impl Iterator<Item = VerifiedBlock>,
294) -> Result<BlockTimestampMs, String> {
295 let mut total_stake = 0;
296 let mut timestamps = vec![];
297 for block in blocks {
298 let stake = context.committee.authority(block.author()).stake;
299 timestamps.push((block.timestamp_ms(), stake));
300 total_stake += stake;
301 }
302
303 if timestamps.is_empty() {
304 return Err("No blocks provided".to_string());
305 }
306 if total_stake < context.committee.quorum_threshold() {
307 return Err(format!(
308 "Total stake {} < quorum threshold {}",
309 total_stake,
310 context.committee.quorum_threshold()
311 ));
312 }
313
314 Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
315}
316
317fn median_timestamps_by_stake_inner(
318 mut timestamps: Vec<(BlockTimestampMs, Stake)>,
319 total_stake: Stake,
320) -> BlockTimestampMs {
321 timestamps.sort_by_key(|(ts, _)| *ts);
322
323 let mut cumulative_stake = 0;
324 for (ts, stake) in ×tamps {
325 cumulative_stake += stake;
326 if cumulative_stake > total_stake / 2 {
327 return *ts;
328 }
329 }
330
331 timestamps.last().unwrap().0
332}
333
334#[cfg(test)]
335mod tests {
336 use consensus_config::AuthorityIndex;
337 use rstest::rstest;
338
339 use super::*;
340 use crate::{
341 CommitIndex, TestBlock,
342 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
343 context::Context,
344 leader_schedule::{LeaderSchedule, LeaderSwapTable},
345 storage::mem_store::MemStore,
346 test_dag_builder::DagBuilder,
347 test_dag_parser::parse_dag,
348 };
349
350 #[rstest]
351 #[tokio::test]
352 async fn test_handle_commit() {
353 telemetry_subscribers::init_for_testing();
354 let num_authorities = 4;
355 let (context, _keys) = Context::new_for_test(num_authorities);
356 let context = Arc::new(context);
357
358 let dag_state = Arc::new(RwLock::new(DagState::new(
359 context.clone(),
360 Arc::new(MemStore::new()),
361 )));
362 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
363
364 let num_rounds: u32 = 10;
366 let mut dag_builder = DagBuilder::new(context.clone());
367 dag_builder
368 .layers(1..=num_rounds)
369 .build()
370 .persist_layers(dag_state.clone());
371
372 let leaders = dag_builder
373 .leader_blocks(1..=num_rounds)
374 .into_iter()
375 .map(Option::unwrap)
376 .collect::<Vec<_>>();
377
378 let commits = linearizer.handle_commit(leaders.clone());
379 for (idx, subdag) in commits.into_iter().enumerate() {
380 tracing::info!("{subdag:?}");
381 assert_eq!(subdag.leader, leaders[idx].reference());
382
383 let expected_ts = {
384 let block_refs = leaders[idx]
385 .ancestors()
386 .iter()
387 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
388 .cloned()
389 .collect::<Vec<_>>();
390 let blocks = dag_state
391 .read()
392 .get_blocks(&block_refs)
393 .into_iter()
394 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
395
396 median_timestamp_by_stake(&context, blocks).unwrap()
397 };
398 assert_eq!(subdag.timestamp_ms, expected_ts);
399
400 if idx == 0 {
401 assert_eq!(subdag.blocks.len(), 1);
403 } else {
404 assert_eq!(subdag.blocks.len(), num_authorities);
407 }
408 for block in subdag.blocks.iter() {
409 assert!(block.round() <= leaders[idx].round());
410 }
411 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
412 }
413 }
414
415 #[rstest]
416 #[tokio::test]
417 async fn test_handle_already_committed() {
418 telemetry_subscribers::init_for_testing();
419 let num_authorities = 4;
420 let (context, _) = Context::new_for_test(num_authorities);
421 let context = Arc::new(context);
422
423 let dag_state = Arc::new(RwLock::new(DagState::new(
424 context.clone(),
425 Arc::new(MemStore::new()),
426 )));
427 let leader_schedule = Arc::new(LeaderSchedule::new(
428 context.clone(),
429 LeaderSwapTable::default(),
430 ));
431 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
432 let wave_length = DEFAULT_WAVE_LENGTH;
433
434 let leader_round_wave_1 = 3;
435 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
436
437 let mut dag_builder = DagBuilder::new(context.clone());
439 dag_builder.layers(1..=leader_round_wave_2).build();
440
441 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
445 blocks.push(
446 dag_builder
447 .leader_block(leader_round_wave_1)
448 .expect("Leader block should have been found"),
449 );
450 dag_state.write().accept_blocks(blocks.clone());
451
452 let first_leader = dag_builder
453 .leader_block(leader_round_wave_1)
454 .expect("Wave 1 leader round block should exist");
455 let mut last_commit_index = 1;
456 let first_commit_data = TrustedCommit::new_for_test(
457 last_commit_index,
458 CommitDigest::MIN,
459 0,
460 first_leader.reference(),
461 blocks.iter().map(|block| block.reference()).collect(),
462 );
463 dag_state.write().add_commit(first_commit_data);
464
465 for block in blocks.iter() {
467 dag_state.write().set_committed(&block.reference());
468 }
469
470 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
472 blocks.retain(|block| {
474 !(block.round() == leader_round_wave_1
475 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
476 });
477 blocks.push(
479 dag_builder
480 .leader_block(leader_round_wave_2)
481 .expect("Leader block should have been found"),
482 );
483 dag_state.write().accept_blocks(blocks.clone());
485
486 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
487
488 let leader = dag_builder
490 .leader_block(leader_round_wave_2)
491 .expect("Leader block should exist");
492
493 last_commit_index += 1;
494 let expected_second_commit = TrustedCommit::new_for_test(
495 last_commit_index,
496 CommitDigest::MIN,
497 0,
498 leader.reference(),
499 blocks.clone(),
500 );
501
502 let commit = linearizer.handle_commit(vec![leader.clone()]);
503 assert_eq!(commit.len(), 1);
504
505 let subdag = &commit[0];
506 tracing::info!("{subdag:?}");
507 assert_eq!(subdag.leader, leader.reference());
508 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
509
510 let expected_ts = median_timestamp_by_stake(
511 &context,
512 subdag.blocks.iter().filter_map(|block| {
513 if block.round() == subdag.leader.round - 1 {
514 Some(block.clone())
515 } else {
516 None
517 }
518 }),
519 )
520 .unwrap();
521 assert_eq!(subdag.timestamp_ms, expected_ts);
522
523 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
525 assert_eq!(
526 subdag
527 .blocks
528 .clone()
529 .into_iter()
530 .map(|b| b.reference())
531 .collect::<Vec<_>>(),
532 blocks
533 );
534 for block in subdag.blocks.iter() {
535 assert!(block.round() <= expected_second_commit.leader().round);
536 }
537 }
538
539 #[tokio::test]
542 async fn test_handle_commit_with_gc_simple() {
543 telemetry_subscribers::init_for_testing();
544
545 const GC_DEPTH: u32 = 3;
546
547 let num_authorities = 4;
548 let (mut context, _keys) = Context::new_for_test(num_authorities);
549 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
550
551 let context = Arc::new(context);
552 let dag_state = Arc::new(RwLock::new(DagState::new(
553 context.clone(),
554 Arc::new(MemStore::new()),
555 )));
556 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
557
558 let dag_str = "DAG {
564 Round 0 : { 4 },
565 Round 1 : { * },
566 Round 2 : {
567 A -> [-D1],
568 B -> [-D1],
569 C -> [-D1],
570 D -> [*],
571 },
572 Round 3 : {
573 A -> [-D2],
574 B -> [-D2],
575 C -> [-D2],
576 },
577 Round 4 : {
578 A -> [-D3],
579 B -> [-D3],
580 C -> [-D3],
581 D -> [A3, B3, C3, D2],
582 },
583 Round 5 : { * },
584 }";
585
586 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
587 dag_builder.print();
588 dag_builder.persist_all_blocks(dag_state.clone());
589
590 let leaders = dag_builder
591 .leader_blocks(1..=6)
592 .into_iter()
593 .flatten()
594 .collect::<Vec<_>>();
595
596 let commits = linearizer.handle_commit(leaders.clone());
597 for (idx, subdag) in commits.into_iter().enumerate() {
598 tracing::info!("{subdag:?}");
599 assert_eq!(subdag.leader, leaders[idx].reference());
600
601 let expected_ts = {
602 let block_refs = leaders[idx]
603 .ancestors()
604 .iter()
605 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
606 .cloned()
607 .collect::<Vec<_>>();
608 let blocks = dag_state
609 .read()
610 .get_blocks(&block_refs)
611 .into_iter()
612 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
613
614 median_timestamp_by_stake(&context, blocks).unwrap()
615 };
616 assert_eq!(subdag.timestamp_ms, expected_ts);
617
618 if idx == 0 {
619 assert_eq!(subdag.blocks.len(), 1);
621 } else if idx == 1 {
622 assert_eq!(subdag.blocks.len(), 3);
623 } else if idx == 2 {
624 assert_eq!(subdag.blocks.len(), 6);
629 } else {
630 assert_eq!(subdag.blocks.len(), 5);
635
636 assert!(
637 subdag.blocks.iter().all(|block| block.round() >= 2),
638 "Found blocks that are of round < 2."
639 );
640
641 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
643 }
644 for block in subdag.blocks.iter() {
645 assert!(block.round() <= leaders[idx].round());
646 }
647 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
648 }
649 }
650
651 #[tokio::test]
652 async fn test_handle_commit_below_highest_committed_round() {
653 telemetry_subscribers::init_for_testing();
654
655 const GC_DEPTH: u32 = 3;
656
657 let num_authorities = 4;
658 let (mut context, _keys) = Context::new_for_test(num_authorities);
659 context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
660
661 let context = Arc::new(context);
662 let dag_state = Arc::new(RwLock::new(DagState::new(
663 context.clone(),
664 Arc::new(MemStore::new()),
665 )));
666 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
667
668 let dag_str = "DAG {
672 Round 0 : { 4 },
673 Round 1 : { * },
674 Round 2 : {
675 A -> [-D1],
676 B -> [-D1],
677 C -> [-D1],
678 D -> [-D1],
679 },
680 Round 3 : {
681 A -> [A2, B2, C2, D1],
682 B -> [A2, B2, C2, D1],
683 C -> [A2, B2, C2, D1],
684 D -> [A2, B2, C2, D2]
685 },
686 Round 4 : { * },
687 }";
688
689 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
690 dag_builder.print();
691 dag_builder.persist_all_blocks(dag_state.clone());
692
693 let leaders = dag_builder
694 .leader_blocks(1..=4)
695 .into_iter()
696 .flatten()
697 .collect::<Vec<_>>();
698
699 let commits = linearizer.handle_commit(leaders.clone());
700 for (idx, subdag) in commits.into_iter().enumerate() {
701 tracing::info!("{subdag:?}");
702 assert_eq!(subdag.leader, leaders[idx].reference());
703
704 let expected_ts = {
705 let block_refs = leaders[idx]
706 .ancestors()
707 .iter()
708 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
709 .cloned()
710 .collect::<Vec<_>>();
711 let blocks = dag_state
712 .read()
713 .get_blocks(&block_refs)
714 .into_iter()
715 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
716
717 median_timestamp_by_stake(&context, blocks).unwrap()
718 };
719 assert_eq!(subdag.timestamp_ms, expected_ts);
720
721 if idx == 0 {
722 assert_eq!(subdag.blocks.len(), 1);
724 } else if idx == 1 {
725 assert_eq!(subdag.blocks.len(), 3);
729 } else if idx == 2 {
730 assert_eq!(subdag.blocks.len(), 4);
734
735 assert!(
736 subdag.blocks.iter().any(|block| block.round() == 2
737 && block.author() == AuthorityIndex::new_for_test(3)),
738 "Block D2 should have been committed."
739 );
740 } else if idx == 3 {
741 assert_eq!(subdag.blocks.len(), 5);
746 assert!(
747 subdag.blocks.iter().any(|block| block.round() == 1
748 && block.author() == AuthorityIndex::new_for_test(3)),
749 "Block D1 should have been committed."
750 );
751 } else {
752 panic!("Unexpected subdag with index {:?}", idx);
753 }
754
755 for block in subdag.blocks.iter() {
756 assert!(block.round() <= leaders[idx].round());
757 }
758 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
759 }
760 }
761
762 #[rstest]
763 #[case(3_000, 3_000, 6_000)]
764 #[tokio::test]
765 async fn test_calculate_commit_timestamp(
766 #[case] timestamp_1: u64,
767 #[case] timestamp_2: u64,
768 #[case] timestamp_3: u64,
769 ) {
770 telemetry_subscribers::init_for_testing();
772
773 let num_authorities = 4;
774 let (context, _keys) = Context::new_for_test(num_authorities);
775
776 let context = Arc::new(context);
777 let store = Arc::new(MemStore::new());
778 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
779 let mut dag_state = dag_state.write();
780
781 let ancestors = vec![
782 VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
783 VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
784 VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
785 VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
786 ];
787
788 let leader_block = VerifiedBlock::new_for_test(
789 TestBlock::new(5, 0)
790 .set_timestamp_ms(5_000)
791 .set_ancestors(
792 ancestors
793 .iter()
794 .map(|block| block.reference())
795 .collect::<Vec<_>>(),
796 )
797 .build(),
798 );
799
800 for block in &ancestors {
801 dag_state.accept_block(block.clone());
802 }
803
804 let last_commit_timestamp_ms = 0;
805
806 let timestamp = Linearizer::calculate_commit_timestamp(
808 &context,
809 &mut dag_state,
810 &leader_block,
811 last_commit_timestamp_ms,
812 );
813 assert_eq!(timestamp, timestamp_1);
814
815 let leader_block = VerifiedBlock::new_for_test(
817 TestBlock::new(5, 0)
818 .set_timestamp_ms(5_000)
819 .set_ancestors(
820 ancestors
821 .iter()
822 .skip(1)
823 .map(|block| block.reference())
824 .collect::<Vec<_>>(),
825 )
826 .build(),
827 );
828
829 let timestamp = Linearizer::calculate_commit_timestamp(
830 &context,
831 &mut dag_state,
832 &leader_block,
833 last_commit_timestamp_ms,
834 );
835 assert_eq!(timestamp, timestamp_2);
836
837 let last_commit_timestamp_ms = 6_000;
839 let timestamp = Linearizer::calculate_commit_timestamp(
840 &context,
841 &mut dag_state,
842 &leader_block,
843 last_commit_timestamp_ms,
844 );
845 assert_eq!(timestamp, timestamp_3);
846
847 let (context, _) = Context::new_for_test(1);
849 let leader_block = VerifiedBlock::new_for_test(
850 TestBlock::new(5, 0)
851 .set_timestamp_ms(5_000)
852 .set_ancestors(
853 ancestors
854 .iter()
855 .take(1)
856 .map(|block| block.reference())
857 .collect::<Vec<_>>(),
858 )
859 .build(),
860 );
861 let last_commit_timestamp_ms = 0;
862 let timestamp = Linearizer::calculate_commit_timestamp(
863 &context,
864 &mut dag_state,
865 &leader_block,
866 last_commit_timestamp_ms,
867 );
868 assert_eq!(timestamp, 1_000);
869 }
870
871 #[test]
872 fn test_median_timestamps_by_stake() {
873 let timestamps = vec![(1_000, 1)];
875 assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
876
877 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
879 assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
880
881 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
883 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
884
885 let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
887 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
888
889 let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
891 assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
892
893 let timestamps = vec![
895 (500, 2),
896 (4_000, 2),
897 (2_500, 3),
898 (1_000, 5),
899 (3_000, 3),
900 (2_000, 4),
901 ];
902 assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
903
904 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
906 assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
907 }
908
909 #[tokio::test]
910 async fn test_median_timestamps_by_stake_errors() {
911 let num_authorities = 4;
912 let (context, _keys) = Context::new_for_test(num_authorities);
913 let context = Arc::new(context);
914
915 let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
917 assert_eq!(err, "No blocks provided");
918
919 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
921 let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
922 assert_eq!(err, "Total stake 1 < quorum threshold 3");
923 }
924}