consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    iter,
7    sync::Arc,
8    time::Duration,
9    vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumerArgs, TransactionClient, block::CertifiedBlocksOutput,
31    block_verifier::NoopBlockVerifier, storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37        VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_tracker::PeerRoundTracker,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    transaction_certifier::TransactionCertifier,
52    universal_committer::{
53        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54    },
55};
56
57// Maximum number of commit votes to include in a block.
58// TODO: Move to protocol config, and verify in BlockVerifier.
59const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62    context: Arc<Context>,
63    /// The consumer to use in order to pull transactions to be included for the next proposals
64    transaction_consumer: TransactionConsumer,
65    /// This contains the reject votes on transactions which proposed blocks should include.
66    transaction_certifier: TransactionCertifier,
67    /// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
68    /// and accept them or suspend if we are missing their causal history
69    block_manager: BlockManager,
70    /// Whether there are subscribers waiting for new blocks proposed by this authority.
71    /// Core stops proposing new blocks when there is no subscriber, because new proposed blocks
72    /// will likely contain only stale info when they propagate to peers.
73    subscriber_exists: bool,
74    /// Estimated delay by round for propagating blocks to a quorum.
75    /// Because of the nature of TCP and block streaming, propagation delay is expected to be
76    /// 0 in most cases, even when the actual latency of broadcasting blocks is high.
77    /// When this value is higher than the `propagation_delay_stop_proposal_threshold`,
78    /// most likely this validator cannot broadcast  blocks to the network at all.
79    /// Core stops proposing new blocks in this case.
80    propagation_delay: Round,
81    /// Used to make commit decisions for leader blocks in the dag.
82    committer: UniversalCommitter,
83    /// The last new round for which core has sent out a signal.
84    last_signaled_round: Round,
85    /// The blocks of the last included ancestors per authority. This vector is basically used as a
86    /// watermark in order to include in the next block proposal only ancestors of higher rounds.
87    /// By default, is initialised with `None` values.
88    last_included_ancestors: Vec<Option<BlockRef>>,
89    /// The last decided leader returned from the universal committer. Important to note
90    /// that this does not signify that the leader has been persisted yet as it still has
91    /// to go through CommitObserver and persist the commit in store. On recovery/restart
92    /// the last_decided_leader will be set to the last_commit leader in dag state.
93    last_decided_leader: Slot,
94    /// The consensus leader schedule to be used to resolve the leader for a
95    /// given round.
96    leader_schedule: Arc<LeaderSchedule>,
97    /// The commit observer is responsible for observing the commits and collecting
98    /// + sending subdags over the consensus output channel.
99    commit_observer: CommitObserver,
100    /// Sender of outgoing signals from Core.
101    signals: CoreSignals,
102    /// The keypair to be used for block signing
103    block_signer: ProtocolKeyPair,
104    /// Keeping track of state of the DAG, including blocks, commits and last committed rounds.
105    dag_state: Arc<RwLock<DagState>>,
106    /// The last known round for which the node has proposed. Any proposal should be for a round > of this.
107    /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that
108    /// the last block sync mechanism is enabled, but it hasn't been initialised yet.
109    last_known_proposed_round: Option<Round>,
110    // The ancestor state manager will keep track of the quality of the authorities
111    // based on the distribution of their blocks to the network. It will use this
112    // information to decide whether to include that authority block in the next
113    // proposal or not.
114    ancestor_state_manager: AncestorStateManager,
115    // The round tracker will keep track of the highest received and accepted rounds
116    // from all authorities. It will use this information to then calculate the
117    // quorum rounds periodically which is used across other components to make
118    // decisions about block proposals.
119    round_tracker: Arc<RwLock<PeerRoundTracker>>,
120}
121
122impl Core {
123    pub(crate) fn new(
124        context: Arc<Context>,
125        leader_schedule: Arc<LeaderSchedule>,
126        transaction_consumer: TransactionConsumer,
127        transaction_certifier: TransactionCertifier,
128        block_manager: BlockManager,
129        subscriber_exists: bool,
130        commit_observer: CommitObserver,
131        signals: CoreSignals,
132        block_signer: ProtocolKeyPair,
133        dag_state: Arc<RwLock<DagState>>,
134        sync_last_known_own_block: bool,
135        round_tracker: Arc<RwLock<PeerRoundTracker>>,
136    ) -> Self {
137        let last_decided_leader = dag_state.read().last_commit_leader();
138        let number_of_leaders = context
139            .protocol_config
140            .mysticeti_num_leaders_per_round()
141            .unwrap_or(1);
142        let committer = UniversalCommitterBuilder::new(
143            context.clone(),
144            leader_schedule.clone(),
145            dag_state.clone(),
146        )
147        .with_number_of_leaders(number_of_leaders)
148        .with_pipeline(true)
149        .build();
150
151        let last_proposed_block = dag_state.read().get_last_proposed_block();
152
153        let last_signaled_round = last_proposed_block.round();
154
155        // Recover the last included ancestor rounds based on the last proposed block. That will allow
156        // to perform the next block proposal by using ancestor blocks of higher rounds and avoid
157        // re-including blocks that have been already included in the last (or earlier) block proposal.
158        // This is only strongly guaranteed for a quorum of ancestors. It is still possible to re-include
159        // a block from an authority which hadn't been added as part of the last proposal hence its
160        // latest included ancestor is not accurately captured here. This is considered a small deficiency,
161        // and it mostly matters just for this next proposal without any actual penalties in performance
162        // or block proposal.
163        let mut last_included_ancestors = vec![None; context.committee.size()];
164        for ancestor in last_proposed_block.ancestors() {
165            last_included_ancestors[ancestor.author] = Some(*ancestor);
166        }
167
168        let min_propose_round = if sync_last_known_own_block {
169            None
170        } else {
171            // if the sync is disabled then we practically don't want to impose any restriction.
172            Some(0)
173        };
174
175        let propagation_scores = leader_schedule
176            .leader_swap_table
177            .read()
178            .reputation_scores
179            .clone();
180        let mut ancestor_state_manager =
181            AncestorStateManager::new(context.clone(), dag_state.clone());
182        ancestor_state_manager.set_propagation_scores(propagation_scores);
183
184        Self {
185            context,
186            last_signaled_round,
187            last_included_ancestors,
188            last_decided_leader,
189            leader_schedule,
190            transaction_consumer,
191            transaction_certifier,
192            block_manager,
193            subscriber_exists,
194            propagation_delay: 0,
195            committer,
196            commit_observer,
197            signals,
198            block_signer,
199            dag_state,
200            last_known_proposed_round: min_propose_round,
201            ancestor_state_manager,
202            round_tracker,
203        }
204        .recover()
205    }
206
207    fn recover(mut self) -> Self {
208        let _s = self
209            .context
210            .metrics
211            .node_metrics
212            .scope_processing_time
213            .with_label_values(&["Core::recover"])
214            .start_timer();
215
216        // Try to commit and propose, since they may not have run after the last storage write.
217        self.try_commit(vec![]).unwrap();
218
219        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
220        {
221            last_proposed_block
222        } else {
223            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
224
225            if self.should_propose() {
226                assert!(
227                    last_proposed_block.round() > GENESIS_ROUND,
228                    "At minimum a block of round higher than genesis should have been produced during recovery"
229                );
230            }
231
232            // if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
233            self.signals
234                .new_block(ExtendedBlock {
235                    block: last_proposed_block.clone(),
236                    excluded_ancestors: vec![],
237                })
238                .unwrap();
239            last_proposed_block
240        };
241
242        // Try to set up leader timeout if needed.
243        // This needs to be called after try_commit() and try_propose(), which may
244        // have advanced the threshold clock round.
245        self.try_signal_new_round();
246
247        info!(
248            "Core recovery completed with last proposed block {:?}",
249            last_proposed_block
250        );
251
252        self
253    }
254
255    /// Processes the provided blocks and accepts them if possible when their causal history exists.
256    /// The method returns:
257    /// - The references of ancestors missing their block
258    #[tracing::instrument(skip_all)]
259    pub(crate) fn add_blocks(
260        &mut self,
261        blocks: Vec<VerifiedBlock>,
262    ) -> ConsensusResult<BTreeSet<BlockRef>> {
263        let _scope = monitored_scope("Core::add_blocks");
264        let _s = self
265            .context
266            .metrics
267            .node_metrics
268            .scope_processing_time
269            .with_label_values(&["Core::add_blocks"])
270            .start_timer();
271        self.context
272            .metrics
273            .node_metrics
274            .core_add_blocks_batch_size
275            .observe(blocks.len() as f64);
276
277        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
278
279        if !accepted_blocks.is_empty() {
280            trace!(
281                "Accepted blocks: {}",
282                accepted_blocks
283                    .iter()
284                    .map(|b| b.reference().to_string())
285                    .join(",")
286            );
287
288            // Try to commit the new blocks if possible.
289            self.try_commit(vec![])?;
290
291            // Try to propose now since there are new blocks accepted.
292            self.try_propose(false)?;
293
294            // Now set up leader timeout if needed.
295            // This needs to be called after try_commit() and try_propose(), which may
296            // have advanced the threshold clock round.
297            self.try_signal_new_round();
298        };
299
300        if !missing_block_refs.is_empty() {
301            trace!(
302                "Missing block refs: {}",
303                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
304            );
305        }
306
307        Ok(missing_block_refs)
308    }
309
310    // Adds the certified commits that have been synced via the commit syncer. We are using the commit info in order to skip running the decision
311    // rule and immediately commit the corresponding leaders and sub dags. Pay attention that no block acceptance is happening here, but rather
312    // internally in the `try_commit` method which ensures that everytime only the blocks corresponding to the certified commits that are about to
313    // be committed are accepted.
314    #[tracing::instrument(skip_all)]
315    pub(crate) fn add_certified_commits(
316        &mut self,
317        certified_commits: CertifiedCommits,
318    ) -> ConsensusResult<BTreeSet<BlockRef>> {
319        let _scope = monitored_scope("Core::add_certified_commits");
320
321        let votes = certified_commits.votes().to_vec();
322        let commits = self
323            .filter_new_commits(certified_commits.commits().to_vec())
324            .expect("Certified commits validation failed");
325
326        // Try to accept the certified commit votes.
327        // Even if they may not be part of a future commit, these blocks are useful for certifying
328        // commits when helping peers sync commits.
329        let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
330
331        // Try to commit the new blocks. Take into account the trusted commit that has been provided.
332        self.try_commit(commits)?;
333
334        // Try to propose now since there are new blocks accepted.
335        self.try_propose(false)?;
336
337        // Now set up leader timeout if needed.
338        // This needs to be called after try_commit() and try_propose(), which may
339        // have advanced the threshold clock round.
340        self.try_signal_new_round();
341
342        Ok(missing_block_refs)
343    }
344
345    /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations.
346    /// Returns the references of missing blocks among the input blocks.
347    pub(crate) fn check_block_refs(
348        &mut self,
349        block_refs: Vec<BlockRef>,
350    ) -> ConsensusResult<BTreeSet<BlockRef>> {
351        let _scope = monitored_scope("Core::check_block_refs");
352        let _s = self
353            .context
354            .metrics
355            .node_metrics
356            .scope_processing_time
357            .with_label_values(&["Core::check_block_refs"])
358            .start_timer();
359        self.context
360            .metrics
361            .node_metrics
362            .core_check_block_refs_batch_size
363            .observe(block_refs.len() as f64);
364
365        // Try to find them via the block manager
366        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
367
368        if !missing_block_refs.is_empty() {
369            trace!(
370                "Missing block refs: {}",
371                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
372            );
373        }
374        Ok(missing_block_refs)
375    }
376
377    /// If needed, signals a new clock round and sets up leader timeout.
378    fn try_signal_new_round(&mut self) {
379        // Signal only when the threshold clock round is more advanced than the last signaled round.
380        //
381        // NOTE: a signal is still sent even when a block has been proposed at the new round.
382        // We can consider changing this in the future.
383        let new_clock_round = self.dag_state.read().threshold_clock_round();
384        if new_clock_round <= self.last_signaled_round {
385            return;
386        }
387        // Then send a signal to set up leader timeout.
388        self.signals.new_round(new_clock_round);
389        self.last_signaled_round = new_clock_round;
390
391        // Report the threshold clock round
392        self.context
393            .metrics
394            .node_metrics
395            .threshold_clock_round
396            .set(new_clock_round as i64);
397    }
398
399    /// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
400    /// when the min timeout expires or max. When `force = true` , then any checks like previous round
401    /// leader existence will get skipped.
402    pub(crate) fn new_block(
403        &mut self,
404        round: Round,
405        force: bool,
406    ) -> ConsensusResult<Option<VerifiedBlock>> {
407        let _scope = monitored_scope("Core::new_block");
408        if self.last_proposed_round() < round {
409            self.context
410                .metrics
411                .node_metrics
412                .leader_timeout_total
413                .with_label_values(&[&format!("{force}")])
414                .inc();
415            let result = self.try_propose(force);
416            // The threshold clock round may have advanced, so a signal needs to be sent.
417            self.try_signal_new_round();
418            return result;
419        }
420        Ok(None)
421    }
422
423    /// Keeps only the certified commits that have a commit index > last commit index.
424    /// It also ensures that the first commit in the list is the next one in line, otherwise it panics.
425    fn filter_new_commits(
426        &mut self,
427        commits: Vec<CertifiedCommit>,
428    ) -> ConsensusResult<Vec<CertifiedCommit>> {
429        // Filter out the commits that have been already locally committed and keep only anything that is above the last committed index.
430        let last_commit_index = self.dag_state.read().last_commit_index();
431        let commits = commits
432            .iter()
433            .filter(|commit| {
434                if commit.index() > last_commit_index {
435                    true
436                } else {
437                    tracing::debug!(
438                        "Skip commit for index {} as it is already committed with last commit index {}",
439                        commit.index(),
440                        last_commit_index
441                    );
442                    false
443                }
444            })
445            .cloned()
446            .collect::<Vec<_>>();
447
448        // Make sure that the first commit we find is the next one in line and there is no gap.
449        if let Some(commit) = commits.first()
450            && commit.index() != last_commit_index + 1
451        {
452            return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
453                expected_commit_index: last_commit_index + 1,
454                commit_index: commit.index(),
455            });
456        }
457
458        Ok(commits)
459    }
460
461    // Attempts to create a new block, persist and propose it to all peers.
462    // When force is true, ignore if leader from the last round exists among ancestors and if
463    // the minimum round delay has passed.
464    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
465        if !self.should_propose() {
466            return Ok(None);
467        }
468        if let Some(extended_block) = self.try_new_block(force) {
469            self.signals.new_block(extended_block.clone())?;
470
471            fail_point!("consensus-after-propose");
472
473            // The new block may help commit.
474            self.try_commit(vec![])?;
475            return Ok(Some(extended_block.block));
476        }
477        Ok(None)
478    }
479
480    /// Attempts to propose a new block for the next round. If a block has already proposed for latest
481    /// or earlier round, then no block is created and None is returned.
482    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
483        let _s = self
484            .context
485            .metrics
486            .node_metrics
487            .scope_processing_time
488            .with_label_values(&["Core::try_new_block"])
489            .start_timer();
490
491        // Ensure the new block has a higher round than the last proposed block.
492        let clock_round = {
493            let dag_state = self.dag_state.read();
494            let clock_round = dag_state.threshold_clock_round();
495            if clock_round <= dag_state.get_last_proposed_block().round() {
496                debug!(
497                    "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
498                    clock_round,
499                    dag_state.get_last_proposed_block().round()
500                );
501                return None;
502            }
503            clock_round
504        };
505
506        // There must be a quorum of blocks from the previous round.
507        let quorum_round = clock_round.saturating_sub(1);
508
509        // Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
510        // or because we are actually ready to produce the block (leader exists and min delay has passed).
511        if !force {
512            if !self.leaders_exist(quorum_round) {
513                return None;
514            }
515
516            if Duration::from_millis(
517                self.context
518                    .clock
519                    .timestamp_utc_ms()
520                    .saturating_sub(self.last_proposed_timestamp_ms()),
521            ) < self.context.parameters.min_round_delay
522            {
523                debug!(
524                    "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
525                    clock_round,
526                    self.last_proposed_timestamp_ms(),
527                    self.context.parameters.min_round_delay.as_millis(),
528                );
529                return None;
530            }
531        }
532
533        // Determine the ancestors to be included in proposal.
534        let (ancestors, excluded_and_equivocating_ancestors) =
535            self.smart_ancestors_to_propose(clock_round, !force);
536
537        // If we did not find enough good ancestors to propose, continue to wait before proposing.
538        if ancestors.is_empty() {
539            assert!(
540                !force,
541                "Ancestors should have been returned if force is true!"
542            );
543            debug!(
544                "Skipping block proposal for round {} because no good ancestor is found",
545                clock_round,
546            );
547            return None;
548        }
549
550        let excluded_ancestors_limit = self.context.committee.size() * 2;
551        if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
552            debug!(
553                "Dropping {} excluded ancestor(s) during proposal due to size limit",
554                excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
555            );
556        }
557        let excluded_ancestors = excluded_and_equivocating_ancestors
558            .into_iter()
559            .take(excluded_ancestors_limit)
560            .collect();
561
562        // Update the last included ancestor block refs
563        for ancestor in &ancestors {
564            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
565        }
566
567        let leader_authority = &self
568            .context
569            .committee
570            .authority(self.first_leader(quorum_round))
571            .hostname;
572        self.context
573            .metrics
574            .node_metrics
575            .block_proposal_leader_wait_ms
576            .with_label_values(&[leader_authority])
577            .inc_by(
578                Instant::now()
579                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
580                    .as_millis() as u64,
581            );
582        self.context
583            .metrics
584            .node_metrics
585            .block_proposal_leader_wait_count
586            .with_label_values(&[leader_authority])
587            .inc();
588
589        self.context
590            .metrics
591            .node_metrics
592            .proposed_block_ancestors
593            .observe(ancestors.len() as f64);
594        for ancestor in &ancestors {
595            let authority = &self.context.committee.authority(ancestor.author()).hostname;
596            self.context
597                .metrics
598                .node_metrics
599                .proposed_block_ancestors_depth
600                .with_label_values(&[authority])
601                .observe(clock_round.saturating_sub(ancestor.round()).into());
602        }
603
604        let now = self.context.clock.timestamp_utc_ms();
605        ancestors.iter().for_each(|block| {
606            if block.timestamp_ms() > now {
607                trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
608                let authority = &self.context.committee.authority(block.author()).hostname;
609                self.context
610                    .metrics
611                    .node_metrics
612                    .proposed_block_ancestors_timestamp_drift_ms
613                    .with_label_values(&[authority])
614                    .inc_by(block.timestamp_ms().saturating_sub(now));
615            }
616        });
617
618        // Consume the next transactions to be included. Do not drop the guards yet as this would acknowledge
619        // the inclusion of transactions. Just let this be done in the end of the method.
620        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
621        self.context
622            .metrics
623            .node_metrics
624            .proposed_block_transactions
625            .observe(transactions.len() as f64);
626
627        // Consume the commit votes to be included.
628        let commit_votes = self
629            .dag_state
630            .write()
631            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
632
633        let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
634            let hard_linked_ancestors = {
635                let mut dag_state = self.dag_state.write();
636                ancestors
637                    .iter()
638                    .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
639                    .collect()
640            };
641            self.transaction_certifier
642                .get_own_votes(hard_linked_ancestors)
643        } else {
644            vec![]
645        };
646
647        // Create the block and insert to storage.
648        let block = if self.context.protocol_config.mysticeti_fastpath() {
649            Block::V2(BlockV2::new(
650                self.context.committee.epoch(),
651                clock_round,
652                self.context.own_index,
653                now,
654                ancestors.iter().map(|b| b.reference()).collect(),
655                transactions,
656                commit_votes,
657                transaction_votes,
658                vec![],
659            ))
660        } else {
661            Block::V1(BlockV1::new(
662                self.context.committee.epoch(),
663                clock_round,
664                self.context.own_index,
665                now,
666                ancestors.iter().map(|b| b.reference()).collect(),
667                transactions,
668                commit_votes,
669                vec![],
670            ))
671        };
672        let signed_block =
673            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
674        let serialized = signed_block
675            .serialize()
676            .expect("Block serialization failed.");
677        self.context
678            .metrics
679            .node_metrics
680            .proposed_block_size
681            .observe(serialized.len() as f64);
682        // Own blocks are assumed to be valid.
683        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
684
685        // Record the interval from last proposal, before accepting the proposed block.
686        let last_proposed_block = self.last_proposed_block();
687        if last_proposed_block.round() > 0 {
688            self.context
689                .metrics
690                .node_metrics
691                .block_proposal_interval
692                .observe(
693                    Duration::from_millis(
694                        verified_block
695                            .timestamp_ms()
696                            .saturating_sub(last_proposed_block.timestamp_ms()),
697                    )
698                    .as_secs_f64(),
699                );
700        }
701
702        // Accept the block into BlockManager and DagState.
703        let (accepted_blocks, missing) = self
704            .block_manager
705            .try_accept_blocks(vec![verified_block.clone()]);
706        assert_eq!(accepted_blocks.len(), 1);
707        assert!(missing.is_empty());
708
709        // The block must be added to transaction certifier before it is broadcasted or added to DagState.
710        // Update proposed state of blocks in local DAG.
711        // TODO(fastpath): move this logic and the logic afterwards to proposed block handler.
712        if self.context.protocol_config.mysticeti_fastpath() {
713            self.transaction_certifier
714                .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
715            self.dag_state
716                .write()
717                .link_causal_history(verified_block.reference());
718        }
719
720        // Ensure the new block and its ancestors are persisted, before broadcasting it.
721        self.dag_state.write().flush();
722
723        // Now acknowledge the transactions for their inclusion to block
724        ack_transactions(verified_block.reference());
725
726        info!("Created block {verified_block:?} for round {clock_round}");
727
728        self.context
729            .metrics
730            .node_metrics
731            .proposed_blocks
732            .with_label_values(&[&force.to_string()])
733            .inc();
734
735        let extended_block = ExtendedBlock {
736            block: verified_block,
737            excluded_ancestors,
738        };
739
740        // Update round tracker with our own highest accepted blocks
741        self.round_tracker
742            .write()
743            .update_from_accepted_block(&extended_block);
744
745        Some(extended_block)
746    }
747
748    /// Runs commit rule to attempt to commit additional blocks from the DAG. If any `certified_commits` are provided, then
749    /// it will attempt to commit those first before trying to commit any further leaders.
750    fn try_commit(
751        &mut self,
752        mut certified_commits: Vec<CertifiedCommit>,
753    ) -> ConsensusResult<Vec<CommittedSubDag>> {
754        let _s = self
755            .context
756            .metrics
757            .node_metrics
758            .scope_processing_time
759            .with_label_values(&["Core::try_commit"])
760            .start_timer();
761
762        let mut certified_commits_map = BTreeMap::new();
763        for c in &certified_commits {
764            certified_commits_map.insert(c.index(), c.reference());
765        }
766
767        if !certified_commits.is_empty() {
768            info!(
769                "Processing synced commits: {:?}",
770                certified_commits
771                    .iter()
772                    .map(|c| (c.index(), c.leader()))
773                    .collect::<Vec<_>>()
774            );
775        }
776
777        let mut committed_sub_dags = Vec::new();
778        // TODO: Add optimization to abort early without quorum for a round.
779        loop {
780            // LeaderSchedule has a limit to how many sequenced leaders can be committed
781            // before a change is triggered. Calling into leader schedule will get you
782            // how many commits till next leader change. We will loop back and recalculate
783            // any discarded leaders with the new schedule.
784            let mut commits_until_update = self
785                .leader_schedule
786                .commits_until_leader_schedule_update(self.dag_state.clone());
787
788            if commits_until_update == 0 {
789                let last_commit_index = self.dag_state.read().last_commit_index();
790
791                tracing::info!(
792                    "Leader schedule change triggered at commit index {last_commit_index}"
793                );
794
795                self.leader_schedule
796                    .update_leader_schedule_v2(&self.dag_state);
797
798                let propagation_scores = self
799                    .leader_schedule
800                    .leader_swap_table
801                    .read()
802                    .reputation_scores
803                    .clone();
804                self.ancestor_state_manager
805                    .set_propagation_scores(propagation_scores);
806
807                commits_until_update = self
808                    .leader_schedule
809                    .commits_until_leader_schedule_update(self.dag_state.clone());
810
811                fail_point!("consensus-after-leader-schedule-change");
812            }
813            assert!(commits_until_update > 0);
814
815            // If there are certified commits to process, find out which leaders and commits from them
816            // are decided and use them as the next commits.
817            let (certified_leaders, decided_certified_commits): (
818                Vec<DecidedLeader>,
819                Vec<CertifiedCommit>,
820            ) = self
821                .try_select_certified_leaders(&mut certified_commits, commits_until_update)
822                .into_iter()
823                .unzip();
824
825            // Only accept blocks for the certified commits that we are certain to sequence.
826            // This ensures that only blocks corresponding to committed certified commits are flushed to disk.
827            // Blocks from non-committed certified commits will not be flushed, preventing issues during crash-recovery.
828            // This avoids scenarios where accepting and flushing blocks of non-committed certified commits could lead to
829            // premature commit rule execution. Due to GC, this could cause a panic if the commit rule tries to access
830            // missing causal history from blocks of certified commits.
831            let blocks = decided_certified_commits
832                .iter()
833                .flat_map(|c| c.blocks())
834                .cloned()
835                .collect::<Vec<_>>();
836            self.block_manager.try_accept_committed_blocks(blocks);
837
838            // If there is no certified commit to process, run the decision rule.
839            let (decided_leaders, local) = if certified_leaders.is_empty() {
840                // TODO: limit commits by commits_until_update for efficiency, which may be needed when leader schedule length is reduced.
841                let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
842                // Truncate the decided leaders to fit the commit schedule limit.
843                if decided_leaders.len() >= commits_until_update {
844                    let _ = decided_leaders.split_off(commits_until_update);
845                }
846                (decided_leaders, true)
847            } else {
848                (certified_leaders, false)
849            };
850
851            // If the decided leaders list is empty then just break the loop.
852            let Some(last_decided) = decided_leaders.last().cloned() else {
853                break;
854            };
855
856            self.last_decided_leader = last_decided.slot();
857            self.context
858                .metrics
859                .node_metrics
860                .last_decided_leader_round
861                .set(self.last_decided_leader.round as i64);
862
863            let sequenced_leaders = decided_leaders
864                .into_iter()
865                .filter_map(|leader| leader.into_committed_block())
866                .collect::<Vec<_>>();
867            // It's possible to reach this point as the decided leaders might all of them be "Skip" decisions. In this case there is no
868            // leader to commit and we should break the loop.
869            if sequenced_leaders.is_empty() {
870                break;
871            }
872            tracing::info!(
873                "Committing {} leaders: {}; {} commits before next leader schedule change",
874                sequenced_leaders.len(),
875                sequenced_leaders
876                    .iter()
877                    .map(|b| b.reference().to_string())
878                    .join(","),
879                commits_until_update,
880            );
881
882            // TODO: refcount subdags
883            let subdags = self
884                .commit_observer
885                .handle_commit(sequenced_leaders, local)?;
886
887            // Try to unsuspend blocks if gc_round has advanced.
888            self.block_manager
889                .try_unsuspend_blocks_for_latest_gc_round();
890
891            committed_sub_dags.extend(subdags);
892
893            fail_point!("consensus-after-handle-commit");
894        }
895
896        // Sanity check: for commits that have been linearized using the certified commits, ensure that the same sub dag has been committed.
897        for sub_dag in &committed_sub_dags {
898            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
899                assert_eq!(
900                    commit_ref, sub_dag.commit_ref,
901                    "Certified commit has different reference than the committed sub dag"
902                );
903            }
904        }
905
906        // Notify about our own committed blocks
907        let committed_block_refs = committed_sub_dags
908            .iter()
909            .flat_map(|sub_dag| sub_dag.blocks.iter())
910            .filter_map(|block| {
911                (block.author() == self.context.own_index).then_some(block.reference())
912            })
913            .collect::<Vec<_>>();
914        self.transaction_consumer
915            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
916
917        Ok(committed_sub_dags)
918    }
919
920    pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
921        let _scope = monitored_scope("Core::get_missing_blocks");
922        self.block_manager.missing_blocks()
923    }
924
925    /// Sets if there is consumer available to consume blocks produced by the core.
926    pub(crate) fn set_subscriber_exists(&mut self, exists: bool) {
927        info!("Block subscriber exists: {exists}");
928        self.subscriber_exists = exists;
929    }
930
931    /// Sets the delay by round for propagating blocks to a quorum.
932    pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
933        info!("Propagation round delay set to: {delay}");
934        self.propagation_delay = delay;
935    }
936
937    /// Sets the min propose round for the proposer allowing to propose blocks only for round numbers
938    /// `> last_known_proposed_round`. At the moment is allowed to call the method only once leading to a panic
939    /// if attempt to do multiple times.
940    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
941        if self.last_known_proposed_round.is_some() {
942            panic!(
943                "Should not attempt to set the last known proposed round if that has been already set"
944            );
945        }
946        self.last_known_proposed_round = Some(round);
947        info!("Last known proposed round set to {round}");
948    }
949
950    /// Whether the core should propose new blocks.
951    pub(crate) fn should_propose(&self) -> bool {
952        let clock_round = self.dag_state.read().threshold_clock_round();
953        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
954
955        if !self.subscriber_exists {
956            debug!("Skip proposing for round {clock_round}, no subscriber exists.");
957            core_skipped_proposals
958                .with_label_values(&["no_subscriber"])
959                .inc();
960            return false;
961        }
962
963        if self.propagation_delay
964            > self
965                .context
966                .parameters
967                .propagation_delay_stop_proposal_threshold
968        {
969            debug!(
970                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
971                self.propagation_delay,
972                self.context
973                    .parameters
974                    .propagation_delay_stop_proposal_threshold
975            );
976            core_skipped_proposals
977                .with_label_values(&["high_propagation_delay"])
978                .inc();
979            return false;
980        }
981
982        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
983            debug!(
984                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
985            );
986            core_skipped_proposals
987                .with_label_values(&["no_last_known_proposed_round"])
988                .inc();
989            return false;
990        };
991        if clock_round <= last_known_proposed_round {
992            debug!(
993                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
994            );
995            core_skipped_proposals
996                .with_label_values(&["higher_last_known_proposed_round"])
997                .inc();
998            return false;
999        }
1000
1001        true
1002    }
1003
1004    // Tries to select a prefix of certified commits to be committed next respecting the `limit`.
1005    // If provided `limit` is zero, it will panic.
1006    // The function returns a list of certified leaders and certified commits. If empty vector is returned, it means that
1007    // there are no certified commits to be committed, as input `certified_commits` is either empty or all of the certified
1008    // commits have been already committed.
1009    #[tracing::instrument(skip_all)]
1010    fn try_select_certified_leaders(
1011        &mut self,
1012        certified_commits: &mut Vec<CertifiedCommit>,
1013        limit: usize,
1014    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
1015        assert!(limit > 0, "limit should be greater than 0");
1016        if certified_commits.is_empty() {
1017            return vec![];
1018        }
1019
1020        let to_commit = if certified_commits.len() >= limit {
1021            // We keep only the number of leaders as dictated by the `limit`
1022            certified_commits.drain(..limit).collect::<Vec<_>>()
1023        } else {
1024            // Otherwise just take all of them and leave the `synced_commits` empty.
1025            std::mem::take(certified_commits)
1026        };
1027
1028        tracing::debug!(
1029            "Selected {} certified leaders: {}",
1030            to_commit.len(),
1031            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1032        );
1033
1034        to_commit
1035            .into_iter()
1036            .map(|commit| {
1037                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1038                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1039                // There is no knowledge of direct commit with certified commits, so assuming indirect commit.
1040                let leader = DecidedLeader::Commit(leader.clone(), /* direct */ false);
1041                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1042                (leader, commit)
1043            })
1044            .collect::<Vec<_>>()
1045    }
1046
1047    /// Retrieves the next ancestors to propose to form a block at `clock_round` round.
1048    /// If smart selection is enabled then this will try to select the best ancestors
1049    /// based on the propagation scores of the authorities.
1050    fn smart_ancestors_to_propose(
1051        &mut self,
1052        clock_round: Round,
1053        smart_select: bool,
1054    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1055        let node_metrics = &self.context.metrics.node_metrics;
1056        let _s = node_metrics
1057            .scope_processing_time
1058            .with_label_values(&["Core::smart_ancestors_to_propose"])
1059            .start_timer();
1060
1061        // Now take the ancestors before the clock_round (excluded) for each authority.
1062        let all_ancestors = self
1063            .dag_state
1064            .read()
1065            .get_last_cached_block_per_authority(clock_round);
1066
1067        assert_eq!(
1068            all_ancestors.len(),
1069            self.context.committee.size(),
1070            "Fatal error, number of returned ancestors don't match committee size."
1071        );
1072
1073        // Ensure ancestor state is up to date before selecting for proposal.
1074        let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1075
1076        self.ancestor_state_manager
1077            .update_all_ancestors_state(&accepted_quorum_rounds);
1078
1079        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1080
1081        let quorum_round = clock_round.saturating_sub(1);
1082
1083        let mut score_and_pending_excluded_ancestors = Vec::new();
1084        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1085
1086        // Propose only ancestors of higher rounds than what has already been proposed.
1087        // And always include own last proposed block first among ancestors.
1088        // Start by only including the high scoring ancestors. Low scoring ancestors
1089        // will be included in a second pass below.
1090        let included_ancestors = iter::once(self.last_proposed_block().clone())
1091            .chain(
1092                all_ancestors
1093                    .into_iter()
1094                    .flat_map(|(ancestor, equivocating_ancestors)| {
1095                        if ancestor.author() == self.context.own_index {
1096                            return None;
1097                        }
1098                        if let Some(last_block_ref) =
1099                            self.last_included_ancestors[ancestor.author()]
1100                            && last_block_ref.round >= ancestor.round() {
1101                                return None;
1102                            }
1103
1104                        // We will never include equivocating ancestors so add them immediately
1105                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1106
1107                        let ancestor_state = ancestor_state_map[ancestor.author()];
1108                        match ancestor_state {
1109                            AncestorState::Include => {
1110                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1111                            }
1112                            AncestorState::Exclude(score) => {
1113                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1114                                score_and_pending_excluded_ancestors.push((score, ancestor));
1115                                return None;
1116                            }
1117                        }
1118
1119                        Some(ancestor)
1120                    }),
1121            )
1122            .collect::<Vec<_>>();
1123
1124        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1125
1126        // Check total stake of high scoring parent round ancestors
1127        for ancestor in included_ancestors
1128            .iter()
1129            .filter(|a| a.round() == quorum_round)
1130        {
1131            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1132        }
1133
1134        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1135            node_metrics.smart_selection_wait.inc();
1136            debug!(
1137                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1138                parent_round_quorum.stake()
1139            );
1140            return (vec![], BTreeSet::new());
1141        }
1142
1143        // Sort scores descending so we can include the best of the pending excluded
1144        // ancestors first until we reach the threshold.
1145        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1146
1147        let mut ancestors_to_propose = included_ancestors;
1148        let mut excluded_ancestors = Vec::new();
1149        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1150            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1151            if !parent_round_quorum.reached_threshold(&self.context.committee)
1152                && ancestor.round() == quorum_round
1153            {
1154                debug!(
1155                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1156                );
1157                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1158                ancestors_to_propose.push(ancestor);
1159                node_metrics
1160                    .included_excluded_proposal_ancestors_count_by_authority
1161                    .with_label_values(&[block_hostname, "timeout"])
1162                    .inc();
1163            } else {
1164                excluded_ancestors.push((score, ancestor));
1165            }
1166        }
1167
1168        // Iterate through excluded ancestors and include the ancestor or the ancestor's ancestor
1169        // that has been accepted by a quorum of the network. If the original ancestor itself
1170        // is not included then it will be part of excluded ancestors that are not
1171        // included in the block but will still be broadcasted to peers.
1172        for (score, ancestor) in excluded_ancestors.iter() {
1173            let excluded_author = ancestor.author();
1174            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1175            // A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
1176            let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1177            // If the accepted quorum round of this ancestor is greater than or equal
1178            // to the clock round then we want to make sure to set it to clock_round - 1
1179            // as that is the max round the new block can include as an ancestor.
1180            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1181
1182            let last_included_round = self.last_included_ancestors[excluded_author]
1183                .map(|block_ref| block_ref.round)
1184                .unwrap_or(GENESIS_ROUND);
1185            if ancestor.round() <= last_included_round {
1186                // This should have already been filtered out when filtering all_ancestors.
1187                // Still, ensure previously included ancestors are filtered out.
1188                continue;
1189            }
1190
1191            if last_included_round >= accepted_low_quorum_round {
1192                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1193                trace!(
1194                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1195                    ancestor.reference()
1196                );
1197                node_metrics
1198                    .excluded_proposal_ancestors_count_by_authority
1199                    .with_label_values(&[block_hostname])
1200                    .inc();
1201                continue;
1202            }
1203
1204            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1205                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1206                ancestor.clone()
1207            } else {
1208                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1209                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1210                trace!(
1211                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1212                    ancestor.reference(),
1213                    ancestor.round()
1214                );
1215                node_metrics
1216                    .excluded_proposal_ancestors_count_by_authority
1217                    .with_label_values(&[block_hostname])
1218                    .inc();
1219
1220                // Look for an earlier block in the ancestor chain that we can include as there
1221                // is a gap between the last included round and the accepted low quorum round.
1222                //
1223                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1224                // do not need to be propagated.
1225                match self.dag_state.read().get_last_cached_block_in_range(
1226                    excluded_author,
1227                    last_included_round + 1,
1228                    accepted_low_quorum_round + 1,
1229                ) {
1230                    Some(earlier_ancestor) => {
1231                        // Found an earlier block that has been propagated well - include it instead
1232                        earlier_ancestor
1233                    }
1234                    None => {
1235                        // No suitable earlier block found
1236                        continue;
1237                    }
1238                }
1239            };
1240            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1241            ancestors_to_propose.push(ancestor.clone());
1242            trace!(
1243                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1244                ancestor.reference()
1245            );
1246            node_metrics
1247                .included_excluded_proposal_ancestors_count_by_authority
1248                .with_label_values(&[block_hostname, "quorum"])
1249                .inc();
1250        }
1251
1252        assert!(
1253            parent_round_quorum.reached_threshold(&self.context.committee),
1254            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1255        );
1256
1257        debug!(
1258            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1259            ancestors_to_propose.len(),
1260            excluded_and_equivocating_ancestors.len()
1261        );
1262
1263        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1264    }
1265
1266    /// Checks whether all the leaders of the round exist.
1267    /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout
1268    /// Ex if we already have one leader - the first in order - we might don't want to wait as much.
1269    fn leaders_exist(&self, round: Round) -> bool {
1270        let dag_state = self.dag_state.read();
1271        for leader in self.leaders(round) {
1272            // Search for all the leaders. If at least one is not found, then return false.
1273            // A linear search should be fine here as the set of elements is not expected to be small enough and more sophisticated
1274            // data structures might not give us much here.
1275            if !dag_state.contains_cached_block_at_slot(leader) {
1276                return false;
1277            }
1278        }
1279
1280        true
1281    }
1282
1283    /// Returns the leaders of the provided round.
1284    fn leaders(&self, round: Round) -> Vec<Slot> {
1285        self.committer
1286            .get_leaders(round)
1287            .into_iter()
1288            .map(|authority_index| Slot::new(round, authority_index))
1289            .collect()
1290    }
1291
1292    /// Returns the 1st leader of the round.
1293    fn first_leader(&self, round: Round) -> AuthorityIndex {
1294        self.leaders(round).first().unwrap().authority
1295    }
1296
1297    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1298        self.last_proposed_block().timestamp_ms()
1299    }
1300
1301    fn last_proposed_round(&self) -> Round {
1302        self.last_proposed_block().round()
1303    }
1304
1305    fn last_proposed_block(&self) -> VerifiedBlock {
1306        self.dag_state.read().get_last_proposed_block()
1307    }
1308}
1309
1310/// Senders of signals from Core, for outputs and events (ex new block produced).
1311pub(crate) struct CoreSignals {
1312    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1313    new_round_sender: watch::Sender<Round>,
1314    context: Arc<Context>,
1315}
1316
1317impl CoreSignals {
1318    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1319        // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state,
1320        // since the underlying blocks are ref counted so a lower buffer here will not reduce memory
1321        // usage significantly.
1322        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1323            context.parameters.dag_state_cached_rounds as usize,
1324        );
1325        let (new_round_sender, new_round_receiver) = watch::channel(0);
1326
1327        let me = Self {
1328            tx_block_broadcast,
1329            new_round_sender,
1330            context,
1331        };
1332
1333        let receivers = CoreSignalsReceivers {
1334            rx_block_broadcast,
1335            new_round_receiver,
1336        };
1337
1338        (me, receivers)
1339    }
1340
1341    /// Sends a signal to all the waiters that a new block has been produced. The method will return
1342    /// true if block has reached even one subscriber, false otherwise.
1343    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1344        // When there is only one authority in committee, it is unnecessary to broadcast
1345        // the block which will fail anyway without subscribers to the signal.
1346        if self.context.committee.size() > 1 {
1347            if extended_block.block.round() == GENESIS_ROUND {
1348                debug!("Ignoring broadcasting genesis block to peers");
1349                return Ok(());
1350            }
1351
1352            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1353                warn!("Couldn't broadcast the block to any receiver: {err}");
1354                return Err(ConsensusError::Shutdown);
1355            }
1356        } else {
1357            debug!(
1358                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1359            );
1360        }
1361        Ok(())
1362    }
1363
1364    /// Sends a signal that threshold clock has advanced to new round. The `round_number` is the round at which the
1365    /// threshold clock has advanced to.
1366    pub(crate) fn new_round(&mut self, round_number: Round) {
1367        let _ = self.new_round_sender.send_replace(round_number);
1368    }
1369}
1370
1371/// Receivers of signals from Core.
1372/// Intentionally un-clonable. Comonents should only subscribe to channels they need.
1373pub(crate) struct CoreSignalsReceivers {
1374    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1375    new_round_receiver: watch::Receiver<Round>,
1376}
1377
1378impl CoreSignalsReceivers {
1379    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1380        self.rx_block_broadcast.resubscribe()
1381    }
1382
1383    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1384        self.new_round_receiver.clone()
1385    }
1386}
1387
1388/// Creates cores for the specified number of authorities for their corresponding stakes. The method returns the
1389/// cores and their respective signal receivers are returned in `AuthorityIndex` order asc.
1390#[cfg(test)]
1391pub(crate) async fn create_cores(
1392    context: Context,
1393    authorities: Vec<Stake>,
1394) -> Vec<CoreTextFixture> {
1395    let mut cores = Vec::new();
1396
1397    for index in 0..authorities.len() {
1398        let own_index = AuthorityIndex::new_for_test(index as u32);
1399        let core =
1400            CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1401        cores.push(core);
1402    }
1403    cores
1404}
1405
1406#[cfg(test)]
1407pub(crate) struct CoreTextFixture {
1408    pub(crate) core: Core,
1409    pub(crate) transaction_certifier: TransactionCertifier,
1410    pub(crate) signal_receivers: CoreSignalsReceivers,
1411    pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1412    pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1413    pub(crate) _blocks_output_receiver: UnboundedReceiver<CertifiedBlocksOutput>,
1414    pub(crate) dag_state: Arc<RwLock<DagState>>,
1415    pub(crate) store: Arc<MemStore>,
1416}
1417
1418#[cfg(test)]
1419impl CoreTextFixture {
1420    async fn new(
1421        context: Context,
1422        authorities: Vec<Stake>,
1423        own_index: AuthorityIndex,
1424        sync_last_known_own_block: bool,
1425    ) -> Self {
1426        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1427        let mut context = context.clone();
1428        context = context
1429            .with_committee(committee)
1430            .with_authority_index(own_index);
1431        context
1432            .protocol_config
1433            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1434
1435        let context = Arc::new(context);
1436        let store = Arc::new(MemStore::new());
1437        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1438
1439        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1440        let leader_schedule = Arc::new(
1441            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1442                .with_num_commits_per_schedule(10),
1443        );
1444        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1445        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1446        let (blocks_sender, _blocks_receiver) =
1447            mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1448        let transaction_certifier =
1449            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1450        transaction_certifier.recover(&NoopBlockVerifier, 0);
1451        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1452        // Need at least one subscriber to the block broadcast channel.
1453        let block_receiver = signal_receivers.block_broadcast_receiver();
1454
1455        let (commit_consumer, commit_output_receiver, blocks_output_receiver) =
1456            CommitConsumerArgs::new(0, 0);
1457        let commit_observer = CommitObserver::new(
1458            context.clone(),
1459            commit_consumer,
1460            dag_state.clone(),
1461            transaction_certifier.clone(),
1462            leader_schedule.clone(),
1463        )
1464        .await;
1465
1466        let block_signer = signers.remove(own_index.value()).1;
1467
1468        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1469        let core = Core::new(
1470            context,
1471            leader_schedule,
1472            transaction_consumer,
1473            transaction_certifier.clone(),
1474            block_manager,
1475            true,
1476            commit_observer,
1477            signals,
1478            block_signer,
1479            dag_state.clone(),
1480            sync_last_known_own_block,
1481            round_tracker,
1482        );
1483
1484        Self {
1485            core,
1486            transaction_certifier,
1487            signal_receivers,
1488            block_receiver,
1489            _commit_output_receiver: commit_output_receiver,
1490            _blocks_output_receiver: blocks_output_receiver,
1491            dag_state,
1492            store,
1493        }
1494    }
1495
1496    pub(crate) fn add_blocks(
1497        &mut self,
1498        blocks: Vec<VerifiedBlock>,
1499    ) -> ConsensusResult<BTreeSet<BlockRef>> {
1500        self.transaction_certifier
1501            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1502        self.core.add_blocks(blocks)
1503    }
1504}
1505
1506#[cfg(test)]
1507mod test {
1508    use std::{collections::BTreeSet, time::Duration};
1509
1510    use consensus_config::{AuthorityIndex, Parameters};
1511    use consensus_types::block::TransactionIndex;
1512    use futures::{StreamExt, stream::FuturesUnordered};
1513    use mysten_metrics::monitored_mpsc;
1514    use sui_protocol_config::ProtocolConfig;
1515    use tokio::time::sleep;
1516
1517    use super::*;
1518    use crate::{
1519        CommitConsumerArgs, CommitIndex,
1520        block::{TestBlock, genesis_blocks},
1521        block_verifier::NoopBlockVerifier,
1522        commit::CommitAPI,
1523        leader_scoring::ReputationScores,
1524        storage::{Store, WriteBatch, mem_store::MemStore},
1525        test_dag_builder::DagBuilder,
1526        test_dag_parser::parse_dag,
1527        transaction::{BlockStatus, TransactionClient},
1528    };
1529
1530    /// Recover Core and continue proposing from the last round which forms a quorum.
1531    #[tokio::test]
1532    async fn test_core_recover_from_store_for_full_round() {
1533        telemetry_subscribers::init_for_testing();
1534        let (context, mut key_pairs) = Context::new_for_test(4);
1535        let context = Arc::new(context);
1536        let store = Arc::new(MemStore::new());
1537        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1538        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1539        let mut block_status_subscriptions = FuturesUnordered::new();
1540
1541        // Create test blocks for all the authorities for 4 rounds and populate them in store
1542        let mut last_round_blocks = genesis_blocks(&context);
1543        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1544        for round in 1..=4 {
1545            let mut this_round_blocks = Vec::new();
1546            for (index, _authority) in context.committee.authorities() {
1547                let block = VerifiedBlock::new_for_test(
1548                    TestBlock::new(round, index.value() as u32)
1549                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1550                        .build(),
1551                );
1552
1553                // If it's round 1, that one will be committed later on, and it's our "own" block, then subscribe to listen for the block status.
1554                if round == 1 && index == context.own_index {
1555                    let subscription =
1556                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1557                    block_status_subscriptions.push(subscription);
1558                }
1559
1560                this_round_blocks.push(block);
1561            }
1562            all_blocks.extend(this_round_blocks.clone());
1563            last_round_blocks = this_round_blocks;
1564        }
1565        // write them in store
1566        store
1567            .write(WriteBatch::default().blocks(all_blocks))
1568            .expect("Storage error");
1569
1570        // create dag state after all blocks have been written to store
1571        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1572        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1573        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1574            context.clone(),
1575            dag_state.clone(),
1576        ));
1577        let (blocks_sender, _blocks_receiver) =
1578            monitored_mpsc::unbounded_channel("consensus_block_output");
1579        let transaction_certifier =
1580            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1581
1582        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1583            CommitConsumerArgs::new(0, 0);
1584        let commit_observer = CommitObserver::new(
1585            context.clone(),
1586            commit_consumer,
1587            dag_state.clone(),
1588            transaction_certifier.clone(),
1589            leader_schedule.clone(),
1590        )
1591        .await;
1592
1593        // Check no commits have been persisted to dag_state or store.
1594        let last_commit = store.read_last_commit().unwrap();
1595        assert!(last_commit.is_none());
1596        assert_eq!(dag_state.read().last_commit_index(), 0);
1597
1598        // Now spin up core
1599        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1600        let (blocks_sender, _blocks_receiver) =
1601            monitored_mpsc::unbounded_channel("consensus_block_output");
1602        let transaction_certifier =
1603            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1604        transaction_certifier.recover(&NoopBlockVerifier, 0);
1605        // Need at least one subscriber to the block broadcast channel.
1606        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1607        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1608        let _core = Core::new(
1609            context.clone(),
1610            leader_schedule,
1611            transaction_consumer,
1612            transaction_certifier.clone(),
1613            block_manager,
1614            true,
1615            commit_observer,
1616            signals,
1617            key_pairs.remove(context.own_index.value()).1,
1618            dag_state.clone(),
1619            false,
1620            round_tracker,
1621        );
1622
1623        // New round should be 5
1624        let mut new_round = signal_receivers.new_round_receiver();
1625        assert_eq!(*new_round.borrow_and_update(), 5);
1626
1627        // Block for round 5 should have been proposed.
1628        let proposed_block = block_receiver
1629            .recv()
1630            .await
1631            .expect("A block should have been created");
1632        assert_eq!(proposed_block.block.round(), 5);
1633        let ancestors = proposed_block.block.ancestors();
1634
1635        // Only ancestors of round 4 should be included.
1636        assert_eq!(ancestors.len(), 4);
1637        for ancestor in ancestors {
1638            assert_eq!(ancestor.round, 4);
1639        }
1640
1641        // Flush the DAG state to storage.
1642        dag_state.write().flush();
1643
1644        // There were no commits prior to the core starting up but there was completed
1645        // rounds up to and including round 4. So we should commit leaders in round 1 & 2
1646        // as soon as the new block for round 5 is proposed.
1647        let last_commit = store
1648            .read_last_commit()
1649            .unwrap()
1650            .expect("last commit should be set");
1651        assert_eq!(last_commit.index(), 2);
1652        assert_eq!(dag_state.read().last_commit_index(), 2);
1653        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1654        assert_eq!(all_stored_commits.len(), 2);
1655
1656        // And ensure that our "own" block 1 sent to TransactionConsumer as notification alongside with gc_round
1657        while let Some(result) = block_status_subscriptions.next().await {
1658            let status = result.unwrap();
1659            assert!(matches!(status, BlockStatus::Sequenced(_)));
1660        }
1661    }
1662
1663    /// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't
1664    /// proposed for that round yet.
1665    #[tokio::test]
1666    async fn test_core_recover_from_store_for_partial_round() {
1667        telemetry_subscribers::init_for_testing();
1668
1669        let (context, mut key_pairs) = Context::new_for_test(4);
1670        let context = Arc::new(context);
1671        let store = Arc::new(MemStore::new());
1672        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1673        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1674
1675        // Create test blocks for all authorities except our's (index = 0).
1676        let mut last_round_blocks = genesis_blocks(&context);
1677        let mut all_blocks = last_round_blocks.clone();
1678        for round in 1..=4 {
1679            let mut this_round_blocks = Vec::new();
1680
1681            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of position 1 from creating blocks.
1682            let authorities_to_skip = if round == 4 {
1683                context.committee.validity_threshold() as usize
1684            } else {
1685                // otherwise always skip creating a block for our authority
1686                1
1687            };
1688
1689            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1690                let block = TestBlock::new(round, index.value() as u32)
1691                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1692                    .build();
1693                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1694            }
1695            all_blocks.extend(this_round_blocks.clone());
1696            last_round_blocks = this_round_blocks;
1697        }
1698
1699        // write them in store
1700        store
1701            .write(WriteBatch::default().blocks(all_blocks))
1702            .expect("Storage error");
1703
1704        // create dag state after all blocks have been written to store
1705        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1706        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1707        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1708            context.clone(),
1709            dag_state.clone(),
1710        ));
1711        let (blocks_sender, _blocks_receiver) =
1712            monitored_mpsc::unbounded_channel("consensus_block_output");
1713        let transaction_certifier =
1714            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1715
1716        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1717            CommitConsumerArgs::new(0, 0);
1718        let commit_observer = CommitObserver::new(
1719            context.clone(),
1720            commit_consumer,
1721            dag_state.clone(),
1722            transaction_certifier.clone(),
1723            leader_schedule.clone(),
1724        )
1725        .await;
1726
1727        // Check no commits have been persisted to dag_state & store
1728        let last_commit = store.read_last_commit().unwrap();
1729        assert!(last_commit.is_none());
1730        assert_eq!(dag_state.read().last_commit_index(), 0);
1731
1732        // Now spin up core
1733        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1734        let (blocks_sender, _blocks_receiver) =
1735            monitored_mpsc::unbounded_channel("consensus_block_output");
1736        let transaction_certifier =
1737            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1738        transaction_certifier.recover(&NoopBlockVerifier, 0);
1739        // Need at least one subscriber to the block broadcast channel.
1740        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1741        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1742        let mut core = Core::new(
1743            context.clone(),
1744            leader_schedule,
1745            transaction_consumer,
1746            transaction_certifier,
1747            block_manager,
1748            true,
1749            commit_observer,
1750            signals,
1751            key_pairs.remove(context.own_index.value()).1,
1752            dag_state.clone(),
1753            false,
1754            round_tracker,
1755        );
1756
1757        // Clock round should have advanced to 5 during recovery because
1758        // a quorum has formed in round 4.
1759        let mut new_round = signal_receivers.new_round_receiver();
1760        assert_eq!(*new_round.borrow_and_update(), 5);
1761
1762        // During recovery, round 4 block should have been proposed.
1763        let proposed_block = block_receiver
1764            .recv()
1765            .await
1766            .expect("A block should have been created");
1767        assert_eq!(proposed_block.block.round(), 4);
1768        let ancestors = proposed_block.block.ancestors();
1769
1770        assert_eq!(ancestors.len(), 4);
1771        for ancestor in ancestors {
1772            if ancestor.author == context.own_index {
1773                assert_eq!(ancestor.round, 0);
1774            } else {
1775                assert_eq!(ancestor.round, 3);
1776            }
1777        }
1778
1779        // Run commit rule.
1780        core.try_commit(vec![]).ok();
1781
1782        // Flush the DAG state to storage.
1783        core.dag_state.write().flush();
1784
1785        // There were no commits prior to the core starting up but there was completed
1786        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1787        // as the new block for round 4 is proposed.
1788        let last_commit = store
1789            .read_last_commit()
1790            .unwrap()
1791            .expect("last commit should be set");
1792        assert_eq!(last_commit.index(), 2);
1793        assert_eq!(dag_state.read().last_commit_index(), 2);
1794        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1795        assert_eq!(all_stored_commits.len(), 2);
1796    }
1797
1798    #[tokio::test]
1799    async fn test_core_propose_after_genesis() {
1800        telemetry_subscribers::init_for_testing();
1801        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1802            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1803            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1804            config
1805        });
1806
1807        let (context, mut key_pairs) = Context::new_for_test(4);
1808        let context = Arc::new(context);
1809        let store = Arc::new(MemStore::new());
1810        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1811
1812        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1813        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1814        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1815        let (blocks_sender, _blocks_receiver) =
1816            monitored_mpsc::unbounded_channel("consensus_block_output");
1817        let transaction_certifier =
1818            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1819        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1820        // Need at least one subscriber to the block broadcast channel.
1821        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1822        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1823            context.clone(),
1824            dag_state.clone(),
1825        ));
1826
1827        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1828            CommitConsumerArgs::new(0, 0);
1829        let commit_observer = CommitObserver::new(
1830            context.clone(),
1831            commit_consumer,
1832            dag_state.clone(),
1833            transaction_certifier.clone(),
1834            leader_schedule.clone(),
1835        )
1836        .await;
1837
1838        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1839        let mut core = Core::new(
1840            context.clone(),
1841            leader_schedule,
1842            transaction_consumer,
1843            transaction_certifier,
1844            block_manager,
1845            true,
1846            commit_observer,
1847            signals,
1848            key_pairs.remove(context.own_index.value()).1,
1849            dag_state.clone(),
1850            false,
1851            round_tracker,
1852        );
1853
1854        // Send some transactions
1855        let mut total = 0;
1856        let mut index = 0;
1857        loop {
1858            let transaction =
1859                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1860            total += transaction.len();
1861            index += 1;
1862            let _w = transaction_client
1863                .submit_no_wait(vec![transaction])
1864                .await
1865                .unwrap();
1866
1867            // Create total size of transactions up to 1KB
1868            if total >= 1_000 {
1869                break;
1870            }
1871        }
1872
1873        // a new block should have been created during recovery.
1874        let extended_block = block_receiver
1875            .recv()
1876            .await
1877            .expect("A new block should have been created");
1878
1879        // A new block created - assert the details
1880        assert_eq!(extended_block.block.round(), 1);
1881        assert_eq!(extended_block.block.author().value(), 0);
1882        assert_eq!(extended_block.block.ancestors().len(), 4);
1883
1884        let mut total = 0;
1885        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1886            total += transaction.data().len() as u64;
1887            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1888            assert_eq!(format!("Transaction {i}"), transaction);
1889        }
1890        assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1891
1892        // genesis blocks should be referenced
1893        let all_genesis = genesis_blocks(&context);
1894
1895        for ancestor in extended_block.block.ancestors() {
1896            all_genesis
1897                .iter()
1898                .find(|block| block.reference() == *ancestor)
1899                .expect("Block should be found amongst genesis blocks");
1900        }
1901
1902        // Try to propose again - with or without ignore leaders check, it will not return any block
1903        assert!(core.try_propose(false).unwrap().is_none());
1904        assert!(core.try_propose(true).unwrap().is_none());
1905
1906        // Flush the DAG state to storage.
1907        dag_state.write().flush();
1908
1909        // Check no commits have been persisted to dag_state & store
1910        let last_commit = store.read_last_commit().unwrap();
1911        assert!(last_commit.is_none());
1912        assert_eq!(dag_state.read().last_commit_index(), 0);
1913    }
1914
1915    #[tokio::test]
1916    async fn test_core_propose_once_receiving_a_quorum() {
1917        telemetry_subscribers::init_for_testing();
1918        let (context, _key_pairs) = Context::new_for_test(4);
1919        let mut core_fixture = CoreTextFixture::new(
1920            context.clone(),
1921            vec![1, 1, 1, 1],
1922            AuthorityIndex::new_for_test(0),
1923            false,
1924        )
1925        .await;
1926        let transaction_certifier = &core_fixture.transaction_certifier;
1927        let store = &core_fixture.store;
1928        let dag_state = &core_fixture.dag_state;
1929        let core = &mut core_fixture.core;
1930
1931        let mut expected_ancestors = BTreeSet::new();
1932
1933        // Adding one block now will trigger the creation of new block for round 1
1934        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1935        expected_ancestors.insert(block_1.reference());
1936        // Wait for min round delay to allow blocks to be proposed.
1937        sleep(context.parameters.min_round_delay).await;
1938        // add blocks to trigger proposal.
1939        transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1940        _ = core.add_blocks(vec![block_1]);
1941
1942        assert_eq!(core.last_proposed_round(), 1);
1943        expected_ancestors.insert(core.last_proposed_block().reference());
1944        // attempt to create a block - none will be produced.
1945        assert!(core.try_propose(false).unwrap().is_none());
1946
1947        // Adding another block now forms a quorum for round 1, so block at round 2 will proposed
1948        let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1949        expected_ancestors.insert(block_2.reference());
1950        // Wait for min round delay to allow blocks to be proposed.
1951        sleep(context.parameters.min_round_delay).await;
1952        // add blocks to trigger proposal.
1953        transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1954        _ = core.add_blocks(vec![block_2.clone()]);
1955
1956        assert_eq!(core.last_proposed_round(), 2);
1957
1958        let proposed_block = core.last_proposed_block();
1959        assert_eq!(proposed_block.round(), 2);
1960        assert_eq!(proposed_block.author(), context.own_index);
1961        assert_eq!(proposed_block.ancestors().len(), 3);
1962        let ancestors = proposed_block.ancestors();
1963        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1964        assert_eq!(ancestors, expected_ancestors);
1965
1966        let transaction_votes = proposed_block.transaction_votes();
1967        assert_eq!(transaction_votes.len(), 1);
1968        let transaction_vote = transaction_votes.first().unwrap();
1969        assert_eq!(transaction_vote.block_ref, block_2.reference());
1970        assert_eq!(transaction_vote.rejects, vec![1, 4]);
1971
1972        // Flush the DAG state to storage.
1973        dag_state.write().flush();
1974
1975        // Check no commits have been persisted to dag_state & store
1976        let last_commit = store.read_last_commit().unwrap();
1977        assert!(last_commit.is_none());
1978        assert_eq!(dag_state.read().last_commit_index(), 0);
1979    }
1980
1981    #[tokio::test]
1982    async fn test_commit_and_notify_for_block_status() {
1983        telemetry_subscribers::init_for_testing();
1984        let (mut context, mut key_pairs) = Context::new_for_test(4);
1985        const GC_DEPTH: u32 = 2;
1986
1987        context
1988            .protocol_config
1989            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1990
1991        let context = Arc::new(context);
1992
1993        let store = Arc::new(MemStore::new());
1994        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1995        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1996        let mut block_status_subscriptions = FuturesUnordered::new();
1997
1998        let dag_str = "DAG {
1999            Round 0 : { 4 },
2000            Round 1 : { * },
2001            Round 2 : { * },
2002            Round 3 : {
2003                A -> [*],
2004                B -> [-A2],
2005                C -> [-A2],
2006                D -> [-A2],
2007            },
2008            Round 4 : { 
2009                B -> [-A3],
2010                C -> [-A3],
2011                D -> [-A3],
2012            },
2013            Round 5 : { 
2014                A -> [A3, B4, C4, D4]
2015                B -> [*],
2016                C -> [*],
2017                D -> [*],
2018            },
2019            Round 6 : { * },
2020            Round 7 : { * },
2021            Round 8 : { * },
2022        }";
2023
2024        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2025        dag_builder.print();
2026
2027        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be able to commit up to round 5.
2028        for block in dag_builder.blocks(1..=5) {
2029            if block.author() == context.own_index {
2030                let subscription =
2031                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2032                block_status_subscriptions.push(subscription);
2033            }
2034        }
2035
2036        // write them in store
2037        store
2038            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2039            .expect("Storage error");
2040
2041        // create dag state after all blocks have been written to store
2042        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2043        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2044        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2045            context.clone(),
2046            dag_state.clone(),
2047        ));
2048        let (blocks_sender, _blocks_receiver) =
2049            monitored_mpsc::unbounded_channel("consensus_block_output");
2050        let transaction_certifier =
2051            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2052
2053        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2054            CommitConsumerArgs::new(0, 0);
2055        let commit_observer = CommitObserver::new(
2056            context.clone(),
2057            commit_consumer,
2058            dag_state.clone(),
2059            transaction_certifier.clone(),
2060            leader_schedule.clone(),
2061        )
2062        .await;
2063
2064        // Flush the DAG state to storage.
2065        dag_state.write().flush();
2066
2067        // Check no commits have been persisted to dag_state or store.
2068        let last_commit = store.read_last_commit().unwrap();
2069        assert!(last_commit.is_none());
2070        assert_eq!(dag_state.read().last_commit_index(), 0);
2071
2072        // Now spin up core
2073        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2074        let (blocks_sender, _blocks_receiver) =
2075            monitored_mpsc::unbounded_channel("consensus_block_output");
2076        let transaction_certifier =
2077            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2078        transaction_certifier.recover(&NoopBlockVerifier, 0);
2079        // Need at least one subscriber to the block broadcast channel.
2080        let _block_receiver = signal_receivers.block_broadcast_receiver();
2081        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2082        let _core = Core::new(
2083            context.clone(),
2084            leader_schedule,
2085            transaction_consumer,
2086            transaction_certifier,
2087            block_manager,
2088            true,
2089            commit_observer,
2090            signals,
2091            key_pairs.remove(context.own_index.value()).1,
2092            dag_state.clone(),
2093            false,
2094            round_tracker,
2095        );
2096
2097        // Flush the DAG state to storage.
2098        dag_state.write().flush();
2099
2100        let last_commit = store
2101            .read_last_commit()
2102            .unwrap()
2103            .expect("last commit should be set");
2104
2105        assert_eq!(last_commit.index(), 5);
2106
2107        while let Some(result) = block_status_subscriptions.next().await {
2108            let status = result.unwrap();
2109
2110            match status {
2111                BlockStatus::Sequenced(block_ref) => {
2112                    assert!(block_ref.round == 1 || block_ref.round == 5);
2113                }
2114                BlockStatus::GarbageCollected(block_ref) => {
2115                    assert!(block_ref.round == 2 || block_ref.round == 3);
2116                }
2117            }
2118        }
2119    }
2120
2121    // Tests that the threshold clock advances when blocks get unsuspended due to GC'ed blocks and newly created blocks are always higher
2122    // than the last advanced gc round.
2123    #[tokio::test]
2124    async fn test_multiple_commits_advance_threshold_clock() {
2125        telemetry_subscribers::init_for_testing();
2126        let (mut context, mut key_pairs) = Context::new_for_test(4);
2127        const GC_DEPTH: u32 = 2;
2128
2129        context
2130            .protocol_config
2131            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2132
2133        let context = Arc::new(context);
2134
2135        let store = Arc::new(MemStore::new());
2136        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2137        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2138
2139        // On round 1 we do produce the block for authority D but we do not link it until round 6. This is making round 6 unable to get processed
2140        // until leader of round 3 is committed where round 1 gets garbage collected.
2141        // Then we add more rounds so we can trigger a commit for leader of round 9 which will move the gc round to 7.
2142        let dag_str = "DAG {
2143            Round 0 : { 4 },
2144            Round 1 : { * },
2145            Round 2 : { 
2146                B -> [-D1],
2147                C -> [-D1],
2148                D -> [-D1],
2149            },
2150            Round 3 : {
2151                B -> [*],
2152                C -> [*]
2153                D -> [*],
2154            },
2155            Round 4 : { 
2156                A -> [*],
2157                B -> [*],
2158                C -> [*]
2159                D -> [*],
2160            },
2161            Round 5 : { 
2162                A -> [*],
2163                B -> [*],
2164                C -> [*],
2165                D -> [*],
2166            },
2167            Round 6 : { 
2168                B -> [A5, B5, C5, D1],
2169                C -> [A5, B5, C5, D1],
2170                D -> [A5, B5, C5, D1],
2171            },
2172            Round 7 : { 
2173                B -> [*],
2174                C -> [*],
2175                D -> [*],
2176            },
2177            Round 8 : { 
2178                B -> [*],
2179                C -> [*],
2180                D -> [*],
2181            },
2182            Round 9 : { 
2183                B -> [*],
2184                C -> [*],
2185                D -> [*],
2186            },
2187            Round 10 : { 
2188                B -> [*],
2189                C -> [*],
2190                D -> [*],
2191            },
2192            Round 11 : { 
2193                B -> [*],
2194                C -> [*],
2195                D -> [*],
2196            },
2197        }";
2198
2199        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2200        dag_builder.print();
2201
2202        // create dag state after all blocks have been written to store
2203        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2204        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2205        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2206            context.clone(),
2207            dag_state.clone(),
2208        ));
2209        let (blocks_sender, _blocks_receiver) =
2210            monitored_mpsc::unbounded_channel("consensus_block_output");
2211        let transaction_certifier =
2212            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2213
2214        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2215            CommitConsumerArgs::new(0, 0);
2216        let commit_observer = CommitObserver::new(
2217            context.clone(),
2218            commit_consumer,
2219            dag_state.clone(),
2220            transaction_certifier.clone(),
2221            leader_schedule.clone(),
2222        )
2223        .await;
2224
2225        // Flush the DAG state to storage.
2226        dag_state.write().flush();
2227
2228        // Check no commits have been persisted to dag_state or store.
2229        let last_commit = store.read_last_commit().unwrap();
2230        assert!(last_commit.is_none());
2231        assert_eq!(dag_state.read().last_commit_index(), 0);
2232
2233        // Now spin up core
2234        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2235        let (blocks_sender, _blocks_receiver) =
2236            monitored_mpsc::unbounded_channel("consensus_block_output");
2237        let transaction_certifier =
2238            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2239        // Need at least one subscriber to the block broadcast channel.
2240        let _block_receiver = signal_receivers.block_broadcast_receiver();
2241        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2242        let mut core = Core::new(
2243            context.clone(),
2244            leader_schedule,
2245            transaction_consumer,
2246            transaction_certifier.clone(),
2247            block_manager,
2248            true,
2249            commit_observer,
2250            signals,
2251            key_pairs.remove(context.own_index.value()).1,
2252            dag_state.clone(),
2253            true,
2254            round_tracker,
2255        );
2256        // We set the last known round to 4 so we avoid creating new blocks until then - otherwise it will crash as the already created DAG contains blocks for this
2257        // authority.
2258        core.set_last_known_proposed_round(4);
2259
2260        // We add all the blocks except D1. The only ones we can immediately accept are the ones up to round 5 as they don't have a dependency on D1. Rest of blocks do have causal dependency
2261        // to D1 so they can't be processed until the leader of round 3 can get committed and gc round moves to 1. That will make all the blocks that depend to D1 get accepted.
2262        // However, our threshold clock is now at round 6 as the last quorum that we managed to process was the round 5.
2263        // As commits happen blocks of later rounds get accepted and more leaders get committed. Eventually the leader of round 9 gets committed and gc is moved to 9 - 2 = 7.
2264        // If our node attempts to produce a block for the threshold clock 6, that will make the acceptance checks fail as now gc has moved far past this round.
2265        let mut all_blocks = dag_builder.blocks(1..=11);
2266        all_blocks.sort_by_key(|b| b.round());
2267        let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2268            all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2269        transaction_certifier.add_voted_blocks(voted_blocks);
2270        let blocks: Vec<VerifiedBlock> = all_blocks
2271            .into_iter()
2272            .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2273            .collect();
2274        core.add_blocks(blocks).expect("Should not fail");
2275
2276        assert_eq!(core.last_proposed_round(), 12);
2277    }
2278
2279    #[tokio::test]
2280    async fn test_core_set_min_propose_round() {
2281        telemetry_subscribers::init_for_testing();
2282        let (context, mut key_pairs) = Context::new_for_test(4);
2283        let context = Arc::new(context.with_parameters(Parameters {
2284            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2285            ..Default::default()
2286        }));
2287
2288        let store = Arc::new(MemStore::new());
2289        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2290
2291        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2292        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2293            context.clone(),
2294            dag_state.clone(),
2295        ));
2296
2297        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2298        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2299        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2300        let (blocks_sender, _blocks_receiver) =
2301            monitored_mpsc::unbounded_channel("consensus_block_output");
2302        let transaction_certifier =
2303            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2304        // Need at least one subscriber to the block broadcast channel.
2305        let _block_receiver = signal_receivers.block_broadcast_receiver();
2306
2307        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2308            CommitConsumerArgs::new(0, 0);
2309        let commit_observer = CommitObserver::new(
2310            context.clone(),
2311            commit_consumer,
2312            dag_state.clone(),
2313            transaction_certifier.clone(),
2314            leader_schedule.clone(),
2315        )
2316        .await;
2317
2318        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2319        let mut core = Core::new(
2320            context.clone(),
2321            leader_schedule,
2322            transaction_consumer,
2323            transaction_certifier.clone(),
2324            block_manager,
2325            true,
2326            commit_observer,
2327            signals,
2328            key_pairs.remove(context.own_index.value()).1,
2329            dag_state.clone(),
2330            true,
2331            round_tracker,
2332        );
2333
2334        // No new block should have been produced
2335        assert_eq!(
2336            core.last_proposed_round(),
2337            GENESIS_ROUND,
2338            "No block should have been created other than genesis"
2339        );
2340
2341        // Trying to explicitly propose a block will not produce anything
2342        assert!(core.try_propose(true).unwrap().is_none());
2343
2344        // Create blocks for the whole network - even "our" node in order to replicate an "amnesia" recovery.
2345        let mut builder = DagBuilder::new(context.clone());
2346        builder.layers(1..=10).build();
2347
2348        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2349
2350        // Process all the blocks
2351        transaction_certifier
2352            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2353        assert!(core.add_blocks(blocks).unwrap().is_empty());
2354
2355        core.round_tracker.write().update_from_probe(
2356            vec![
2357                vec![10, 10, 10, 10],
2358                vec![10, 10, 10, 10],
2359                vec![10, 10, 10, 10],
2360                vec![10, 10, 10, 10],
2361            ],
2362            vec![
2363                vec![10, 10, 10, 10],
2364                vec![10, 10, 10, 10],
2365                vec![10, 10, 10, 10],
2366                vec![10, 10, 10, 10],
2367            ],
2368        );
2369
2370        // Try to propose - no block should be produced.
2371        assert!(core.try_propose(true).unwrap().is_none());
2372
2373        // Now set the last known proposed round which is the highest round for which the network informed
2374        // us that we do have proposed a block about.
2375        core.set_last_known_proposed_round(10);
2376
2377        let block = core.try_propose(true).expect("No error").unwrap();
2378        assert_eq!(block.round(), 11);
2379        assert_eq!(block.ancestors().len(), 4);
2380
2381        let our_ancestor_included = block.ancestors()[0];
2382        assert_eq!(our_ancestor_included.author, context.own_index);
2383        assert_eq!(our_ancestor_included.round, 10);
2384    }
2385
2386    #[tokio::test(flavor = "current_thread", start_paused = true)]
2387    async fn test_core_try_new_block_leader_timeout() {
2388        telemetry_subscribers::init_for_testing();
2389
2390        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2391        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2392        // Core's clock will have initialised potentially with different values but it never advances.
2393        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2394        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2395        // tokio clock.
2396        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2397            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2398            let now = context.clock.timestamp_utc_ms();
2399            let max_timestamp = blocks
2400                .iter()
2401                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2402                .map(|block| block.timestamp_ms())
2403                .unwrap_or(0);
2404
2405            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2406            sleep(wait_time).await;
2407        }
2408
2409        let (context, _) = Context::new_for_test(4);
2410        // Create the cores for all authorities
2411        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2412
2413        // Create blocks for rounds 1..=3 from all Cores except last Core of authority 3, so we miss the block from it. As
2414        // it will be the leader of round 3 then no-one will be able to progress to round 4 unless we explicitly trigger
2415        // the block creation.
2416        // create the cores and their signals for all the authorities
2417        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2418
2419        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
2420        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2421        for round in 1..=3 {
2422            let mut this_round_blocks = Vec::new();
2423
2424            for core_fixture in cores.iter_mut() {
2425                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2426
2427                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2428
2429                // Only when round > 1 and using non-genesis parents.
2430                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2431                    assert_eq!(round - 1, r);
2432                    if core_fixture.core.last_proposed_round() == r {
2433                        // Force propose new block regardless of min round delay.
2434                        core_fixture
2435                            .core
2436                            .try_propose(true)
2437                            .unwrap()
2438                            .unwrap_or_else(|| {
2439                                panic!("Block should have been proposed for round {}", round)
2440                            });
2441                    }
2442                }
2443
2444                assert_eq!(core_fixture.core.last_proposed_round(), round);
2445
2446                this_round_blocks.push(core_fixture.core.last_proposed_block());
2447            }
2448
2449            last_round_blocks = this_round_blocks;
2450        }
2451
2452        // Try to create the blocks for round 4 by calling the try_propose() method. No block should be created as the
2453        // leader - authority 3 - hasn't proposed any block.
2454        for core_fixture in cores.iter_mut() {
2455            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2456
2457            core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2458            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2459        }
2460
2461        // Now try to create the blocks for round 4 via the leader timeout method which should
2462        // ignore any leader checks or min round delay.
2463        for core_fixture in cores.iter_mut() {
2464            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2465            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2466
2467            // Flush the DAG state to storage.
2468            core_fixture.dag_state.write().flush();
2469
2470            // Check commits have been persisted to store
2471            let last_commit = core_fixture
2472                .store
2473                .read_last_commit()
2474                .unwrap()
2475                .expect("last commit should be set");
2476            // There are 1 leader rounds with rounds completed up to and including
2477            // round 4
2478            assert_eq!(last_commit.index(), 1);
2479            let all_stored_commits = core_fixture
2480                .store
2481                .scan_commits((0..=CommitIndex::MAX).into())
2482                .unwrap();
2483            assert_eq!(all_stored_commits.len(), 1);
2484        }
2485    }
2486
2487    #[tokio::test(flavor = "current_thread", start_paused = true)]
2488    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2489        telemetry_subscribers::init_for_testing();
2490
2491        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2492        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2493        // Core's clock will have initialised potentially with different values but it never advances.
2494        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2495        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2496        // tokio clock.
2497        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2498            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2499            let now = context.clock.timestamp_utc_ms();
2500            let max_timestamp = blocks
2501                .iter()
2502                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2503                .map(|block| block.timestamp_ms())
2504                .unwrap_or(0);
2505
2506            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2507            sleep(wait_time).await;
2508        }
2509
2510        let (mut context, _) = Context::new_for_test(5);
2511        context
2512            .protocol_config
2513            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2514
2515        // Create the cores for all authorities
2516        let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2517        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2518
2519        // Create blocks for rounds 1..=30 from all Cores except last Core of authority 4.
2520        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2521        for round in 1..=30 {
2522            let mut this_round_blocks = Vec::new();
2523
2524            for core_fixture in cores.iter_mut() {
2525                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2526
2527                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2528
2529                core_fixture.core.round_tracker.write().update_from_probe(
2530                    vec![
2531                        vec![round, round, round, round, 0],
2532                        vec![round, round, round, round, 0],
2533                        vec![round, round, round, round, 0],
2534                        vec![round, round, round, round, 0],
2535                        vec![0, 0, 0, 0, 0],
2536                    ],
2537                    vec![
2538                        vec![round, round, round, round, 0],
2539                        vec![round, round, round, round, 0],
2540                        vec![round, round, round, round, 0],
2541                        vec![round, round, round, round, 0],
2542                        vec![0, 0, 0, 0, 0],
2543                    ],
2544                );
2545
2546                // Only when round > 1 and using non-genesis parents.
2547                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2548                    assert_eq!(round - 1, r);
2549                    if core_fixture.core.last_proposed_round() == r {
2550                        // Force propose new block regardless of min round delay.
2551                        core_fixture
2552                            .core
2553                            .try_propose(true)
2554                            .unwrap()
2555                            .unwrap_or_else(|| {
2556                                panic!("Block should have been proposed for round {}", round)
2557                            });
2558                    }
2559                }
2560
2561                assert_eq!(core_fixture.core.last_proposed_round(), round);
2562
2563                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2564            }
2565
2566            last_round_blocks = this_round_blocks;
2567        }
2568
2569        // Now produce blocks for all Cores
2570        for round in 31..=40 {
2571            let mut this_round_blocks = Vec::new();
2572
2573            for core_fixture in all_cores.iter_mut() {
2574                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2575
2576                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2577
2578                // Don't update probed rounds for authority 3 so it will remain
2579                // excluded
2580                core_fixture.core.round_tracker.write().update_from_probe(
2581                    vec![
2582                        vec![round, round, round, round, 0],
2583                        vec![round, round, round, round, 0],
2584                        vec![round, round, round, round, 0],
2585                        vec![round, round, round, round, 0],
2586                        vec![0, 0, 0, 0, 0],
2587                    ],
2588                    vec![
2589                        vec![round, round, round, round, 0],
2590                        vec![round, round, round, round, 0],
2591                        vec![round, round, round, round, 0],
2592                        vec![round, round, round, round, 0],
2593                        vec![0, 0, 0, 0, 0],
2594                    ],
2595                );
2596
2597                // Only when round > 1 and using non-genesis parents.
2598                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2599                    assert_eq!(round - 1, r);
2600                    if core_fixture.core.last_proposed_round() == r {
2601                        // Force propose new block regardless of min round delay.
2602                        core_fixture
2603                            .core
2604                            .try_propose(true)
2605                            .unwrap()
2606                            .unwrap_or_else(|| {
2607                                panic!("Block should have been proposed for round {}", round)
2608                            });
2609                    }
2610                }
2611
2612                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2613
2614                for block in this_round_blocks.iter() {
2615                    if block.author() != AuthorityIndex::new_for_test(4) {
2616                        // Assert blocks created include only 4 ancestors per block as one
2617                        // should be excluded
2618                        assert_eq!(block.ancestors().len(), 4);
2619                    } else {
2620                        // Authority 3 is the low scoring authority so it will still include
2621                        // its own blocks.
2622                        assert_eq!(block.ancestors().len(), 5);
2623                    }
2624                }
2625            }
2626
2627            last_round_blocks = this_round_blocks;
2628        }
2629    }
2630
2631    #[tokio::test]
2632    async fn test_smart_ancestor_selection() {
2633        telemetry_subscribers::init_for_testing();
2634        let (mut context, mut key_pairs) = Context::new_for_test(7);
2635        context
2636            .protocol_config
2637            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2638        let context = Arc::new(context.with_parameters(Parameters {
2639            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2640            ..Default::default()
2641        }));
2642
2643        let store = Arc::new(MemStore::new());
2644        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2645
2646        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2647        let leader_schedule = Arc::new(
2648            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2649                .with_num_commits_per_schedule(10),
2650        );
2651
2652        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2653        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2654        let (blocks_sender, _blocks_receiver) =
2655            monitored_mpsc::unbounded_channel("consensus_block_output");
2656        let transaction_certifier =
2657            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2658        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2659        // Need at least one subscriber to the block broadcast channel.
2660        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2661
2662        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2663            CommitConsumerArgs::new(0, 0);
2664        let commit_observer = CommitObserver::new(
2665            context.clone(),
2666            commit_consumer,
2667            dag_state.clone(),
2668            transaction_certifier.clone(),
2669            leader_schedule.clone(),
2670        )
2671        .await;
2672
2673        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2674        let mut core = Core::new(
2675            context.clone(),
2676            leader_schedule,
2677            transaction_consumer,
2678            transaction_certifier.clone(),
2679            block_manager,
2680            true,
2681            commit_observer,
2682            signals,
2683            key_pairs.remove(context.own_index.value()).1,
2684            dag_state.clone(),
2685            true,
2686            round_tracker.clone(),
2687        );
2688
2689        // No new block should have been produced
2690        assert_eq!(
2691            core.last_proposed_round(),
2692            GENESIS_ROUND,
2693            "No block should have been created other than genesis"
2694        );
2695
2696        // Trying to explicitly propose a block will not produce anything
2697        assert!(core.try_propose(true).unwrap().is_none());
2698
2699        // Create blocks for the whole network but not for authority 1
2700        let mut builder = DagBuilder::new(context.clone());
2701        builder
2702            .layers(1..=12)
2703            .authorities(vec![AuthorityIndex::new_for_test(1)])
2704            .skip_block()
2705            .build();
2706        let blocks = builder.blocks(1..=12);
2707        // Process all the blocks
2708        transaction_certifier
2709            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2710        assert!(core.add_blocks(blocks).unwrap().is_empty());
2711        core.set_last_known_proposed_round(12);
2712
2713        round_tracker.write().update_from_probe(
2714            vec![
2715                vec![12, 12, 12, 12, 12, 12, 12],
2716                vec![0, 0, 0, 0, 0, 0, 0],
2717                vec![12, 12, 12, 12, 12, 12, 12],
2718                vec![12, 12, 12, 12, 12, 12, 12],
2719                vec![12, 12, 12, 12, 12, 12, 12],
2720                vec![12, 12, 12, 12, 12, 12, 12],
2721                vec![12, 12, 12, 12, 12, 12, 12],
2722            ],
2723            vec![
2724                vec![12, 12, 12, 12, 12, 12, 12],
2725                vec![0, 0, 0, 0, 0, 0, 0],
2726                vec![12, 12, 12, 12, 12, 12, 12],
2727                vec![12, 12, 12, 12, 12, 12, 12],
2728                vec![12, 12, 12, 12, 12, 12, 12],
2729                vec![12, 12, 12, 12, 12, 12, 12],
2730                vec![12, 12, 12, 12, 12, 12, 12],
2731            ],
2732        );
2733
2734        let block = core.try_propose(true).expect("No error").unwrap();
2735        assert_eq!(block.round(), 13);
2736        assert_eq!(block.ancestors().len(), 7);
2737
2738        // Build blocks for rest of the network other than own index
2739        builder
2740            .layers(13..=14)
2741            .authorities(vec![AuthorityIndex::new_for_test(0)])
2742            .skip_block()
2743            .build();
2744        let blocks = builder.blocks(13..=14);
2745        transaction_certifier
2746            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2747        assert!(core.add_blocks(blocks).unwrap().is_empty());
2748
2749        // We now have triggered a leader schedule change so we should have
2750        // one EXCLUDE authority (1) when we go to select ancestors for the next proposal
2751        let block = core.try_propose(true).expect("No error").unwrap();
2752        assert_eq!(block.round(), 15);
2753        assert_eq!(block.ancestors().len(), 6);
2754
2755        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2756        // which will trigger smart select and we will not propose a block
2757        let round_14_ancestors = builder.last_ancestors.clone();
2758        builder
2759            .layer(15)
2760            .authorities(vec![
2761                AuthorityIndex::new_for_test(0),
2762                AuthorityIndex::new_for_test(5),
2763                AuthorityIndex::new_for_test(6),
2764            ])
2765            .skip_block()
2766            .build();
2767        let blocks = builder.blocks(15..=15);
2768        let authority_1_excluded_block_reference = blocks
2769            .iter()
2770            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2771            .unwrap()
2772            .reference();
2773        // Wait for min round delay to allow blocks to be proposed.
2774        sleep(context.parameters.min_round_delay).await;
2775        // Smart select should be triggered and no block should be proposed.
2776        transaction_certifier
2777            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2778        assert!(core.add_blocks(blocks).unwrap().is_empty());
2779        assert_eq!(core.last_proposed_block().round(), 15);
2780
2781        builder
2782            .layer(15)
2783            .authorities(vec![
2784                AuthorityIndex::new_for_test(0),
2785                AuthorityIndex::new_for_test(1),
2786                AuthorityIndex::new_for_test(2),
2787                AuthorityIndex::new_for_test(3),
2788                AuthorityIndex::new_for_test(4),
2789            ])
2790            .skip_block()
2791            .override_last_ancestors(round_14_ancestors)
2792            .build();
2793        let blocks = builder.blocks(15..=15);
2794        let round_15_ancestors: Vec<BlockRef> = blocks
2795            .iter()
2796            .filter(|block| block.round() == 15)
2797            .map(|block| block.reference())
2798            .collect();
2799        let included_block_references = iter::once(&core.last_proposed_block())
2800            .chain(blocks.iter())
2801            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2802            .map(|block| block.reference())
2803            .collect::<Vec<_>>();
2804
2805        // Have enough ancestor blocks to propose now.
2806        transaction_certifier
2807            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2808        assert!(core.add_blocks(blocks).unwrap().is_empty());
2809        assert_eq!(core.last_proposed_block().round(), 16);
2810
2811        // Check that a new block has been proposed & signaled.
2812        let extended_block = loop {
2813            let extended_block =
2814                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2815                    .await
2816                    .unwrap()
2817                    .unwrap();
2818            if extended_block.block.round() == 16 {
2819                break extended_block;
2820            }
2821        };
2822        assert_eq!(extended_block.block.round(), 16);
2823        assert_eq!(extended_block.block.author(), core.context.own_index);
2824        assert_eq!(extended_block.block.ancestors().len(), 6);
2825        assert_eq!(extended_block.block.ancestors(), included_block_references);
2826        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2827        assert_eq!(
2828            extended_block.excluded_ancestors[0],
2829            authority_1_excluded_block_reference
2830        );
2831
2832        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2833        // which will trigger smart select and we will not propose a block.
2834        // This time we will force propose by hitting the leader timeout after which
2835        // should cause us to include this EXCLUDE ancestor.
2836        builder
2837            .layer(16)
2838            .authorities(vec![
2839                AuthorityIndex::new_for_test(0),
2840                AuthorityIndex::new_for_test(5),
2841                AuthorityIndex::new_for_test(6),
2842            ])
2843            .skip_block()
2844            .override_last_ancestors(round_15_ancestors)
2845            .build();
2846        let blocks = builder.blocks(16..=16);
2847        // Wait for leader timeout to force blocks to be proposed.
2848        sleep(context.parameters.min_round_delay).await;
2849        // Smart select should be triggered and no block should be proposed.
2850        transaction_certifier
2851            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2852        assert!(core.add_blocks(blocks).unwrap().is_empty());
2853        assert_eq!(core.last_proposed_block().round(), 16);
2854
2855        // Simulate a leader timeout and a force proposal where we will include
2856        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2857        let block = core.try_propose(true).expect("No error").unwrap();
2858        assert_eq!(block.round(), 17);
2859        assert_eq!(block.ancestors().len(), 5);
2860
2861        // Check that a new block has been proposed & signaled.
2862        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2863            .await
2864            .unwrap()
2865            .unwrap();
2866        assert_eq!(extended_block.block.round(), 17);
2867        assert_eq!(extended_block.block.author(), core.context.own_index);
2868        assert_eq!(extended_block.block.ancestors().len(), 5);
2869        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2870
2871        // Excluded authority is locked until round 20, simulate enough rounds to
2872        // unlock
2873        builder
2874            .layers(17..=22)
2875            .authorities(vec![AuthorityIndex::new_for_test(0)])
2876            .skip_block()
2877            .build();
2878        let blocks = builder.blocks(17..=22);
2879
2880        // Simulate updating received and accepted rounds from prober.
2881        // New quorum rounds for authority can then be computed which will unlock
2882        // the Excluded authority (1) and then we should be able to create a new
2883        // layer of blocks which will then all be included as ancestors for the
2884        // next proposal
2885        round_tracker.write().update_from_probe(
2886            vec![
2887                vec![22, 22, 22, 22, 22, 22, 22],
2888                vec![22, 22, 22, 22, 22, 22, 22],
2889                vec![22, 22, 22, 22, 22, 22, 22],
2890                vec![22, 22, 22, 22, 22, 22, 22],
2891                vec![22, 22, 22, 22, 22, 22, 22],
2892                vec![22, 22, 22, 22, 22, 22, 22],
2893                vec![22, 22, 22, 22, 22, 22, 22],
2894            ],
2895            vec![
2896                vec![22, 22, 22, 22, 22, 22, 22],
2897                vec![22, 22, 22, 22, 22, 22, 22],
2898                vec![22, 22, 22, 22, 22, 22, 22],
2899                vec![22, 22, 22, 22, 22, 22, 22],
2900                vec![22, 22, 22, 22, 22, 22, 22],
2901                vec![22, 22, 22, 22, 22, 22, 22],
2902                vec![22, 22, 22, 22, 22, 22, 22],
2903            ],
2904        );
2905
2906        let included_block_references = iter::once(&core.last_proposed_block())
2907            .chain(blocks.iter())
2908            .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2909            .map(|block| block.reference())
2910            .collect::<Vec<_>>();
2911
2912        // Have enough ancestor blocks to propose now.
2913        sleep(context.parameters.min_round_delay).await;
2914        transaction_certifier
2915            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2916        assert!(core.add_blocks(blocks).unwrap().is_empty());
2917        assert_eq!(core.last_proposed_block().round(), 23);
2918
2919        // Check that a new block has been proposed & signaled.
2920        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2921            .await
2922            .unwrap()
2923            .unwrap();
2924        assert_eq!(extended_block.block.round(), 23);
2925        assert_eq!(extended_block.block.author(), core.context.own_index);
2926        assert_eq!(extended_block.block.ancestors().len(), 7);
2927        assert_eq!(extended_block.block.ancestors(), included_block_references);
2928        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2929    }
2930
2931    #[tokio::test]
2932    async fn test_excluded_ancestor_limit() {
2933        telemetry_subscribers::init_for_testing();
2934        let (context, mut key_pairs) = Context::new_for_test(4);
2935        let context = Arc::new(context.with_parameters(Parameters {
2936            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2937            ..Default::default()
2938        }));
2939
2940        let store = Arc::new(MemStore::new());
2941        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2942
2943        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2944        let leader_schedule = Arc::new(
2945            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2946                .with_num_commits_per_schedule(10),
2947        );
2948
2949        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2950        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2951        let (blocks_sender, _blocks_receiver) =
2952            monitored_mpsc::unbounded_channel("consensus_block_output");
2953        let transaction_certifier =
2954            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
2955        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2956        // Need at least one subscriber to the block broadcast channel.
2957        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2958
2959        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2960            CommitConsumerArgs::new(0, 0);
2961        let commit_observer = CommitObserver::new(
2962            context.clone(),
2963            commit_consumer,
2964            dag_state.clone(),
2965            transaction_certifier.clone(),
2966            leader_schedule.clone(),
2967        )
2968        .await;
2969
2970        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2971        let mut core = Core::new(
2972            context.clone(),
2973            leader_schedule,
2974            transaction_consumer,
2975            transaction_certifier.clone(),
2976            block_manager,
2977            true,
2978            commit_observer,
2979            signals,
2980            key_pairs.remove(context.own_index.value()).1,
2981            dag_state.clone(),
2982            true,
2983            round_tracker,
2984        );
2985
2986        // No new block should have been produced
2987        assert_eq!(
2988            core.last_proposed_round(),
2989            GENESIS_ROUND,
2990            "No block should have been created other than genesis"
2991        );
2992
2993        // Create blocks for the whole network
2994        let mut builder = DagBuilder::new(context.clone());
2995        builder.layers(1..=3).build();
2996
2997        // This will equivocate 9 blocks for authority 1 which will be excluded on
2998        // the proposal but because of the limits set will be dropped and not included
2999        // as part of the ExtendedBlock structure sent to the rest of the network
3000        builder
3001            .layer(4)
3002            .authorities(vec![AuthorityIndex::new_for_test(1)])
3003            .equivocate(9)
3004            .build();
3005        let blocks = builder.blocks(1..=4);
3006
3007        // Process all the blocks
3008        transaction_certifier
3009            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3010        assert!(core.add_blocks(blocks).unwrap().is_empty());
3011        core.set_last_known_proposed_round(3);
3012
3013        let block = core.try_propose(true).expect("No error").unwrap();
3014        assert_eq!(block.round(), 5);
3015        assert_eq!(block.ancestors().len(), 4);
3016
3017        // Check that a new block has been proposed & signaled.
3018        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3019            .await
3020            .unwrap()
3021            .unwrap();
3022        assert_eq!(extended_block.block.round(), 5);
3023        assert_eq!(extended_block.block.author(), core.context.own_index);
3024        assert_eq!(extended_block.block.ancestors().len(), 4);
3025        assert_eq!(extended_block.excluded_ancestors.len(), 8);
3026    }
3027
3028    #[tokio::test]
3029    async fn test_core_set_subscriber_exists() {
3030        telemetry_subscribers::init_for_testing();
3031        let (context, mut key_pairs) = Context::new_for_test(4);
3032        let context = Arc::new(context);
3033        let store = Arc::new(MemStore::new());
3034        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3035
3036        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3037        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3038            context.clone(),
3039            dag_state.clone(),
3040        ));
3041
3042        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3043        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3044        let (blocks_sender, _blocks_receiver) =
3045            monitored_mpsc::unbounded_channel("consensus_block_output");
3046        let transaction_certifier =
3047            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3048        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3049        // Need at least one subscriber to the block broadcast channel.
3050        let _block_receiver = signal_receivers.block_broadcast_receiver();
3051
3052        let (commit_consumer, _commit_receiver, _transaction_receiver) =
3053            CommitConsumerArgs::new(0, 0);
3054        let commit_observer = CommitObserver::new(
3055            context.clone(),
3056            commit_consumer,
3057            dag_state.clone(),
3058            transaction_certifier.clone(),
3059            leader_schedule.clone(),
3060        )
3061        .await;
3062
3063        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3064        let mut core = Core::new(
3065            context.clone(),
3066            leader_schedule,
3067            transaction_consumer,
3068            transaction_certifier.clone(),
3069            block_manager,
3070            // Set to no subscriber exists initially.
3071            false,
3072            commit_observer,
3073            signals,
3074            key_pairs.remove(context.own_index.value()).1,
3075            dag_state.clone(),
3076            false,
3077            round_tracker,
3078        );
3079
3080        // There is no proposal during recovery because there is no subscriber.
3081        assert_eq!(
3082            core.last_proposed_round(),
3083            GENESIS_ROUND,
3084            "No block should have been created other than genesis"
3085        );
3086
3087        // There is no proposal even with forced proposing.
3088        assert!(core.try_propose(true).unwrap().is_none());
3089
3090        // Let Core know subscriber exists.
3091        core.set_subscriber_exists(true);
3092
3093        // Proposing now would succeed.
3094        assert!(core.try_propose(true).unwrap().is_some());
3095    }
3096
3097    #[tokio::test]
3098    async fn test_core_set_propagation_delay_per_authority() {
3099        // TODO: create helper to avoid the duplicated code here.
3100        telemetry_subscribers::init_for_testing();
3101        let (context, mut key_pairs) = Context::new_for_test(4);
3102        let context = Arc::new(context);
3103        let store = Arc::new(MemStore::new());
3104        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3105
3106        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3107        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3108            context.clone(),
3109            dag_state.clone(),
3110        ));
3111
3112        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3113        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3114        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3115        let (blocks_sender, _blocks_receiver) =
3116            monitored_mpsc::unbounded_channel("consensus_block_output");
3117        let transaction_certifier =
3118            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3119        // Need at least one subscriber to the block broadcast channel.
3120        let _block_receiver = signal_receivers.block_broadcast_receiver();
3121
3122        let (commit_consumer, _commit_receiver, _transaction_receiver) =
3123            CommitConsumerArgs::new(0, 0);
3124        let commit_observer = CommitObserver::new(
3125            context.clone(),
3126            commit_consumer,
3127            dag_state.clone(),
3128            transaction_certifier.clone(),
3129            leader_schedule.clone(),
3130        )
3131        .await;
3132
3133        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3134        let mut core = Core::new(
3135            context.clone(),
3136            leader_schedule,
3137            transaction_consumer,
3138            transaction_certifier.clone(),
3139            block_manager,
3140            // Set to no subscriber exists initially.
3141            false,
3142            commit_observer,
3143            signals,
3144            key_pairs.remove(context.own_index.value()).1,
3145            dag_state.clone(),
3146            false,
3147            round_tracker.clone(),
3148        );
3149
3150        // There is no proposal during recovery because there is no subscriber.
3151        assert_eq!(
3152            core.last_proposed_round(),
3153            GENESIS_ROUND,
3154            "No block should have been created other than genesis"
3155        );
3156
3157        // Use a large propagation delay to disable proposing.
3158        // This is done by accepting an own block at round 1000 to dag state and
3159        // then simulating updating round tracker received rounds from probe where
3160        // low quorum round for own index should get calculated to round 0.
3161        let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3162        transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3163        // Force accepting the block to dag state because its causal history is incomplete.
3164        dag_state.write().accept_block(test_block);
3165
3166        round_tracker.write().update_from_probe(
3167            vec![
3168                vec![0, 0, 0, 0],
3169                vec![0, 0, 0, 0],
3170                vec![0, 0, 0, 0],
3171                vec![0, 0, 0, 0],
3172            ],
3173            vec![
3174                vec![0, 0, 0, 0],
3175                vec![0, 0, 0, 0],
3176                vec![0, 0, 0, 0],
3177                vec![0, 0, 0, 0],
3178            ],
3179        );
3180
3181        // Make propagation delay the only reason for not proposing.
3182        core.set_subscriber_exists(true);
3183
3184        // There is no proposal even with forced proposing.
3185        assert!(core.try_propose(true).unwrap().is_none());
3186
3187        // Let Core know there is no propagation delay.
3188        // This is done by simulating updating round tracker recieved rounds from probe
3189        // where low quorum round for own index should get calculated to round 1000.
3190        round_tracker.write().update_from_probe(
3191            vec![
3192                vec![1000, 1000, 1000, 1000],
3193                vec![1000, 1000, 1000, 1000],
3194                vec![1000, 1000, 1000, 1000],
3195                vec![1000, 1000, 1000, 1000],
3196            ],
3197            vec![
3198                vec![1000, 1000, 1000, 1000],
3199                vec![1000, 1000, 1000, 1000],
3200                vec![1000, 1000, 1000, 1000],
3201                vec![1000, 1000, 1000, 1000],
3202            ],
3203        );
3204
3205        // Also add the necessary blocks from round 1000 so core will propose for
3206        // round 1001
3207        for author in 1..4 {
3208            let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3209            transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3210            // Force accepting the block to dag state because its causal history is incomplete.
3211            dag_state.write().accept_block(block);
3212        }
3213
3214        // Proposing now would succeed.
3215        assert!(core.try_propose(true).unwrap().is_some());
3216    }
3217
3218    #[tokio::test(flavor = "current_thread", start_paused = true)]
3219    async fn test_leader_schedule_change() {
3220        telemetry_subscribers::init_for_testing();
3221        let default_params = Parameters::default();
3222
3223        let (context, _) = Context::new_for_test(4);
3224        // create the cores and their signals for all the authorities
3225        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3226
3227        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3228        let mut last_round_blocks = Vec::new();
3229        for round in 1..=30 {
3230            let mut this_round_blocks = Vec::new();
3231
3232            // Wait for min round delay to allow blocks to be proposed.
3233            sleep(default_params.min_round_delay).await;
3234
3235            for core_fixture in &mut cores {
3236                // add the blocks from last round
3237                // this will trigger a block creation for the round and a signal should be emitted
3238                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3239
3240                core_fixture.core.round_tracker.write().update_from_probe(
3241                    vec![
3242                        vec![round, round, round, round],
3243                        vec![round, round, round, round],
3244                        vec![round, round, round, round],
3245                        vec![round, round, round, round],
3246                    ],
3247                    vec![
3248                        vec![round, round, round, round],
3249                        vec![round, round, round, round],
3250                        vec![round, round, round, round],
3251                        vec![round, round, round, round],
3252                    ],
3253                );
3254
3255                // A "new round" signal should be received given that all the blocks of previous round have been processed
3256                let new_round = receive(
3257                    Duration::from_secs(1),
3258                    core_fixture.signal_receivers.new_round_receiver(),
3259                )
3260                .await;
3261                assert_eq!(new_round, round);
3262
3263                // Check that a new block has been proposed.
3264                let extended_block = tokio::time::timeout(
3265                    Duration::from_secs(1),
3266                    core_fixture.block_receiver.recv(),
3267                )
3268                .await
3269                .unwrap()
3270                .unwrap();
3271                assert_eq!(extended_block.block.round(), round);
3272                assert_eq!(
3273                    extended_block.block.author(),
3274                    core_fixture.core.context.own_index
3275                );
3276
3277                // append the new block to this round blocks
3278                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3279
3280                let block = core_fixture.core.last_proposed_block();
3281
3282                // ensure that produced block is referring to the blocks of last_round
3283                assert_eq!(
3284                    block.ancestors().len(),
3285                    core_fixture.core.context.committee.size()
3286                );
3287                for ancestor in block.ancestors() {
3288                    if block.round() > 1 {
3289                        // don't bother with round 1 block which just contains the genesis blocks.
3290                        assert!(
3291                            last_round_blocks
3292                                .iter()
3293                                .any(|block| block.reference() == *ancestor),
3294                            "Reference from previous round should be added"
3295                        );
3296                    }
3297                }
3298            }
3299
3300            last_round_blocks = this_round_blocks;
3301        }
3302
3303        for core_fixture in cores {
3304            // Flush the DAG state to storage.
3305            core_fixture.dag_state.write().flush();
3306
3307            // Check commits have been persisted to store
3308            let last_commit = core_fixture
3309                .store
3310                .read_last_commit()
3311                .unwrap()
3312                .expect("last commit should be set");
3313            // There are 28 leader rounds with rounds completed up to and including
3314            // round 29. Round 30 blocks will only include their own blocks, so the
3315            // 28th leader will not be committed.
3316            assert_eq!(last_commit.index(), 27);
3317            let all_stored_commits = core_fixture
3318                .store
3319                .scan_commits((0..=CommitIndex::MAX).into())
3320                .unwrap();
3321            assert_eq!(all_stored_commits.len(), 27);
3322            assert_eq!(
3323                core_fixture
3324                    .core
3325                    .leader_schedule
3326                    .leader_swap_table
3327                    .read()
3328                    .bad_nodes
3329                    .len(),
3330                1
3331            );
3332            assert_eq!(
3333                core_fixture
3334                    .core
3335                    .leader_schedule
3336                    .leader_swap_table
3337                    .read()
3338                    .good_nodes
3339                    .len(),
3340                1
3341            );
3342            let expected_reputation_scores =
3343                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3344            assert_eq!(
3345                core_fixture
3346                    .core
3347                    .leader_schedule
3348                    .leader_swap_table
3349                    .read()
3350                    .reputation_scores,
3351                expected_reputation_scores
3352            );
3353        }
3354    }
3355
3356    #[tokio::test]
3357    async fn test_filter_new_commits() {
3358        telemetry_subscribers::init_for_testing();
3359
3360        let (context, _key_pairs) = Context::new_for_test(4);
3361        let context = context.with_parameters(Parameters {
3362            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3363            ..Default::default()
3364        });
3365
3366        let authority_index = AuthorityIndex::new_for_test(0);
3367        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3368        let mut core = core.core;
3369
3370        // No new block should have been produced
3371        assert_eq!(
3372            core.last_proposed_round(),
3373            GENESIS_ROUND,
3374            "No block should have been created other than genesis"
3375        );
3376
3377        // create a DAG of 12 rounds
3378        let mut dag_builder = DagBuilder::new(core.context.clone());
3379        dag_builder.layers(1..=12).build();
3380
3381        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3382        dag_builder.print();
3383        let blocks = dag_builder.blocks(1..=6);
3384
3385        for block in blocks {
3386            core.dag_state.write().accept_block(block);
3387        }
3388
3389        // Get all the committed sub dags up to round 10
3390        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3391
3392        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3393        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3394
3395        // We should have committed up to round 4
3396        assert_eq!(committed_sub_dags.len(), 4);
3397
3398        // Now validate the certified commits. We'll try 3 different scenarios:
3399        println!("Case 1. Provide certified commits that are all before the last committed round.");
3400
3401        // Highest certified commit should be for leader of round 4.
3402        let certified_commits = sub_dags_and_commits
3403            .iter()
3404            .take(4)
3405            .map(|(_, c)| c)
3406            .cloned()
3407            .collect::<Vec<_>>();
3408        assert!(
3409            certified_commits.last().unwrap().index()
3410                <= committed_sub_dags.last().unwrap().commit_ref.index,
3411            "Highest certified commit should older than the highest committed index."
3412        );
3413
3414        let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3415
3416        // No commits should be processed
3417        assert!(certified_commits.is_empty());
3418
3419        println!("Case 2. Provide certified commits that are all after the last committed round.");
3420
3421        // Highest certified commit should be for leader of round 4.
3422        let certified_commits = sub_dags_and_commits
3423            .iter()
3424            .take(5)
3425            .map(|(_, c)| c.clone())
3426            .collect::<Vec<_>>();
3427
3428        let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3429
3430        // The certified commit of index 5 should be processed.
3431        assert_eq!(certified_commits.len(), 1);
3432        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3433
3434        println!(
3435            "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3436        );
3437
3438        // Highest certified commit should be for leader of round 4.
3439        let certified_commits = sub_dags_and_commits
3440            .iter()
3441            .skip(5)
3442            .take(1)
3443            .map(|(_, c)| c.clone())
3444            .collect::<Vec<_>>();
3445
3446        let err = core
3447            .filter_new_commits(certified_commits.clone())
3448            .unwrap_err();
3449        match err {
3450            ConsensusError::UnexpectedCertifiedCommitIndex {
3451                expected_commit_index: 5,
3452                commit_index: 6,
3453            } => (),
3454            _ => panic!("Unexpected error: {:?}", err),
3455        }
3456    }
3457
3458    #[tokio::test]
3459    async fn test_add_certified_commits() {
3460        telemetry_subscribers::init_for_testing();
3461
3462        let (context, _key_pairs) = Context::new_for_test(4);
3463        let context = context.with_parameters(Parameters {
3464            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3465            ..Default::default()
3466        });
3467
3468        let authority_index = AuthorityIndex::new_for_test(0);
3469        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3470        let store = core.store.clone();
3471        let mut core = core.core;
3472
3473        // No new block should have been produced
3474        assert_eq!(
3475            core.last_proposed_round(),
3476            GENESIS_ROUND,
3477            "No block should have been created other than genesis"
3478        );
3479
3480        // create a DAG of 12 rounds
3481        let mut dag_builder = DagBuilder::new(core.context.clone());
3482        dag_builder.layers(1..=12).build();
3483
3484        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3485        dag_builder.print();
3486        let blocks = dag_builder.blocks(1..=6);
3487
3488        for block in blocks {
3489            core.dag_state.write().accept_block(block);
3490        }
3491
3492        // Get all the committed sub dags up to round 10
3493        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3494
3495        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3496        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3497
3498        // We should have committed up to round 4
3499        assert_eq!(committed_sub_dags.len(), 4);
3500
3501        // Flush the DAG state to storage.
3502        core.dag_state.write().flush();
3503
3504        println!("Case 1. Provide no certified commits. No commit should happen.");
3505
3506        let last_commit = store
3507            .read_last_commit()
3508            .unwrap()
3509            .expect("Last commit should be set");
3510        assert_eq!(last_commit.reference().index, 4);
3511
3512        println!(
3513            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3514        );
3515
3516        // The commits of leader rounds 5-8 should be committed via the certified commits.
3517        let certified_commits = sub_dags_and_commits
3518            .iter()
3519            .skip(3)
3520            .take(5)
3521            .map(|(_, c)| c.clone())
3522            .collect::<Vec<_>>();
3523
3524        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be accepted via the certified commits processing.
3525        let blocks = dag_builder.blocks(8..=12);
3526        for block in blocks {
3527            core.dag_state.write().accept_block(block);
3528        }
3529
3530        // The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
3531        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3532            .expect("Should not fail");
3533
3534        // Flush the DAG state to storage.
3535        core.dag_state.write().flush();
3536
3537        let commits = store.scan_commits((6..=10).into()).unwrap();
3538
3539        // We expect all the sub dags up to leader round 10 to be committed.
3540        assert_eq!(commits.len(), 5);
3541
3542        for i in 6..=10 {
3543            let commit = &commits[i - 6];
3544            assert_eq!(commit.reference().index, i as u32);
3545        }
3546    }
3547
3548    #[tokio::test]
3549    async fn try_commit_with_certified_commits_gced_blocks() {
3550        const GC_DEPTH: u32 = 3;
3551        telemetry_subscribers::init_for_testing();
3552
3553        let (mut context, mut key_pairs) = Context::new_for_test(5);
3554        context
3555            .protocol_config
3556            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3557        let context = Arc::new(context.with_parameters(Parameters {
3558            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3559            ..Default::default()
3560        }));
3561
3562        let store = Arc::new(MemStore::new());
3563        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3564
3565        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3566        let leader_schedule = Arc::new(
3567            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3568                .with_num_commits_per_schedule(10),
3569        );
3570
3571        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3572        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3573        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3574        let (blocks_sender, _blocks_receiver) =
3575            monitored_mpsc::unbounded_channel("consensus_block_output");
3576        let transaction_certifier =
3577            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
3578        // Need at least one subscriber to the block broadcast channel.
3579        let _block_receiver = signal_receivers.block_broadcast_receiver();
3580
3581        let (commit_consumer, _commit_receiver, _transaction_receiver) =
3582            CommitConsumerArgs::new(0, 0);
3583        let commit_observer = CommitObserver::new(
3584            context.clone(),
3585            commit_consumer,
3586            dag_state.clone(),
3587            transaction_certifier.clone(),
3588            leader_schedule.clone(),
3589        )
3590        .await;
3591
3592        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3593        let mut core = Core::new(
3594            context.clone(),
3595            leader_schedule,
3596            transaction_consumer,
3597            transaction_certifier.clone(),
3598            block_manager,
3599            true,
3600            commit_observer,
3601            signals,
3602            key_pairs.remove(context.own_index.value()).1,
3603            dag_state.clone(),
3604            true,
3605            round_tracker,
3606        );
3607
3608        // No new block should have been produced
3609        assert_eq!(
3610            core.last_proposed_round(),
3611            GENESIS_ROUND,
3612            "No block should have been created other than genesis"
3613        );
3614
3615        let dag_str = "DAG {
3616            Round 0 : { 5 },
3617            Round 1 : { * },
3618            Round 2 : { 
3619                A -> [-E1],
3620                B -> [-E1],
3621                C -> [-E1],
3622                D -> [-E1],
3623            },
3624            Round 3 : {
3625                A -> [*],
3626                B -> [*],
3627                C -> [*],
3628                D -> [*],
3629            },
3630            Round 4 : { 
3631                A -> [*],
3632                B -> [*],
3633                C -> [*],
3634                D -> [*],
3635            },
3636            Round 5 : { 
3637                A -> [*],
3638                B -> [*],
3639                C -> [*],
3640                D -> [*],
3641                E -> [A4, B4, C4, D4, E1]
3642            },
3643            Round 6 : { * },
3644            Round 7 : { * },
3645        }";
3646
3647        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3648        dag_builder.print();
3649
3650        // Now get all the committed sub dags from the DagBuilder
3651        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3652            .get_sub_dag_and_certified_commits(1..=5)
3653            .into_iter()
3654            .unzip();
3655
3656        // Now try to commit up to the latest leader (round = 5) with the provided certified commits. Not that we have not accepted any
3657        // blocks. That should happen during the commit process.
3658        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3659
3660        // We should have committed up to round 4
3661        assert_eq!(committed_sub_dags.len(), 4);
3662        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3663            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3664
3665            // ensure that block from E1 node has not been committed
3666            for block in committed_sub_dag.blocks.iter() {
3667                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3668                    panic!("Did not expect to commit block E1");
3669                }
3670            }
3671        }
3672    }
3673
3674    #[tokio::test(flavor = "current_thread", start_paused = true)]
3675    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3676        parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3677    }
3678
3679    #[tokio::test(flavor = "current_thread", start_paused = true)]
3680    async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3681        parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3682    }
3683
3684    async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3685        num_leaders_per_round: Option<usize>,
3686    ) {
3687        telemetry_subscribers::init_for_testing();
3688        let default_params = Parameters::default();
3689
3690        let (mut context, _) = Context::new_for_test(6);
3691        context
3692            .protocol_config
3693            .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3694        // create the cores and their signals for all the authorities
3695        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3696
3697        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3698        let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3699        for round in 1..=33 {
3700            let mut this_round_blocks = Vec::new();
3701
3702            // Wait for min round delay to allow blocks to be proposed.
3703            sleep(default_params.min_round_delay).await;
3704
3705            for core_fixture in &mut cores {
3706                // add the blocks from last round
3707                // this will trigger a block creation for the round and a signal should be emitted
3708                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3709
3710                core_fixture.core.round_tracker.write().update_from_probe(
3711                    vec![
3712                        vec![round, round, round, round, round, round],
3713                        vec![round, round, round, round, round, round],
3714                        vec![round, round, round, round, round, round],
3715                        vec![round, round, round, round, round, round],
3716                        vec![round, round, round, round, round, round],
3717                        vec![round, round, round, round, round, round],
3718                    ],
3719                    vec![
3720                        vec![round, round, round, round, round, round],
3721                        vec![round, round, round, round, round, round],
3722                        vec![round, round, round, round, round, round],
3723                        vec![round, round, round, round, round, round],
3724                        vec![round, round, round, round, round, round],
3725                        vec![round, round, round, round, round, round],
3726                    ],
3727                );
3728
3729                // A "new round" signal should be received given that all the blocks of previous round have been processed
3730                let new_round = receive(
3731                    Duration::from_secs(1),
3732                    core_fixture.signal_receivers.new_round_receiver(),
3733                )
3734                .await;
3735                assert_eq!(new_round, round);
3736
3737                // Check that a new block has been proposed.
3738                let extended_block = tokio::time::timeout(
3739                    Duration::from_secs(1),
3740                    core_fixture.block_receiver.recv(),
3741                )
3742                .await
3743                .unwrap()
3744                .unwrap();
3745                assert_eq!(extended_block.block.round(), round);
3746                assert_eq!(
3747                    extended_block.block.author(),
3748                    core_fixture.core.context.own_index
3749                );
3750
3751                // append the new block to this round blocks
3752                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3753
3754                let block = core_fixture.core.last_proposed_block();
3755
3756                // ensure that produced block is referring to the blocks of last_round
3757                assert_eq!(
3758                    block.ancestors().len(),
3759                    core_fixture.core.context.committee.size()
3760                );
3761                for ancestor in block.ancestors() {
3762                    if block.round() > 1 {
3763                        // don't bother with round 1 block which just contains the genesis blocks.
3764                        assert!(
3765                            last_round_blocks
3766                                .iter()
3767                                .any(|block| block.reference() == *ancestor),
3768                            "Reference from previous round should be added"
3769                        );
3770                    }
3771                }
3772            }
3773
3774            last_round_blocks = this_round_blocks;
3775        }
3776
3777        for core_fixture in cores {
3778            // There are 31 leader rounds with rounds completed up to and including
3779            // round 33. Round 33 blocks will only include their own blocks, so there
3780            // should only be 30 commits.
3781            // However on a leader schedule change boundary its is possible for a
3782            // new leader to get selected for the same round if the leader elected
3783            // gets swapped allowing for multiple leaders to be committed at a round.
3784            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3785            // otherwise 31.
3786            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3787            // where the leader schedule boundary occurred AND we had a swap to a new
3788            // leader for the same round
3789            let expected_commit_count = match num_leaders_per_round {
3790                Some(1) => 30,
3791                _ => 31,
3792            };
3793
3794            // Flush the DAG state to storage.
3795            core_fixture.dag_state.write().flush();
3796
3797            // Check commits have been persisted to store
3798            let last_commit = core_fixture
3799                .store
3800                .read_last_commit()
3801                .unwrap()
3802                .expect("last commit should be set");
3803            assert_eq!(last_commit.index(), expected_commit_count);
3804            let all_stored_commits = core_fixture
3805                .store
3806                .scan_commits((0..=CommitIndex::MAX).into())
3807                .unwrap();
3808            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3809            assert_eq!(
3810                core_fixture
3811                    .core
3812                    .leader_schedule
3813                    .leader_swap_table
3814                    .read()
3815                    .bad_nodes
3816                    .len(),
3817                1
3818            );
3819            assert_eq!(
3820                core_fixture
3821                    .core
3822                    .leader_schedule
3823                    .leader_swap_table
3824                    .read()
3825                    .good_nodes
3826                    .len(),
3827                1
3828            );
3829            let expected_reputation_scores =
3830                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3831            assert_eq!(
3832                core_fixture
3833                    .core
3834                    .leader_schedule
3835                    .leader_swap_table
3836                    .read()
3837                    .reputation_scores,
3838                expected_reputation_scores
3839            );
3840        }
3841    }
3842
3843    #[tokio::test]
3844    async fn test_core_signals() {
3845        telemetry_subscribers::init_for_testing();
3846        let default_params = Parameters::default();
3847
3848        let (context, _) = Context::new_for_test(4);
3849        // create the cores and their signals for all the authorities
3850        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3851
3852        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3853        let mut last_round_blocks = Vec::new();
3854        for round in 1..=10 {
3855            let mut this_round_blocks = Vec::new();
3856
3857            // Wait for min round delay to allow blocks to be proposed.
3858            sleep(default_params.min_round_delay).await;
3859
3860            for core_fixture in &mut cores {
3861                // add the blocks from last round
3862                // this will trigger a block creation for the round and a signal should be emitted
3863                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3864
3865                core_fixture.core.round_tracker.write().update_from_probe(
3866                    vec![
3867                        vec![round, round, round, round],
3868                        vec![round, round, round, round],
3869                        vec![round, round, round, round],
3870                        vec![round, round, round, round],
3871                    ],
3872                    vec![
3873                        vec![round, round, round, round],
3874                        vec![round, round, round, round],
3875                        vec![round, round, round, round],
3876                        vec![round, round, round, round],
3877                    ],
3878                );
3879
3880                // A "new round" signal should be received given that all the blocks of previous round have been processed
3881                let new_round = receive(
3882                    Duration::from_secs(1),
3883                    core_fixture.signal_receivers.new_round_receiver(),
3884                )
3885                .await;
3886                assert_eq!(new_round, round);
3887
3888                // Check that a new block has been proposed.
3889                let extended_block = tokio::time::timeout(
3890                    Duration::from_secs(1),
3891                    core_fixture.block_receiver.recv(),
3892                )
3893                .await
3894                .unwrap()
3895                .unwrap();
3896                assert_eq!(extended_block.block.round(), round);
3897                assert_eq!(
3898                    extended_block.block.author(),
3899                    core_fixture.core.context.own_index
3900                );
3901
3902                // append the new block to this round blocks
3903                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3904
3905                let block = core_fixture.core.last_proposed_block();
3906
3907                // ensure that produced block is referring to the blocks of last_round
3908                assert_eq!(
3909                    block.ancestors().len(),
3910                    core_fixture.core.context.committee.size()
3911                );
3912                for ancestor in block.ancestors() {
3913                    if block.round() > 1 {
3914                        // don't bother with round 1 block which just contains the genesis blocks.
3915                        assert!(
3916                            last_round_blocks
3917                                .iter()
3918                                .any(|block| block.reference() == *ancestor),
3919                            "Reference from previous round should be added"
3920                        );
3921                    }
3922                }
3923            }
3924
3925            last_round_blocks = this_round_blocks;
3926        }
3927
3928        for core_fixture in cores {
3929            // Flush the DAG state to storage.
3930            core_fixture.dag_state.write().flush();
3931            // Check commits have been persisted to store
3932            let last_commit = core_fixture
3933                .store
3934                .read_last_commit()
3935                .unwrap()
3936                .expect("last commit should be set");
3937            // There are 8 leader rounds with rounds completed up to and including
3938            // round 9. Round 10 blocks will only include their own blocks, so the
3939            // 8th leader will not be committed.
3940            assert_eq!(last_commit.index(), 7);
3941            let all_stored_commits = core_fixture
3942                .store
3943                .scan_commits((0..=CommitIndex::MAX).into())
3944                .unwrap();
3945            assert_eq!(all_stored_commits.len(), 7);
3946        }
3947    }
3948
3949    #[tokio::test]
3950    async fn test_core_compress_proposal_references() {
3951        telemetry_subscribers::init_for_testing();
3952        let default_params = Parameters::default();
3953
3954        let (context, _) = Context::new_for_test(4);
3955        // create the cores and their signals for all the authorities
3956        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3957
3958        let mut last_round_blocks = Vec::new();
3959        let mut all_blocks = Vec::new();
3960
3961        let excluded_authority = AuthorityIndex::new_for_test(3);
3962
3963        for round in 1..=10 {
3964            let mut this_round_blocks = Vec::new();
3965
3966            for core_fixture in &mut cores {
3967                // do not produce any block for authority 3
3968                if core_fixture.core.context.own_index == excluded_authority {
3969                    continue;
3970                }
3971
3972                // try to propose to ensure that we are covering the case where we miss the leader authority 3
3973                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3974                core_fixture.core.round_tracker.write().update_from_probe(
3975                    vec![
3976                        vec![round, round, round, round],
3977                        vec![round, round, round, round],
3978                        vec![round, round, round, round],
3979                        vec![round, round, round, round],
3980                    ],
3981                    vec![
3982                        vec![round, round, round, round],
3983                        vec![round, round, round, round],
3984                        vec![round, round, round, round],
3985                        vec![round, round, round, round],
3986                    ],
3987                );
3988                core_fixture.core.new_block(round, true).unwrap();
3989
3990                let block = core_fixture.core.last_proposed_block();
3991                assert_eq!(block.round(), round);
3992
3993                // append the new block to this round blocks
3994                this_round_blocks.push(block.clone());
3995            }
3996
3997            last_round_blocks = this_round_blocks.clone();
3998            all_blocks.extend(this_round_blocks);
3999        }
4000
4001        // Now send all the produced blocks to core of authority 3. It should produce a new block. If no compression would
4002        // be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression
4003        // is applied only the last round's (10) blocks should be referenced + the authority's block of round 0.
4004        let core_fixture = &mut cores[excluded_authority];
4005        // Wait for min round delay to allow blocks to be proposed.
4006        sleep(default_params.min_round_delay).await;
4007        // add blocks to trigger proposal.
4008        core_fixture.add_blocks(all_blocks).unwrap();
4009
4010        // Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and
4011        // to round 1 for its own block (created after recovery).
4012        let block = core_fixture.core.last_proposed_block();
4013        assert_eq!(block.round(), 11);
4014        assert_eq!(block.ancestors().len(), 4);
4015        for block_ref in block.ancestors() {
4016            if block_ref.author == excluded_authority {
4017                assert_eq!(block_ref.round, 1);
4018            } else {
4019                assert_eq!(block_ref.round, 10);
4020            }
4021        }
4022
4023        // Flush the DAG state to storage.
4024        core_fixture.dag_state.write().flush();
4025
4026        // Check commits have been persisted to store
4027        let last_commit = core_fixture
4028            .store
4029            .read_last_commit()
4030            .unwrap()
4031            .expect("last commit should be set");
4032        // There are 8 leader rounds with rounds completed up to and including
4033        // round 10. However because there were no blocks produced for authority 3
4034        // 2 leader rounds will be skipped.
4035        assert_eq!(last_commit.index(), 6);
4036        let all_stored_commits = core_fixture
4037            .store
4038            .scan_commits((0..=CommitIndex::MAX).into())
4039            .unwrap();
4040        assert_eq!(all_stored_commits.len(), 6);
4041    }
4042
4043    #[tokio::test]
4044    async fn try_select_certified_leaders() {
4045        // GIVEN
4046        telemetry_subscribers::init_for_testing();
4047
4048        let (context, _) = Context::new_for_test(4);
4049
4050        let authority_index = AuthorityIndex::new_for_test(0);
4051        let core =
4052            CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
4053        let mut core = core.core;
4054
4055        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4056        dag_builder.layers(1..=12).build();
4057
4058        let limit = 2;
4059
4060        let blocks = dag_builder.blocks(1..=12);
4061
4062        for block in blocks {
4063            core.dag_state.write().accept_block(block);
4064        }
4065
4066        // WHEN
4067        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4068        let mut certified_commits = sub_dags_and_commits
4069            .into_iter()
4070            .map(|(_, commit)| commit)
4071            .collect::<Vec<_>>();
4072
4073        let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4074
4075        // THEN
4076        assert_eq!(leaders.len(), 2);
4077        assert_eq!(certified_commits.len(), 2);
4078    }
4079
4080    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4081        tokio::time::timeout(timeout, receiver.changed())
4082            .await
4083            .expect("Timeout while waiting to read from receiver")
4084            .expect("Signal receive channel shouldn't be closed");
4085        *receiver.borrow_and_update()
4086    }
4087}