1use std::sync::Arc;
5
6use consensus_config::Stake;
7use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
8use itertools::Itertools;
9use parking_lot::RwLock;
10
11use crate::{
12 block::{BlockAPI, VerifiedBlock},
13 commit::{Commit, CommittedSubDag, TrustedCommit, sort_sub_dag_blocks},
14 context::Context,
15 dag_state::DagState,
16};
17
18pub(crate) trait BlockStoreAPI {
21 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>>;
22
23 fn gc_round(&self) -> Round;
24
25 fn set_committed(&mut self, block_ref: &BlockRef) -> bool;
26
27 fn is_committed(&self, block_ref: &BlockRef) -> bool;
28}
29
30impl BlockStoreAPI
31 for parking_lot::lock_api::RwLockWriteGuard<'_, parking_lot::RawRwLock, DagState>
32{
33 fn get_blocks(&self, refs: &[BlockRef]) -> Vec<Option<VerifiedBlock>> {
34 DagState::get_blocks(self, refs)
35 }
36
37 fn gc_round(&self) -> Round {
38 DagState::gc_round(self)
39 }
40
41 fn set_committed(&mut self, block_ref: &BlockRef) -> bool {
42 DagState::set_committed(self, block_ref)
43 }
44
45 fn is_committed(&self, block_ref: &BlockRef) -> bool {
46 DagState::is_committed(self, block_ref)
47 }
48}
49
50#[derive(Clone)]
52pub struct Linearizer {
53 context: Arc<Context>,
55 dag_state: Arc<RwLock<DagState>>,
56}
57
58impl Linearizer {
59 pub fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
60 Self { context, dag_state }
61 }
62
63 fn collect_sub_dag_and_commit(
66 &mut self,
67 leader_block: VerifiedBlock,
68 ) -> (CommittedSubDag, TrustedCommit) {
69 let _s = self
70 .context
71 .metrics
72 .node_metrics
73 .scope_processing_time
74 .with_label_values(&["Linearizer::collect_sub_dag_and_commit"])
75 .start_timer();
76
77 let mut dag_state = self.dag_state.write();
79 let last_commit_index = dag_state.last_commit_index();
80 let last_commit_digest = dag_state.last_commit_digest();
81 let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
82
83 let to_commit = Self::linearize_sub_dag(leader_block.clone(), &mut dag_state);
85
86 let timestamp_ms = Self::calculate_commit_timestamp(
87 &self.context,
88 &mut dag_state,
89 &leader_block,
90 last_commit_timestamp_ms,
91 );
92
93 drop(dag_state);
94
95 let commit = Commit::new(
97 last_commit_index + 1,
98 last_commit_digest,
99 timestamp_ms,
100 leader_block.reference(),
101 to_commit
102 .iter()
103 .map(|block| block.reference())
104 .collect::<Vec<_>>(),
105 );
106 let serialized = commit
107 .serialize()
108 .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e));
109 let commit = TrustedCommit::new_trusted(commit, serialized);
110
111 let sub_dag = CommittedSubDag::new(
113 leader_block.reference(),
114 to_commit,
115 timestamp_ms,
116 commit.reference(),
117 self.context
118 .protocol_config
119 .consensus_always_accept_system_transactions(),
120 );
121
122 (sub_dag, commit)
123 }
124
125 pub(crate) fn calculate_commit_timestamp(
129 context: &Context,
130 dag_state: &mut impl BlockStoreAPI,
131 leader_block: &VerifiedBlock,
132 last_commit_timestamp_ms: BlockTimestampMs,
133 ) -> BlockTimestampMs {
134 let timestamp_ms = {
135 let block_refs = leader_block
137 .ancestors()
138 .iter()
139 .filter(|block_ref| block_ref.round == leader_block.round() - 1)
140 .cloned()
141 .collect::<Vec<_>>();
142 let blocks = dag_state
144 .get_blocks(&block_refs)
145 .into_iter()
146 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
147 median_timestamp_by_stake(context, blocks).unwrap_or_else(|e| {
148 panic!(
149 "Cannot compute median timestamp for leader block {:?} ancestors: {}",
150 leader_block, e
151 )
152 })
153 };
154
155 timestamp_ms.max(last_commit_timestamp_ms)
157 }
158
159 pub(crate) fn linearize_sub_dag(
160 leader_block: VerifiedBlock,
161 dag_state: &mut impl BlockStoreAPI,
162 ) -> Vec<VerifiedBlock> {
163 let gc_round: Round = dag_state.gc_round();
168 let leader_block_ref = leader_block.reference();
169 let mut buffer = vec![leader_block];
170 let mut to_commit = Vec::new();
171
172 assert!(
175 dag_state.set_committed(&leader_block_ref),
176 "Leader block with reference {:?} attempted to be committed twice",
177 leader_block_ref
178 );
179
180 while let Some(x) = buffer.pop() {
181 to_commit.push(x.clone());
182
183 let ancestors: Vec<VerifiedBlock> = dag_state
184 .get_blocks(
185 &x.ancestors()
186 .iter()
187 .copied()
188 .filter(|ancestor| {
189 ancestor.round > gc_round && !dag_state.is_committed(ancestor)
190 })
191 .collect::<Vec<_>>(),
192 )
193 .into_iter()
194 .map(|ancestor_opt| {
195 ancestor_opt.expect("We should have all uncommitted blocks in dag state.")
196 })
197 .collect();
198
199 for ancestor in ancestors {
200 buffer.push(ancestor.clone());
201 assert!(
202 dag_state.set_committed(&ancestor.reference()),
203 "Block with reference {:?} attempted to be committed twice",
204 ancestor.reference()
205 );
206 }
207 }
208
209 assert!(
212 to_commit.iter().all(|block| block.round() > gc_round),
213 "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.",
214 leader_block_ref
215 );
216
217 sort_sub_dag_blocks(&mut to_commit);
219
220 to_commit
221 }
222
223 pub fn handle_commit(&mut self, committed_leaders: Vec<VerifiedBlock>) -> Vec<CommittedSubDag> {
227 if committed_leaders.is_empty() {
228 return vec![];
229 }
230
231 let mut committed_sub_dags = vec![];
232 for leader_block in committed_leaders {
233 let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
235
236 self.update_blocks_pruned_metric(&sub_dag);
237
238 self.dag_state.write().add_commit(commit.clone());
241
242 committed_sub_dags.push(sub_dag);
243 }
244
245 committed_sub_dags
246 }
247
248 fn update_blocks_pruned_metric(&self, sub_dag: &CommittedSubDag) {
253 let (last_committed_rounds, gc_round) = {
254 let dag_state = self.dag_state.read();
255 (dag_state.last_committed_rounds(), dag_state.gc_round())
256 };
257
258 for block_ref in sub_dag
259 .blocks
260 .iter()
261 .flat_map(|block| block.ancestors())
262 .filter(
263 |ancestor_ref| {
264 ancestor_ref.round <= gc_round
265 && last_committed_rounds[ancestor_ref.author] != ancestor_ref.round
266 }, )
269 .unique()
270 {
271 let hostname = &self.context.committee.authority(block_ref.author).hostname;
272
273 let label_values = if last_committed_rounds[block_ref.author] < block_ref.round {
275 &[hostname, "uncommitted"]
276 } else {
277 &[hostname, "higher_committed"]
279 };
280
281 self.context
282 .metrics
283 .node_metrics
284 .blocks_pruned_on_commit
285 .with_label_values(label_values)
286 .inc();
287 }
288 }
289}
290
291pub(crate) fn median_timestamp_by_stake(
295 context: &Context,
296 blocks: impl Iterator<Item = VerifiedBlock>,
297) -> Result<BlockTimestampMs, String> {
298 let mut total_stake = 0;
299 let mut timestamps = vec![];
300 for block in blocks {
301 let stake = context.committee.authority(block.author()).stake;
302 timestamps.push((block.timestamp_ms(), stake));
303 total_stake += stake;
304 }
305
306 if timestamps.is_empty() {
307 return Err("No blocks provided".to_string());
308 }
309 if total_stake < context.committee.quorum_threshold() {
310 return Err(format!(
311 "Total stake {} < quorum threshold {}",
312 total_stake,
313 context.committee.quorum_threshold()
314 ));
315 }
316
317 Ok(median_timestamps_by_stake_inner(timestamps, total_stake))
318}
319
320fn median_timestamps_by_stake_inner(
321 mut timestamps: Vec<(BlockTimestampMs, Stake)>,
322 total_stake: Stake,
323) -> BlockTimestampMs {
324 timestamps.sort_by_key(|(ts, _)| *ts);
325
326 let mut cumulative_stake = 0;
327 for (ts, stake) in ×tamps {
328 cumulative_stake += stake;
329 if cumulative_stake > total_stake / 2 {
330 return *ts;
331 }
332 }
333
334 timestamps.last().unwrap().0
335}
336
337#[cfg(test)]
338mod tests {
339 use consensus_config::AuthorityIndex;
340 use rstest::rstest;
341
342 use super::*;
343 use crate::{
344 CommitIndex, TestBlock,
345 commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
346 context::Context,
347 leader_schedule::{LeaderSchedule, LeaderSwapTable},
348 storage::mem_store::MemStore,
349 test_dag_builder::DagBuilder,
350 test_dag_parser::parse_dag,
351 };
352
353 #[rstest]
354 #[tokio::test]
355 async fn test_handle_commit() {
356 telemetry_subscribers::init_for_testing();
357 let num_authorities = 4;
358 let (context, _keys) = Context::new_for_test(num_authorities);
359 let context = Arc::new(context);
360
361 let dag_state = Arc::new(RwLock::new(DagState::new(
362 context.clone(),
363 Arc::new(MemStore::new()),
364 )));
365 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
366
367 let num_rounds: u32 = 10;
369 let mut dag_builder = DagBuilder::new(context.clone());
370 dag_builder
371 .layers(1..=num_rounds)
372 .build()
373 .persist_layers(dag_state.clone());
374
375 let leaders = dag_builder
376 .leader_blocks(1..=num_rounds)
377 .into_iter()
378 .map(Option::unwrap)
379 .collect::<Vec<_>>();
380
381 let commits = linearizer.handle_commit(leaders.clone());
382 for (idx, subdag) in commits.into_iter().enumerate() {
383 tracing::info!("{subdag:?}");
384 assert_eq!(subdag.leader, leaders[idx].reference());
385
386 let expected_ts = {
387 let block_refs = leaders[idx]
388 .ancestors()
389 .iter()
390 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
391 .cloned()
392 .collect::<Vec<_>>();
393 let blocks = dag_state
394 .read()
395 .get_blocks(&block_refs)
396 .into_iter()
397 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
398
399 median_timestamp_by_stake(&context, blocks).unwrap()
400 };
401 assert_eq!(subdag.timestamp_ms, expected_ts);
402
403 if idx == 0 {
404 assert_eq!(subdag.blocks.len(), 1);
406 } else {
407 assert_eq!(subdag.blocks.len(), num_authorities);
410 }
411 for block in subdag.blocks.iter() {
412 assert!(block.round() <= leaders[idx].round());
413 }
414 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
415 }
416 }
417
418 #[rstest]
419 #[tokio::test]
420 async fn test_handle_already_committed() {
421 telemetry_subscribers::init_for_testing();
422 let num_authorities = 4;
423 let (context, _) = Context::new_for_test(num_authorities);
424 let context = Arc::new(context);
425
426 let dag_state = Arc::new(RwLock::new(DagState::new(
427 context.clone(),
428 Arc::new(MemStore::new()),
429 )));
430 let leader_schedule = Arc::new(LeaderSchedule::new(
431 context.clone(),
432 LeaderSwapTable::default(),
433 ));
434 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
435 let wave_length = DEFAULT_WAVE_LENGTH;
436
437 let leader_round_wave_1 = 3;
438 let leader_round_wave_2 = leader_round_wave_1 + wave_length;
439
440 let mut dag_builder = DagBuilder::new(context.clone());
442 dag_builder.layers(1..=leader_round_wave_2).build();
443
444 let mut blocks = dag_builder.blocks(0..=leader_round_wave_1 - 1);
448 blocks.push(
449 dag_builder
450 .leader_block(leader_round_wave_1)
451 .expect("Leader block should have been found"),
452 );
453 dag_state.write().accept_blocks(blocks.clone());
454
455 let first_leader = dag_builder
456 .leader_block(leader_round_wave_1)
457 .expect("Wave 1 leader round block should exist");
458 let mut last_commit_index = 1;
459 let first_commit_data = TrustedCommit::new_for_test(
460 last_commit_index,
461 CommitDigest::MIN,
462 0,
463 first_leader.reference(),
464 blocks.iter().map(|block| block.reference()).collect(),
465 );
466 dag_state.write().add_commit(first_commit_data);
467
468 for block in blocks.iter() {
470 dag_state.write().set_committed(&block.reference());
471 }
472
473 let mut blocks = dag_builder.blocks(leader_round_wave_1..=leader_round_wave_2 - 1);
475 blocks.retain(|block| {
477 !(block.round() == leader_round_wave_1
478 && block.author() == leader_schedule.elect_leader(leader_round_wave_1, 0))
479 });
480 blocks.push(
482 dag_builder
483 .leader_block(leader_round_wave_2)
484 .expect("Leader block should have been found"),
485 );
486 dag_state.write().accept_blocks(blocks.clone());
488
489 let mut blocks: Vec<_> = blocks.into_iter().map(|block| block.reference()).collect();
490
491 let leader = dag_builder
493 .leader_block(leader_round_wave_2)
494 .expect("Leader block should exist");
495
496 last_commit_index += 1;
497 let expected_second_commit = TrustedCommit::new_for_test(
498 last_commit_index,
499 CommitDigest::MIN,
500 0,
501 leader.reference(),
502 blocks.clone(),
503 );
504
505 let commit = linearizer.handle_commit(vec![leader.clone()]);
506 assert_eq!(commit.len(), 1);
507
508 let subdag = &commit[0];
509 tracing::info!("{subdag:?}");
510 assert_eq!(subdag.leader, leader.reference());
511 assert_eq!(subdag.commit_ref.index, expected_second_commit.index());
512
513 let expected_ts = median_timestamp_by_stake(
514 &context,
515 subdag.blocks.iter().filter_map(|block| {
516 if block.round() == subdag.leader.round - 1 {
517 Some(block.clone())
518 } else {
519 None
520 }
521 }),
522 )
523 .unwrap();
524 assert_eq!(subdag.timestamp_ms, expected_ts);
525
526 blocks.sort_by(|a, b| a.round.cmp(&b.round).then_with(|| a.author.cmp(&b.author)));
528 assert_eq!(
529 subdag
530 .blocks
531 .clone()
532 .into_iter()
533 .map(|b| b.reference())
534 .collect::<Vec<_>>(),
535 blocks
536 );
537 for block in subdag.blocks.iter() {
538 assert!(block.round() <= expected_second_commit.leader().round);
539 }
540 }
541
542 #[tokio::test]
545 async fn test_handle_commit_with_gc_simple() {
546 telemetry_subscribers::init_for_testing();
547
548 const GC_DEPTH: u32 = 3;
549
550 let num_authorities = 4;
551 let (mut context, _keys) = Context::new_for_test(num_authorities);
552 context
553 .protocol_config
554 .set_consensus_gc_depth_for_testing(GC_DEPTH);
555
556 let context = Arc::new(context);
557 let dag_state = Arc::new(RwLock::new(DagState::new(
558 context.clone(),
559 Arc::new(MemStore::new()),
560 )));
561 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
562
563 let dag_str = "DAG {
569 Round 0 : { 4 },
570 Round 1 : { * },
571 Round 2 : {
572 A -> [-D1],
573 B -> [-D1],
574 C -> [-D1],
575 D -> [*],
576 },
577 Round 3 : {
578 A -> [-D2],
579 B -> [-D2],
580 C -> [-D2],
581 },
582 Round 4 : {
583 A -> [-D3],
584 B -> [-D3],
585 C -> [-D3],
586 D -> [A3, B3, C3, D2],
587 },
588 Round 5 : { * },
589 }";
590
591 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
592 dag_builder.print();
593 dag_builder.persist_all_blocks(dag_state.clone());
594
595 let leaders = dag_builder
596 .leader_blocks(1..=6)
597 .into_iter()
598 .flatten()
599 .collect::<Vec<_>>();
600
601 let commits = linearizer.handle_commit(leaders.clone());
602 for (idx, subdag) in commits.into_iter().enumerate() {
603 tracing::info!("{subdag:?}");
604 assert_eq!(subdag.leader, leaders[idx].reference());
605
606 let expected_ts = {
607 let block_refs = leaders[idx]
608 .ancestors()
609 .iter()
610 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
611 .cloned()
612 .collect::<Vec<_>>();
613 let blocks = dag_state
614 .read()
615 .get_blocks(&block_refs)
616 .into_iter()
617 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
618
619 median_timestamp_by_stake(&context, blocks).unwrap()
620 };
621 assert_eq!(subdag.timestamp_ms, expected_ts);
622
623 if idx == 0 {
624 assert_eq!(subdag.blocks.len(), 1);
626 } else if idx == 1 {
627 assert_eq!(subdag.blocks.len(), 3);
628 } else if idx == 2 {
629 assert_eq!(subdag.blocks.len(), 6);
634 } else {
635 assert_eq!(subdag.blocks.len(), 5);
640
641 assert!(
642 subdag.blocks.iter().all(|block| block.round() >= 2),
643 "Found blocks that are of round < 2."
644 );
645
646 assert_eq!(dag_state.read().gc_round(), subdag.leader.round - GC_DEPTH);
648 }
649 for block in subdag.blocks.iter() {
650 assert!(block.round() <= leaders[idx].round());
651 }
652 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
653 }
654 }
655
656 #[tokio::test]
657 async fn test_handle_commit_below_highest_committed_round() {
658 telemetry_subscribers::init_for_testing();
659
660 const GC_DEPTH: u32 = 3;
661
662 let num_authorities = 4;
663 let (mut context, _keys) = Context::new_for_test(num_authorities);
664 context
665 .protocol_config
666 .set_consensus_gc_depth_for_testing(GC_DEPTH);
667
668 let context = Arc::new(context);
669 let dag_state = Arc::new(RwLock::new(DagState::new(
670 context.clone(),
671 Arc::new(MemStore::new()),
672 )));
673 let mut linearizer = Linearizer::new(context.clone(), dag_state.clone());
674
675 let dag_str = "DAG {
679 Round 0 : { 4 },
680 Round 1 : { * },
681 Round 2 : {
682 A -> [-D1],
683 B -> [-D1],
684 C -> [-D1],
685 D -> [-D1],
686 },
687 Round 3 : {
688 A -> [A2, B2, C2, D1],
689 B -> [A2, B2, C2, D1],
690 C -> [A2, B2, C2, D1],
691 D -> [A2, B2, C2, D2]
692 },
693 Round 4 : { * },
694 }";
695
696 let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
697 dag_builder.print();
698 dag_builder.persist_all_blocks(dag_state.clone());
699
700 let leaders = dag_builder
701 .leader_blocks(1..=4)
702 .into_iter()
703 .flatten()
704 .collect::<Vec<_>>();
705
706 let commits = linearizer.handle_commit(leaders.clone());
707 for (idx, subdag) in commits.into_iter().enumerate() {
708 tracing::info!("{subdag:?}");
709 assert_eq!(subdag.leader, leaders[idx].reference());
710
711 let expected_ts = {
712 let block_refs = leaders[idx]
713 .ancestors()
714 .iter()
715 .filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
716 .cloned()
717 .collect::<Vec<_>>();
718 let blocks = dag_state
719 .read()
720 .get_blocks(&block_refs)
721 .into_iter()
722 .map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
723
724 median_timestamp_by_stake(&context, blocks).unwrap()
725 };
726 assert_eq!(subdag.timestamp_ms, expected_ts);
727
728 if idx == 0 {
729 assert_eq!(subdag.blocks.len(), 1);
731 } else if idx == 1 {
732 assert_eq!(subdag.blocks.len(), 3);
736 } else if idx == 2 {
737 assert_eq!(subdag.blocks.len(), 4);
741
742 assert!(
743 subdag.blocks.iter().any(|block| block.round() == 2
744 && block.author() == AuthorityIndex::new_for_test(3)),
745 "Block D2 should have been committed."
746 );
747 } else if idx == 3 {
748 assert_eq!(subdag.blocks.len(), 5);
753 assert!(
754 subdag.blocks.iter().any(|block| block.round() == 1
755 && block.author() == AuthorityIndex::new_for_test(3)),
756 "Block D1 should have been committed."
757 );
758 } else {
759 panic!("Unexpected subdag with index {:?}", idx);
760 }
761
762 for block in subdag.blocks.iter() {
763 assert!(block.round() <= leaders[idx].round());
764 }
765 assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
766 }
767 }
768
769 #[rstest]
770 #[case(3_000, 3_000, 6_000)]
771 #[tokio::test]
772 async fn test_calculate_commit_timestamp(
773 #[case] timestamp_1: u64,
774 #[case] timestamp_2: u64,
775 #[case] timestamp_3: u64,
776 ) {
777 telemetry_subscribers::init_for_testing();
779
780 let num_authorities = 4;
781 let (context, _keys) = Context::new_for_test(num_authorities);
782
783 let context = Arc::new(context);
784 let store = Arc::new(MemStore::new());
785 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
786 let mut dag_state = dag_state.write();
787
788 let ancestors = vec![
789 VerifiedBlock::new_for_test(TestBlock::new(4, 0).set_timestamp_ms(1_000).build()),
790 VerifiedBlock::new_for_test(TestBlock::new(4, 1).set_timestamp_ms(2_000).build()),
791 VerifiedBlock::new_for_test(TestBlock::new(4, 2).set_timestamp_ms(3_000).build()),
792 VerifiedBlock::new_for_test(TestBlock::new(4, 3).set_timestamp_ms(4_000).build()),
793 ];
794
795 let leader_block = VerifiedBlock::new_for_test(
796 TestBlock::new(5, 0)
797 .set_timestamp_ms(5_000)
798 .set_ancestors(
799 ancestors
800 .iter()
801 .map(|block| block.reference())
802 .collect::<Vec<_>>(),
803 )
804 .build(),
805 );
806
807 for block in &ancestors {
808 dag_state.accept_block(block.clone());
809 }
810
811 let last_commit_timestamp_ms = 0;
812
813 let timestamp = Linearizer::calculate_commit_timestamp(
815 &context,
816 &mut dag_state,
817 &leader_block,
818 last_commit_timestamp_ms,
819 );
820 assert_eq!(timestamp, timestamp_1);
821
822 let leader_block = VerifiedBlock::new_for_test(
824 TestBlock::new(5, 0)
825 .set_timestamp_ms(5_000)
826 .set_ancestors(
827 ancestors
828 .iter()
829 .skip(1)
830 .map(|block| block.reference())
831 .collect::<Vec<_>>(),
832 )
833 .build(),
834 );
835
836 let timestamp = Linearizer::calculate_commit_timestamp(
837 &context,
838 &mut dag_state,
839 &leader_block,
840 last_commit_timestamp_ms,
841 );
842 assert_eq!(timestamp, timestamp_2);
843
844 let last_commit_timestamp_ms = 6_000;
846 let timestamp = Linearizer::calculate_commit_timestamp(
847 &context,
848 &mut dag_state,
849 &leader_block,
850 last_commit_timestamp_ms,
851 );
852 assert_eq!(timestamp, timestamp_3);
853
854 let (context, _) = Context::new_for_test(1);
856 let leader_block = VerifiedBlock::new_for_test(
857 TestBlock::new(5, 0)
858 .set_timestamp_ms(5_000)
859 .set_ancestors(
860 ancestors
861 .iter()
862 .take(1)
863 .map(|block| block.reference())
864 .collect::<Vec<_>>(),
865 )
866 .build(),
867 );
868 let last_commit_timestamp_ms = 0;
869 let timestamp = Linearizer::calculate_commit_timestamp(
870 &context,
871 &mut dag_state,
872 &leader_block,
873 last_commit_timestamp_ms,
874 );
875 assert_eq!(timestamp, 1_000);
876 }
877
878 #[test]
879 fn test_median_timestamps_by_stake() {
880 let timestamps = vec![(1_000, 1)];
882 assert_eq!(median_timestamps_by_stake_inner(timestamps, 1), 1_000);
883
884 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1)];
886 assert_eq!(median_timestamps_by_stake_inner(timestamps, 3), 2_000);
887
888 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1)];
890 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
891
892 let timestamps = vec![(4_000, 1), (3_000, 1), (1_000, 1), (2_000, 1)];
894 assert_eq!(median_timestamps_by_stake_inner(timestamps, 4), 3_000);
895
896 let timestamps = vec![(2_000, 2), (4_000, 2), (1_000, 3), (3_000, 3)];
898 assert_eq!(median_timestamps_by_stake_inner(timestamps, 10), 3_000);
899
900 let timestamps = vec![
902 (500, 2),
903 (4_000, 2),
904 (2_500, 3),
905 (1_000, 5),
906 (3_000, 3),
907 (2_000, 4),
908 ];
909 assert_eq!(median_timestamps_by_stake_inner(timestamps, 19), 2_000);
910
911 let timestamps = vec![(1_000, 1), (2_000, 1), (3_000, 1), (4_000, 1), (5_000, 10)];
913 assert_eq!(median_timestamps_by_stake_inner(timestamps, 14), 5_000);
914 }
915
916 #[tokio::test]
917 async fn test_median_timestamps_by_stake_errors() {
918 let num_authorities = 4;
919 let (context, _keys) = Context::new_for_test(num_authorities);
920 let context = Arc::new(context);
921
922 let err = median_timestamp_by_stake(&context, vec![].into_iter()).unwrap_err();
924 assert_eq!(err, "No blocks provided");
925
926 let block = VerifiedBlock::new_for_test(TestBlock::new(5, 0).build());
928 let err = median_timestamp_by_stake(&context, vec![block].into_iter()).unwrap_err();
929 assert_eq!(err, "Total stake 1 < quorum threshold 3");
930 }
931}