consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    sync::Arc,
7    vec,
8};
9
10use consensus_config::ProtocolKeyPair;
11#[cfg(test)]
12use consensus_config::{AuthorityIndex, Stake, local_committee_and_keys};
13use consensus_types::block::{BlockRef, Round};
14use itertools::Itertools as _;
15#[cfg(test)]
16use mysten_metrics::monitored_mpsc::UnboundedReceiver;
17use mysten_metrics::monitored_scope;
18use parking_lot::RwLock;
19use sui_macros::fail_point;
20use tokio::sync::{broadcast, watch};
21use tracing::{debug, info, trace, warn};
22
23#[cfg(test)]
24use crate::{
25    CommitConsumerArgs, TransactionClient,
26    block_verifier::NoopBlockVerifier,
27    storage::{Store, WriteBatch, mem_store::MemStore},
28};
29use crate::{
30    ancestor::AncestorStateManager,
31    block::{BlockAPI, ExtendedBlock, GENESIS_ROUND, Slot, VerifiedBlock},
32    block_manager::BlockManager,
33    commit::{
34        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
35    },
36    commit_observer::CommitObserver,
37    context::Context,
38    dag_state::DagState,
39    error::{ConsensusError, ConsensusResult},
40    leader_schedule::LeaderSchedule,
41    proposer::{Proposer, ValidatorProposer},
42    round_tracker::RoundTracker,
43    transaction::TransactionConsumer,
44    transaction_vote_tracker::TransactionVoteTracker,
45    universal_committer::{
46        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
47    },
48};
49
50pub(crate) struct Core {
51    context: Arc<Context>,
52    /// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
53    /// and accept them or suspend if we are missing their causal history
54    block_manager: BlockManager,
55    /// Used to make commit decisions for leader blocks in the dag.
56    committer: Arc<UniversalCommitter>,
57    /// The last new round for which core has sent out a signal.
58    last_signaled_round: Round,
59    /// The last decided leader returned from the universal committer. Important to note
60    /// that this does not signify that the leader has been persisted yet as it still has
61    /// to go through CommitObserver and persist the commit in store. On recovery/restart
62    /// the last_decided_leader will be set to the last_commit leader in dag state.
63    last_decided_leader: Slot,
64    /// The consensus leader schedule to be used to resolve the leader for a
65    /// given round.
66    leader_schedule: Arc<LeaderSchedule>,
67    /// The commit observer is responsible for observing the commits and collecting
68    /// + sending subdags over the consensus output channel.
69    commit_observer: CommitObserver,
70    /// Sender of outgoing signals from Core.
71    signals: CoreSignals,
72    /// Keeping track of state of the DAG, including blocks, commits and last committed rounds.
73    dag_state: Arc<RwLock<DagState>>,
74    /// Block proposal engine for Validator nodes only.
75    /// Validators have a proposer to create blocks, Observers have None (they only receive blocks).
76    proposer: Option<Box<dyn Proposer>>,
77}
78
79impl Core {
80    /// Creates a new Core instance for a validator node that participates in consensus.
81    pub(crate) fn new_validator(
82        context: Arc<Context>,
83        leader_schedule: Arc<LeaderSchedule>,
84        transaction_consumer: TransactionConsumer,
85        transaction_vote_tracker: TransactionVoteTracker,
86        block_manager: BlockManager,
87        commit_observer: CommitObserver,
88        signals: CoreSignals,
89        block_signer: ProtocolKeyPair,
90        dag_state: Arc<RwLock<DagState>>,
91        sync_last_known_own_block: bool,
92        round_tracker: Arc<RwLock<RoundTracker>>,
93    ) -> Self {
94        let last_decided_leader = dag_state.read().last_commit_leader();
95        let number_of_leaders = context.protocol_config.num_leaders_per_round().unwrap_or(1);
96        let committer = Arc::new(
97            UniversalCommitterBuilder::new(
98                context.clone(),
99                leader_schedule.clone(),
100                dag_state.clone(),
101            )
102            .with_number_of_leaders(number_of_leaders)
103            .with_pipeline(true)
104            .build(),
105        );
106
107        let last_proposed_block = dag_state
108            .read()
109            .get_last_proposed_block()
110            .expect("A block should have been returned");
111        let last_signaled_round = last_proposed_block.round();
112
113        // Recover the last included ancestor rounds based on the last proposed block. That will allow
114        // to perform the next block proposal by using ancestor blocks of higher rounds and avoid
115        // re-including blocks that have been already included in the last (or earlier) block proposal.
116        // This is only strongly guaranteed for a quorum of ancestors. It is still possible to re-include
117        // a block from an authority which hadn't been added as part of the last proposal hence its
118        // latest included ancestor is not accurately captured here. This is considered a small deficiency,
119        // and it mostly matters just for this next proposal without any actual penalties in performance
120        // or block proposal.
121        let mut last_included_ancestors = vec![None; context.committee.size()];
122        for ancestor in last_proposed_block.ancestors() {
123            last_included_ancestors[ancestor.author] = Some(*ancestor);
124        }
125
126        let last_known_proposed_round = if sync_last_known_own_block {
127            None
128        } else {
129            // if the sync is disabled then we practically don't want to impose any restriction.
130            Some(0)
131        };
132
133        let propagation_scores = leader_schedule
134            .leader_swap_table
135            .read()
136            .reputation_scores
137            .clone();
138        let mut ancestor_state_manager =
139            AncestorStateManager::new(context.clone(), dag_state.clone());
140        ancestor_state_manager.set_propagation_scores(propagation_scores);
141
142        // Create the ValidatorProposer
143        let proposer = Some(Box::new(ValidatorProposer::new(
144            dag_state.clone(),
145            context.clone(),
146            transaction_consumer,
147            transaction_vote_tracker.clone(),
148            block_signer,
149            last_known_proposed_round,
150            ancestor_state_manager,
151            round_tracker.clone(),
152            committer.clone(),
153        )) as Box<dyn Proposer>);
154
155        Self {
156            context,
157            last_signaled_round,
158            last_decided_leader,
159            leader_schedule,
160            block_manager,
161            committer,
162            commit_observer,
163            signals,
164            dag_state,
165            proposer,
166        }
167        .recover_validator()
168    }
169
170    /// Creates a new Core instance for an observer node that only processes blocks.
171    pub(crate) fn new_observer(
172        context: Arc<Context>,
173        leader_schedule: Arc<LeaderSchedule>,
174        block_manager: BlockManager,
175        commit_observer: CommitObserver,
176        signals: CoreSignals,
177        dag_state: Arc<RwLock<DagState>>,
178    ) -> Self {
179        let last_decided_leader = dag_state.read().last_commit_leader();
180        let number_of_leaders = context.protocol_config.num_leaders_per_round().unwrap_or(1);
181        let committer = Arc::new(
182            UniversalCommitterBuilder::new(
183                context.clone(),
184                leader_schedule.clone(),
185                dag_state.clone(),
186            )
187            .with_number_of_leaders(number_of_leaders)
188            .with_pipeline(true)
189            .build(),
190        );
191
192        // For the Observer nodes let's consider the last signaled round as the latest threshold clock round.
193        let last_signaled_round = dag_state.read().threshold_clock_round();
194
195        Self {
196            context,
197            last_signaled_round,
198            last_decided_leader,
199            leader_schedule,
200            block_manager,
201            committer,
202            commit_observer,
203            signals,
204            dag_state,
205            proposer: None,
206        }
207        .recover_observer()
208    }
209
210    fn recover_observer(mut self) -> Self {
211        let _s = self
212            .context
213            .metrics
214            .node_metrics
215            .scope_processing_time
216            .with_label_values(&["Core::recover_observer"])
217            .start_timer();
218
219        // Try to commit, since they may not have run after the last storage write.
220        self.try_commit(vec![]).unwrap();
221
222        self.try_signal_new_round();
223
224        self
225    }
226
227    fn recover_validator(mut self) -> Self {
228        let _s = self
229            .context
230            .metrics
231            .node_metrics
232            .scope_processing_time
233            .with_label_values(&["Core::recover_validator"])
234            .start_timer();
235
236        // Try to commit and propose, since they may not have run after the last storage write.
237        self.try_commit(vec![]).unwrap();
238
239        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
240        {
241            last_proposed_block
242        } else {
243            let proposer = self
244                .proposer
245                .as_ref()
246                .expect("Validator must have proposer");
247            let last_proposed_block = proposer.last_proposed_block();
248
249            if proposer.should_propose() {
250                assert!(
251                    last_proposed_block.round() > GENESIS_ROUND,
252                    "At minimum a block of round higher than genesis should have been produced during recovery"
253                );
254            }
255
256            // if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
257            self.signals
258                .new_block(ExtendedBlock {
259                    block: last_proposed_block.clone(),
260                    excluded_ancestors: vec![],
261                })
262                .unwrap();
263            last_proposed_block
264        };
265
266        // Try to set up leader timeout if needed.
267        // This needs to be called after try_commit() and try_propose(), which may
268        // have advanced the threshold clock round.
269        self.try_signal_new_round();
270
271        info!(
272            "Core recovery for validator completed with last proposed block {:?}",
273            last_proposed_block
274        );
275
276        self
277    }
278
279    /// Calls `BlockManager::try_accept_blocks` and broadcasts each accepted block to any active
280    /// observer subscribers. Returns the accepted blocks alongside any missing block refs.
281    fn accept_blocks(
282        &mut self,
283        blocks: Vec<VerifiedBlock>,
284    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
285        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
286        for block in &accepted_blocks {
287            tracing::trace!(
288                "{} Core accepted round {}, author {} at timestamp: {}",
289                self.context.own_index,
290                block.round(),
291                block.author(),
292                block.timestamp_ms()
293            );
294            self.signals.new_accepted_block(block.clone());
295        }
296        (accepted_blocks, missing_block_refs)
297    }
298
299    /// Calls `BlockManager::try_accept_committed_blocks` and broadcasts each accepted block to any
300    /// active observer subscribers. Returns all accepted blocks.
301    fn accept_committed_blocks(&mut self, blocks: Vec<VerifiedBlock>) -> Vec<VerifiedBlock> {
302        let accepted_blocks = self.block_manager.try_accept_committed_blocks(blocks);
303        for block in &accepted_blocks {
304            self.signals.new_accepted_block(block.clone());
305        }
306        accepted_blocks
307    }
308
309    /// Processes the provided blocks and accepts them if possible when their causal history exists.
310    /// The method returns:
311    /// - The references of ancestors missing their block
312    #[tracing::instrument(skip_all)]
313    pub(crate) fn add_blocks(
314        &mut self,
315        blocks: Vec<VerifiedBlock>,
316    ) -> ConsensusResult<BTreeSet<BlockRef>> {
317        let _scope = monitored_scope("Core::add_blocks");
318        let _s = self
319            .context
320            .metrics
321            .node_metrics
322            .scope_processing_time
323            .with_label_values(&["Core::add_blocks"])
324            .start_timer();
325        self.context
326            .metrics
327            .node_metrics
328            .core_add_blocks_batch_size
329            .observe(blocks.len() as f64);
330
331        let (accepted_blocks, missing_block_refs) = self.accept_blocks(blocks);
332
333        if !accepted_blocks.is_empty() {
334            trace!(
335                "Accepted blocks: {}",
336                accepted_blocks
337                    .iter()
338                    .map(|b| b.reference().to_string())
339                    .join(",")
340            );
341
342            // Try to commit the new blocks if possible.
343            self.try_commit(vec![])?;
344
345            // Try to propose now since there are new blocks accepted.
346            self.try_propose(false)?;
347
348            // Now set up leader timeout if needed.
349            // This needs to be called after try_commit() and try_propose(), which may
350            // have advanced the threshold clock round.
351            self.try_signal_new_round();
352        };
353
354        if !missing_block_refs.is_empty() {
355            trace!(
356                "Missing block refs: {}",
357                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
358            );
359        }
360
361        Ok(missing_block_refs)
362    }
363
364    // Adds the certified commits that have been synced via the commit syncer. We are using the commit info in order to skip running the decision
365    // rule and immediately commit the corresponding leaders and sub dags. Pay attention that no block acceptance is happening here, but rather
366    // internally in the `try_commit` method which ensures that everytime only the blocks corresponding to the certified commits that are about to
367    // be committed are accepted.
368    #[tracing::instrument(skip_all)]
369    pub(crate) fn add_certified_commits(
370        &mut self,
371        certified_commits: CertifiedCommits,
372    ) -> ConsensusResult<BTreeSet<BlockRef>> {
373        let _scope = monitored_scope("Core::add_certified_commits");
374
375        let votes = certified_commits.votes().to_vec();
376        let commits = self
377            .filter_new_commits(certified_commits.commits().to_vec())
378            .expect("Certified commits validation failed");
379
380        // Try to accept the certified commit votes.
381        // Even if they may not be part of a future commit, these blocks are useful for certifying
382        // commits when helping peers sync commits.
383        let (_, missing_block_refs) = self.accept_blocks(votes);
384
385        // Try to commit the new blocks. Take into account the trusted commit that has been provided.
386        self.try_commit(commits)?;
387
388        // Try to propose now since there are new blocks accepted.
389        self.try_propose(false)?;
390
391        // Now set up leader timeout if needed.
392        // This needs to be called after try_commit() and try_propose(), which may
393        // have advanced the threshold clock round.
394        self.try_signal_new_round();
395
396        Ok(missing_block_refs)
397    }
398
399    /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations.
400    /// Returns the references of missing blocks among the input blocks.
401    pub(crate) fn check_block_refs(
402        &mut self,
403        block_refs: Vec<BlockRef>,
404    ) -> ConsensusResult<BTreeSet<BlockRef>> {
405        let _scope = monitored_scope("Core::check_block_refs");
406        let _s = self
407            .context
408            .metrics
409            .node_metrics
410            .scope_processing_time
411            .with_label_values(&["Core::check_block_refs"])
412            .start_timer();
413        self.context
414            .metrics
415            .node_metrics
416            .core_check_block_refs_batch_size
417            .observe(block_refs.len() as f64);
418
419        // Try to find them via the block manager
420        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
421
422        if !missing_block_refs.is_empty() {
423            trace!(
424                "Missing block refs: {}",
425                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
426            );
427        }
428        Ok(missing_block_refs)
429    }
430
431    /// If needed, signals a new clock round and sets up leader timeout.
432    fn try_signal_new_round(&mut self) {
433        // Signal only when the threshold clock round is more advanced than the last signaled round.
434        //
435        // NOTE: a signal is still sent even when a block has been proposed at the new round.
436        // We can consider changing this in the future.
437        let new_clock_round = self.dag_state.read().threshold_clock_round();
438        if new_clock_round <= self.last_signaled_round {
439            return;
440        }
441        // Then send a signal to set up leader timeout.
442        self.signals.new_round(new_clock_round);
443        self.last_signaled_round = new_clock_round;
444
445        // Report the threshold clock round
446        self.context
447            .metrics
448            .node_metrics
449            .threshold_clock_round
450            .set(new_clock_round as i64);
451    }
452
453    /// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
454    /// when the min timeout expires or max. When `force = true` , then any checks like previous round
455    /// leader existence will get skipped.
456    pub(crate) fn new_block(
457        &mut self,
458        round: Round,
459        force: bool,
460    ) -> ConsensusResult<Option<VerifiedBlock>> {
461        let _scope = monitored_scope("Core::new_block");
462        if let Some(last_round) = self.last_proposed_round()
463            && last_round < round
464        {
465            self.context
466                .metrics
467                .node_metrics
468                .leader_timeout_total
469                .with_label_values(&[&format!("{force}")])
470                .inc();
471            let result = self.try_propose(force);
472            // The threshold clock round may have advanced, so a signal needs to be sent.
473            self.try_signal_new_round();
474            return result;
475        }
476        Ok(None)
477    }
478
479    /// Keeps only the certified commits that have a commit index > last commit index.
480    /// It also ensures that the first commit in the list is the next one in line, otherwise it panics.
481    fn filter_new_commits(
482        &mut self,
483        commits: Vec<CertifiedCommit>,
484    ) -> ConsensusResult<Vec<CertifiedCommit>> {
485        // Filter out the commits that have been already locally committed and keep only anything that is above the last committed index.
486        let last_commit_index = self.dag_state.read().last_commit_index();
487        let commits = commits
488            .iter()
489            .filter(|commit| {
490                if commit.index() > last_commit_index {
491                    true
492                } else {
493                    tracing::debug!(
494                        "Skip commit for index {} as it is already committed with last commit index {}",
495                        commit.index(),
496                        last_commit_index
497                    );
498                    false
499                }
500            })
501            .cloned()
502            .collect::<Vec<_>>();
503
504        // Make sure that the first commit we find is the next one in line and there is no gap.
505        if let Some(commit) = commits.first()
506            && commit.index() != last_commit_index + 1
507        {
508            return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
509                expected_commit_index: last_commit_index + 1,
510                commit_index: commit.index(),
511            });
512        }
513
514        Ok(commits)
515    }
516
517    // Attempts to create a new block, persist and propose it to all peers.
518    // When force is true, ignore if leader from the last round exists among ancestors and if
519    // the minimum round delay has passed.
520    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
521        if let Some(proposer) = &mut self.proposer
522            && let Some(extended_block) = proposer.try_new_block(force)
523        {
524            self.signals.new_block(extended_block.clone())?;
525            self.signals
526                .new_accepted_block(extended_block.block.clone());
527
528            fail_point!("consensus-after-propose");
529
530            // The new block may help commit.
531            self.try_commit(vec![])?;
532            return Ok(Some(extended_block.block));
533        }
534        Ok(None)
535    }
536
537    /// Runs commit rule to attempt to commit additional blocks from the DAG. If any `certified_commits` are provided, then
538    /// it will attempt to commit those first before trying to commit any further leaders.
539    fn try_commit(
540        &mut self,
541        mut certified_commits: Vec<CertifiedCommit>,
542    ) -> ConsensusResult<Vec<CommittedSubDag>> {
543        let _s = self
544            .context
545            .metrics
546            .node_metrics
547            .scope_processing_time
548            .with_label_values(&["Core::try_commit"])
549            .start_timer();
550
551        let mut certified_commits_map = BTreeMap::new();
552        for c in &certified_commits {
553            certified_commits_map.insert(c.index(), c.reference());
554        }
555
556        if !certified_commits.is_empty() {
557            info!(
558                "Processing synced commits: {:?}",
559                certified_commits
560                    .iter()
561                    .map(|c| (c.index(), c.leader()))
562                    .collect::<Vec<_>>()
563            );
564        }
565
566        let mut committed_sub_dags = Vec::new();
567        // TODO: Add optimization to abort early without quorum for a round.
568        loop {
569            // LeaderSchedule has a limit to how many sequenced leaders can be committed
570            // before a change is triggered. Calling into leader schedule will get you
571            // how many commits till next leader change. We will loop back and recalculate
572            // any discarded leaders with the new schedule.
573            let mut commits_until_update = self
574                .leader_schedule
575                .commits_until_leader_schedule_update(self.dag_state.clone());
576
577            if commits_until_update == 0 {
578                let last_commit_index = self.dag_state.read().last_commit_index();
579
580                tracing::info!(
581                    "Leader schedule change triggered at commit index {last_commit_index}"
582                );
583
584                self.leader_schedule
585                    .update_leader_schedule_v2(&self.dag_state);
586
587                let propagation_scores = self
588                    .leader_schedule
589                    .leader_swap_table
590                    .read()
591                    .reputation_scores
592                    .clone();
593                if let Some(proposer) = &mut self.proposer {
594                    proposer.set_propagation_scores(propagation_scores);
595                }
596
597                commits_until_update = self
598                    .leader_schedule
599                    .commits_until_leader_schedule_update(self.dag_state.clone());
600
601                fail_point!("consensus-after-leader-schedule-change");
602            }
603            assert!(commits_until_update > 0);
604
605            // If there are certified commits to process, find out which leaders and commits from them
606            // are decided and use them as the next commits.
607            let (certified_leaders, decided_certified_commits): (
608                Vec<DecidedLeader>,
609                Vec<CertifiedCommit>,
610            ) = self
611                .try_select_certified_leaders(&mut certified_commits, commits_until_update)
612                .into_iter()
613                .unzip();
614
615            // Only accept blocks for the certified commits that we are certain to sequence.
616            // This ensures that only blocks corresponding to committed certified commits are flushed to disk.
617            // Blocks from non-committed certified commits will not be flushed, preventing issues during crash-recovery.
618            // This avoids scenarios where accepting and flushing blocks of non-committed certified commits could lead to
619            // premature commit rule execution. Due to GC, this could cause a panic if the commit rule tries to access
620            // missing causal history from blocks of certified commits.
621            let blocks = decided_certified_commits
622                .iter()
623                .flat_map(|c| c.blocks())
624                .cloned()
625                .collect::<Vec<_>>();
626            self.accept_committed_blocks(blocks);
627
628            // If there is no certified commit to process, run the decision rule.
629            let (decided_leaders, local) = if certified_leaders.is_empty() {
630                // TODO: limit commits by commits_until_update for efficiency, which may be needed when leader schedule length is reduced.
631                let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
632                // Truncate the decided leaders to fit the commit schedule limit.
633                if decided_leaders.len() >= commits_until_update {
634                    let _ = decided_leaders.split_off(commits_until_update);
635                }
636                (decided_leaders, true)
637            } else {
638                (certified_leaders, false)
639            };
640
641            // If the decided leaders list is empty then just break the loop.
642            let Some(last_decided) = decided_leaders.last().cloned() else {
643                break;
644            };
645
646            self.last_decided_leader = last_decided.slot();
647            self.context
648                .metrics
649                .node_metrics
650                .last_decided_leader_round
651                .set(self.last_decided_leader.round as i64);
652
653            let sequenced_leaders = decided_leaders
654                .into_iter()
655                .filter_map(|leader| leader.into_committed_block())
656                .collect::<Vec<_>>();
657            // It's possible to reach this point as the decided leaders might all of them be "Skip" decisions. In this case there is no
658            // leader to commit and we should break the loop.
659            if sequenced_leaders.is_empty() {
660                break;
661            }
662            tracing::info!(
663                "Committing {} leaders: {}; {} commits before next leader schedule change",
664                sequenced_leaders.len(),
665                sequenced_leaders
666                    .iter()
667                    .map(|b| b.reference().to_string())
668                    .join(","),
669                commits_until_update,
670            );
671
672            // TODO: refcount subdags
673            let subdags = self
674                .commit_observer
675                .handle_commit(sequenced_leaders, local)?;
676
677            // Try to unsuspend blocks if gc_round has advanced.
678            self.block_manager
679                .try_unsuspend_blocks_for_latest_gc_round();
680
681            committed_sub_dags.extend(subdags);
682
683            fail_point!("consensus-after-handle-commit");
684        }
685
686        // Sanity check: for commits that have been linearized using the certified commits, ensure that the same sub dag has been committed.
687        for sub_dag in &committed_sub_dags {
688            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
689                assert_eq!(
690                    commit_ref, sub_dag.commit_ref,
691                    "Certified commit has different reference than the committed sub dag"
692                );
693            }
694        }
695
696        // Notify about our own committed blocks
697        let committed_block_refs = committed_sub_dags
698            .iter()
699            .flat_map(|sub_dag| sub_dag.blocks.iter())
700            .filter_map(|block| {
701                (block.author() == self.context.own_index).then_some(block.reference())
702            })
703            .collect::<Vec<_>>();
704        if let Some(proposer) = &self.proposer {
705            proposer.notify_own_blocks_committed(
706                committed_block_refs,
707                self.dag_state.read().gc_round(),
708            );
709        }
710
711        Ok(committed_sub_dags)
712    }
713
714    pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
715        let _scope = monitored_scope("Core::get_missing_blocks");
716        self.block_manager.missing_blocks()
717    }
718
719    /// Sets the delay by round for propagating blocks to a quorum.
720    pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
721        info!("Propagation round delay set to: {delay}");
722        if let Some(proposer) = &mut self.proposer {
723            proposer.set_propagation_delay(delay);
724        }
725    }
726
727    /// Sets the min propose round for the proposer allowing to propose blocks only for round numbers
728    /// `> last_known_proposed_round`. At the moment is allowed to call the method only once leading to a panic
729    /// if attempt to do multiple times.
730    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
731        if let Some(proposer) = &mut self.proposer {
732            if proposer.get_last_known_proposed_round().is_some() {
733                panic!(
734                    "Should not attempt to set the last known proposed round if that has been already set"
735                );
736            }
737            proposer.set_last_known_proposed_round(round);
738            info!("Last known proposed round set to {round}");
739        }
740    }
741
742    /// Returns true if the node should propose blocks.
743    /// Observers always return false since they don't have a proposer.
744    pub(crate) fn should_propose(&self) -> bool {
745        self.proposer
746            .as_ref()
747            .map(|p| p.should_propose())
748            .unwrap_or(false)
749    }
750
751    /// Returns the last proposed round, or None if this is an observer node.
752    pub(crate) fn last_proposed_round(&self) -> Option<Round> {
753        self.proposer.as_ref().map(|p| p.last_proposed_round())
754    }
755
756    // Tries to select a prefix of certified commits to be committed next respecting the `limit`.
757    // If provided `limit` is zero, it will panic.
758    // The function returns a list of certified leaders and certified commits. If empty vector is returned, it means that
759    // there are no certified commits to be committed, as input `certified_commits` is either empty or all of the certified
760    // commits have been already committed.
761    #[tracing::instrument(skip_all)]
762    fn try_select_certified_leaders(
763        &mut self,
764        certified_commits: &mut Vec<CertifiedCommit>,
765        limit: usize,
766    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
767        assert!(limit > 0, "limit should be greater than 0");
768        if certified_commits.is_empty() {
769            return vec![];
770        }
771
772        let to_commit = if certified_commits.len() >= limit {
773            // We keep only the number of leaders as dictated by the `limit`
774            certified_commits.drain(..limit).collect::<Vec<_>>()
775        } else {
776            // Otherwise just take all of them and leave the `synced_commits` empty.
777            std::mem::take(certified_commits)
778        };
779
780        tracing::debug!(
781            "Selected {} certified leaders: {}",
782            to_commit.len(),
783            to_commit.iter().map(|c| c.leader().to_string()).join(",")
784        );
785
786        to_commit
787            .into_iter()
788            .map(|commit| {
789                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
790                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
791                // There is no knowledge of direct commit with certified commits, so assuming indirect commit.
792                let leader = DecidedLeader::Commit(leader.clone(), /* direct */ false);
793                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
794                (leader, commit)
795            })
796            .collect::<Vec<_>>()
797    }
798
799    /// Helper method for tests to get the last proposed block from the proposer.
800    #[cfg(test)]
801    pub(crate) fn last_proposed_block(&self) -> VerifiedBlock {
802        self.proposer
803            .as_ref()
804            .expect("Proposer should be present")
805            .last_proposed_block()
806    }
807
808    /// Helper method for tests to get the round tracker from the proposer.
809    /// Returns None if this is an observer node.
810    #[cfg(test)]
811    pub(crate) fn round_tracker_for_tests(&self) -> Arc<RwLock<RoundTracker>> {
812        self.proposer
813            .as_ref()
814            .expect("Proposer should be present")
815            .round_tracker_for_tests()
816    }
817}
818
819/// Senders of signals from Core, for outputs and events (ex new block produced).
820pub(crate) struct CoreSignals {
821    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
822    tx_accepted_block_broadcast: broadcast::Sender<VerifiedBlock>,
823    new_round_sender: watch::Sender<Round>,
824    context: Arc<Context>,
825}
826
827impl CoreSignals {
828    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
829        // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state,
830        // since the underlying blocks are ref counted so a lower buffer here will not reduce memory
831        // usage significantly.
832        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
833            context.parameters.dag_state_cached_rounds as usize,
834        );
835        let (tx_accepted_block_broadcast, rx_accepted_block_broadcast) =
836            broadcast::channel::<VerifiedBlock>(
837                2 * context.parameters.dag_state_cached_rounds as usize * context.committee.size(),
838            );
839        let (new_round_sender, new_round_receiver) = watch::channel(0);
840
841        let me = Self {
842            tx_block_broadcast,
843            tx_accepted_block_broadcast,
844            new_round_sender,
845            context,
846        };
847
848        let receivers = CoreSignalsReceivers {
849            rx_block_broadcast,
850            rx_accepted_block_broadcast,
851            new_round_receiver,
852        };
853
854        (me, receivers)
855    }
856
857    /// Sends a signal to all the waiters that a new block has been produced. The method will return
858    /// true if block has reached even one subscriber, false otherwise.
859    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
860        // When there is only one authority in committee, it is unnecessary to broadcast
861        // the block which will fail anyway without subscribers to the signal.
862        if self.context.committee.size() > 1 {
863            if extended_block.block.round() == GENESIS_ROUND {
864                debug!("Ignoring broadcasting genesis block to peers");
865                return Ok(());
866            }
867
868            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
869                warn!("Couldn't broadcast the block to any receiver: {err}");
870                return Err(ConsensusError::Shutdown);
871            }
872        } else {
873            debug!(
874                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
875            );
876        }
877        Ok(())
878    }
879
880    /// Broadcasts a block that has been accepted into the local DAG to any active observer
881    /// subscribers. Unlike `new_block()`, this covers blocks from all authorities, not just
882    /// own proposals.
883    /// Silently drops the send when there are no active observer subscribers.
884    pub(crate) fn new_accepted_block(&self, block: VerifiedBlock) {
885        // Ignoring send errors here: it is normal for there to be no observers subscribed.
886        let _ = self.tx_accepted_block_broadcast.send(block);
887    }
888
889    /// Sends a signal that threshold clock has advanced to new round. The `round_number` is the round at which the
890    /// threshold clock has advanced to.
891    pub(crate) fn new_round(&mut self, round_number: Round) {
892        let _ = self.new_round_sender.send_replace(round_number);
893    }
894}
895
896/// Receivers of signals from Core.
897/// Intentionally un-clonable. Comonents should only subscribe to channels they need.
898pub(crate) struct CoreSignalsReceivers {
899    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
900    rx_accepted_block_broadcast: broadcast::Receiver<VerifiedBlock>,
901    new_round_receiver: watch::Receiver<Round>,
902}
903
904impl CoreSignalsReceivers {
905    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
906        self.rx_block_broadcast.resubscribe()
907    }
908
909    pub(crate) fn accepted_block_broadcast_receiver(&self) -> broadcast::Receiver<VerifiedBlock> {
910        self.rx_accepted_block_broadcast.resubscribe()
911    }
912
913    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
914        self.new_round_receiver.clone()
915    }
916}
917
918/// Creates cores for the specified number of authorities for their corresponding stakes. The method returns the
919/// cores and their respective signal receivers are returned in `AuthorityIndex` order asc.
920#[cfg(test)]
921pub(crate) async fn create_cores(
922    context: Context,
923    authorities: Vec<Stake>,
924) -> Vec<CoreTestFixture> {
925    let mut cores = Vec::new();
926
927    for index in 0..authorities.len() {
928        let own_index = AuthorityIndex::new_for_test(index as u32);
929        let core =
930            CoreTestFixture::new(context.clone(), authorities.clone(), own_index, false).await;
931        cores.push(core);
932    }
933    cores
934}
935
936#[cfg(test)]
937pub(crate) struct CoreTestFixture {
938    pub(crate) core: Core,
939    pub(crate) transaction_vote_tracker: TransactionVoteTracker,
940    pub(crate) signal_receivers: CoreSignalsReceivers,
941    pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
942    pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
943    pub(crate) dag_state: Arc<RwLock<DagState>>,
944    pub(crate) store: Arc<MemStore>,
945    pub(crate) transaction_client: TransactionClient,
946}
947
948#[cfg(test)]
949impl CoreTestFixture {
950    async fn new(
951        context: Context,
952        authorities: Vec<Stake>,
953        own_index: AuthorityIndex,
954        sync_last_known_own_block: bool,
955    ) -> Self {
956        Self::new_with_prepopulated_blocks(
957            context,
958            authorities,
959            own_index,
960            sync_last_known_own_block,
961            vec![],
962        )
963        .await
964    }
965
966    /// Variant of `new` that writes `prepopulated_blocks` into the store before
967    /// constructing `DagState` and `Core`. Used by tests that recover from a
968    /// pre-existing store state.
969    async fn new_with_prepopulated_blocks(
970        context: Context,
971        authorities: Vec<Stake>,
972        own_index: AuthorityIndex,
973        sync_last_known_own_block: bool,
974        prepopulated_blocks: Vec<VerifiedBlock>,
975    ) -> Self {
976        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
977        let mut context = context.clone();
978        context = context
979            .with_committee(committee)
980            .with_authority_index(own_index);
981        context
982            .protocol_config
983            .set_bad_nodes_stake_threshold_for_testing(33);
984
985        let context = Arc::new(context);
986        let store = Arc::new(MemStore::new());
987        if !prepopulated_blocks.is_empty() {
988            store
989                .write(WriteBatch::default().blocks(prepopulated_blocks))
990                .expect("Storage error");
991        }
992        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
993
994        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
995        let leader_schedule = Arc::new(
996            LeaderSchedule::from_store(context.clone(), dag_state.clone())
997                .with_num_commits_per_schedule(10),
998        );
999        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1000        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1001        let transaction_vote_tracker = TransactionVoteTracker::new(
1002            context.clone(),
1003            Arc::new(NoopBlockVerifier {}),
1004            dag_state.clone(),
1005        );
1006        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1007        // Need at least one subscriber to the block broadcast channel.
1008        let block_receiver = signal_receivers.block_broadcast_receiver();
1009
1010        let (commit_consumer, commit_output_receiver) = CommitConsumerArgs::new(0, 0);
1011        let commit_observer = CommitObserver::new(
1012            context.clone(),
1013            commit_consumer,
1014            dag_state.clone(),
1015            transaction_vote_tracker.clone(),
1016        )
1017        .await;
1018
1019        let block_signer = signers.remove(own_index.value()).1;
1020
1021        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1022        let core = Core::new_validator(
1023            context.clone(),
1024            leader_schedule,
1025            transaction_consumer,
1026            transaction_vote_tracker.clone(),
1027            block_manager,
1028            commit_observer,
1029            signals,
1030            block_signer,
1031            dag_state.clone(),
1032            sync_last_known_own_block,
1033            round_tracker.clone(),
1034        );
1035
1036        Self {
1037            core,
1038            transaction_vote_tracker,
1039            signal_receivers,
1040            block_receiver,
1041            _commit_output_receiver: commit_output_receiver,
1042            dag_state,
1043            store,
1044            transaction_client,
1045        }
1046    }
1047
1048    pub(crate) fn add_blocks(
1049        &mut self,
1050        blocks: Vec<VerifiedBlock>,
1051    ) -> ConsensusResult<BTreeSet<BlockRef>> {
1052        self.transaction_vote_tracker
1053            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1054        self.core.add_blocks(blocks)
1055    }
1056}
1057
1058#[cfg(test)]
1059mod test {
1060    use std::{collections::BTreeSet, iter, time::Duration};
1061
1062    use consensus_config::{AuthorityIndex, Parameters};
1063    use consensus_types::block::BlockTimestampMs;
1064    use futures::{StreamExt, stream::FuturesUnordered};
1065    use tokio::time::sleep;
1066
1067    use super::*;
1068    use crate::{
1069        CommitConsumerArgs, CommitIndex,
1070        block::{TestBlock, genesis_blocks},
1071        block_verifier::NoopBlockVerifier,
1072        commit::CommitAPI,
1073        leader_scoring::ReputationScores,
1074        storage::{Store, WriteBatch, mem_store::MemStore},
1075        test_dag_builder::DagBuilder,
1076        test_dag_parser::parse_dag,
1077        transaction::{BlockStatus, TransactionClient},
1078    };
1079
1080    /// Recover Core and continue proposing from the last round which forms a quorum.
1081    #[tokio::test]
1082    async fn test_core_recover_from_store_for_full_round() {
1083        telemetry_subscribers::init_for_testing();
1084        let (context, mut key_pairs) = Context::new_for_test(4);
1085        let context = Arc::new(context);
1086        let store = Arc::new(MemStore::new());
1087        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1088        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1089        let mut block_status_subscriptions = FuturesUnordered::new();
1090
1091        // Create test blocks for all the authorities for 4 rounds and populate them in store
1092        let mut last_round_blocks = genesis_blocks(&context);
1093        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1094        for round in 1..=4 {
1095            let mut this_round_blocks = Vec::new();
1096            for (index, _authority) in context.committee.authorities() {
1097                let block = VerifiedBlock::new_for_test(
1098                    TestBlock::new(round, index.value() as u32)
1099                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1100                        .build(),
1101                );
1102
1103                // If it's round 1, that one will be committed later on, and it's our "own" block, then subscribe to listen for the block status.
1104                if round == 1 && index == context.own_index {
1105                    let subscription =
1106                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1107                    block_status_subscriptions.push(subscription);
1108                }
1109
1110                this_round_blocks.push(block);
1111            }
1112            all_blocks.extend(this_round_blocks.clone());
1113            last_round_blocks = this_round_blocks;
1114        }
1115        // write them in store
1116        store
1117            .write(WriteBatch::default().blocks(all_blocks))
1118            .expect("Storage error");
1119
1120        // create dag state after all blocks have been written to store
1121        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1122        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1123        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1124            context.clone(),
1125            dag_state.clone(),
1126        ));
1127        let transaction_vote_tracker = TransactionVoteTracker::new(
1128            context.clone(),
1129            Arc::new(NoopBlockVerifier {}),
1130            dag_state.clone(),
1131        );
1132
1133        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1134        let commit_observer = CommitObserver::new(
1135            context.clone(),
1136            commit_consumer,
1137            dag_state.clone(),
1138            transaction_vote_tracker.clone(),
1139        )
1140        .await;
1141
1142        // Check no commits have been persisted to dag_state or store.
1143        let last_commit = store.read_last_commit().unwrap();
1144        assert!(last_commit.is_none());
1145        assert_eq!(dag_state.read().last_commit_index(), 0);
1146
1147        // Now spin up core
1148        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1149        let transaction_vote_tracker = TransactionVoteTracker::new(
1150            context.clone(),
1151            Arc::new(NoopBlockVerifier {}),
1152            dag_state.clone(),
1153        );
1154        transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
1155        // Need at least one subscriber to the block broadcast channel.
1156        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1157        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1158        let _core = Core::new_validator(
1159            context.clone(),
1160            leader_schedule,
1161            transaction_consumer,
1162            transaction_vote_tracker.clone(),
1163            block_manager,
1164            commit_observer,
1165            signals,
1166            key_pairs.remove(context.own_index.value()).1,
1167            dag_state.clone(),
1168            false,
1169            round_tracker,
1170        );
1171
1172        // New round should be 5
1173        let mut new_round = signal_receivers.new_round_receiver();
1174        assert_eq!(*new_round.borrow_and_update(), 5);
1175
1176        // Block for round 5 should have been proposed.
1177        let proposed_block = block_receiver
1178            .recv()
1179            .await
1180            .expect("A block should have been created");
1181        assert_eq!(proposed_block.block.round(), 5);
1182        let ancestors = proposed_block.block.ancestors();
1183
1184        // Only ancestors of round 4 should be included.
1185        assert_eq!(ancestors.len(), 4);
1186        for ancestor in ancestors {
1187            assert_eq!(ancestor.round, 4);
1188        }
1189
1190        // Flush the DAG state to storage.
1191        dag_state.write().flush();
1192
1193        // There were no commits prior to the core starting up but there was completed
1194        // rounds up to and including round 4. So we should commit leaders in round 1 & 2
1195        // as soon as the new block for round 5 is proposed.
1196        let last_commit = store
1197            .read_last_commit()
1198            .unwrap()
1199            .expect("last commit should be set");
1200        assert_eq!(last_commit.index(), 2);
1201        assert_eq!(dag_state.read().last_commit_index(), 2);
1202        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1203        assert_eq!(all_stored_commits.len(), 2);
1204
1205        // And ensure that our "own" block 1 sent to TransactionConsumer as notification alongside with gc_round
1206        while let Some(result) = block_status_subscriptions.next().await {
1207            let status = result.unwrap();
1208            assert!(matches!(status, BlockStatus::Sequenced(_)));
1209        }
1210    }
1211
1212    /// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't
1213    /// proposed for that round yet.
1214    #[tokio::test]
1215    async fn test_core_recover_from_store_for_partial_round() {
1216        telemetry_subscribers::init_for_testing();
1217
1218        let (context, _) = Context::new_for_test(4);
1219
1220        // Create test blocks for all authorities except our's (index = 0).
1221        let mut last_round_blocks = genesis_blocks(&context);
1222        let mut all_blocks = last_round_blocks.clone();
1223        for round in 1..=4 {
1224            let mut this_round_blocks = Vec::new();
1225
1226            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of position 1 from creating blocks.
1227            let authorities_to_skip = if round == 4 {
1228                context.committee.validity_threshold() as usize
1229            } else {
1230                // otherwise always skip creating a block for our authority
1231                1
1232            };
1233
1234            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1235                let block = TestBlock::new(round, index.value() as u32)
1236                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1237                    .build();
1238                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1239            }
1240            all_blocks.extend(this_round_blocks.clone());
1241            last_round_blocks = this_round_blocks;
1242        }
1243
1244        let mut fixture = CoreTestFixture::new_with_prepopulated_blocks(
1245            context,
1246            vec![1, 1, 1, 1],
1247            AuthorityIndex::new_for_test(0),
1248            false,
1249            all_blocks,
1250        )
1251        .await;
1252
1253        // Clock round should have advanced to 5 during recovery because
1254        // a quorum has formed in round 4.
1255        let mut new_round = fixture.signal_receivers.new_round_receiver();
1256        assert_eq!(*new_round.borrow_and_update(), 5);
1257
1258        // During recovery, round 4 block should have been proposed.
1259        let proposed_block = fixture
1260            .block_receiver
1261            .recv()
1262            .await
1263            .expect("A block should have been created");
1264        assert_eq!(proposed_block.block.round(), 4);
1265        let ancestors = proposed_block.block.ancestors();
1266
1267        assert_eq!(ancestors.len(), 4);
1268        let own_index = fixture.core.context.own_index;
1269        for ancestor in ancestors {
1270            if ancestor.author == own_index {
1271                assert_eq!(ancestor.round, 0);
1272            } else {
1273                assert_eq!(ancestor.round, 3);
1274            }
1275        }
1276
1277        // Run commit rule.
1278        fixture.core.try_commit(vec![]).ok();
1279
1280        // Flush the DAG state to storage.
1281        fixture.core.dag_state.write().flush();
1282
1283        // There were no commits prior to the core starting up but there was completed
1284        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1285        // as the new block for round 4 is proposed.
1286        let last_commit = fixture
1287            .store
1288            .read_last_commit()
1289            .unwrap()
1290            .expect("last commit should be set");
1291        assert_eq!(last_commit.index(), 2);
1292        assert_eq!(fixture.dag_state.read().last_commit_index(), 2);
1293        let all_stored_commits = fixture
1294            .store
1295            .scan_commits((0..=CommitIndex::MAX).into())
1296            .unwrap();
1297        assert_eq!(all_stored_commits.len(), 2);
1298    }
1299
1300    #[tokio::test]
1301    async fn test_core_propose_after_genesis() {
1302        telemetry_subscribers::init_for_testing();
1303        let (mut context, _) = Context::new_for_test(4);
1304        context
1305            .protocol_config
1306            .set_max_transaction_size_bytes_for_testing(2_000);
1307        context
1308            .protocol_config
1309            .set_max_transactions_in_block_bytes_for_testing(2_000);
1310
1311        let mut fixture = CoreTestFixture::new(
1312            context,
1313            vec![1, 1, 1, 1],
1314            AuthorityIndex::new_for_test(0),
1315            false,
1316        )
1317        .await;
1318
1319        // Send some transactions
1320        let mut total = 0;
1321        let mut index = 0;
1322        loop {
1323            let transaction =
1324                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1325            total += transaction.len();
1326            index += 1;
1327            let _w = fixture
1328                .transaction_client
1329                .submit_no_wait(vec![transaction])
1330                .await
1331                .unwrap();
1332
1333            // Create total size of transactions up to 1KB
1334            if total >= 1_000 {
1335                break;
1336            }
1337        }
1338
1339        // a new block should have been created during recovery.
1340        let extended_block = fixture
1341            .block_receiver
1342            .recv()
1343            .await
1344            .expect("A new block should have been created");
1345
1346        // A new block created - assert the details
1347        assert_eq!(extended_block.block.round(), 1);
1348        assert_eq!(extended_block.block.author().value(), 0);
1349        assert_eq!(extended_block.block.ancestors().len(), 4);
1350
1351        let mut total = 0;
1352        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1353            total += transaction.data().len() as u64;
1354            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1355            assert_eq!(format!("Transaction {i}"), transaction);
1356        }
1357        assert!(
1358            total
1359                <= fixture
1360                    .core
1361                    .context
1362                    .protocol_config
1363                    .max_transactions_in_block_bytes()
1364        );
1365
1366        // genesis blocks should be referenced
1367        let all_genesis = genesis_blocks(&fixture.core.context);
1368
1369        for ancestor in extended_block.block.ancestors() {
1370            all_genesis
1371                .iter()
1372                .find(|block| block.reference() == *ancestor)
1373                .expect("Block should be found amongst genesis blocks");
1374        }
1375
1376        // Try to propose again - with or without ignore leaders check, it will not return any block
1377        assert!(fixture.core.try_propose(false).unwrap().is_none());
1378        assert!(fixture.core.try_propose(true).unwrap().is_none());
1379
1380        // Flush the DAG state to storage.
1381        fixture.dag_state.write().flush();
1382
1383        // Check no commits have been persisted to dag_state & store
1384        let last_commit = fixture.store.read_last_commit().unwrap();
1385        assert!(last_commit.is_none());
1386        assert_eq!(fixture.dag_state.read().last_commit_index(), 0);
1387    }
1388
1389    #[tokio::test]
1390    async fn test_core_propose_once_receiving_a_quorum() {
1391        telemetry_subscribers::init_for_testing();
1392        let (context, _key_pairs) = Context::new_for_test(4);
1393        let mut core_fixture = CoreTestFixture::new(
1394            context.clone(),
1395            vec![1, 1, 1, 1],
1396            AuthorityIndex::new_for_test(0),
1397            false,
1398        )
1399        .await;
1400        let transaction_vote_tracker = &core_fixture.transaction_vote_tracker;
1401        let store = &core_fixture.store;
1402        let dag_state = &core_fixture.dag_state;
1403        let core = &mut core_fixture.core;
1404
1405        let mut expected_ancestors = BTreeSet::new();
1406
1407        // Adding one block now will trigger the creation of new block for round 1
1408        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1409        expected_ancestors.insert(block_1.reference());
1410        // Wait for min round delay to allow blocks to be proposed.
1411        sleep(context.parameters.min_round_delay).await;
1412        // add blocks to trigger proposal.
1413        transaction_vote_tracker.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1414        _ = core.add_blocks(vec![block_1]);
1415
1416        assert_eq!(core.last_proposed_round(), Some(1));
1417        expected_ancestors.insert(core.last_proposed_block().reference());
1418        // attempt to create a block - none will be produced.
1419        assert!(core.try_propose(false).unwrap().is_none());
1420
1421        // Adding another block now forms a quorum for round 1, so block at round 2 will proposed
1422        let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1423        expected_ancestors.insert(block_2.reference());
1424        // Wait for min round delay to allow blocks to be proposed.
1425        sleep(context.parameters.min_round_delay).await;
1426        // add blocks to trigger proposal.
1427        transaction_vote_tracker.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1428        _ = core.add_blocks(vec![block_2.clone()]);
1429
1430        assert_eq!(core.last_proposed_round(), Some(2));
1431
1432        let proposed_block = core.last_proposed_block();
1433        assert_eq!(proposed_block.round(), 2);
1434        assert_eq!(proposed_block.author(), context.own_index);
1435        assert_eq!(proposed_block.ancestors().len(), 3);
1436        let ancestors = proposed_block.ancestors();
1437        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1438        assert_eq!(ancestors, expected_ancestors);
1439
1440        let transaction_votes = proposed_block.transaction_votes();
1441        assert_eq!(transaction_votes.len(), 1);
1442        let transaction_vote = transaction_votes.first().unwrap();
1443        assert_eq!(transaction_vote.block_ref, block_2.reference());
1444        assert_eq!(transaction_vote.rejects, vec![1, 4]);
1445
1446        // Flush the DAG state to storage.
1447        dag_state.write().flush();
1448
1449        // Check no commits have been persisted to dag_state & store
1450        let last_commit = store.read_last_commit().unwrap();
1451        assert!(last_commit.is_none());
1452        assert_eq!(dag_state.read().last_commit_index(), 0);
1453    }
1454
1455    #[tokio::test]
1456    async fn test_commit_and_notify_for_block_status() {
1457        telemetry_subscribers::init_for_testing();
1458        let (mut context, mut key_pairs) = Context::new_for_test(4);
1459        const GC_DEPTH: u32 = 2;
1460
1461        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1462
1463        let context = Arc::new(context);
1464
1465        let store = Arc::new(MemStore::new());
1466        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1467        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1468        let mut block_status_subscriptions = FuturesUnordered::new();
1469
1470        let dag_str = "DAG {
1471            Round 0 : { 4 },
1472            Round 1 : { * },
1473            Round 2 : { * },
1474            Round 3 : {
1475                A -> [*],
1476                B -> [-A2],
1477                C -> [-A2],
1478                D -> [-A2],
1479            },
1480            Round 4 : { 
1481                B -> [-A3],
1482                C -> [-A3],
1483                D -> [-A3],
1484            },
1485            Round 5 : { 
1486                A -> [A3, B4, C4, D4]
1487                B -> [*],
1488                C -> [*],
1489                D -> [*],
1490            },
1491            Round 6 : { * },
1492            Round 7 : { * },
1493            Round 8 : { * },
1494        }";
1495
1496        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1497        dag_builder.print();
1498
1499        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be able to commit up to round 5.
1500        for block in dag_builder.blocks(1..=5) {
1501            if block.author() == context.own_index {
1502                let subscription =
1503                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
1504                block_status_subscriptions.push(subscription);
1505            }
1506        }
1507
1508        // write them in store
1509        store
1510            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
1511            .expect("Storage error");
1512
1513        // create dag state after all blocks have been written to store
1514        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1515        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1516        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1517            context.clone(),
1518            dag_state.clone(),
1519        ));
1520        let transaction_vote_tracker = TransactionVoteTracker::new(
1521            context.clone(),
1522            Arc::new(NoopBlockVerifier {}),
1523            dag_state.clone(),
1524        );
1525
1526        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
1527        let commit_observer = CommitObserver::new(
1528            context.clone(),
1529            commit_consumer,
1530            dag_state.clone(),
1531            transaction_vote_tracker.clone(),
1532        )
1533        .await;
1534
1535        // Flush the DAG state to storage.
1536        dag_state.write().flush();
1537
1538        // Check no commits have been persisted to dag_state or store.
1539        let last_commit = store.read_last_commit().unwrap();
1540        assert!(last_commit.is_none());
1541        assert_eq!(dag_state.read().last_commit_index(), 0);
1542
1543        // Now recover Core and other components.
1544        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1545        let transaction_vote_tracker = TransactionVoteTracker::new(
1546            context.clone(),
1547            Arc::new(NoopBlockVerifier {}),
1548            dag_state.clone(),
1549        );
1550        transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
1551        // Need at least one subscriber to the block broadcast channel.
1552        let _block_receiver = signal_receivers.block_broadcast_receiver();
1553        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1554        let _core = Core::new_validator(
1555            context.clone(),
1556            leader_schedule,
1557            transaction_consumer,
1558            transaction_vote_tracker,
1559            block_manager,
1560            commit_observer,
1561            signals,
1562            key_pairs.remove(context.own_index.value()).1,
1563            dag_state.clone(),
1564            false,
1565            round_tracker,
1566        );
1567
1568        // Flush the DAG state to storage.
1569        dag_state.write().flush();
1570
1571        let last_commit = store
1572            .read_last_commit()
1573            .unwrap()
1574            .expect("last commit should be set");
1575
1576        assert_eq!(last_commit.index(), 5);
1577
1578        while let Some(result) = block_status_subscriptions.next().await {
1579            let status = result.unwrap();
1580
1581            match status {
1582                BlockStatus::Sequenced(block_ref) => {
1583                    assert!(block_ref.round == 1 || block_ref.round == 5);
1584                }
1585                BlockStatus::GarbageCollected(block_ref) => {
1586                    assert!(block_ref.round == 2 || block_ref.round == 3);
1587                }
1588            }
1589        }
1590    }
1591
1592    // Tests that the threshold clock advances when blocks get unsuspended due to GC'ed blocks and newly created blocks are always higher
1593    // than the last advanced gc round.
1594    #[tokio::test]
1595    async fn test_multiple_commits_advance_threshold_clock() {
1596        telemetry_subscribers::init_for_testing();
1597        let (mut context, _) = Context::new_for_test(4);
1598        const GC_DEPTH: u32 = 2;
1599
1600        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
1601
1602        // On round 1 we do produce the block for authority D but we do not link it until round 6. This is making round 6 unable to get processed
1603        // until leader of round 3 is committed where round 1 gets garbage collected.
1604        // Then we add more rounds so we can trigger a commit for leader of round 9 which will move the gc round to 7.
1605        let dag_str = "DAG {
1606            Round 0 : { 4 },
1607            Round 1 : { * },
1608            Round 2 : {
1609                B -> [-D1],
1610                C -> [-D1],
1611                D -> [-D1],
1612            },
1613            Round 3 : {
1614                B -> [*],
1615                C -> [*]
1616                D -> [*],
1617            },
1618            Round 4 : {
1619                A -> [*],
1620                B -> [*],
1621                C -> [*]
1622                D -> [*],
1623            },
1624            Round 5 : {
1625                A -> [*],
1626                B -> [*],
1627                C -> [*],
1628                D -> [*],
1629            },
1630            Round 6 : {
1631                B -> [A5, B5, C5, D1],
1632                C -> [A5, B5, C5, D1],
1633                D -> [A5, B5, C5, D1],
1634            },
1635            Round 7 : {
1636                B -> [*],
1637                C -> [*],
1638                D -> [*],
1639            },
1640            Round 8 : {
1641                B -> [*],
1642                C -> [*],
1643                D -> [*],
1644            },
1645            Round 9 : {
1646                B -> [*],
1647                C -> [*],
1648                D -> [*],
1649            },
1650            Round 10 : {
1651                B -> [*],
1652                C -> [*],
1653                D -> [*],
1654            },
1655            Round 11 : {
1656                B -> [*],
1657                C -> [*],
1658                D -> [*],
1659            },
1660        }";
1661
1662        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
1663        dag_builder.print();
1664
1665        let mut fixture = CoreTestFixture::new(
1666            context,
1667            vec![1, 1, 1, 1],
1668            AuthorityIndex::new_for_test(0),
1669            true,
1670        )
1671        .await;
1672
1673        // Check no commits have been persisted to dag_state or store.
1674        let last_commit = fixture.store.read_last_commit().unwrap();
1675        assert!(last_commit.is_none());
1676        assert_eq!(fixture.dag_state.read().last_commit_index(), 0);
1677
1678        // We set the last known round to 4 so we avoid creating new blocks until then - otherwise it will crash as the already created DAG contains blocks for this
1679        // authority.
1680        fixture.core.set_last_known_proposed_round(4);
1681
1682        // We add all the blocks except D1. The only ones we can immediately accept are the ones up to round 5 as they don't have a dependency on D1. Rest of blocks do have causal dependency
1683        // to D1 so they can't be processed until the leader of round 3 can get committed and gc round moves to 1. That will make all the blocks that depend to D1 get accepted.
1684        // However, our threshold clock is now at round 6 as the last quorum that we managed to process was the round 5.
1685        // As commits happen blocks of later rounds get accepted and more leaders get committed. Eventually the leader of round 9 gets committed and gc is moved to 9 - 2 = 7.
1686        // If our node attempts to produce a block for the threshold clock 6, that will make the acceptance checks fail as now gc has moved far past this round.
1687        let mut all_blocks = dag_builder.blocks(1..=11);
1688        all_blocks.sort_by_key(|b| b.round());
1689        // Register votes for every block - including the one withheld from Core -
1690        // so transaction_vote_tracker's state matches the full DAG.
1691        fixture
1692            .transaction_vote_tracker
1693            .add_voted_blocks(all_blocks.iter().map(|b| (b.clone(), vec![])).collect());
1694        let blocks: Vec<VerifiedBlock> = all_blocks
1695            .into_iter()
1696            .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
1697            .collect();
1698        fixture.core.add_blocks(blocks).expect("Should not fail");
1699
1700        assert_eq!(fixture.core.last_proposed_round(), Some(12));
1701    }
1702
1703    #[tokio::test]
1704    async fn test_core_set_min_propose_round() {
1705        telemetry_subscribers::init_for_testing();
1706        let (context, _) = Context::new_for_test(4);
1707        let context = context.with_parameters(Parameters {
1708            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1709            ..Default::default()
1710        });
1711        let mut fixture = CoreTestFixture::new(
1712            context,
1713            vec![1, 1, 1, 1],
1714            AuthorityIndex::new_for_test(0),
1715            true,
1716        )
1717        .await;
1718
1719        // No new block should have been produced
1720        assert_eq!(
1721            fixture.core.last_proposed_round(),
1722            Some(GENESIS_ROUND),
1723            "No block should have been created other than genesis"
1724        );
1725
1726        // Trying to explicitly propose a block will not produce anything
1727        assert!(fixture.core.try_propose(true).unwrap().is_none());
1728
1729        // Create blocks for the whole network - even "our" node in order to replicate an "amnesia" recovery.
1730        let mut builder = DagBuilder::new(fixture.core.context.clone());
1731        builder.layers(1..=10).build();
1732
1733        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
1734
1735        // Process all the blocks
1736        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
1737
1738        fixture
1739            .core
1740            .round_tracker_for_tests()
1741            .write()
1742            .update_from_probe(
1743                vec![
1744                    vec![10, 10, 10, 10],
1745                    vec![10, 10, 10, 10],
1746                    vec![10, 10, 10, 10],
1747                    vec![10, 10, 10, 10],
1748                ],
1749                vec![
1750                    vec![10, 10, 10, 10],
1751                    vec![10, 10, 10, 10],
1752                    vec![10, 10, 10, 10],
1753                    vec![10, 10, 10, 10],
1754                ],
1755            );
1756
1757        // Try to propose - no block should be produced.
1758        assert!(fixture.core.try_propose(true).unwrap().is_none());
1759
1760        // Now set the last known proposed round which is the highest round for which the network informed
1761        // us that we do have proposed a block about.
1762        fixture.core.set_last_known_proposed_round(10);
1763
1764        let block = fixture.core.try_propose(true).expect("No error").unwrap();
1765        assert_eq!(block.round(), 11);
1766        assert_eq!(block.ancestors().len(), 4);
1767
1768        let our_ancestor_included = block.ancestors()[0];
1769        assert_eq!(our_ancestor_included.author, fixture.core.context.own_index);
1770        assert_eq!(our_ancestor_included.round, 10);
1771    }
1772
1773    #[tokio::test(flavor = "current_thread", start_paused = true)]
1774    async fn test_core_try_new_block_leader_timeout() {
1775        telemetry_subscribers::init_for_testing();
1776
1777        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
1778        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
1779        // Core's clock will have initialised potentially with different values but it never advances.
1780        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
1781        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
1782        // tokio clock.
1783        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
1784            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
1785            let now = context.clock.timestamp_utc_ms();
1786            let max_timestamp = blocks
1787                .iter()
1788                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
1789                .map(|block| block.timestamp_ms())
1790                .unwrap_or(0);
1791
1792            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
1793            sleep(wait_time).await;
1794        }
1795
1796        let (context, _) = Context::new_for_test(4);
1797        // Create the cores for all authorities
1798        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
1799
1800        // Create blocks for rounds 1..=3 from all Cores except last Core of authority 3, so we miss the block from it. As
1801        // it will be the leader of round 3 then no-one will be able to progress to round 4 unless we explicitly trigger
1802        // the block creation.
1803        // create the cores and their signals for all the authorities
1804        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
1805
1806        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
1807        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
1808        for round in 1..=3 {
1809            let mut this_round_blocks = Vec::new();
1810
1811            for core_fixture in cores.iter_mut() {
1812                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1813
1814                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1815
1816                // Only when round > 1 and using non-genesis parents.
1817                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1818                    assert_eq!(round - 1, r);
1819                    if core_fixture.core.last_proposed_round() == Some(r) {
1820                        // Force propose new block regardless of min round delay.
1821                        core_fixture
1822                            .core
1823                            .try_propose(true)
1824                            .unwrap()
1825                            .unwrap_or_else(|| {
1826                                panic!("Block should have been proposed for round {}", round)
1827                            });
1828                    }
1829                }
1830
1831                assert_eq!(core_fixture.core.last_proposed_round(), Some(round));
1832
1833                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
1834            }
1835
1836            last_round_blocks = this_round_blocks;
1837        }
1838
1839        // Try to create the blocks for round 4 by calling the try_propose() method. No block should be created as the
1840        // leader - authority 3 - hasn't proposed any block.
1841        for core_fixture in cores.iter_mut() {
1842            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1843
1844            core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1845            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
1846        }
1847
1848        // Now try to create the blocks for round 4 via the leader timeout method which should
1849        // ignore any leader checks or min round delay.
1850        for core_fixture in cores.iter_mut() {
1851            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
1852            assert_eq!(core_fixture.core.last_proposed_round(), Some(4));
1853
1854            // Flush the DAG state to storage.
1855            core_fixture.dag_state.write().flush();
1856
1857            // Check commits have been persisted to store
1858            let last_commit = core_fixture
1859                .store
1860                .read_last_commit()
1861                .unwrap()
1862                .expect("last commit should be set");
1863            // There are 1 leader rounds with rounds completed up to and including
1864            // round 4
1865            assert_eq!(last_commit.index(), 1);
1866            let all_stored_commits = core_fixture
1867                .store
1868                .scan_commits((0..=CommitIndex::MAX).into())
1869                .unwrap();
1870            assert_eq!(all_stored_commits.len(), 1);
1871        }
1872    }
1873
1874    #[tokio::test(flavor = "current_thread", start_paused = true)]
1875    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
1876        telemetry_subscribers::init_for_testing();
1877
1878        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
1879        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
1880        // Core's clock will have initialised potentially with different values but it never advances.
1881        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
1882        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
1883        // tokio clock.
1884        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
1885            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
1886            let now = context.clock.timestamp_utc_ms();
1887            let max_timestamp = blocks
1888                .iter()
1889                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
1890                .map(|block| block.timestamp_ms())
1891                .unwrap_or(0);
1892
1893            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
1894            sleep(wait_time).await;
1895        }
1896
1897        let (mut context, _) = Context::new_for_test(5);
1898        context
1899            .protocol_config
1900            .set_bad_nodes_stake_threshold_for_testing(33);
1901
1902        // Create the cores for all authorities
1903        let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
1904        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
1905
1906        // Create blocks for rounds 1..=30 from all Cores except last Core of authority 4.
1907        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
1908        for round in 1..=30 {
1909            let mut this_round_blocks = Vec::new();
1910
1911            for core_fixture in cores.iter_mut() {
1912                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1913
1914                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1915
1916                core_fixture
1917                    .core
1918                    .round_tracker_for_tests()
1919                    .write()
1920                    .update_from_probe(
1921                        vec![
1922                            vec![round, round, round, round, 0],
1923                            vec![round, round, round, round, 0],
1924                            vec![round, round, round, round, 0],
1925                            vec![round, round, round, round, 0],
1926                            vec![0, 0, 0, 0, 0],
1927                        ],
1928                        vec![
1929                            vec![round, round, round, round, 0],
1930                            vec![round, round, round, round, 0],
1931                            vec![round, round, round, round, 0],
1932                            vec![round, round, round, round, 0],
1933                            vec![0, 0, 0, 0, 0],
1934                        ],
1935                    );
1936
1937                // Only when round > 1 and using non-genesis parents.
1938                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1939                    assert_eq!(round - 1, r);
1940                    if core_fixture.core.last_proposed_round() == Some(r) {
1941                        // Force propose new block regardless of min round delay.
1942                        core_fixture
1943                            .core
1944                            .try_propose(true)
1945                            .unwrap()
1946                            .unwrap_or_else(|| {
1947                                panic!("Block should have been proposed for round {}", round)
1948                            });
1949                    }
1950                }
1951
1952                assert_eq!(core_fixture.core.last_proposed_round(), Some(round));
1953
1954                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
1955            }
1956
1957            last_round_blocks = this_round_blocks;
1958        }
1959
1960        // Now produce blocks for all Cores
1961        for round in 31..=40 {
1962            let mut this_round_blocks = Vec::new();
1963
1964            for core_fixture in all_cores.iter_mut() {
1965                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
1966
1967                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
1968
1969                // Don't update probed rounds for authority 3 so it will remain
1970                // excluded
1971                core_fixture
1972                    .core
1973                    .round_tracker_for_tests()
1974                    .write()
1975                    .update_from_probe(
1976                        vec![
1977                            vec![round, round, round, round, 0],
1978                            vec![round, round, round, round, 0],
1979                            vec![round, round, round, round, 0],
1980                            vec![round, round, round, round, 0],
1981                            vec![0, 0, 0, 0, 0],
1982                        ],
1983                        vec![
1984                            vec![round, round, round, round, 0],
1985                            vec![round, round, round, round, 0],
1986                            vec![round, round, round, round, 0],
1987                            vec![round, round, round, round, 0],
1988                            vec![0, 0, 0, 0, 0],
1989                        ],
1990                    );
1991
1992                // Only when round > 1 and using non-genesis parents.
1993                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
1994                    assert_eq!(round - 1, r);
1995                    if core_fixture.core.last_proposed_round() == Some(r) {
1996                        // Force propose new block regardless of min round delay.
1997                        core_fixture
1998                            .core
1999                            .try_propose(true)
2000                            .unwrap()
2001                            .unwrap_or_else(|| {
2002                                panic!("Block should have been proposed for round {}", round)
2003                            });
2004                    }
2005                }
2006
2007                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2008
2009                for block in this_round_blocks.iter() {
2010                    if block.author() != AuthorityIndex::new_for_test(4) {
2011                        // Assert blocks created include only 4 ancestors per block as one
2012                        // should be excluded
2013                        assert_eq!(block.ancestors().len(), 4);
2014                    } else {
2015                        // Authority 3 is the low scoring authority so it will still include
2016                        // its own blocks.
2017                        assert_eq!(block.ancestors().len(), 5);
2018                    }
2019                }
2020            }
2021
2022            last_round_blocks = this_round_blocks;
2023        }
2024    }
2025
2026    #[tokio::test]
2027    async fn test_smart_ancestor_selection() {
2028        telemetry_subscribers::init_for_testing();
2029        let (context, _) = Context::new_for_test(7);
2030        let context = context.with_parameters(Parameters {
2031            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2032            ..Default::default()
2033        });
2034        let mut fixture =
2035            CoreTestFixture::new(context, vec![1; 7], AuthorityIndex::new_for_test(0), true).await;
2036        let min_round_delay = fixture.core.context.parameters.min_round_delay;
2037
2038        // No new block should have been produced
2039        assert_eq!(
2040            fixture.core.last_proposed_round(),
2041            Some(GENESIS_ROUND),
2042            "No block should have been created other than genesis"
2043        );
2044
2045        // Trying to explicitly propose a block will not produce anything
2046        assert!(fixture.core.try_propose(true).unwrap().is_none());
2047
2048        // Create blocks for the whole network but not for authority 1
2049        let mut builder = DagBuilder::new(fixture.core.context.clone());
2050        builder
2051            .layers(1..=12)
2052            .authorities(vec![AuthorityIndex::new_for_test(1)])
2053            .skip_block()
2054            .build();
2055        let blocks = builder.blocks(1..=12);
2056        // Process all the blocks
2057        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2058        fixture.core.set_last_known_proposed_round(12);
2059
2060        fixture
2061            .core
2062            .round_tracker_for_tests()
2063            .write()
2064            .update_from_probe(
2065                vec![
2066                    vec![12, 12, 12, 12, 12, 12, 12],
2067                    vec![0, 0, 0, 0, 0, 0, 0],
2068                    vec![12, 12, 12, 12, 12, 12, 12],
2069                    vec![12, 12, 12, 12, 12, 12, 12],
2070                    vec![12, 12, 12, 12, 12, 12, 12],
2071                    vec![12, 12, 12, 12, 12, 12, 12],
2072                    vec![12, 12, 12, 12, 12, 12, 12],
2073                ],
2074                vec![
2075                    vec![12, 12, 12, 12, 12, 12, 12],
2076                    vec![0, 0, 0, 0, 0, 0, 0],
2077                    vec![12, 12, 12, 12, 12, 12, 12],
2078                    vec![12, 12, 12, 12, 12, 12, 12],
2079                    vec![12, 12, 12, 12, 12, 12, 12],
2080                    vec![12, 12, 12, 12, 12, 12, 12],
2081                    vec![12, 12, 12, 12, 12, 12, 12],
2082                ],
2083            );
2084
2085        let block = fixture.core.try_propose(true).expect("No error").unwrap();
2086        assert_eq!(block.round(), 13);
2087        assert_eq!(block.ancestors().len(), 7);
2088
2089        // Build blocks for rest of the network other than own index
2090        builder
2091            .layers(13..=14)
2092            .authorities(vec![AuthorityIndex::new_for_test(0)])
2093            .skip_block()
2094            .build();
2095        let blocks = builder.blocks(13..=14);
2096        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2097
2098        // We now have triggered a leader schedule change so we should have
2099        // one EXCLUDE authority (1) when we go to select ancestors for the next proposal
2100        let block = fixture.core.try_propose(true).expect("No error").unwrap();
2101        assert_eq!(block.round(), 15);
2102        assert_eq!(block.ancestors().len(), 6);
2103
2104        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2105        // which will trigger smart select and we will not propose a block
2106        let round_14_ancestors = builder.last_ancestors.clone();
2107        builder
2108            .layer(15)
2109            .authorities(vec![
2110                AuthorityIndex::new_for_test(0),
2111                AuthorityIndex::new_for_test(5),
2112                AuthorityIndex::new_for_test(6),
2113            ])
2114            .skip_block()
2115            .build();
2116        let blocks = builder.blocks(15..=15);
2117        let authority_1_excluded_block_reference = blocks
2118            .iter()
2119            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2120            .unwrap()
2121            .reference();
2122        // Wait for min round delay to allow blocks to be proposed.
2123        sleep(min_round_delay).await;
2124        // Smart select should be triggered and no block should be proposed.
2125        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2126        assert_eq!(fixture.core.last_proposed_block().round(), 15);
2127
2128        builder
2129            .layer(15)
2130            .authorities(vec![
2131                AuthorityIndex::new_for_test(0),
2132                AuthorityIndex::new_for_test(1),
2133                AuthorityIndex::new_for_test(2),
2134                AuthorityIndex::new_for_test(3),
2135                AuthorityIndex::new_for_test(4),
2136            ])
2137            .skip_block()
2138            .override_last_ancestors(round_14_ancestors)
2139            .build();
2140        let blocks = builder.blocks(15..=15);
2141        let round_15_ancestors: Vec<BlockRef> = blocks
2142            .iter()
2143            .filter(|block| block.round() == 15)
2144            .map(|block| block.reference())
2145            .collect();
2146        let included_block_references = iter::once(&fixture.core.last_proposed_block())
2147            .chain(blocks.iter())
2148            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2149            .map(|block| block.reference())
2150            .collect::<Vec<_>>();
2151
2152        // Have enough ancestor blocks to propose now.
2153        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2154        assert_eq!(fixture.core.last_proposed_block().round(), 16);
2155
2156        // Check that a new block has been proposed & signaled.
2157        let extended_block = loop {
2158            let extended_block =
2159                tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2160                    .await
2161                    .unwrap()
2162                    .unwrap();
2163            if extended_block.block.round() == 16 {
2164                break extended_block;
2165            }
2166        };
2167        assert_eq!(extended_block.block.round(), 16);
2168        assert_eq!(
2169            extended_block.block.author(),
2170            fixture.core.context.own_index
2171        );
2172        assert_eq!(extended_block.block.ancestors().len(), 6);
2173        assert_eq!(extended_block.block.ancestors(), included_block_references);
2174        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2175        assert_eq!(
2176            extended_block.excluded_ancestors[0],
2177            authority_1_excluded_block_reference
2178        );
2179
2180        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2181        // which will trigger smart select and we will not propose a block.
2182        // This time we will force propose by hitting the leader timeout after which
2183        // should cause us to include this EXCLUDE ancestor.
2184        builder
2185            .layer(16)
2186            .authorities(vec![
2187                AuthorityIndex::new_for_test(0),
2188                AuthorityIndex::new_for_test(5),
2189                AuthorityIndex::new_for_test(6),
2190            ])
2191            .skip_block()
2192            .override_last_ancestors(round_15_ancestors)
2193            .build();
2194        let blocks = builder.blocks(16..=16);
2195        // Wait for leader timeout to force blocks to be proposed.
2196        sleep(min_round_delay).await;
2197        // Smart select should be triggered and no block should be proposed.
2198        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2199        assert_eq!(fixture.core.last_proposed_block().round(), 16);
2200
2201        // Simulate a leader timeout and a force proposal where we will include
2202        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2203        let block = fixture.core.try_propose(true).expect("No error").unwrap();
2204        assert_eq!(block.round(), 17);
2205        assert_eq!(block.ancestors().len(), 5);
2206
2207        // Check that a new block has been proposed & signaled.
2208        let extended_block =
2209            tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2210                .await
2211                .unwrap()
2212                .unwrap();
2213        assert_eq!(extended_block.block.round(), 17);
2214        assert_eq!(
2215            extended_block.block.author(),
2216            fixture.core.context.own_index
2217        );
2218        assert_eq!(extended_block.block.ancestors().len(), 5);
2219        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2220
2221        // Excluded authority is locked until round 20, simulate enough rounds to
2222        // unlock
2223        builder
2224            .layers(17..=22)
2225            .authorities(vec![AuthorityIndex::new_for_test(0)])
2226            .skip_block()
2227            .build();
2228        let blocks = builder.blocks(17..=22);
2229
2230        // Simulate updating received and accepted rounds from prober.
2231        // New quorum rounds for authority can then be computed which will unlock
2232        // the Excluded authority (1) and then we should be able to create a new
2233        // layer of blocks which will then all be included as ancestors for the
2234        // next proposal
2235        fixture
2236            .core
2237            .round_tracker_for_tests()
2238            .write()
2239            .update_from_probe(
2240                vec![
2241                    vec![22, 22, 22, 22, 22, 22, 22],
2242                    vec![22, 22, 22, 22, 22, 22, 22],
2243                    vec![22, 22, 22, 22, 22, 22, 22],
2244                    vec![22, 22, 22, 22, 22, 22, 22],
2245                    vec![22, 22, 22, 22, 22, 22, 22],
2246                    vec![22, 22, 22, 22, 22, 22, 22],
2247                    vec![22, 22, 22, 22, 22, 22, 22],
2248                ],
2249                vec![
2250                    vec![22, 22, 22, 22, 22, 22, 22],
2251                    vec![22, 22, 22, 22, 22, 22, 22],
2252                    vec![22, 22, 22, 22, 22, 22, 22],
2253                    vec![22, 22, 22, 22, 22, 22, 22],
2254                    vec![22, 22, 22, 22, 22, 22, 22],
2255                    vec![22, 22, 22, 22, 22, 22, 22],
2256                    vec![22, 22, 22, 22, 22, 22, 22],
2257                ],
2258            );
2259
2260        let own_index = fixture.core.context.own_index;
2261        let included_block_references = iter::once(&fixture.core.last_proposed_block())
2262            .chain(blocks.iter())
2263            .filter(|block| block.round() == 22 || block.author() == own_index)
2264            .map(|block| block.reference())
2265            .collect::<Vec<_>>();
2266
2267        // Have enough ancestor blocks to propose now.
2268        sleep(min_round_delay).await;
2269        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2270        assert_eq!(fixture.core.last_proposed_block().round(), 23);
2271
2272        // Check that a new block has been proposed & signaled.
2273        let extended_block =
2274            tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2275                .await
2276                .unwrap()
2277                .unwrap();
2278        assert_eq!(extended_block.block.round(), 23);
2279        assert_eq!(
2280            extended_block.block.author(),
2281            fixture.core.context.own_index
2282        );
2283        assert_eq!(extended_block.block.ancestors().len(), 7);
2284        assert_eq!(extended_block.block.ancestors(), included_block_references);
2285        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2286    }
2287
2288    #[tokio::test]
2289    async fn test_excluded_ancestor_limit() {
2290        telemetry_subscribers::init_for_testing();
2291        let (context, _) = Context::new_for_test(4);
2292        let context = context.with_parameters(Parameters {
2293            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2294            ..Default::default()
2295        });
2296        let mut fixture = CoreTestFixture::new(
2297            context,
2298            vec![1, 1, 1, 1],
2299            AuthorityIndex::new_for_test(0),
2300            true,
2301        )
2302        .await;
2303
2304        // No new block should have been produced
2305        assert_eq!(
2306            fixture.core.last_proposed_round(),
2307            Some(GENESIS_ROUND),
2308            "No block should have been created other than genesis"
2309        );
2310
2311        // Create blocks for the whole network
2312        let mut builder = DagBuilder::new(fixture.core.context.clone());
2313        builder.layers(1..=3).build();
2314
2315        // This will equivocate 9 blocks for authority 1 which will be excluded on
2316        // the proposal but because of the limits set will be dropped and not included
2317        // as part of the ExtendedBlock structure sent to the rest of the network
2318        builder
2319            .layer(4)
2320            .authorities(vec![AuthorityIndex::new_for_test(1)])
2321            .equivocate(9)
2322            .build();
2323        let blocks = builder.blocks(1..=4);
2324
2325        // Process all the blocks
2326        assert!(fixture.add_blocks(blocks).unwrap().is_empty());
2327        fixture.core.set_last_known_proposed_round(3);
2328
2329        let block = fixture.core.try_propose(true).expect("No error").unwrap();
2330        assert_eq!(block.round(), 5);
2331        assert_eq!(block.ancestors().len(), 4);
2332
2333        // Check that a new block has been proposed & signaled.
2334        let extended_block =
2335            tokio::time::timeout(Duration::from_secs(1), fixture.block_receiver.recv())
2336                .await
2337                .unwrap()
2338                .unwrap();
2339        assert_eq!(extended_block.block.round(), 5);
2340        assert_eq!(
2341            extended_block.block.author(),
2342            fixture.core.context.own_index
2343        );
2344        assert_eq!(extended_block.block.ancestors().len(), 4);
2345        assert_eq!(extended_block.excluded_ancestors.len(), 8);
2346    }
2347
2348    #[tokio::test]
2349    async fn test_core_set_propagation_delay_per_authority() {
2350        telemetry_subscribers::init_for_testing();
2351        let (context, _) = Context::new_for_test(4);
2352        let mut fixture = CoreTestFixture::new(
2353            context,
2354            vec![1, 1, 1, 1],
2355            AuthorityIndex::new_for_test(0),
2356            false,
2357        )
2358        .await;
2359
2360        // Use a large propagation delay to disable proposing.
2361        // This is done by accepting an own block at round 1000 to dag state and
2362        // then simulating updating round tracker received rounds from probe where
2363        // low quorum round for own index should get calculated to round 0.
2364        let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
2365        fixture
2366            .transaction_vote_tracker
2367            .add_voted_blocks(vec![(test_block.clone(), vec![])]);
2368        // Force accepting the block to dag state because its causal history is incomplete.
2369        fixture.dag_state.write().accept_block(test_block);
2370
2371        fixture
2372            .core
2373            .round_tracker_for_tests()
2374            .write()
2375            .update_from_probe(
2376                vec![
2377                    vec![0, 0, 0, 0],
2378                    vec![0, 0, 0, 0],
2379                    vec![0, 0, 0, 0],
2380                    vec![0, 0, 0, 0],
2381                ],
2382                vec![
2383                    vec![0, 0, 0, 0],
2384                    vec![0, 0, 0, 0],
2385                    vec![0, 0, 0, 0],
2386                    vec![0, 0, 0, 0],
2387                ],
2388            );
2389
2390        // There is no proposal even with forced proposing.
2391        assert!(fixture.core.try_propose(true).unwrap().is_none());
2392
2393        // Let Core know there is no propagation delay.
2394        // This is done by simulating updating round tracker recieved rounds from probe
2395        // where low quorum round for own index should get calculated to round 1000.
2396        fixture
2397            .core
2398            .round_tracker_for_tests()
2399            .write()
2400            .update_from_probe(
2401                vec![
2402                    vec![1000, 1000, 1000, 1000],
2403                    vec![1000, 1000, 1000, 1000],
2404                    vec![1000, 1000, 1000, 1000],
2405                    vec![1000, 1000, 1000, 1000],
2406                ],
2407                vec![
2408                    vec![1000, 1000, 1000, 1000],
2409                    vec![1000, 1000, 1000, 1000],
2410                    vec![1000, 1000, 1000, 1000],
2411                    vec![1000, 1000, 1000, 1000],
2412                ],
2413            );
2414
2415        // Also add the necessary blocks from round 1000 so core will propose for
2416        // round 1001
2417        for author in 1..4 {
2418            let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
2419            fixture
2420                .transaction_vote_tracker
2421                .add_voted_blocks(vec![(block.clone(), vec![])]);
2422            // Force accepting the block to dag state because its causal history is incomplete.
2423            fixture.dag_state.write().accept_block(block);
2424        }
2425
2426        // Proposing now would succeed.
2427        assert!(fixture.core.try_propose(true).unwrap().is_some());
2428    }
2429
2430    #[tokio::test(flavor = "current_thread", start_paused = true)]
2431    async fn test_leader_schedule_change() {
2432        telemetry_subscribers::init_for_testing();
2433        let default_params = Parameters::default();
2434
2435        let (context, _) = Context::new_for_test(4);
2436        // create the cores and their signals for all the authorities
2437        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
2438
2439        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
2440        let mut last_round_blocks = Vec::new();
2441        for round in 1..=30 {
2442            let mut this_round_blocks = Vec::new();
2443
2444            // Wait for min round delay to allow blocks to be proposed.
2445            sleep(default_params.min_round_delay).await;
2446
2447            for core_fixture in &mut cores {
2448                // add the blocks from last round
2449                // this will trigger a block creation for the round and a signal should be emitted
2450                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2451
2452                core_fixture
2453                    .core
2454                    .round_tracker_for_tests()
2455                    .write()
2456                    .update_from_probe(
2457                        vec![
2458                            vec![round, round, round, round],
2459                            vec![round, round, round, round],
2460                            vec![round, round, round, round],
2461                            vec![round, round, round, round],
2462                        ],
2463                        vec![
2464                            vec![round, round, round, round],
2465                            vec![round, round, round, round],
2466                            vec![round, round, round, round],
2467                            vec![round, round, round, round],
2468                        ],
2469                    );
2470
2471                // A "new round" signal should be received given that all the blocks of previous round have been processed
2472                let new_round = receive(
2473                    Duration::from_secs(1),
2474                    core_fixture.signal_receivers.new_round_receiver(),
2475                )
2476                .await;
2477                assert_eq!(new_round, round);
2478
2479                // Check that a new block has been proposed.
2480                let extended_block = tokio::time::timeout(
2481                    Duration::from_secs(1),
2482                    core_fixture.block_receiver.recv(),
2483                )
2484                .await
2485                .unwrap()
2486                .unwrap();
2487                assert_eq!(extended_block.block.round(), round);
2488                assert_eq!(
2489                    extended_block.block.author(),
2490                    core_fixture.core.context.own_index
2491                );
2492
2493                // append the new block to this round blocks
2494                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2495
2496                let block = core_fixture.core.last_proposed_block();
2497
2498                // ensure that produced block is referring to the blocks of last_round
2499                assert_eq!(
2500                    block.ancestors().len(),
2501                    core_fixture.core.context.committee.size()
2502                );
2503                for ancestor in block.ancestors() {
2504                    if block.round() > 1 {
2505                        // don't bother with round 1 block which just contains the genesis blocks.
2506                        assert!(
2507                            last_round_blocks
2508                                .iter()
2509                                .any(|block| block.reference() == *ancestor),
2510                            "Reference from previous round should be added"
2511                        );
2512                    }
2513                }
2514            }
2515
2516            last_round_blocks = this_round_blocks;
2517        }
2518
2519        for core_fixture in cores {
2520            // Flush the DAG state to storage.
2521            core_fixture.dag_state.write().flush();
2522
2523            // Check commits have been persisted to store
2524            let last_commit = core_fixture
2525                .store
2526                .read_last_commit()
2527                .unwrap()
2528                .expect("last commit should be set");
2529            // There are 28 leader rounds with rounds completed up to and including
2530            // round 29. Round 30 blocks will only include their own blocks, so the
2531            // 28th leader will not be committed.
2532            assert_eq!(last_commit.index(), 27);
2533            let all_stored_commits = core_fixture
2534                .store
2535                .scan_commits((0..=CommitIndex::MAX).into())
2536                .unwrap();
2537            assert_eq!(all_stored_commits.len(), 27);
2538            assert_eq!(
2539                core_fixture
2540                    .core
2541                    .leader_schedule
2542                    .leader_swap_table
2543                    .read()
2544                    .bad_nodes
2545                    .len(),
2546                1
2547            );
2548            assert_eq!(
2549                core_fixture
2550                    .core
2551                    .leader_schedule
2552                    .leader_swap_table
2553                    .read()
2554                    .good_nodes
2555                    .len(),
2556                1
2557            );
2558            let expected_reputation_scores =
2559                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
2560            assert_eq!(
2561                core_fixture
2562                    .core
2563                    .leader_schedule
2564                    .leader_swap_table
2565                    .read()
2566                    .reputation_scores,
2567                expected_reputation_scores
2568            );
2569        }
2570    }
2571
2572    #[tokio::test]
2573    async fn test_filter_new_commits() {
2574        telemetry_subscribers::init_for_testing();
2575
2576        let (context, _key_pairs) = Context::new_for_test(4);
2577        let context = context.with_parameters(Parameters {
2578            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2579            ..Default::default()
2580        });
2581
2582        let authority_index = AuthorityIndex::new_for_test(0);
2583        let core = CoreTestFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
2584        let mut core = core.core;
2585
2586        // No new block should have been produced
2587        assert_eq!(
2588            core.last_proposed_round(),
2589            Some(GENESIS_ROUND),
2590            "No block should have been created other than genesis"
2591        );
2592
2593        // create a DAG of 12 rounds
2594        let mut dag_builder = DagBuilder::new(core.context.clone());
2595        dag_builder.layers(1..=12).build();
2596
2597        // Store all blocks up to round 6 which should be enough to decide up to leader 4
2598        dag_builder.print();
2599        let blocks = dag_builder.blocks(1..=6);
2600        core.dag_state.write().accept_blocks(blocks);
2601
2602        // Get all the committed sub dags up to round 10
2603        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
2604
2605        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
2606        let committed_sub_dags = core.try_commit(vec![]).unwrap();
2607
2608        // We should have committed up to round 4
2609        assert_eq!(committed_sub_dags.len(), 4);
2610
2611        // Now validate the certified commits. We'll try 3 different scenarios:
2612        println!("Case 1. Provide certified commits that are all before the last committed round.");
2613
2614        // Highest certified commit should be for leader of round 4.
2615        let certified_commits = sub_dags_and_commits
2616            .iter()
2617            .take(4)
2618            .map(|(_, c)| c)
2619            .cloned()
2620            .collect::<Vec<_>>();
2621        assert!(
2622            certified_commits.last().unwrap().index()
2623                <= committed_sub_dags.last().unwrap().commit_ref.index,
2624            "Highest certified commit should older than the highest committed index."
2625        );
2626
2627        let certified_commits = core.filter_new_commits(certified_commits).unwrap();
2628
2629        // No commits should be processed
2630        assert!(certified_commits.is_empty());
2631
2632        println!("Case 2. Provide certified commits that are all after the last committed round.");
2633
2634        // Highest certified commit should be for leader of round 4.
2635        let certified_commits = sub_dags_and_commits
2636            .iter()
2637            .take(5)
2638            .map(|(_, c)| c.clone())
2639            .collect::<Vec<_>>();
2640
2641        let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
2642
2643        // The certified commit of index 5 should be processed.
2644        assert_eq!(certified_commits.len(), 1);
2645        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
2646
2647        println!(
2648            "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
2649        );
2650
2651        // Highest certified commit should be for leader of round 4.
2652        let certified_commits = sub_dags_and_commits
2653            .iter()
2654            .skip(5)
2655            .take(1)
2656            .map(|(_, c)| c.clone())
2657            .collect::<Vec<_>>();
2658
2659        let err = core
2660            .filter_new_commits(certified_commits.clone())
2661            .unwrap_err();
2662        match err {
2663            ConsensusError::UnexpectedCertifiedCommitIndex {
2664                expected_commit_index: 5,
2665                commit_index: 6,
2666            } => (),
2667            _ => panic!("Unexpected error: {:?}", err),
2668        }
2669    }
2670
2671    #[tokio::test]
2672    async fn test_add_certified_commits() {
2673        telemetry_subscribers::init_for_testing();
2674
2675        let (context, _key_pairs) = Context::new_for_test(4);
2676        let context = context.with_parameters(Parameters {
2677            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2678            ..Default::default()
2679        });
2680
2681        let authority_index = AuthorityIndex::new_for_test(0);
2682        let core = CoreTestFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
2683        let store = core.store.clone();
2684        let mut core = core.core;
2685
2686        // No new block should have been produced
2687        assert_eq!(
2688            core.last_proposed_round(),
2689            Some(GENESIS_ROUND),
2690            "No block should have been created other than genesis"
2691        );
2692
2693        // create a DAG of 12 rounds
2694        let mut dag_builder = DagBuilder::new(core.context.clone());
2695        dag_builder.layers(1..=12).build();
2696
2697        // Store all blocks up to round 6 which should be enough to decide up to leader 4
2698        dag_builder.print();
2699        let blocks = dag_builder.blocks(1..=6);
2700        core.dag_state.write().accept_blocks(blocks);
2701
2702        // Get all the committed sub dags up to round 10
2703        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
2704
2705        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
2706        let committed_sub_dags = core.try_commit(vec![]).unwrap();
2707
2708        // We should have committed up to round 4
2709        assert_eq!(committed_sub_dags.len(), 4);
2710
2711        // Flush the DAG state to storage.
2712        core.dag_state.write().flush();
2713
2714        {
2715            println!("Case 1. Provide no certified commits. No commit should happen.");
2716            // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
2717            let committed_sub_dags = core.try_commit(vec![]).unwrap();
2718            assert!(committed_sub_dags.is_empty());
2719            let last_commit = store
2720                .read_last_commit()
2721                .unwrap()
2722                .expect("Last commit should be set");
2723            assert_eq!(last_commit.reference().index, 4);
2724        }
2725
2726        println!(
2727            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
2728        );
2729
2730        // The commits of leader rounds 5-8 should be committed via the certified commits.
2731        let certified_commits = sub_dags_and_commits
2732            .iter()
2733            .skip(3)
2734            .take(5)
2735            .map(|(_, c)| c.clone())
2736            .collect::<Vec<_>>();
2737
2738        // Now only add the blocks of rounds 7..=12.
2739        let blocks = dag_builder.blocks(7..=12);
2740        core.dag_state.write().accept_blocks(blocks);
2741
2742        // The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
2743        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
2744            .expect("Should not fail");
2745
2746        // Flush the DAG state to storage.
2747        core.dag_state.write().flush();
2748
2749        let commits = store.scan_commits((6..=10).into()).unwrap();
2750
2751        // We expect all the sub dags up to leader round 10 to be committed.
2752        assert_eq!(commits.len(), 5);
2753
2754        for i in 6..=10 {
2755            let commit = &commits[i - 6];
2756            assert_eq!(commit.reference().index, i as u32);
2757        }
2758    }
2759
2760    #[tokio::test]
2761    async fn try_commit_with_certified_commits_gced_blocks() {
2762        const GC_DEPTH: u32 = 3;
2763        telemetry_subscribers::init_for_testing();
2764
2765        let (mut context, mut key_pairs) = Context::new_for_test(5);
2766        context.protocol_config.set_gc_depth_for_testing(GC_DEPTH);
2767        let context = Arc::new(context.with_parameters(Parameters {
2768            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2769            ..Default::default()
2770        }));
2771
2772        let store = Arc::new(MemStore::new());
2773        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2774
2775        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2776        let leader_schedule = Arc::new(
2777            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2778                .with_num_commits_per_schedule(10),
2779        );
2780
2781        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2782        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2783        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2784        let transaction_vote_tracker = TransactionVoteTracker::new(
2785            context.clone(),
2786            Arc::new(NoopBlockVerifier {}),
2787            dag_state.clone(),
2788        );
2789        // Need at least one subscriber to the block broadcast channel.
2790        let _block_receiver = signal_receivers.block_broadcast_receiver();
2791
2792        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
2793        let commit_observer = CommitObserver::new(
2794            context.clone(),
2795            commit_consumer,
2796            dag_state.clone(),
2797            transaction_vote_tracker.clone(),
2798        )
2799        .await;
2800
2801        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2802        let mut core = Core::new_validator(
2803            context.clone(),
2804            leader_schedule,
2805            transaction_consumer,
2806            transaction_vote_tracker.clone(),
2807            block_manager,
2808            commit_observer,
2809            signals,
2810            key_pairs.remove(context.own_index.value()).1,
2811            dag_state.clone(),
2812            true,
2813            round_tracker,
2814        );
2815
2816        // No new block should have been produced
2817        assert_eq!(
2818            core.last_proposed_round(),
2819            Some(GENESIS_ROUND),
2820            "No block should have been created other than genesis"
2821        );
2822
2823        let dag_str = "DAG {
2824            Round 0 : { 5 },
2825            Round 1 : { * },
2826            Round 2 : { 
2827                A -> [-E1],
2828                B -> [-E1],
2829                C -> [-E1],
2830                D -> [-E1],
2831            },
2832            Round 3 : {
2833                A -> [*],
2834                B -> [*],
2835                C -> [*],
2836                D -> [*],
2837            },
2838            Round 4 : { 
2839                A -> [*],
2840                B -> [*],
2841                C -> [*],
2842                D -> [*],
2843            },
2844            Round 5 : { 
2845                A -> [*],
2846                B -> [*],
2847                C -> [*],
2848                D -> [*],
2849                E -> [A4, B4, C4, D4, E1]
2850            },
2851            Round 6 : { * },
2852            Round 7 : { * },
2853        }";
2854
2855        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2856        dag_builder.print();
2857
2858        // Now get all the committed sub dags from the DagBuilder
2859        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
2860            .get_sub_dag_and_certified_commits(1..=5)
2861            .into_iter()
2862            .unzip();
2863
2864        // Now try to commit up to the latest leader (round = 5) with the provided certified commits. Not that we have not accepted any
2865        // blocks. That should happen during the commit process.
2866        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
2867
2868        // We should have committed up to round 4
2869        assert_eq!(committed_sub_dags.len(), 4);
2870        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
2871            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
2872
2873            // ensure that block from E1 node has not been committed
2874            for block in committed_sub_dag.blocks.iter() {
2875                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
2876                    panic!("Did not expect to commit block E1");
2877                }
2878            }
2879        }
2880    }
2881
2882    #[tokio::test(flavor = "current_thread", start_paused = true)]
2883    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
2884        parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
2885    }
2886
2887    #[tokio::test(flavor = "current_thread", start_paused = true)]
2888    async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
2889        parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
2890    }
2891
2892    async fn parameterized_test_commit_on_leader_schedule_change_boundary(
2893        num_leaders_per_round: Option<usize>,
2894    ) {
2895        telemetry_subscribers::init_for_testing();
2896        let default_params = Parameters::default();
2897
2898        let (mut context, _) = Context::new_for_test(6);
2899        context
2900            .protocol_config
2901            .set_num_leaders_per_round_for_testing(num_leaders_per_round);
2902        // create the cores and their signals for all the authorities
2903        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
2904
2905        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
2906        let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
2907        for round in 1..=33 {
2908            let mut this_round_blocks = Vec::new();
2909
2910            // Wait for min round delay to allow blocks to be proposed.
2911            sleep(default_params.min_round_delay).await;
2912
2913            for core_fixture in &mut cores {
2914                // add the blocks from last round
2915                // this will trigger a block creation for the round and a signal should be emitted
2916                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2917
2918                core_fixture
2919                    .core
2920                    .round_tracker_for_tests()
2921                    .write()
2922                    .update_from_probe(
2923                        vec![
2924                            vec![round, round, round, round, round, round],
2925                            vec![round, round, round, round, round, round],
2926                            vec![round, round, round, round, round, round],
2927                            vec![round, round, round, round, round, round],
2928                            vec![round, round, round, round, round, round],
2929                            vec![round, round, round, round, round, round],
2930                        ],
2931                        vec![
2932                            vec![round, round, round, round, round, round],
2933                            vec![round, round, round, round, round, round],
2934                            vec![round, round, round, round, round, round],
2935                            vec![round, round, round, round, round, round],
2936                            vec![round, round, round, round, round, round],
2937                            vec![round, round, round, round, round, round],
2938                        ],
2939                    );
2940
2941                // A "new round" signal should be received given that all the blocks of previous round have been processed
2942                let new_round = receive(
2943                    Duration::from_secs(1),
2944                    core_fixture.signal_receivers.new_round_receiver(),
2945                )
2946                .await;
2947                assert_eq!(new_round, round);
2948
2949                // Check that a new block has been proposed.
2950                let extended_block = tokio::time::timeout(
2951                    Duration::from_secs(1),
2952                    core_fixture.block_receiver.recv(),
2953                )
2954                .await
2955                .unwrap()
2956                .unwrap();
2957                assert_eq!(extended_block.block.round(), round);
2958                assert_eq!(
2959                    extended_block.block.author(),
2960                    core_fixture.core.context.own_index
2961                );
2962
2963                // append the new block to this round blocks
2964                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2965
2966                let block = core_fixture.core.last_proposed_block();
2967
2968                // ensure that produced block is referring to the blocks of last_round
2969                assert_eq!(
2970                    block.ancestors().len(),
2971                    core_fixture.core.context.committee.size()
2972                );
2973                for ancestor in block.ancestors() {
2974                    if block.round() > 1 {
2975                        // don't bother with round 1 block which just contains the genesis blocks.
2976                        assert!(
2977                            last_round_blocks
2978                                .iter()
2979                                .any(|block| block.reference() == *ancestor),
2980                            "Reference from previous round should be added"
2981                        );
2982                    }
2983                }
2984            }
2985
2986            last_round_blocks = this_round_blocks;
2987        }
2988
2989        for core_fixture in cores {
2990            // There are 31 leader rounds with rounds completed up to and including
2991            // round 33. Round 33 blocks will only include their own blocks, so there
2992            // should only be 30 commits.
2993            // However on a leader schedule change boundary its is possible for a
2994            // new leader to get selected for the same round if the leader elected
2995            // gets swapped allowing for multiple leaders to be committed at a round.
2996            // Meaning with multi leader per round explicitly set to 1 we will have 30,
2997            // otherwise 31.
2998            // NOTE: We used 31 leader rounds to specifically trigger the scenario
2999            // where the leader schedule boundary occurred AND we had a swap to a new
3000            // leader for the same round
3001            let expected_commit_count = match num_leaders_per_round {
3002                Some(1) => 30,
3003                _ => 31,
3004            };
3005
3006            // Flush the DAG state to storage.
3007            core_fixture.dag_state.write().flush();
3008
3009            // Check commits have been persisted to store
3010            let last_commit = core_fixture
3011                .store
3012                .read_last_commit()
3013                .unwrap()
3014                .expect("last commit should be set");
3015            assert_eq!(last_commit.index(), expected_commit_count);
3016            let all_stored_commits = core_fixture
3017                .store
3018                .scan_commits((0..=CommitIndex::MAX).into())
3019                .unwrap();
3020            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3021            assert_eq!(
3022                core_fixture
3023                    .core
3024                    .leader_schedule
3025                    .leader_swap_table
3026                    .read()
3027                    .bad_nodes
3028                    .len(),
3029                1
3030            );
3031            assert_eq!(
3032                core_fixture
3033                    .core
3034                    .leader_schedule
3035                    .leader_swap_table
3036                    .read()
3037                    .good_nodes
3038                    .len(),
3039                1
3040            );
3041            let expected_reputation_scores =
3042                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3043            assert_eq!(
3044                core_fixture
3045                    .core
3046                    .leader_schedule
3047                    .leader_swap_table
3048                    .read()
3049                    .reputation_scores,
3050                expected_reputation_scores
3051            );
3052        }
3053    }
3054
3055    #[tokio::test]
3056    async fn test_core_signals() {
3057        telemetry_subscribers::init_for_testing();
3058        let default_params = Parameters::default();
3059
3060        let (context, _) = Context::new_for_test(4);
3061        // create the cores and their signals for all the authorities
3062        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3063
3064        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3065        let mut last_round_blocks = Vec::new();
3066        for round in 1..=10 {
3067            let mut this_round_blocks = Vec::new();
3068
3069            // Wait for min round delay to allow blocks to be proposed.
3070            sleep(default_params.min_round_delay).await;
3071
3072            for core_fixture in &mut cores {
3073                // add the blocks from last round
3074                // this will trigger a block creation for the round and a signal should be emitted
3075                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3076
3077                core_fixture
3078                    .core
3079                    .round_tracker_for_tests()
3080                    .write()
3081                    .update_from_probe(
3082                        vec![
3083                            vec![round, round, round, round],
3084                            vec![round, round, round, round],
3085                            vec![round, round, round, round],
3086                            vec![round, round, round, round],
3087                        ],
3088                        vec![
3089                            vec![round, round, round, round],
3090                            vec![round, round, round, round],
3091                            vec![round, round, round, round],
3092                            vec![round, round, round, round],
3093                        ],
3094                    );
3095
3096                // A "new round" signal should be received given that all the blocks of previous round have been processed
3097                let new_round = receive(
3098                    Duration::from_secs(1),
3099                    core_fixture.signal_receivers.new_round_receiver(),
3100                )
3101                .await;
3102                assert_eq!(new_round, round);
3103
3104                // Check that a new block has been proposed.
3105                let extended_block = tokio::time::timeout(
3106                    Duration::from_secs(1),
3107                    core_fixture.block_receiver.recv(),
3108                )
3109                .await
3110                .unwrap()
3111                .unwrap();
3112                assert_eq!(extended_block.block.round(), round);
3113                assert_eq!(
3114                    extended_block.block.author(),
3115                    core_fixture.core.context.own_index
3116                );
3117
3118                // append the new block to this round blocks
3119                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3120
3121                let block = core_fixture.core.last_proposed_block();
3122
3123                // ensure that produced block is referring to the blocks of last_round
3124                assert_eq!(
3125                    block.ancestors().len(),
3126                    core_fixture.core.context.committee.size()
3127                );
3128                for ancestor in block.ancestors() {
3129                    if block.round() > 1 {
3130                        // don't bother with round 1 block which just contains the genesis blocks.
3131                        assert!(
3132                            last_round_blocks
3133                                .iter()
3134                                .any(|block| block.reference() == *ancestor),
3135                            "Reference from previous round should be added"
3136                        );
3137                    }
3138                }
3139            }
3140
3141            last_round_blocks = this_round_blocks;
3142        }
3143
3144        for core_fixture in cores {
3145            // Flush the DAG state to storage.
3146            core_fixture.dag_state.write().flush();
3147            // Check commits have been persisted to store
3148            let last_commit = core_fixture
3149                .store
3150                .read_last_commit()
3151                .unwrap()
3152                .expect("last commit should be set");
3153            // There are 8 leader rounds with rounds completed up to and including
3154            // round 9. Round 10 blocks will only include their own blocks, so the
3155            // 8th leader will not be committed.
3156            assert_eq!(last_commit.index(), 7);
3157            let all_stored_commits = core_fixture
3158                .store
3159                .scan_commits((0..=CommitIndex::MAX).into())
3160                .unwrap();
3161            assert_eq!(all_stored_commits.len(), 7);
3162        }
3163    }
3164
3165    #[tokio::test]
3166    async fn test_core_compress_proposal_references() {
3167        telemetry_subscribers::init_for_testing();
3168        let default_params = Parameters::default();
3169
3170        let (context, _) = Context::new_for_test(4);
3171        // create the cores and their signals for all the authorities
3172        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3173
3174        let mut last_round_blocks = Vec::new();
3175        let mut all_blocks = Vec::new();
3176
3177        let excluded_authority = AuthorityIndex::new_for_test(3);
3178
3179        for round in 1..=10 {
3180            let mut this_round_blocks = Vec::new();
3181
3182            for core_fixture in &mut cores {
3183                // do not produce any block for authority 3
3184                if core_fixture.core.context.own_index == excluded_authority {
3185                    continue;
3186                }
3187
3188                // try to propose to ensure that we are covering the case where we miss the leader authority 3
3189                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3190                core_fixture
3191                    .core
3192                    .round_tracker_for_tests()
3193                    .write()
3194                    .update_from_probe(
3195                        vec![
3196                            vec![round, round, round, round],
3197                            vec![round, round, round, round],
3198                            vec![round, round, round, round],
3199                            vec![round, round, round, round],
3200                        ],
3201                        vec![
3202                            vec![round, round, round, round],
3203                            vec![round, round, round, round],
3204                            vec![round, round, round, round],
3205                            vec![round, round, round, round],
3206                        ],
3207                    );
3208                core_fixture.core.new_block(round, true).unwrap();
3209
3210                let block = core_fixture.core.last_proposed_block();
3211                assert_eq!(block.round(), round);
3212
3213                // append the new block to this round blocks
3214                this_round_blocks.push(block.clone());
3215            }
3216
3217            last_round_blocks = this_round_blocks.clone();
3218            all_blocks.extend(this_round_blocks);
3219        }
3220
3221        // Now send all the produced blocks to core of authority 3. It should produce a new block. If no compression would
3222        // be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression
3223        // is applied only the last round's (10) blocks should be referenced + the authority's block of round 0.
3224        let core_fixture = &mut cores[excluded_authority];
3225        // Wait for min round delay to allow blocks to be proposed.
3226        sleep(default_params.min_round_delay).await;
3227        // add blocks to trigger proposal.
3228        core_fixture.add_blocks(all_blocks).unwrap();
3229
3230        // Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and
3231        // to round 1 for its own block (created after recovery).
3232        let block = core_fixture.core.last_proposed_block();
3233        assert_eq!(block.round(), 11);
3234        assert_eq!(block.ancestors().len(), 4);
3235        for block_ref in block.ancestors() {
3236            if block_ref.author == excluded_authority {
3237                assert_eq!(block_ref.round, 1);
3238            } else {
3239                assert_eq!(block_ref.round, 10);
3240            }
3241        }
3242
3243        // Flush the DAG state to storage.
3244        core_fixture.dag_state.write().flush();
3245
3246        // Check commits have been persisted to store
3247        let last_commit = core_fixture
3248            .store
3249            .read_last_commit()
3250            .unwrap()
3251            .expect("last commit should be set");
3252        // There are 8 leader rounds with rounds completed up to and including
3253        // round 10. However because there were no blocks produced for authority 3
3254        // 2 leader rounds will be skipped.
3255        assert_eq!(last_commit.index(), 6);
3256        let all_stored_commits = core_fixture
3257            .store
3258            .scan_commits((0..=CommitIndex::MAX).into())
3259            .unwrap();
3260        assert_eq!(all_stored_commits.len(), 6);
3261    }
3262
3263    #[tokio::test]
3264    async fn try_select_certified_leaders() {
3265        // GIVEN
3266        telemetry_subscribers::init_for_testing();
3267
3268        let (context, _) = Context::new_for_test(4);
3269
3270        let authority_index = AuthorityIndex::new_for_test(0);
3271        let core =
3272            CoreTestFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
3273        let mut core = core.core;
3274
3275        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
3276        dag_builder.layers(1..=12).build();
3277
3278        let limit = 2;
3279
3280        let blocks = dag_builder.blocks(1..=12);
3281        core.dag_state.write().accept_blocks(blocks);
3282
3283        // WHEN
3284        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
3285        let mut certified_commits = sub_dags_and_commits
3286            .into_iter()
3287            .map(|(_, commit)| commit)
3288            .collect::<Vec<_>>();
3289
3290        let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
3291
3292        // THEN
3293        assert_eq!(leaders.len(), 2);
3294        assert_eq!(certified_commits.len(), 2);
3295    }
3296
3297    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
3298        tokio::time::timeout(timeout, receiver.changed())
3299            .await
3300            .expect("Timeout while waiting to read from receiver")
3301            .expect("Signal receive channel shouldn't be closed");
3302        *receiver.borrow_and_update()
3303    }
3304}