consensus_core/
core.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    iter,
7    sync::Arc,
8    time::Duration,
9    vec,
10};
11
12use consensus_config::{AuthorityIndex, ProtocolKeyPair};
13#[cfg(test)]
14use consensus_config::{Stake, local_committee_and_keys};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
16use itertools::Itertools as _;
17#[cfg(test)]
18use mysten_metrics::monitored_mpsc::UnboundedReceiver;
19use mysten_metrics::monitored_scope;
20use parking_lot::RwLock;
21use sui_macros::fail_point;
22use tokio::{
23    sync::{broadcast, watch},
24    time::Instant,
25};
26use tracing::{debug, info, trace, warn};
27
28#[cfg(test)]
29use crate::{
30    CommitConsumerArgs, TransactionClient, block::CertifiedBlocksOutput,
31    block_verifier::NoopBlockVerifier, storage::mem_store::MemStore,
32};
33use crate::{
34    ancestor::{AncestorState, AncestorStateManager},
35    block::{
36        Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, GENESIS_ROUND, SignedBlock, Slot,
37        VerifiedBlock,
38    },
39    block_manager::BlockManager,
40    commit::{
41        CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
42    },
43    commit_observer::CommitObserver,
44    context::Context,
45    dag_state::DagState,
46    error::{ConsensusError, ConsensusResult},
47    leader_schedule::LeaderSchedule,
48    round_tracker::PeerRoundTracker,
49    stake_aggregator::{QuorumThreshold, StakeAggregator},
50    transaction::TransactionConsumer,
51    transaction_certifier::TransactionCertifier,
52    universal_committer::{
53        UniversalCommitter, universal_committer_builder::UniversalCommitterBuilder,
54    },
55};
56
57// Maximum number of commit votes to include in a block.
58// TODO: Move to protocol config, and verify in BlockVerifier.
59const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
60
61pub(crate) struct Core {
62    context: Arc<Context>,
63    /// The consumer to use in order to pull transactions to be included for the next proposals
64    transaction_consumer: TransactionConsumer,
65    /// This contains the reject votes on transactions which proposed blocks should include.
66    transaction_certifier: TransactionCertifier,
67    /// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
68    /// and accept them or suspend if we are missing their causal history
69    block_manager: BlockManager,
70    /// Estimated delay by round for propagating blocks to a quorum.
71    /// Because of the nature of TCP and block streaming, propagation delay is expected to be
72    /// 0 in most cases, even when the actual latency of broadcasting blocks is high.
73    /// When this value is higher than the `propagation_delay_stop_proposal_threshold`,
74    /// most likely this validator cannot broadcast  blocks to the network at all.
75    /// Core stops proposing new blocks in this case.
76    propagation_delay: Round,
77    /// Used to make commit decisions for leader blocks in the dag.
78    committer: UniversalCommitter,
79    /// The last new round for which core has sent out a signal.
80    last_signaled_round: Round,
81    /// The blocks of the last included ancestors per authority. This vector is basically used as a
82    /// watermark in order to include in the next block proposal only ancestors of higher rounds.
83    /// By default, is initialised with `None` values.
84    last_included_ancestors: Vec<Option<BlockRef>>,
85    /// The last decided leader returned from the universal committer. Important to note
86    /// that this does not signify that the leader has been persisted yet as it still has
87    /// to go through CommitObserver and persist the commit in store. On recovery/restart
88    /// the last_decided_leader will be set to the last_commit leader in dag state.
89    last_decided_leader: Slot,
90    /// The consensus leader schedule to be used to resolve the leader for a
91    /// given round.
92    leader_schedule: Arc<LeaderSchedule>,
93    /// The commit observer is responsible for observing the commits and collecting
94    /// + sending subdags over the consensus output channel.
95    commit_observer: CommitObserver,
96    /// Sender of outgoing signals from Core.
97    signals: CoreSignals,
98    /// The keypair to be used for block signing
99    block_signer: ProtocolKeyPair,
100    /// Keeping track of state of the DAG, including blocks, commits and last committed rounds.
101    dag_state: Arc<RwLock<DagState>>,
102    /// The last known round for which the node has proposed. Any proposal should be for a round > of this.
103    /// This is currently being used to avoid equivocations during a node recovering from amnesia. When value is None it means that
104    /// the last block sync mechanism is enabled, but it hasn't been initialised yet.
105    last_known_proposed_round: Option<Round>,
106    // The ancestor state manager will keep track of the quality of the authorities
107    // based on the distribution of their blocks to the network. It will use this
108    // information to decide whether to include that authority block in the next
109    // proposal or not.
110    ancestor_state_manager: AncestorStateManager,
111    // The round tracker will keep track of the highest received and accepted rounds
112    // from all authorities. It will use this information to then calculate the
113    // quorum rounds periodically which is used across other components to make
114    // decisions about block proposals.
115    round_tracker: Arc<RwLock<PeerRoundTracker>>,
116}
117
118impl Core {
119    pub(crate) fn new(
120        context: Arc<Context>,
121        leader_schedule: Arc<LeaderSchedule>,
122        transaction_consumer: TransactionConsumer,
123        transaction_certifier: TransactionCertifier,
124        block_manager: BlockManager,
125        commit_observer: CommitObserver,
126        signals: CoreSignals,
127        block_signer: ProtocolKeyPair,
128        dag_state: Arc<RwLock<DagState>>,
129        sync_last_known_own_block: bool,
130        round_tracker: Arc<RwLock<PeerRoundTracker>>,
131    ) -> Self {
132        let last_decided_leader = dag_state.read().last_commit_leader();
133        let number_of_leaders = context
134            .protocol_config
135            .mysticeti_num_leaders_per_round()
136            .unwrap_or(1);
137        let committer = UniversalCommitterBuilder::new(
138            context.clone(),
139            leader_schedule.clone(),
140            dag_state.clone(),
141        )
142        .with_number_of_leaders(number_of_leaders)
143        .with_pipeline(true)
144        .build();
145
146        let last_proposed_block = dag_state.read().get_last_proposed_block();
147
148        let last_signaled_round = last_proposed_block.round();
149
150        // Recover the last included ancestor rounds based on the last proposed block. That will allow
151        // to perform the next block proposal by using ancestor blocks of higher rounds and avoid
152        // re-including blocks that have been already included in the last (or earlier) block proposal.
153        // This is only strongly guaranteed for a quorum of ancestors. It is still possible to re-include
154        // a block from an authority which hadn't been added as part of the last proposal hence its
155        // latest included ancestor is not accurately captured here. This is considered a small deficiency,
156        // and it mostly matters just for this next proposal without any actual penalties in performance
157        // or block proposal.
158        let mut last_included_ancestors = vec![None; context.committee.size()];
159        for ancestor in last_proposed_block.ancestors() {
160            last_included_ancestors[ancestor.author] = Some(*ancestor);
161        }
162
163        let min_propose_round = if sync_last_known_own_block {
164            None
165        } else {
166            // if the sync is disabled then we practically don't want to impose any restriction.
167            Some(0)
168        };
169
170        let propagation_scores = leader_schedule
171            .leader_swap_table
172            .read()
173            .reputation_scores
174            .clone();
175        let mut ancestor_state_manager =
176            AncestorStateManager::new(context.clone(), dag_state.clone());
177        ancestor_state_manager.set_propagation_scores(propagation_scores);
178
179        Self {
180            context,
181            last_signaled_round,
182            last_included_ancestors,
183            last_decided_leader,
184            leader_schedule,
185            transaction_consumer,
186            transaction_certifier,
187            block_manager,
188            propagation_delay: 0,
189            committer,
190            commit_observer,
191            signals,
192            block_signer,
193            dag_state,
194            last_known_proposed_round: min_propose_round,
195            ancestor_state_manager,
196            round_tracker,
197        }
198        .recover()
199    }
200
201    fn recover(mut self) -> Self {
202        let _s = self
203            .context
204            .metrics
205            .node_metrics
206            .scope_processing_time
207            .with_label_values(&["Core::recover"])
208            .start_timer();
209
210        // Try to commit and propose, since they may not have run after the last storage write.
211        self.try_commit(vec![]).unwrap();
212
213        let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
214        {
215            last_proposed_block
216        } else {
217            let last_proposed_block = self.dag_state.read().get_last_proposed_block();
218
219            if self.should_propose() {
220                assert!(
221                    last_proposed_block.round() > GENESIS_ROUND,
222                    "At minimum a block of round higher than genesis should have been produced during recovery"
223                );
224            }
225
226            // if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
227            self.signals
228                .new_block(ExtendedBlock {
229                    block: last_proposed_block.clone(),
230                    excluded_ancestors: vec![],
231                })
232                .unwrap();
233            last_proposed_block
234        };
235
236        // Try to set up leader timeout if needed.
237        // This needs to be called after try_commit() and try_propose(), which may
238        // have advanced the threshold clock round.
239        self.try_signal_new_round();
240
241        info!(
242            "Core recovery completed with last proposed block {:?}",
243            last_proposed_block
244        );
245
246        self
247    }
248
249    /// Processes the provided blocks and accepts them if possible when their causal history exists.
250    /// The method returns:
251    /// - The references of ancestors missing their block
252    #[tracing::instrument(skip_all)]
253    pub(crate) fn add_blocks(
254        &mut self,
255        blocks: Vec<VerifiedBlock>,
256    ) -> ConsensusResult<BTreeSet<BlockRef>> {
257        let _scope = monitored_scope("Core::add_blocks");
258        let _s = self
259            .context
260            .metrics
261            .node_metrics
262            .scope_processing_time
263            .with_label_values(&["Core::add_blocks"])
264            .start_timer();
265        self.context
266            .metrics
267            .node_metrics
268            .core_add_blocks_batch_size
269            .observe(blocks.len() as f64);
270
271        let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
272
273        if !accepted_blocks.is_empty() {
274            trace!(
275                "Accepted blocks: {}",
276                accepted_blocks
277                    .iter()
278                    .map(|b| b.reference().to_string())
279                    .join(",")
280            );
281
282            // Try to commit the new blocks if possible.
283            self.try_commit(vec![])?;
284
285            // Try to propose now since there are new blocks accepted.
286            self.try_propose(false)?;
287
288            // Now set up leader timeout if needed.
289            // This needs to be called after try_commit() and try_propose(), which may
290            // have advanced the threshold clock round.
291            self.try_signal_new_round();
292        };
293
294        if !missing_block_refs.is_empty() {
295            trace!(
296                "Missing block refs: {}",
297                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
298            );
299        }
300
301        Ok(missing_block_refs)
302    }
303
304    // Adds the certified commits that have been synced via the commit syncer. We are using the commit info in order to skip running the decision
305    // rule and immediately commit the corresponding leaders and sub dags. Pay attention that no block acceptance is happening here, but rather
306    // internally in the `try_commit` method which ensures that everytime only the blocks corresponding to the certified commits that are about to
307    // be committed are accepted.
308    #[tracing::instrument(skip_all)]
309    pub(crate) fn add_certified_commits(
310        &mut self,
311        certified_commits: CertifiedCommits,
312    ) -> ConsensusResult<BTreeSet<BlockRef>> {
313        let _scope = monitored_scope("Core::add_certified_commits");
314
315        let votes = certified_commits.votes().to_vec();
316        let commits = self
317            .filter_new_commits(certified_commits.commits().to_vec())
318            .expect("Certified commits validation failed");
319
320        // Try to accept the certified commit votes.
321        // Even if they may not be part of a future commit, these blocks are useful for certifying
322        // commits when helping peers sync commits.
323        let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
324
325        // Try to commit the new blocks. Take into account the trusted commit that has been provided.
326        self.try_commit(commits)?;
327
328        // Try to propose now since there are new blocks accepted.
329        self.try_propose(false)?;
330
331        // Now set up leader timeout if needed.
332        // This needs to be called after try_commit() and try_propose(), which may
333        // have advanced the threshold clock round.
334        self.try_signal_new_round();
335
336        Ok(missing_block_refs)
337    }
338
339    /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations.
340    /// Returns the references of missing blocks among the input blocks.
341    pub(crate) fn check_block_refs(
342        &mut self,
343        block_refs: Vec<BlockRef>,
344    ) -> ConsensusResult<BTreeSet<BlockRef>> {
345        let _scope = monitored_scope("Core::check_block_refs");
346        let _s = self
347            .context
348            .metrics
349            .node_metrics
350            .scope_processing_time
351            .with_label_values(&["Core::check_block_refs"])
352            .start_timer();
353        self.context
354            .metrics
355            .node_metrics
356            .core_check_block_refs_batch_size
357            .observe(block_refs.len() as f64);
358
359        // Try to find them via the block manager
360        let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
361
362        if !missing_block_refs.is_empty() {
363            trace!(
364                "Missing block refs: {}",
365                missing_block_refs.iter().map(|b| b.to_string()).join(", ")
366            );
367        }
368        Ok(missing_block_refs)
369    }
370
371    /// If needed, signals a new clock round and sets up leader timeout.
372    fn try_signal_new_round(&mut self) {
373        // Signal only when the threshold clock round is more advanced than the last signaled round.
374        //
375        // NOTE: a signal is still sent even when a block has been proposed at the new round.
376        // We can consider changing this in the future.
377        let new_clock_round = self.dag_state.read().threshold_clock_round();
378        if new_clock_round <= self.last_signaled_round {
379            return;
380        }
381        // Then send a signal to set up leader timeout.
382        self.signals.new_round(new_clock_round);
383        self.last_signaled_round = new_clock_round;
384
385        // Report the threshold clock round
386        self.context
387            .metrics
388            .node_metrics
389            .threshold_clock_round
390            .set(new_clock_round as i64);
391    }
392
393    /// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
394    /// when the min timeout expires or max. When `force = true` , then any checks like previous round
395    /// leader existence will get skipped.
396    pub(crate) fn new_block(
397        &mut self,
398        round: Round,
399        force: bool,
400    ) -> ConsensusResult<Option<VerifiedBlock>> {
401        let _scope = monitored_scope("Core::new_block");
402        if self.last_proposed_round() < round {
403            self.context
404                .metrics
405                .node_metrics
406                .leader_timeout_total
407                .with_label_values(&[&format!("{force}")])
408                .inc();
409            let result = self.try_propose(force);
410            // The threshold clock round may have advanced, so a signal needs to be sent.
411            self.try_signal_new_round();
412            return result;
413        }
414        Ok(None)
415    }
416
417    /// Keeps only the certified commits that have a commit index > last commit index.
418    /// It also ensures that the first commit in the list is the next one in line, otherwise it panics.
419    fn filter_new_commits(
420        &mut self,
421        commits: Vec<CertifiedCommit>,
422    ) -> ConsensusResult<Vec<CertifiedCommit>> {
423        // Filter out the commits that have been already locally committed and keep only anything that is above the last committed index.
424        let last_commit_index = self.dag_state.read().last_commit_index();
425        let commits = commits
426            .iter()
427            .filter(|commit| {
428                if commit.index() > last_commit_index {
429                    true
430                } else {
431                    tracing::debug!(
432                        "Skip commit for index {} as it is already committed with last commit index {}",
433                        commit.index(),
434                        last_commit_index
435                    );
436                    false
437                }
438            })
439            .cloned()
440            .collect::<Vec<_>>();
441
442        // Make sure that the first commit we find is the next one in line and there is no gap.
443        if let Some(commit) = commits.first()
444            && commit.index() != last_commit_index + 1
445        {
446            return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
447                expected_commit_index: last_commit_index + 1,
448                commit_index: commit.index(),
449            });
450        }
451
452        Ok(commits)
453    }
454
455    // Attempts to create a new block, persist and propose it to all peers.
456    // When force is true, ignore if leader from the last round exists among ancestors and if
457    // the minimum round delay has passed.
458    fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
459        if !self.should_propose() {
460            return Ok(None);
461        }
462        if let Some(extended_block) = self.try_new_block(force) {
463            self.signals.new_block(extended_block.clone())?;
464
465            fail_point!("consensus-after-propose");
466
467            // The new block may help commit.
468            self.try_commit(vec![])?;
469            return Ok(Some(extended_block.block));
470        }
471        Ok(None)
472    }
473
474    /// Attempts to propose a new block for the next round. If a block has already proposed for latest
475    /// or earlier round, then no block is created and None is returned.
476    fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
477        let _s = self
478            .context
479            .metrics
480            .node_metrics
481            .scope_processing_time
482            .with_label_values(&["Core::try_new_block"])
483            .start_timer();
484
485        // Ensure the new block has a higher round than the last proposed block.
486        let clock_round = {
487            let dag_state = self.dag_state.read();
488            let clock_round = dag_state.threshold_clock_round();
489            if clock_round <= dag_state.get_last_proposed_block().round() {
490                debug!(
491                    "Skipping block proposal for round {} as it is not higher than the last proposed block {}",
492                    clock_round,
493                    dag_state.get_last_proposed_block().round()
494                );
495                return None;
496            }
497            clock_round
498        };
499
500        // There must be a quorum of blocks from the previous round.
501        let quorum_round = clock_round.saturating_sub(1);
502
503        // Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
504        // or because we are actually ready to produce the block (leader exists and min delay has passed).
505        if !force {
506            if !self.leaders_exist(quorum_round) {
507                return None;
508            }
509
510            if Duration::from_millis(
511                self.context
512                    .clock
513                    .timestamp_utc_ms()
514                    .saturating_sub(self.last_proposed_timestamp_ms()),
515            ) < self.context.parameters.min_round_delay
516            {
517                debug!(
518                    "Skipping block proposal for round {} as it is too soon after the last proposed block timestamp {}; min round delay is {}ms",
519                    clock_round,
520                    self.last_proposed_timestamp_ms(),
521                    self.context.parameters.min_round_delay.as_millis(),
522                );
523                return None;
524            }
525        }
526
527        // Determine the ancestors to be included in proposal.
528        let (ancestors, excluded_and_equivocating_ancestors) =
529            self.smart_ancestors_to_propose(clock_round, !force);
530
531        // If we did not find enough good ancestors to propose, continue to wait before proposing.
532        if ancestors.is_empty() {
533            assert!(
534                !force,
535                "Ancestors should have been returned if force is true!"
536            );
537            debug!(
538                "Skipping block proposal for round {} because no good ancestor is found",
539                clock_round,
540            );
541            return None;
542        }
543
544        let excluded_ancestors_limit = self.context.committee.size() * 2;
545        if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
546            debug!(
547                "Dropping {} excluded ancestor(s) during proposal due to size limit",
548                excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
549            );
550        }
551        let excluded_ancestors = excluded_and_equivocating_ancestors
552            .into_iter()
553            .take(excluded_ancestors_limit)
554            .collect();
555
556        // Update the last included ancestor block refs
557        for ancestor in &ancestors {
558            self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
559        }
560
561        let leader_authority = &self
562            .context
563            .committee
564            .authority(self.first_leader(quorum_round))
565            .hostname;
566        self.context
567            .metrics
568            .node_metrics
569            .block_proposal_leader_wait_ms
570            .with_label_values(&[leader_authority])
571            .inc_by(
572                Instant::now()
573                    .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
574                    .as_millis() as u64,
575            );
576        self.context
577            .metrics
578            .node_metrics
579            .block_proposal_leader_wait_count
580            .with_label_values(&[leader_authority])
581            .inc();
582
583        self.context
584            .metrics
585            .node_metrics
586            .proposed_block_ancestors
587            .observe(ancestors.len() as f64);
588        for ancestor in &ancestors {
589            let authority = &self.context.committee.authority(ancestor.author()).hostname;
590            self.context
591                .metrics
592                .node_metrics
593                .proposed_block_ancestors_depth
594                .with_label_values(&[authority])
595                .observe(clock_round.saturating_sub(ancestor.round()).into());
596        }
597
598        let now = self.context.clock.timestamp_utc_ms();
599        ancestors.iter().for_each(|block| {
600            if block.timestamp_ms() > now {
601                trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
602                let authority = &self.context.committee.authority(block.author()).hostname;
603                self.context
604                    .metrics
605                    .node_metrics
606                    .proposed_block_ancestors_timestamp_drift_ms
607                    .with_label_values(&[authority])
608                    .inc_by(block.timestamp_ms().saturating_sub(now));
609            }
610        });
611
612        // Consume the next transactions to be included. Do not drop the guards yet as this would acknowledge
613        // the inclusion of transactions. Just let this be done in the end of the method.
614        let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
615        self.context
616            .metrics
617            .node_metrics
618            .proposed_block_transactions
619            .observe(transactions.len() as f64);
620
621        // Consume the commit votes to be included.
622        let commit_votes = self
623            .dag_state
624            .write()
625            .take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
626
627        let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
628            let new_causal_history = {
629                let mut dag_state = self.dag_state.write();
630                ancestors
631                    .iter()
632                    .flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
633                    .collect()
634            };
635            self.transaction_certifier.get_own_votes(new_causal_history)
636        } else {
637            vec![]
638        };
639
640        // Create the block and insert to storage.
641        let block = if self.context.protocol_config.mysticeti_fastpath() {
642            Block::V2(BlockV2::new(
643                self.context.committee.epoch(),
644                clock_round,
645                self.context.own_index,
646                now,
647                ancestors.iter().map(|b| b.reference()).collect(),
648                transactions,
649                commit_votes,
650                transaction_votes,
651                vec![],
652            ))
653        } else {
654            Block::V1(BlockV1::new(
655                self.context.committee.epoch(),
656                clock_round,
657                self.context.own_index,
658                now,
659                ancestors.iter().map(|b| b.reference()).collect(),
660                transactions,
661                commit_votes,
662                vec![],
663            ))
664        };
665        let signed_block =
666            SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
667        let serialized = signed_block
668            .serialize()
669            .expect("Block serialization failed.");
670        self.context
671            .metrics
672            .node_metrics
673            .proposed_block_size
674            .observe(serialized.len() as f64);
675        // Own blocks are assumed to be valid.
676        let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
677
678        // Record the interval from last proposal, before accepting the proposed block.
679        let last_proposed_block = self.last_proposed_block();
680        if last_proposed_block.round() > 0 {
681            self.context
682                .metrics
683                .node_metrics
684                .block_proposal_interval
685                .observe(
686                    Duration::from_millis(
687                        verified_block
688                            .timestamp_ms()
689                            .saturating_sub(last_proposed_block.timestamp_ms()),
690                    )
691                    .as_secs_f64(),
692                );
693        }
694
695        // Accept the block into BlockManager and DagState.
696        let (accepted_blocks, missing) = self
697            .block_manager
698            .try_accept_blocks(vec![verified_block.clone()]);
699        assert_eq!(accepted_blocks.len(), 1);
700        assert!(missing.is_empty());
701
702        // The block must be added to transaction certifier before it is broadcasted or added to DagState.
703        // Update proposed state of blocks in local DAG.
704        // TODO(fastpath): move this logic and the logic afterwards to proposed block handler.
705        if self.context.protocol_config.mysticeti_fastpath() {
706            self.transaction_certifier
707                .add_voted_blocks(vec![(verified_block.clone(), vec![])]);
708            self.dag_state
709                .write()
710                .link_causal_history(verified_block.reference());
711        }
712
713        // Ensure the new block and its ancestors are persisted, before broadcasting it.
714        self.dag_state.write().flush();
715
716        // Now acknowledge the transactions for their inclusion to block
717        ack_transactions(verified_block.reference());
718
719        info!("Created block {verified_block:?} for round {clock_round}");
720
721        self.context
722            .metrics
723            .node_metrics
724            .proposed_blocks
725            .with_label_values(&[&force.to_string()])
726            .inc();
727
728        let extended_block = ExtendedBlock {
729            block: verified_block,
730            excluded_ancestors,
731        };
732
733        // Update round tracker with our own highest accepted blocks
734        self.round_tracker
735            .write()
736            .update_from_verified_block(&extended_block);
737
738        Some(extended_block)
739    }
740
741    /// Runs commit rule to attempt to commit additional blocks from the DAG. If any `certified_commits` are provided, then
742    /// it will attempt to commit those first before trying to commit any further leaders.
743    fn try_commit(
744        &mut self,
745        mut certified_commits: Vec<CertifiedCommit>,
746    ) -> ConsensusResult<Vec<CommittedSubDag>> {
747        let _s = self
748            .context
749            .metrics
750            .node_metrics
751            .scope_processing_time
752            .with_label_values(&["Core::try_commit"])
753            .start_timer();
754
755        let mut certified_commits_map = BTreeMap::new();
756        for c in &certified_commits {
757            certified_commits_map.insert(c.index(), c.reference());
758        }
759
760        if !certified_commits.is_empty() {
761            info!(
762                "Processing synced commits: {:?}",
763                certified_commits
764                    .iter()
765                    .map(|c| (c.index(), c.leader()))
766                    .collect::<Vec<_>>()
767            );
768        }
769
770        let mut committed_sub_dags = Vec::new();
771        // TODO: Add optimization to abort early without quorum for a round.
772        loop {
773            // LeaderSchedule has a limit to how many sequenced leaders can be committed
774            // before a change is triggered. Calling into leader schedule will get you
775            // how many commits till next leader change. We will loop back and recalculate
776            // any discarded leaders with the new schedule.
777            let mut commits_until_update = self
778                .leader_schedule
779                .commits_until_leader_schedule_update(self.dag_state.clone());
780
781            if commits_until_update == 0 {
782                let last_commit_index = self.dag_state.read().last_commit_index();
783
784                tracing::info!(
785                    "Leader schedule change triggered at commit index {last_commit_index}"
786                );
787
788                self.leader_schedule
789                    .update_leader_schedule_v2(&self.dag_state);
790
791                let propagation_scores = self
792                    .leader_schedule
793                    .leader_swap_table
794                    .read()
795                    .reputation_scores
796                    .clone();
797                self.ancestor_state_manager
798                    .set_propagation_scores(propagation_scores);
799
800                commits_until_update = self
801                    .leader_schedule
802                    .commits_until_leader_schedule_update(self.dag_state.clone());
803
804                fail_point!("consensus-after-leader-schedule-change");
805            }
806            assert!(commits_until_update > 0);
807
808            // If there are certified commits to process, find out which leaders and commits from them
809            // are decided and use them as the next commits.
810            let (certified_leaders, decided_certified_commits): (
811                Vec<DecidedLeader>,
812                Vec<CertifiedCommit>,
813            ) = self
814                .try_select_certified_leaders(&mut certified_commits, commits_until_update)
815                .into_iter()
816                .unzip();
817
818            // Only accept blocks for the certified commits that we are certain to sequence.
819            // This ensures that only blocks corresponding to committed certified commits are flushed to disk.
820            // Blocks from non-committed certified commits will not be flushed, preventing issues during crash-recovery.
821            // This avoids scenarios where accepting and flushing blocks of non-committed certified commits could lead to
822            // premature commit rule execution. Due to GC, this could cause a panic if the commit rule tries to access
823            // missing causal history from blocks of certified commits.
824            let blocks = decided_certified_commits
825                .iter()
826                .flat_map(|c| c.blocks())
827                .cloned()
828                .collect::<Vec<_>>();
829            self.block_manager.try_accept_committed_blocks(blocks);
830
831            // If there is no certified commit to process, run the decision rule.
832            let (decided_leaders, local) = if certified_leaders.is_empty() {
833                // TODO: limit commits by commits_until_update for efficiency, which may be needed when leader schedule length is reduced.
834                let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
835                // Truncate the decided leaders to fit the commit schedule limit.
836                if decided_leaders.len() >= commits_until_update {
837                    let _ = decided_leaders.split_off(commits_until_update);
838                }
839                (decided_leaders, true)
840            } else {
841                (certified_leaders, false)
842            };
843
844            // If the decided leaders list is empty then just break the loop.
845            let Some(last_decided) = decided_leaders.last().cloned() else {
846                break;
847            };
848
849            self.last_decided_leader = last_decided.slot();
850            self.context
851                .metrics
852                .node_metrics
853                .last_decided_leader_round
854                .set(self.last_decided_leader.round as i64);
855
856            let sequenced_leaders = decided_leaders
857                .into_iter()
858                .filter_map(|leader| leader.into_committed_block())
859                .collect::<Vec<_>>();
860            // It's possible to reach this point as the decided leaders might all of them be "Skip" decisions. In this case there is no
861            // leader to commit and we should break the loop.
862            if sequenced_leaders.is_empty() {
863                break;
864            }
865            tracing::info!(
866                "Committing {} leaders: {}; {} commits before next leader schedule change",
867                sequenced_leaders.len(),
868                sequenced_leaders
869                    .iter()
870                    .map(|b| b.reference().to_string())
871                    .join(","),
872                commits_until_update,
873            );
874
875            // TODO: refcount subdags
876            let subdags = self
877                .commit_observer
878                .handle_commit(sequenced_leaders, local)?;
879
880            // Try to unsuspend blocks if gc_round has advanced.
881            self.block_manager
882                .try_unsuspend_blocks_for_latest_gc_round();
883
884            committed_sub_dags.extend(subdags);
885
886            fail_point!("consensus-after-handle-commit");
887        }
888
889        // Sanity check: for commits that have been linearized using the certified commits, ensure that the same sub dag has been committed.
890        for sub_dag in &committed_sub_dags {
891            if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
892                assert_eq!(
893                    commit_ref, sub_dag.commit_ref,
894                    "Certified commit has different reference than the committed sub dag"
895                );
896            }
897        }
898
899        // Notify about our own committed blocks
900        let committed_block_refs = committed_sub_dags
901            .iter()
902            .flat_map(|sub_dag| sub_dag.blocks.iter())
903            .filter_map(|block| {
904                (block.author() == self.context.own_index).then_some(block.reference())
905            })
906            .collect::<Vec<_>>();
907        self.transaction_consumer
908            .notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
909
910        Ok(committed_sub_dags)
911    }
912
913    pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
914        let _scope = monitored_scope("Core::get_missing_blocks");
915        self.block_manager.missing_blocks()
916    }
917
918    /// Sets the delay by round for propagating blocks to a quorum.
919    pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
920        info!("Propagation round delay set to: {delay}");
921        self.propagation_delay = delay;
922    }
923
924    /// Sets the min propose round for the proposer allowing to propose blocks only for round numbers
925    /// `> last_known_proposed_round`. At the moment is allowed to call the method only once leading to a panic
926    /// if attempt to do multiple times.
927    pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
928        if self.last_known_proposed_round.is_some() {
929            panic!(
930                "Should not attempt to set the last known proposed round if that has been already set"
931            );
932        }
933        self.last_known_proposed_round = Some(round);
934        info!("Last known proposed round set to {round}");
935    }
936
937    /// Whether the core should propose new blocks.
938    pub(crate) fn should_propose(&self) -> bool {
939        let clock_round = self.dag_state.read().threshold_clock_round();
940        let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
941
942        if self.propagation_delay
943            > self
944                .context
945                .parameters
946                .propagation_delay_stop_proposal_threshold
947        {
948            debug!(
949                "Skip proposing for round {clock_round}, high propagation delay {} > {}.",
950                self.propagation_delay,
951                self.context
952                    .parameters
953                    .propagation_delay_stop_proposal_threshold
954            );
955            core_skipped_proposals
956                .with_label_values(&["high_propagation_delay"])
957                .inc();
958            return false;
959        }
960
961        let Some(last_known_proposed_round) = self.last_known_proposed_round else {
962            debug!(
963                "Skip proposing for round {clock_round}, last known proposed round has not been synced yet."
964            );
965            core_skipped_proposals
966                .with_label_values(&["no_last_known_proposed_round"])
967                .inc();
968            return false;
969        };
970        if clock_round <= last_known_proposed_round {
971            debug!(
972                "Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}"
973            );
974            core_skipped_proposals
975                .with_label_values(&["higher_last_known_proposed_round"])
976                .inc();
977            return false;
978        }
979
980        true
981    }
982
983    // Tries to select a prefix of certified commits to be committed next respecting the `limit`.
984    // If provided `limit` is zero, it will panic.
985    // The function returns a list of certified leaders and certified commits. If empty vector is returned, it means that
986    // there are no certified commits to be committed, as input `certified_commits` is either empty or all of the certified
987    // commits have been already committed.
988    #[tracing::instrument(skip_all)]
989    fn try_select_certified_leaders(
990        &mut self,
991        certified_commits: &mut Vec<CertifiedCommit>,
992        limit: usize,
993    ) -> Vec<(DecidedLeader, CertifiedCommit)> {
994        assert!(limit > 0, "limit should be greater than 0");
995        if certified_commits.is_empty() {
996            return vec![];
997        }
998
999        let to_commit = if certified_commits.len() >= limit {
1000            // We keep only the number of leaders as dictated by the `limit`
1001            certified_commits.drain(..limit).collect::<Vec<_>>()
1002        } else {
1003            // Otherwise just take all of them and leave the `synced_commits` empty.
1004            std::mem::take(certified_commits)
1005        };
1006
1007        tracing::debug!(
1008            "Selected {} certified leaders: {}",
1009            to_commit.len(),
1010            to_commit.iter().map(|c| c.leader().to_string()).join(",")
1011        );
1012
1013        to_commit
1014            .into_iter()
1015            .map(|commit| {
1016                let leader = commit.blocks().last().expect("Certified commit should have at least one block");
1017                assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
1018                // There is no knowledge of direct commit with certified commits, so assuming indirect commit.
1019                let leader = DecidedLeader::Commit(leader.clone(), /* direct */ false);
1020                UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
1021                (leader, commit)
1022            })
1023            .collect::<Vec<_>>()
1024    }
1025
1026    /// Retrieves the next ancestors to propose to form a block at `clock_round` round.
1027    /// If smart selection is enabled then this will try to select the best ancestors
1028    /// based on the propagation scores of the authorities.
1029    fn smart_ancestors_to_propose(
1030        &mut self,
1031        clock_round: Round,
1032        smart_select: bool,
1033    ) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
1034        let node_metrics = &self.context.metrics.node_metrics;
1035        let _s = node_metrics
1036            .scope_processing_time
1037            .with_label_values(&["Core::smart_ancestors_to_propose"])
1038            .start_timer();
1039
1040        // Now take the ancestors before the clock_round (excluded) for each authority.
1041        let all_ancestors = self
1042            .dag_state
1043            .read()
1044            .get_last_cached_block_per_authority(clock_round);
1045
1046        assert_eq!(
1047            all_ancestors.len(),
1048            self.context.committee.size(),
1049            "Fatal error, number of returned ancestors don't match committee size."
1050        );
1051
1052        // Ensure ancestor state is up to date before selecting for proposal.
1053        let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
1054
1055        self.ancestor_state_manager
1056            .update_all_ancestors_state(&accepted_quorum_rounds);
1057
1058        let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
1059
1060        let quorum_round = clock_round.saturating_sub(1);
1061
1062        let mut score_and_pending_excluded_ancestors = Vec::new();
1063        let mut excluded_and_equivocating_ancestors = BTreeSet::new();
1064
1065        // Propose only ancestors of higher rounds than what has already been proposed.
1066        // And always include own last proposed block first among ancestors.
1067        // Start by only including the high scoring ancestors. Low scoring ancestors
1068        // will be included in a second pass below.
1069        let included_ancestors = iter::once(self.last_proposed_block().clone())
1070            .chain(
1071                all_ancestors
1072                    .into_iter()
1073                    .flat_map(|(ancestor, equivocating_ancestors)| {
1074                        if ancestor.author() == self.context.own_index {
1075                            return None;
1076                        }
1077                        if let Some(last_block_ref) =
1078                            self.last_included_ancestors[ancestor.author()]
1079                            && last_block_ref.round >= ancestor.round() {
1080                                return None;
1081                            }
1082
1083                        // We will never include equivocating ancestors so add them immediately
1084                        excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
1085
1086                        let ancestor_state = ancestor_state_map[ancestor.author()];
1087                        match ancestor_state {
1088                            AncestorState::Include => {
1089                                trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
1090                            }
1091                            AncestorState::Exclude(score) => {
1092                                trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
1093                                score_and_pending_excluded_ancestors.push((score, ancestor));
1094                                return None;
1095                            }
1096                        }
1097
1098                        Some(ancestor)
1099                    }),
1100            )
1101            .collect::<Vec<_>>();
1102
1103        let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
1104
1105        // Check total stake of high scoring parent round ancestors
1106        for ancestor in included_ancestors
1107            .iter()
1108            .filter(|a| a.round() == quorum_round)
1109        {
1110            parent_round_quorum.add(ancestor.author(), &self.context.committee);
1111        }
1112
1113        if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
1114            node_metrics.smart_selection_wait.inc();
1115            debug!(
1116                "Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.",
1117                parent_round_quorum.stake()
1118            );
1119            return (vec![], BTreeSet::new());
1120        }
1121
1122        // Sort scores descending so we can include the best of the pending excluded
1123        // ancestors first until we reach the threshold.
1124        score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
1125
1126        let mut ancestors_to_propose = included_ancestors;
1127        let mut excluded_ancestors = Vec::new();
1128        for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
1129            let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
1130            if !parent_round_quorum.reached_threshold(&self.context.committee)
1131                && ancestor.round() == quorum_round
1132            {
1133                debug!(
1134                    "Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}"
1135                );
1136                parent_round_quorum.add(ancestor.author(), &self.context.committee);
1137                ancestors_to_propose.push(ancestor);
1138                node_metrics
1139                    .included_excluded_proposal_ancestors_count_by_authority
1140                    .with_label_values(&[block_hostname, "timeout"])
1141                    .inc();
1142            } else {
1143                excluded_ancestors.push((score, ancestor));
1144            }
1145        }
1146
1147        // Iterate through excluded ancestors and include the ancestor or the ancestor's ancestor
1148        // that has been accepted by a quorum of the network. If the original ancestor itself
1149        // is not included then it will be part of excluded ancestors that are not
1150        // included in the block but will still be broadcasted to peers.
1151        for (score, ancestor) in excluded_ancestors.iter() {
1152            let excluded_author = ancestor.author();
1153            let block_hostname = &self.context.committee.authority(excluded_author).hostname;
1154            // A quorum of validators reported to have accepted blocks from the excluded_author up to the low quorum round.
1155            let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
1156            // If the accepted quorum round of this ancestor is greater than or equal
1157            // to the clock round then we want to make sure to set it to clock_round - 1
1158            // as that is the max round the new block can include as an ancestor.
1159            accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
1160
1161            let last_included_round = self.last_included_ancestors[excluded_author]
1162                .map(|block_ref| block_ref.round)
1163                .unwrap_or(GENESIS_ROUND);
1164            if ancestor.round() <= last_included_round {
1165                // This should have already been filtered out when filtering all_ancestors.
1166                // Still, ensure previously included ancestors are filtered out.
1167                continue;
1168            }
1169
1170            if last_included_round >= accepted_low_quorum_round {
1171                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1172                trace!(
1173                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}",
1174                    ancestor.reference()
1175                );
1176                node_metrics
1177                    .excluded_proposal_ancestors_count_by_authority
1178                    .with_label_values(&[block_hostname])
1179                    .inc();
1180                continue;
1181            }
1182
1183            let ancestor = if ancestor.round() <= accepted_low_quorum_round {
1184                // Include the ancestor block as it has been seen & accepted by a strong quorum.
1185                ancestor.clone()
1186            } else {
1187                // Exclude this ancestor since it hasn't been accepted by a strong quorum
1188                excluded_and_equivocating_ancestors.insert(ancestor.reference());
1189                trace!(
1190                    "Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ",
1191                    ancestor.reference(),
1192                    ancestor.round()
1193                );
1194                node_metrics
1195                    .excluded_proposal_ancestors_count_by_authority
1196                    .with_label_values(&[block_hostname])
1197                    .inc();
1198
1199                // Look for an earlier block in the ancestor chain that we can include as there
1200                // is a gap between the last included round and the accepted low quorum round.
1201                //
1202                // Note: Only cached blocks need to be propagated. Committed and GC'ed blocks
1203                // do not need to be propagated.
1204                match self.dag_state.read().get_last_cached_block_in_range(
1205                    excluded_author,
1206                    last_included_round + 1,
1207                    accepted_low_quorum_round + 1,
1208                ) {
1209                    Some(earlier_ancestor) => {
1210                        // Found an earlier block that has been propagated well - include it instead
1211                        earlier_ancestor
1212                    }
1213                    None => {
1214                        // No suitable earlier block found
1215                        continue;
1216                    }
1217                }
1218            };
1219            self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
1220            ancestors_to_propose.push(ancestor.clone());
1221            trace!(
1222                "Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}",
1223                ancestor.reference()
1224            );
1225            node_metrics
1226                .included_excluded_proposal_ancestors_count_by_authority
1227                .with_label_values(&[block_hostname, "quorum"])
1228                .inc();
1229        }
1230
1231        assert!(
1232            parent_round_quorum.reached_threshold(&self.context.committee),
1233            "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core."
1234        );
1235
1236        debug!(
1237            "Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
1238            ancestors_to_propose.len(),
1239            excluded_and_equivocating_ancestors.len()
1240        );
1241
1242        (ancestors_to_propose, excluded_and_equivocating_ancestors)
1243    }
1244
1245    /// Checks whether all the leaders of the round exist.
1246    /// TODO: we can leverage some additional signal here in order to more cleverly manipulate later the leader timeout
1247    /// Ex if we already have one leader - the first in order - we might don't want to wait as much.
1248    fn leaders_exist(&self, round: Round) -> bool {
1249        let dag_state = self.dag_state.read();
1250        for leader in self.leaders(round) {
1251            // Search for all the leaders. If at least one is not found, then return false.
1252            // A linear search should be fine here as the set of elements is not expected to be small enough and more sophisticated
1253            // data structures might not give us much here.
1254            if !dag_state.contains_cached_block_at_slot(leader) {
1255                return false;
1256            }
1257        }
1258
1259        true
1260    }
1261
1262    /// Returns the leaders of the provided round.
1263    fn leaders(&self, round: Round) -> Vec<Slot> {
1264        self.committer
1265            .get_leaders(round)
1266            .into_iter()
1267            .map(|authority_index| Slot::new(round, authority_index))
1268            .collect()
1269    }
1270
1271    /// Returns the 1st leader of the round.
1272    fn first_leader(&self, round: Round) -> AuthorityIndex {
1273        self.leaders(round).first().unwrap().authority
1274    }
1275
1276    fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
1277        self.last_proposed_block().timestamp_ms()
1278    }
1279
1280    fn last_proposed_round(&self) -> Round {
1281        self.last_proposed_block().round()
1282    }
1283
1284    fn last_proposed_block(&self) -> VerifiedBlock {
1285        self.dag_state.read().get_last_proposed_block()
1286    }
1287}
1288
1289/// Senders of signals from Core, for outputs and events (ex new block produced).
1290pub(crate) struct CoreSignals {
1291    tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
1292    new_round_sender: watch::Sender<Round>,
1293    context: Arc<Context>,
1294}
1295
1296impl CoreSignals {
1297    pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
1298        // Blocks buffered in broadcast channel should be roughly equal to thosed cached in dag state,
1299        // since the underlying blocks are ref counted so a lower buffer here will not reduce memory
1300        // usage significantly.
1301        let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
1302            context.parameters.dag_state_cached_rounds as usize,
1303        );
1304        let (new_round_sender, new_round_receiver) = watch::channel(0);
1305
1306        let me = Self {
1307            tx_block_broadcast,
1308            new_round_sender,
1309            context,
1310        };
1311
1312        let receivers = CoreSignalsReceivers {
1313            rx_block_broadcast,
1314            new_round_receiver,
1315        };
1316
1317        (me, receivers)
1318    }
1319
1320    /// Sends a signal to all the waiters that a new block has been produced. The method will return
1321    /// true if block has reached even one subscriber, false otherwise.
1322    pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
1323        // When there is only one authority in committee, it is unnecessary to broadcast
1324        // the block which will fail anyway without subscribers to the signal.
1325        if self.context.committee.size() > 1 {
1326            if extended_block.block.round() == GENESIS_ROUND {
1327                debug!("Ignoring broadcasting genesis block to peers");
1328                return Ok(());
1329            }
1330
1331            if let Err(err) = self.tx_block_broadcast.send(extended_block) {
1332                warn!("Couldn't broadcast the block to any receiver: {err}");
1333                return Err(ConsensusError::Shutdown);
1334            }
1335        } else {
1336            debug!(
1337                "Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
1338            );
1339        }
1340        Ok(())
1341    }
1342
1343    /// Sends a signal that threshold clock has advanced to new round. The `round_number` is the round at which the
1344    /// threshold clock has advanced to.
1345    pub(crate) fn new_round(&mut self, round_number: Round) {
1346        let _ = self.new_round_sender.send_replace(round_number);
1347    }
1348}
1349
1350/// Receivers of signals from Core.
1351/// Intentionally un-clonable. Comonents should only subscribe to channels they need.
1352pub(crate) struct CoreSignalsReceivers {
1353    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
1354    new_round_receiver: watch::Receiver<Round>,
1355}
1356
1357impl CoreSignalsReceivers {
1358    pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
1359        self.rx_block_broadcast.resubscribe()
1360    }
1361
1362    pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
1363        self.new_round_receiver.clone()
1364    }
1365}
1366
1367/// Creates cores for the specified number of authorities for their corresponding stakes. The method returns the
1368/// cores and their respective signal receivers are returned in `AuthorityIndex` order asc.
1369#[cfg(test)]
1370pub(crate) async fn create_cores(
1371    context: Context,
1372    authorities: Vec<Stake>,
1373) -> Vec<CoreTextFixture> {
1374    let mut cores = Vec::new();
1375
1376    for index in 0..authorities.len() {
1377        let own_index = AuthorityIndex::new_for_test(index as u32);
1378        let core =
1379            CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
1380        cores.push(core);
1381    }
1382    cores
1383}
1384
1385#[cfg(test)]
1386pub(crate) struct CoreTextFixture {
1387    pub(crate) core: Core,
1388    pub(crate) transaction_certifier: TransactionCertifier,
1389    pub(crate) signal_receivers: CoreSignalsReceivers,
1390    pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
1391    pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
1392    pub(crate) _blocks_output_receiver: UnboundedReceiver<CertifiedBlocksOutput>,
1393    pub(crate) dag_state: Arc<RwLock<DagState>>,
1394    pub(crate) store: Arc<MemStore>,
1395}
1396
1397#[cfg(test)]
1398impl CoreTextFixture {
1399    async fn new(
1400        context: Context,
1401        authorities: Vec<Stake>,
1402        own_index: AuthorityIndex,
1403        sync_last_known_own_block: bool,
1404    ) -> Self {
1405        let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
1406        let mut context = context.clone();
1407        context = context
1408            .with_committee(committee)
1409            .with_authority_index(own_index);
1410        context
1411            .protocol_config
1412            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
1413
1414        let context = Arc::new(context);
1415        let store = Arc::new(MemStore::new());
1416        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1417
1418        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1419        let leader_schedule = Arc::new(
1420            LeaderSchedule::from_store(context.clone(), dag_state.clone())
1421                .with_num_commits_per_schedule(10),
1422        );
1423        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1424        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1425        let (blocks_sender, _blocks_receiver) =
1426            mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
1427        let transaction_certifier = TransactionCertifier::new(
1428            context.clone(),
1429            Arc::new(NoopBlockVerifier {}),
1430            dag_state.clone(),
1431            blocks_sender,
1432        );
1433        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1434        // Need at least one subscriber to the block broadcast channel.
1435        let block_receiver = signal_receivers.block_broadcast_receiver();
1436
1437        let (commit_consumer, commit_output_receiver, blocks_output_receiver) =
1438            CommitConsumerArgs::new(0, 0);
1439        let commit_observer = CommitObserver::new(
1440            context.clone(),
1441            commit_consumer,
1442            dag_state.clone(),
1443            transaction_certifier.clone(),
1444            leader_schedule.clone(),
1445        )
1446        .await;
1447
1448        let block_signer = signers.remove(own_index.value()).1;
1449
1450        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1451        let core = Core::new(
1452            context,
1453            leader_schedule,
1454            transaction_consumer,
1455            transaction_certifier.clone(),
1456            block_manager,
1457            commit_observer,
1458            signals,
1459            block_signer,
1460            dag_state.clone(),
1461            sync_last_known_own_block,
1462            round_tracker,
1463        );
1464
1465        Self {
1466            core,
1467            transaction_certifier,
1468            signal_receivers,
1469            block_receiver,
1470            _commit_output_receiver: commit_output_receiver,
1471            _blocks_output_receiver: blocks_output_receiver,
1472            dag_state,
1473            store,
1474        }
1475    }
1476
1477    pub(crate) fn add_blocks(
1478        &mut self,
1479        blocks: Vec<VerifiedBlock>,
1480    ) -> ConsensusResult<BTreeSet<BlockRef>> {
1481        self.transaction_certifier
1482            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
1483        self.core.add_blocks(blocks)
1484    }
1485}
1486
1487#[cfg(test)]
1488mod test {
1489    use std::{collections::BTreeSet, time::Duration};
1490
1491    use consensus_config::{AuthorityIndex, Parameters};
1492    use consensus_types::block::TransactionIndex;
1493    use futures::{StreamExt, stream::FuturesUnordered};
1494    use mysten_metrics::monitored_mpsc;
1495    use sui_protocol_config::ProtocolConfig;
1496    use tokio::time::sleep;
1497
1498    use super::*;
1499    use crate::{
1500        CommitConsumerArgs, CommitIndex,
1501        block::{TestBlock, genesis_blocks},
1502        block_verifier::NoopBlockVerifier,
1503        commit::CommitAPI,
1504        leader_scoring::ReputationScores,
1505        storage::{Store, WriteBatch, mem_store::MemStore},
1506        test_dag_builder::DagBuilder,
1507        test_dag_parser::parse_dag,
1508        transaction::{BlockStatus, TransactionClient},
1509    };
1510
1511    /// Recover Core and continue proposing from the last round which forms a quorum.
1512    #[tokio::test]
1513    async fn test_core_recover_from_store_for_full_round() {
1514        telemetry_subscribers::init_for_testing();
1515        let (context, mut key_pairs) = Context::new_for_test(4);
1516        let context = Arc::new(context);
1517        let store = Arc::new(MemStore::new());
1518        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1519        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1520        let mut block_status_subscriptions = FuturesUnordered::new();
1521
1522        // Create test blocks for all the authorities for 4 rounds and populate them in store
1523        let mut last_round_blocks = genesis_blocks(&context);
1524        let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
1525        for round in 1..=4 {
1526            let mut this_round_blocks = Vec::new();
1527            for (index, _authority) in context.committee.authorities() {
1528                let block = VerifiedBlock::new_for_test(
1529                    TestBlock::new(round, index.value() as u32)
1530                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1531                        .build(),
1532                );
1533
1534                // If it's round 1, that one will be committed later on, and it's our "own" block, then subscribe to listen for the block status.
1535                if round == 1 && index == context.own_index {
1536                    let subscription =
1537                        transaction_consumer.subscribe_for_block_status_testing(block.reference());
1538                    block_status_subscriptions.push(subscription);
1539                }
1540
1541                this_round_blocks.push(block);
1542            }
1543            all_blocks.extend(this_round_blocks.clone());
1544            last_round_blocks = this_round_blocks;
1545        }
1546        // write them in store
1547        store
1548            .write(WriteBatch::default().blocks(all_blocks))
1549            .expect("Storage error");
1550
1551        // create dag state after all blocks have been written to store
1552        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1553        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1554        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1555            context.clone(),
1556            dag_state.clone(),
1557        ));
1558        let (blocks_sender, _blocks_receiver) =
1559            monitored_mpsc::unbounded_channel("consensus_block_output");
1560        let transaction_certifier = TransactionCertifier::new(
1561            context.clone(),
1562            Arc::new(NoopBlockVerifier {}),
1563            dag_state.clone(),
1564            blocks_sender,
1565        );
1566
1567        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1568            CommitConsumerArgs::new(0, 0);
1569        let commit_observer = CommitObserver::new(
1570            context.clone(),
1571            commit_consumer,
1572            dag_state.clone(),
1573            transaction_certifier.clone(),
1574            leader_schedule.clone(),
1575        )
1576        .await;
1577
1578        // Check no commits have been persisted to dag_state or store.
1579        let last_commit = store.read_last_commit().unwrap();
1580        assert!(last_commit.is_none());
1581        assert_eq!(dag_state.read().last_commit_index(), 0);
1582
1583        // Now spin up core
1584        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1585        let (blocks_sender, _blocks_receiver) =
1586            monitored_mpsc::unbounded_channel("consensus_block_output");
1587        let transaction_certifier = TransactionCertifier::new(
1588            context.clone(),
1589            Arc::new(NoopBlockVerifier {}),
1590            dag_state.clone(),
1591            blocks_sender,
1592        );
1593        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1594        // Need at least one subscriber to the block broadcast channel.
1595        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1596        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1597        let _core = Core::new(
1598            context.clone(),
1599            leader_schedule,
1600            transaction_consumer,
1601            transaction_certifier.clone(),
1602            block_manager,
1603            commit_observer,
1604            signals,
1605            key_pairs.remove(context.own_index.value()).1,
1606            dag_state.clone(),
1607            false,
1608            round_tracker,
1609        );
1610
1611        // New round should be 5
1612        let mut new_round = signal_receivers.new_round_receiver();
1613        assert_eq!(*new_round.borrow_and_update(), 5);
1614
1615        // Block for round 5 should have been proposed.
1616        let proposed_block = block_receiver
1617            .recv()
1618            .await
1619            .expect("A block should have been created");
1620        assert_eq!(proposed_block.block.round(), 5);
1621        let ancestors = proposed_block.block.ancestors();
1622
1623        // Only ancestors of round 4 should be included.
1624        assert_eq!(ancestors.len(), 4);
1625        for ancestor in ancestors {
1626            assert_eq!(ancestor.round, 4);
1627        }
1628
1629        // Flush the DAG state to storage.
1630        dag_state.write().flush();
1631
1632        // There were no commits prior to the core starting up but there was completed
1633        // rounds up to and including round 4. So we should commit leaders in round 1 & 2
1634        // as soon as the new block for round 5 is proposed.
1635        let last_commit = store
1636            .read_last_commit()
1637            .unwrap()
1638            .expect("last commit should be set");
1639        assert_eq!(last_commit.index(), 2);
1640        assert_eq!(dag_state.read().last_commit_index(), 2);
1641        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1642        assert_eq!(all_stored_commits.len(), 2);
1643
1644        // And ensure that our "own" block 1 sent to TransactionConsumer as notification alongside with gc_round
1645        while let Some(result) = block_status_subscriptions.next().await {
1646            let status = result.unwrap();
1647            assert!(matches!(status, BlockStatus::Sequenced(_)));
1648        }
1649    }
1650
1651    /// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't
1652    /// proposed for that round yet.
1653    #[tokio::test]
1654    async fn test_core_recover_from_store_for_partial_round() {
1655        telemetry_subscribers::init_for_testing();
1656
1657        let (context, mut key_pairs) = Context::new_for_test(4);
1658        let context = Arc::new(context);
1659        let store = Arc::new(MemStore::new());
1660        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1661        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1662
1663        // Create test blocks for all authorities except our's (index = 0).
1664        let mut last_round_blocks = genesis_blocks(&context);
1665        let mut all_blocks = last_round_blocks.clone();
1666        for round in 1..=4 {
1667            let mut this_round_blocks = Vec::new();
1668
1669            // For round 4 only produce f+1 blocks. Skip our validator 0 and that of position 1 from creating blocks.
1670            let authorities_to_skip = if round == 4 {
1671                context.committee.validity_threshold() as usize
1672            } else {
1673                // otherwise always skip creating a block for our authority
1674                1
1675            };
1676
1677            for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
1678                let block = TestBlock::new(round, index.value() as u32)
1679                    .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
1680                    .build();
1681                this_round_blocks.push(VerifiedBlock::new_for_test(block));
1682            }
1683            all_blocks.extend(this_round_blocks.clone());
1684            last_round_blocks = this_round_blocks;
1685        }
1686
1687        // write them in store
1688        store
1689            .write(WriteBatch::default().blocks(all_blocks))
1690            .expect("Storage error");
1691
1692        // create dag state after all blocks have been written to store
1693        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1694        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1695        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1696            context.clone(),
1697            dag_state.clone(),
1698        ));
1699        let (blocks_sender, _blocks_receiver) =
1700            monitored_mpsc::unbounded_channel("consensus_block_output");
1701        let transaction_certifier = TransactionCertifier::new(
1702            context.clone(),
1703            Arc::new(NoopBlockVerifier {}),
1704            dag_state.clone(),
1705            blocks_sender,
1706        );
1707
1708        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1709            CommitConsumerArgs::new(0, 0);
1710        let commit_observer = CommitObserver::new(
1711            context.clone(),
1712            commit_consumer,
1713            dag_state.clone(),
1714            transaction_certifier.clone(),
1715            leader_schedule.clone(),
1716        )
1717        .await;
1718
1719        // Check no commits have been persisted to dag_state & store
1720        let last_commit = store.read_last_commit().unwrap();
1721        assert!(last_commit.is_none());
1722        assert_eq!(dag_state.read().last_commit_index(), 0);
1723
1724        // Now spin up core
1725        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1726        let (blocks_sender, _blocks_receiver) =
1727            monitored_mpsc::unbounded_channel("consensus_block_output");
1728        let transaction_certifier = TransactionCertifier::new(
1729            context.clone(),
1730            Arc::new(NoopBlockVerifier {}),
1731            dag_state.clone(),
1732            blocks_sender,
1733        );
1734        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
1735        // Need at least one subscriber to the block broadcast channel.
1736        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1737        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1738        let mut core = Core::new(
1739            context.clone(),
1740            leader_schedule,
1741            transaction_consumer,
1742            transaction_certifier,
1743            block_manager,
1744            commit_observer,
1745            signals,
1746            key_pairs.remove(context.own_index.value()).1,
1747            dag_state.clone(),
1748            false,
1749            round_tracker,
1750        );
1751
1752        // Clock round should have advanced to 5 during recovery because
1753        // a quorum has formed in round 4.
1754        let mut new_round = signal_receivers.new_round_receiver();
1755        assert_eq!(*new_round.borrow_and_update(), 5);
1756
1757        // During recovery, round 4 block should have been proposed.
1758        let proposed_block = block_receiver
1759            .recv()
1760            .await
1761            .expect("A block should have been created");
1762        assert_eq!(proposed_block.block.round(), 4);
1763        let ancestors = proposed_block.block.ancestors();
1764
1765        assert_eq!(ancestors.len(), 4);
1766        for ancestor in ancestors {
1767            if ancestor.author == context.own_index {
1768                assert_eq!(ancestor.round, 0);
1769            } else {
1770                assert_eq!(ancestor.round, 3);
1771            }
1772        }
1773
1774        // Run commit rule.
1775        core.try_commit(vec![]).ok();
1776
1777        // Flush the DAG state to storage.
1778        core.dag_state.write().flush();
1779
1780        // There were no commits prior to the core starting up but there was completed
1781        // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
1782        // as the new block for round 4 is proposed.
1783        let last_commit = store
1784            .read_last_commit()
1785            .unwrap()
1786            .expect("last commit should be set");
1787        assert_eq!(last_commit.index(), 2);
1788        assert_eq!(dag_state.read().last_commit_index(), 2);
1789        let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
1790        assert_eq!(all_stored_commits.len(), 2);
1791    }
1792
1793    #[tokio::test]
1794    async fn test_core_propose_after_genesis() {
1795        telemetry_subscribers::init_for_testing();
1796        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
1797            config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
1798            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
1799            config
1800        });
1801
1802        let (context, mut key_pairs) = Context::new_for_test(4);
1803        let context = Arc::new(context);
1804        let store = Arc::new(MemStore::new());
1805        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1806
1807        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
1808        let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1809        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1810        let (blocks_sender, _blocks_receiver) =
1811            monitored_mpsc::unbounded_channel("consensus_block_output");
1812        let transaction_certifier = TransactionCertifier::new(
1813            context.clone(),
1814            Arc::new(NoopBlockVerifier {}),
1815            dag_state.clone(),
1816            blocks_sender,
1817        );
1818        let (signals, signal_receivers) = CoreSignals::new(context.clone());
1819        // Need at least one subscriber to the block broadcast channel.
1820        let mut block_receiver = signal_receivers.block_broadcast_receiver();
1821        let leader_schedule = Arc::new(LeaderSchedule::from_store(
1822            context.clone(),
1823            dag_state.clone(),
1824        ));
1825
1826        let (commit_consumer, _commit_receiver, _transaction_receiver) =
1827            CommitConsumerArgs::new(0, 0);
1828        let commit_observer = CommitObserver::new(
1829            context.clone(),
1830            commit_consumer,
1831            dag_state.clone(),
1832            transaction_certifier.clone(),
1833            leader_schedule.clone(),
1834        )
1835        .await;
1836
1837        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1838        let mut core = Core::new(
1839            context.clone(),
1840            leader_schedule,
1841            transaction_consumer,
1842            transaction_certifier,
1843            block_manager,
1844            commit_observer,
1845            signals,
1846            key_pairs.remove(context.own_index.value()).1,
1847            dag_state.clone(),
1848            false,
1849            round_tracker,
1850        );
1851
1852        // Send some transactions
1853        let mut total = 0;
1854        let mut index = 0;
1855        loop {
1856            let transaction =
1857                bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
1858            total += transaction.len();
1859            index += 1;
1860            let _w = transaction_client
1861                .submit_no_wait(vec![transaction])
1862                .await
1863                .unwrap();
1864
1865            // Create total size of transactions up to 1KB
1866            if total >= 1_000 {
1867                break;
1868            }
1869        }
1870
1871        // a new block should have been created during recovery.
1872        let extended_block = block_receiver
1873            .recv()
1874            .await
1875            .expect("A new block should have been created");
1876
1877        // A new block created - assert the details
1878        assert_eq!(extended_block.block.round(), 1);
1879        assert_eq!(extended_block.block.author().value(), 0);
1880        assert_eq!(extended_block.block.ancestors().len(), 4);
1881
1882        let mut total = 0;
1883        for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
1884            total += transaction.data().len() as u64;
1885            let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
1886            assert_eq!(format!("Transaction {i}"), transaction);
1887        }
1888        assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
1889
1890        // genesis blocks should be referenced
1891        let all_genesis = genesis_blocks(&context);
1892
1893        for ancestor in extended_block.block.ancestors() {
1894            all_genesis
1895                .iter()
1896                .find(|block| block.reference() == *ancestor)
1897                .expect("Block should be found amongst genesis blocks");
1898        }
1899
1900        // Try to propose again - with or without ignore leaders check, it will not return any block
1901        assert!(core.try_propose(false).unwrap().is_none());
1902        assert!(core.try_propose(true).unwrap().is_none());
1903
1904        // Flush the DAG state to storage.
1905        dag_state.write().flush();
1906
1907        // Check no commits have been persisted to dag_state & store
1908        let last_commit = store.read_last_commit().unwrap();
1909        assert!(last_commit.is_none());
1910        assert_eq!(dag_state.read().last_commit_index(), 0);
1911    }
1912
1913    #[tokio::test]
1914    async fn test_core_propose_once_receiving_a_quorum() {
1915        telemetry_subscribers::init_for_testing();
1916        let (context, _key_pairs) = Context::new_for_test(4);
1917        let mut core_fixture = CoreTextFixture::new(
1918            context.clone(),
1919            vec![1, 1, 1, 1],
1920            AuthorityIndex::new_for_test(0),
1921            false,
1922        )
1923        .await;
1924        let transaction_certifier = &core_fixture.transaction_certifier;
1925        let store = &core_fixture.store;
1926        let dag_state = &core_fixture.dag_state;
1927        let core = &mut core_fixture.core;
1928
1929        let mut expected_ancestors = BTreeSet::new();
1930
1931        // Adding one block now will trigger the creation of new block for round 1
1932        let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
1933        expected_ancestors.insert(block_1.reference());
1934        // Wait for min round delay to allow blocks to be proposed.
1935        sleep(context.parameters.min_round_delay).await;
1936        // add blocks to trigger proposal.
1937        transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
1938        _ = core.add_blocks(vec![block_1]);
1939
1940        assert_eq!(core.last_proposed_round(), 1);
1941        expected_ancestors.insert(core.last_proposed_block().reference());
1942        // attempt to create a block - none will be produced.
1943        assert!(core.try_propose(false).unwrap().is_none());
1944
1945        // Adding another block now forms a quorum for round 1, so block at round 2 will proposed
1946        let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
1947        expected_ancestors.insert(block_2.reference());
1948        // Wait for min round delay to allow blocks to be proposed.
1949        sleep(context.parameters.min_round_delay).await;
1950        // add blocks to trigger proposal.
1951        transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
1952        _ = core.add_blocks(vec![block_2.clone()]);
1953
1954        assert_eq!(core.last_proposed_round(), 2);
1955
1956        let proposed_block = core.last_proposed_block();
1957        assert_eq!(proposed_block.round(), 2);
1958        assert_eq!(proposed_block.author(), context.own_index);
1959        assert_eq!(proposed_block.ancestors().len(), 3);
1960        let ancestors = proposed_block.ancestors();
1961        let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
1962        assert_eq!(ancestors, expected_ancestors);
1963
1964        let transaction_votes = proposed_block.transaction_votes();
1965        assert_eq!(transaction_votes.len(), 1);
1966        let transaction_vote = transaction_votes.first().unwrap();
1967        assert_eq!(transaction_vote.block_ref, block_2.reference());
1968        assert_eq!(transaction_vote.rejects, vec![1, 4]);
1969
1970        // Flush the DAG state to storage.
1971        dag_state.write().flush();
1972
1973        // Check no commits have been persisted to dag_state & store
1974        let last_commit = store.read_last_commit().unwrap();
1975        assert!(last_commit.is_none());
1976        assert_eq!(dag_state.read().last_commit_index(), 0);
1977    }
1978
1979    #[tokio::test]
1980    async fn test_commit_and_notify_for_block_status() {
1981        telemetry_subscribers::init_for_testing();
1982        let (mut context, mut key_pairs) = Context::new_for_test(4);
1983        const GC_DEPTH: u32 = 2;
1984
1985        context
1986            .protocol_config
1987            .set_consensus_gc_depth_for_testing(GC_DEPTH);
1988
1989        let context = Arc::new(context);
1990
1991        let store = Arc::new(MemStore::new());
1992        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
1993        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
1994        let mut block_status_subscriptions = FuturesUnordered::new();
1995
1996        let dag_str = "DAG {
1997            Round 0 : { 4 },
1998            Round 1 : { * },
1999            Round 2 : { * },
2000            Round 3 : {
2001                A -> [*],
2002                B -> [-A2],
2003                C -> [-A2],
2004                D -> [-A2],
2005            },
2006            Round 4 : { 
2007                B -> [-A3],
2008                C -> [-A3],
2009                D -> [-A3],
2010            },
2011            Round 5 : { 
2012                A -> [A3, B4, C4, D4]
2013                B -> [*],
2014                C -> [*],
2015                D -> [*],
2016            },
2017            Round 6 : { * },
2018            Round 7 : { * },
2019            Round 8 : { * },
2020        }";
2021
2022        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2023        dag_builder.print();
2024
2025        // Subscribe to all created "own" blocks. We know that for our node (A) we'll be able to commit up to round 5.
2026        for block in dag_builder.blocks(1..=5) {
2027            if block.author() == context.own_index {
2028                let subscription =
2029                    transaction_consumer.subscribe_for_block_status_testing(block.reference());
2030                block_status_subscriptions.push(subscription);
2031            }
2032        }
2033
2034        // write them in store
2035        store
2036            .write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
2037            .expect("Storage error");
2038
2039        // create dag state after all blocks have been written to store
2040        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2041        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2042        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2043            context.clone(),
2044            dag_state.clone(),
2045        ));
2046        let (blocks_sender, _blocks_receiver) =
2047            monitored_mpsc::unbounded_channel("consensus_block_output");
2048        let transaction_certifier = TransactionCertifier::new(
2049            context.clone(),
2050            Arc::new(NoopBlockVerifier {}),
2051            dag_state.clone(),
2052            blocks_sender,
2053        );
2054
2055        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2056            CommitConsumerArgs::new(0, 0);
2057        let commit_observer = CommitObserver::new(
2058            context.clone(),
2059            commit_consumer,
2060            dag_state.clone(),
2061            transaction_certifier.clone(),
2062            leader_schedule.clone(),
2063        )
2064        .await;
2065
2066        // Flush the DAG state to storage.
2067        dag_state.write().flush();
2068
2069        // Check no commits have been persisted to dag_state or store.
2070        let last_commit = store.read_last_commit().unwrap();
2071        assert!(last_commit.is_none());
2072        assert_eq!(dag_state.read().last_commit_index(), 0);
2073
2074        // Now recover Core and other components.
2075        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2076        let (blocks_sender, _blocks_receiver) =
2077            monitored_mpsc::unbounded_channel("consensus_block_output");
2078        let transaction_certifier = TransactionCertifier::new(
2079            context.clone(),
2080            Arc::new(NoopBlockVerifier {}),
2081            dag_state.clone(),
2082            blocks_sender,
2083        );
2084        transaction_certifier.recover_blocks_after_round(dag_state.read().gc_round());
2085        // Need at least one subscriber to the block broadcast channel.
2086        let _block_receiver = signal_receivers.block_broadcast_receiver();
2087        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2088        let _core = Core::new(
2089            context.clone(),
2090            leader_schedule,
2091            transaction_consumer,
2092            transaction_certifier,
2093            block_manager,
2094            commit_observer,
2095            signals,
2096            key_pairs.remove(context.own_index.value()).1,
2097            dag_state.clone(),
2098            false,
2099            round_tracker,
2100        );
2101
2102        // Flush the DAG state to storage.
2103        dag_state.write().flush();
2104
2105        let last_commit = store
2106            .read_last_commit()
2107            .unwrap()
2108            .expect("last commit should be set");
2109
2110        assert_eq!(last_commit.index(), 5);
2111
2112        while let Some(result) = block_status_subscriptions.next().await {
2113            let status = result.unwrap();
2114
2115            match status {
2116                BlockStatus::Sequenced(block_ref) => {
2117                    assert!(block_ref.round == 1 || block_ref.round == 5);
2118                }
2119                BlockStatus::GarbageCollected(block_ref) => {
2120                    assert!(block_ref.round == 2 || block_ref.round == 3);
2121                }
2122            }
2123        }
2124    }
2125
2126    // Tests that the threshold clock advances when blocks get unsuspended due to GC'ed blocks and newly created blocks are always higher
2127    // than the last advanced gc round.
2128    #[tokio::test]
2129    async fn test_multiple_commits_advance_threshold_clock() {
2130        telemetry_subscribers::init_for_testing();
2131        let (mut context, mut key_pairs) = Context::new_for_test(4);
2132        const GC_DEPTH: u32 = 2;
2133
2134        context
2135            .protocol_config
2136            .set_consensus_gc_depth_for_testing(GC_DEPTH);
2137
2138        let context = Arc::new(context);
2139
2140        let store = Arc::new(MemStore::new());
2141        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2142        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2143
2144        // On round 1 we do produce the block for authority D but we do not link it until round 6. This is making round 6 unable to get processed
2145        // until leader of round 3 is committed where round 1 gets garbage collected.
2146        // Then we add more rounds so we can trigger a commit for leader of round 9 which will move the gc round to 7.
2147        let dag_str = "DAG {
2148            Round 0 : { 4 },
2149            Round 1 : { * },
2150            Round 2 : { 
2151                B -> [-D1],
2152                C -> [-D1],
2153                D -> [-D1],
2154            },
2155            Round 3 : {
2156                B -> [*],
2157                C -> [*]
2158                D -> [*],
2159            },
2160            Round 4 : { 
2161                A -> [*],
2162                B -> [*],
2163                C -> [*]
2164                D -> [*],
2165            },
2166            Round 5 : { 
2167                A -> [*],
2168                B -> [*],
2169                C -> [*],
2170                D -> [*],
2171            },
2172            Round 6 : { 
2173                B -> [A5, B5, C5, D1],
2174                C -> [A5, B5, C5, D1],
2175                D -> [A5, B5, C5, D1],
2176            },
2177            Round 7 : { 
2178                B -> [*],
2179                C -> [*],
2180                D -> [*],
2181            },
2182            Round 8 : { 
2183                B -> [*],
2184                C -> [*],
2185                D -> [*],
2186            },
2187            Round 9 : { 
2188                B -> [*],
2189                C -> [*],
2190                D -> [*],
2191            },
2192            Round 10 : { 
2193                B -> [*],
2194                C -> [*],
2195                D -> [*],
2196            },
2197            Round 11 : { 
2198                B -> [*],
2199                C -> [*],
2200                D -> [*],
2201            },
2202        }";
2203
2204        let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
2205        dag_builder.print();
2206
2207        // create dag state after all blocks have been written to store
2208        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2209        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2210        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2211            context.clone(),
2212            dag_state.clone(),
2213        ));
2214        let (blocks_sender, _blocks_receiver) =
2215            monitored_mpsc::unbounded_channel("consensus_block_output");
2216        let transaction_certifier = TransactionCertifier::new(
2217            context.clone(),
2218            Arc::new(NoopBlockVerifier {}),
2219            dag_state.clone(),
2220            blocks_sender,
2221        );
2222
2223        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2224            CommitConsumerArgs::new(0, 0);
2225        let commit_observer = CommitObserver::new(
2226            context.clone(),
2227            commit_consumer,
2228            dag_state.clone(),
2229            transaction_certifier.clone(),
2230            leader_schedule.clone(),
2231        )
2232        .await;
2233
2234        // Flush the DAG state to storage.
2235        dag_state.write().flush();
2236
2237        // Check no commits have been persisted to dag_state or store.
2238        let last_commit = store.read_last_commit().unwrap();
2239        assert!(last_commit.is_none());
2240        assert_eq!(dag_state.read().last_commit_index(), 0);
2241
2242        // Now spin up core
2243        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2244        let (blocks_sender, _blocks_receiver) =
2245            monitored_mpsc::unbounded_channel("consensus_block_output");
2246        let transaction_certifier = TransactionCertifier::new(
2247            context.clone(),
2248            Arc::new(NoopBlockVerifier {}),
2249            dag_state.clone(),
2250            blocks_sender,
2251        );
2252        // Need at least one subscriber to the block broadcast channel.
2253        let _block_receiver = signal_receivers.block_broadcast_receiver();
2254        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2255        let mut core = Core::new(
2256            context.clone(),
2257            leader_schedule,
2258            transaction_consumer,
2259            transaction_certifier.clone(),
2260            block_manager,
2261            commit_observer,
2262            signals,
2263            key_pairs.remove(context.own_index.value()).1,
2264            dag_state.clone(),
2265            true,
2266            round_tracker,
2267        );
2268        // We set the last known round to 4 so we avoid creating new blocks until then - otherwise it will crash as the already created DAG contains blocks for this
2269        // authority.
2270        core.set_last_known_proposed_round(4);
2271
2272        // We add all the blocks except D1. The only ones we can immediately accept are the ones up to round 5 as they don't have a dependency on D1. Rest of blocks do have causal dependency
2273        // to D1 so they can't be processed until the leader of round 3 can get committed and gc round moves to 1. That will make all the blocks that depend to D1 get accepted.
2274        // However, our threshold clock is now at round 6 as the last quorum that we managed to process was the round 5.
2275        // As commits happen blocks of later rounds get accepted and more leaders get committed. Eventually the leader of round 9 gets committed and gc is moved to 9 - 2 = 7.
2276        // If our node attempts to produce a block for the threshold clock 6, that will make the acceptance checks fail as now gc has moved far past this round.
2277        let mut all_blocks = dag_builder.blocks(1..=11);
2278        all_blocks.sort_by_key(|b| b.round());
2279        let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
2280            all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
2281        transaction_certifier.add_voted_blocks(voted_blocks);
2282        let blocks: Vec<VerifiedBlock> = all_blocks
2283            .into_iter()
2284            .filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
2285            .collect();
2286        core.add_blocks(blocks).expect("Should not fail");
2287
2288        assert_eq!(core.last_proposed_round(), 12);
2289    }
2290
2291    #[tokio::test]
2292    async fn test_core_set_min_propose_round() {
2293        telemetry_subscribers::init_for_testing();
2294        let (context, mut key_pairs) = Context::new_for_test(4);
2295        let context = Arc::new(context.with_parameters(Parameters {
2296            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2297            ..Default::default()
2298        }));
2299
2300        let store = Arc::new(MemStore::new());
2301        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2302
2303        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2304        let leader_schedule = Arc::new(LeaderSchedule::from_store(
2305            context.clone(),
2306            dag_state.clone(),
2307        ));
2308
2309        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2310        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2311        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2312        let (blocks_sender, _blocks_receiver) =
2313            monitored_mpsc::unbounded_channel("consensus_block_output");
2314        let transaction_certifier = TransactionCertifier::new(
2315            context.clone(),
2316            Arc::new(NoopBlockVerifier {}),
2317            dag_state.clone(),
2318            blocks_sender,
2319        );
2320        // Need at least one subscriber to the block broadcast channel.
2321        let _block_receiver = signal_receivers.block_broadcast_receiver();
2322
2323        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2324            CommitConsumerArgs::new(0, 0);
2325        let commit_observer = CommitObserver::new(
2326            context.clone(),
2327            commit_consumer,
2328            dag_state.clone(),
2329            transaction_certifier.clone(),
2330            leader_schedule.clone(),
2331        )
2332        .await;
2333
2334        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2335        let mut core = Core::new(
2336            context.clone(),
2337            leader_schedule,
2338            transaction_consumer,
2339            transaction_certifier.clone(),
2340            block_manager,
2341            commit_observer,
2342            signals,
2343            key_pairs.remove(context.own_index.value()).1,
2344            dag_state.clone(),
2345            true,
2346            round_tracker,
2347        );
2348
2349        // No new block should have been produced
2350        assert_eq!(
2351            core.last_proposed_round(),
2352            GENESIS_ROUND,
2353            "No block should have been created other than genesis"
2354        );
2355
2356        // Trying to explicitly propose a block will not produce anything
2357        assert!(core.try_propose(true).unwrap().is_none());
2358
2359        // Create blocks for the whole network - even "our" node in order to replicate an "amnesia" recovery.
2360        let mut builder = DagBuilder::new(context.clone());
2361        builder.layers(1..=10).build();
2362
2363        let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
2364
2365        // Process all the blocks
2366        transaction_certifier
2367            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2368        assert!(core.add_blocks(blocks).unwrap().is_empty());
2369
2370        core.round_tracker.write().update_from_probe(
2371            vec![
2372                vec![10, 10, 10, 10],
2373                vec![10, 10, 10, 10],
2374                vec![10, 10, 10, 10],
2375                vec![10, 10, 10, 10],
2376            ],
2377            vec![
2378                vec![10, 10, 10, 10],
2379                vec![10, 10, 10, 10],
2380                vec![10, 10, 10, 10],
2381                vec![10, 10, 10, 10],
2382            ],
2383        );
2384
2385        // Try to propose - no block should be produced.
2386        assert!(core.try_propose(true).unwrap().is_none());
2387
2388        // Now set the last known proposed round which is the highest round for which the network informed
2389        // us that we do have proposed a block about.
2390        core.set_last_known_proposed_round(10);
2391
2392        let block = core.try_propose(true).expect("No error").unwrap();
2393        assert_eq!(block.round(), 11);
2394        assert_eq!(block.ancestors().len(), 4);
2395
2396        let our_ancestor_included = block.ancestors()[0];
2397        assert_eq!(our_ancestor_included.author, context.own_index);
2398        assert_eq!(our_ancestor_included.round, 10);
2399    }
2400
2401    #[tokio::test(flavor = "current_thread", start_paused = true)]
2402    async fn test_core_try_new_block_leader_timeout() {
2403        telemetry_subscribers::init_for_testing();
2404
2405        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2406        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2407        // Core's clock will have initialised potentially with different values but it never advances.
2408        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2409        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2410        // tokio clock.
2411        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2412            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2413            let now = context.clock.timestamp_utc_ms();
2414            let max_timestamp = blocks
2415                .iter()
2416                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2417                .map(|block| block.timestamp_ms())
2418                .unwrap_or(0);
2419
2420            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2421            sleep(wait_time).await;
2422        }
2423
2424        let (context, _) = Context::new_for_test(4);
2425        // Create the cores for all authorities
2426        let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
2427
2428        // Create blocks for rounds 1..=3 from all Cores except last Core of authority 3, so we miss the block from it. As
2429        // it will be the leader of round 3 then no-one will be able to progress to round 4 unless we explicitly trigger
2430        // the block creation.
2431        // create the cores and their signals for all the authorities
2432        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2433
2434        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
2435        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2436        for round in 1..=3 {
2437            let mut this_round_blocks = Vec::new();
2438
2439            for core_fixture in cores.iter_mut() {
2440                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2441
2442                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2443
2444                // Only when round > 1 and using non-genesis parents.
2445                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2446                    assert_eq!(round - 1, r);
2447                    if core_fixture.core.last_proposed_round() == r {
2448                        // Force propose new block regardless of min round delay.
2449                        core_fixture
2450                            .core
2451                            .try_propose(true)
2452                            .unwrap()
2453                            .unwrap_or_else(|| {
2454                                panic!("Block should have been proposed for round {}", round)
2455                            });
2456                    }
2457                }
2458
2459                assert_eq!(core_fixture.core.last_proposed_round(), round);
2460
2461                this_round_blocks.push(core_fixture.core.last_proposed_block());
2462            }
2463
2464            last_round_blocks = this_round_blocks;
2465        }
2466
2467        // Try to create the blocks for round 4 by calling the try_propose() method. No block should be created as the
2468        // leader - authority 3 - hasn't proposed any block.
2469        for core_fixture in cores.iter_mut() {
2470            wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2471
2472            core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2473            assert!(core_fixture.core.try_propose(false).unwrap().is_none());
2474        }
2475
2476        // Now try to create the blocks for round 4 via the leader timeout method which should
2477        // ignore any leader checks or min round delay.
2478        for core_fixture in cores.iter_mut() {
2479            assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
2480            assert_eq!(core_fixture.core.last_proposed_round(), 4);
2481
2482            // Flush the DAG state to storage.
2483            core_fixture.dag_state.write().flush();
2484
2485            // Check commits have been persisted to store
2486            let last_commit = core_fixture
2487                .store
2488                .read_last_commit()
2489                .unwrap()
2490                .expect("last commit should be set");
2491            // There are 1 leader rounds with rounds completed up to and including
2492            // round 4
2493            assert_eq!(last_commit.index(), 1);
2494            let all_stored_commits = core_fixture
2495                .store
2496                .scan_commits((0..=CommitIndex::MAX).into())
2497                .unwrap();
2498            assert_eq!(all_stored_commits.len(), 1);
2499        }
2500    }
2501
2502    #[tokio::test(flavor = "current_thread", start_paused = true)]
2503    async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
2504        telemetry_subscribers::init_for_testing();
2505
2506        // Since we run the test with started_paused = true, any time-dependent operations using Tokio's time
2507        // facilities, such as tokio::time::sleep or tokio::time::Instant, will not advance. So practically each
2508        // Core's clock will have initialised potentially with different values but it never advances.
2509        // To ensure that blocks won't get rejected by cores we'll need to manually wait for the time
2510        // diff before processing them. By calling the `tokio::time::sleep` we implicitly also advance the
2511        // tokio clock.
2512        async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
2513            // Simulate the time wait before processing a block to ensure that block.timestamp <= now
2514            let now = context.clock.timestamp_utc_ms();
2515            let max_timestamp = blocks
2516                .iter()
2517                .max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
2518                .map(|block| block.timestamp_ms())
2519                .unwrap_or(0);
2520
2521            let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
2522            sleep(wait_time).await;
2523        }
2524
2525        let (mut context, _) = Context::new_for_test(5);
2526        context
2527            .protocol_config
2528            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2529
2530        // Create the cores for all authorities
2531        let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
2532        let (_last_core, cores) = all_cores.split_last_mut().unwrap();
2533
2534        // Create blocks for rounds 1..=30 from all Cores except last Core of authority 4.
2535        let mut last_round_blocks = Vec::<VerifiedBlock>::new();
2536        for round in 1..=30 {
2537            let mut this_round_blocks = Vec::new();
2538
2539            for core_fixture in cores.iter_mut() {
2540                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2541
2542                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2543
2544                core_fixture.core.round_tracker.write().update_from_probe(
2545                    vec![
2546                        vec![round, round, round, round, 0],
2547                        vec![round, round, round, round, 0],
2548                        vec![round, round, round, round, 0],
2549                        vec![round, round, round, round, 0],
2550                        vec![0, 0, 0, 0, 0],
2551                    ],
2552                    vec![
2553                        vec![round, round, round, round, 0],
2554                        vec![round, round, round, round, 0],
2555                        vec![round, round, round, round, 0],
2556                        vec![round, round, round, round, 0],
2557                        vec![0, 0, 0, 0, 0],
2558                    ],
2559                );
2560
2561                // Only when round > 1 and using non-genesis parents.
2562                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2563                    assert_eq!(round - 1, r);
2564                    if core_fixture.core.last_proposed_round() == r {
2565                        // Force propose new block regardless of min round delay.
2566                        core_fixture
2567                            .core
2568                            .try_propose(true)
2569                            .unwrap()
2570                            .unwrap_or_else(|| {
2571                                panic!("Block should have been proposed for round {}", round)
2572                            });
2573                    }
2574                }
2575
2576                assert_eq!(core_fixture.core.last_proposed_round(), round);
2577
2578                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2579            }
2580
2581            last_round_blocks = this_round_blocks;
2582        }
2583
2584        // Now produce blocks for all Cores
2585        for round in 31..=40 {
2586            let mut this_round_blocks = Vec::new();
2587
2588            for core_fixture in all_cores.iter_mut() {
2589                wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
2590
2591                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
2592
2593                // Don't update probed rounds for authority 3 so it will remain
2594                // excluded
2595                core_fixture.core.round_tracker.write().update_from_probe(
2596                    vec![
2597                        vec![round, round, round, round, 0],
2598                        vec![round, round, round, round, 0],
2599                        vec![round, round, round, round, 0],
2600                        vec![round, round, round, round, 0],
2601                        vec![0, 0, 0, 0, 0],
2602                    ],
2603                    vec![
2604                        vec![round, round, round, round, 0],
2605                        vec![round, round, round, round, 0],
2606                        vec![round, round, round, round, 0],
2607                        vec![round, round, round, round, 0],
2608                        vec![0, 0, 0, 0, 0],
2609                    ],
2610                );
2611
2612                // Only when round > 1 and using non-genesis parents.
2613                if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
2614                    assert_eq!(round - 1, r);
2615                    if core_fixture.core.last_proposed_round() == r {
2616                        // Force propose new block regardless of min round delay.
2617                        core_fixture
2618                            .core
2619                            .try_propose(true)
2620                            .unwrap()
2621                            .unwrap_or_else(|| {
2622                                panic!("Block should have been proposed for round {}", round)
2623                            });
2624                    }
2625                }
2626
2627                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
2628
2629                for block in this_round_blocks.iter() {
2630                    if block.author() != AuthorityIndex::new_for_test(4) {
2631                        // Assert blocks created include only 4 ancestors per block as one
2632                        // should be excluded
2633                        assert_eq!(block.ancestors().len(), 4);
2634                    } else {
2635                        // Authority 3 is the low scoring authority so it will still include
2636                        // its own blocks.
2637                        assert_eq!(block.ancestors().len(), 5);
2638                    }
2639                }
2640            }
2641
2642            last_round_blocks = this_round_blocks;
2643        }
2644    }
2645
2646    #[tokio::test]
2647    async fn test_smart_ancestor_selection() {
2648        telemetry_subscribers::init_for_testing();
2649        let (mut context, mut key_pairs) = Context::new_for_test(7);
2650        context
2651            .protocol_config
2652            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
2653        let context = Arc::new(context.with_parameters(Parameters {
2654            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2655            ..Default::default()
2656        }));
2657
2658        let store = Arc::new(MemStore::new());
2659        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2660
2661        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2662        let leader_schedule = Arc::new(
2663            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2664                .with_num_commits_per_schedule(10),
2665        );
2666
2667        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2668        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2669        let (blocks_sender, _blocks_receiver) =
2670            monitored_mpsc::unbounded_channel("consensus_block_output");
2671        let transaction_certifier = TransactionCertifier::new(
2672            context.clone(),
2673            Arc::new(NoopBlockVerifier {}),
2674            dag_state.clone(),
2675            blocks_sender,
2676        );
2677        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2678        // Need at least one subscriber to the block broadcast channel.
2679        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2680
2681        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2682            CommitConsumerArgs::new(0, 0);
2683        let commit_observer = CommitObserver::new(
2684            context.clone(),
2685            commit_consumer,
2686            dag_state.clone(),
2687            transaction_certifier.clone(),
2688            leader_schedule.clone(),
2689        )
2690        .await;
2691
2692        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2693        let mut core = Core::new(
2694            context.clone(),
2695            leader_schedule,
2696            transaction_consumer,
2697            transaction_certifier.clone(),
2698            block_manager,
2699            commit_observer,
2700            signals,
2701            key_pairs.remove(context.own_index.value()).1,
2702            dag_state.clone(),
2703            true,
2704            round_tracker.clone(),
2705        );
2706
2707        // No new block should have been produced
2708        assert_eq!(
2709            core.last_proposed_round(),
2710            GENESIS_ROUND,
2711            "No block should have been created other than genesis"
2712        );
2713
2714        // Trying to explicitly propose a block will not produce anything
2715        assert!(core.try_propose(true).unwrap().is_none());
2716
2717        // Create blocks for the whole network but not for authority 1
2718        let mut builder = DagBuilder::new(context.clone());
2719        builder
2720            .layers(1..=12)
2721            .authorities(vec![AuthorityIndex::new_for_test(1)])
2722            .skip_block()
2723            .build();
2724        let blocks = builder.blocks(1..=12);
2725        // Process all the blocks
2726        transaction_certifier
2727            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2728        assert!(core.add_blocks(blocks).unwrap().is_empty());
2729        core.set_last_known_proposed_round(12);
2730
2731        round_tracker.write().update_from_probe(
2732            vec![
2733                vec![12, 12, 12, 12, 12, 12, 12],
2734                vec![0, 0, 0, 0, 0, 0, 0],
2735                vec![12, 12, 12, 12, 12, 12, 12],
2736                vec![12, 12, 12, 12, 12, 12, 12],
2737                vec![12, 12, 12, 12, 12, 12, 12],
2738                vec![12, 12, 12, 12, 12, 12, 12],
2739                vec![12, 12, 12, 12, 12, 12, 12],
2740            ],
2741            vec![
2742                vec![12, 12, 12, 12, 12, 12, 12],
2743                vec![0, 0, 0, 0, 0, 0, 0],
2744                vec![12, 12, 12, 12, 12, 12, 12],
2745                vec![12, 12, 12, 12, 12, 12, 12],
2746                vec![12, 12, 12, 12, 12, 12, 12],
2747                vec![12, 12, 12, 12, 12, 12, 12],
2748                vec![12, 12, 12, 12, 12, 12, 12],
2749            ],
2750        );
2751
2752        let block = core.try_propose(true).expect("No error").unwrap();
2753        assert_eq!(block.round(), 13);
2754        assert_eq!(block.ancestors().len(), 7);
2755
2756        // Build blocks for rest of the network other than own index
2757        builder
2758            .layers(13..=14)
2759            .authorities(vec![AuthorityIndex::new_for_test(0)])
2760            .skip_block()
2761            .build();
2762        let blocks = builder.blocks(13..=14);
2763        transaction_certifier
2764            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2765        assert!(core.add_blocks(blocks).unwrap().is_empty());
2766
2767        // We now have triggered a leader schedule change so we should have
2768        // one EXCLUDE authority (1) when we go to select ancestors for the next proposal
2769        let block = core.try_propose(true).expect("No error").unwrap();
2770        assert_eq!(block.round(), 15);
2771        assert_eq!(block.ancestors().len(), 6);
2772
2773        // Build blocks for a quorum of the network including the EXCLUDE authority (1)
2774        // which will trigger smart select and we will not propose a block
2775        let round_14_ancestors = builder.last_ancestors.clone();
2776        builder
2777            .layer(15)
2778            .authorities(vec![
2779                AuthorityIndex::new_for_test(0),
2780                AuthorityIndex::new_for_test(5),
2781                AuthorityIndex::new_for_test(6),
2782            ])
2783            .skip_block()
2784            .build();
2785        let blocks = builder.blocks(15..=15);
2786        let authority_1_excluded_block_reference = blocks
2787            .iter()
2788            .find(|block| block.author() == AuthorityIndex::new_for_test(1))
2789            .unwrap()
2790            .reference();
2791        // Wait for min round delay to allow blocks to be proposed.
2792        sleep(context.parameters.min_round_delay).await;
2793        // Smart select should be triggered and no block should be proposed.
2794        transaction_certifier
2795            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2796        assert!(core.add_blocks(blocks).unwrap().is_empty());
2797        assert_eq!(core.last_proposed_block().round(), 15);
2798
2799        builder
2800            .layer(15)
2801            .authorities(vec![
2802                AuthorityIndex::new_for_test(0),
2803                AuthorityIndex::new_for_test(1),
2804                AuthorityIndex::new_for_test(2),
2805                AuthorityIndex::new_for_test(3),
2806                AuthorityIndex::new_for_test(4),
2807            ])
2808            .skip_block()
2809            .override_last_ancestors(round_14_ancestors)
2810            .build();
2811        let blocks = builder.blocks(15..=15);
2812        let round_15_ancestors: Vec<BlockRef> = blocks
2813            .iter()
2814            .filter(|block| block.round() == 15)
2815            .map(|block| block.reference())
2816            .collect();
2817        let included_block_references = iter::once(&core.last_proposed_block())
2818            .chain(blocks.iter())
2819            .filter(|block| block.author() != AuthorityIndex::new_for_test(1))
2820            .map(|block| block.reference())
2821            .collect::<Vec<_>>();
2822
2823        // Have enough ancestor blocks to propose now.
2824        transaction_certifier
2825            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2826        assert!(core.add_blocks(blocks).unwrap().is_empty());
2827        assert_eq!(core.last_proposed_block().round(), 16);
2828
2829        // Check that a new block has been proposed & signaled.
2830        let extended_block = loop {
2831            let extended_block =
2832                tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2833                    .await
2834                    .unwrap()
2835                    .unwrap();
2836            if extended_block.block.round() == 16 {
2837                break extended_block;
2838            }
2839        };
2840        assert_eq!(extended_block.block.round(), 16);
2841        assert_eq!(extended_block.block.author(), core.context.own_index);
2842        assert_eq!(extended_block.block.ancestors().len(), 6);
2843        assert_eq!(extended_block.block.ancestors(), included_block_references);
2844        assert_eq!(extended_block.excluded_ancestors.len(), 1);
2845        assert_eq!(
2846            extended_block.excluded_ancestors[0],
2847            authority_1_excluded_block_reference
2848        );
2849
2850        // Build blocks for a quorum of the network including the EXCLUDE ancestor
2851        // which will trigger smart select and we will not propose a block.
2852        // This time we will force propose by hitting the leader timeout after which
2853        // should cause us to include this EXCLUDE ancestor.
2854        builder
2855            .layer(16)
2856            .authorities(vec![
2857                AuthorityIndex::new_for_test(0),
2858                AuthorityIndex::new_for_test(5),
2859                AuthorityIndex::new_for_test(6),
2860            ])
2861            .skip_block()
2862            .override_last_ancestors(round_15_ancestors)
2863            .build();
2864        let blocks = builder.blocks(16..=16);
2865        // Wait for leader timeout to force blocks to be proposed.
2866        sleep(context.parameters.min_round_delay).await;
2867        // Smart select should be triggered and no block should be proposed.
2868        transaction_certifier
2869            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2870        assert!(core.add_blocks(blocks).unwrap().is_empty());
2871        assert_eq!(core.last_proposed_block().round(), 16);
2872
2873        // Simulate a leader timeout and a force proposal where we will include
2874        // one EXCLUDE ancestor when we go to select ancestors for the next proposal
2875        let block = core.try_propose(true).expect("No error").unwrap();
2876        assert_eq!(block.round(), 17);
2877        assert_eq!(block.ancestors().len(), 5);
2878
2879        // Check that a new block has been proposed & signaled.
2880        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2881            .await
2882            .unwrap()
2883            .unwrap();
2884        assert_eq!(extended_block.block.round(), 17);
2885        assert_eq!(extended_block.block.author(), core.context.own_index);
2886        assert_eq!(extended_block.block.ancestors().len(), 5);
2887        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2888
2889        // Excluded authority is locked until round 20, simulate enough rounds to
2890        // unlock
2891        builder
2892            .layers(17..=22)
2893            .authorities(vec![AuthorityIndex::new_for_test(0)])
2894            .skip_block()
2895            .build();
2896        let blocks = builder.blocks(17..=22);
2897
2898        // Simulate updating received and accepted rounds from prober.
2899        // New quorum rounds for authority can then be computed which will unlock
2900        // the Excluded authority (1) and then we should be able to create a new
2901        // layer of blocks which will then all be included as ancestors for the
2902        // next proposal
2903        round_tracker.write().update_from_probe(
2904            vec![
2905                vec![22, 22, 22, 22, 22, 22, 22],
2906                vec![22, 22, 22, 22, 22, 22, 22],
2907                vec![22, 22, 22, 22, 22, 22, 22],
2908                vec![22, 22, 22, 22, 22, 22, 22],
2909                vec![22, 22, 22, 22, 22, 22, 22],
2910                vec![22, 22, 22, 22, 22, 22, 22],
2911                vec![22, 22, 22, 22, 22, 22, 22],
2912            ],
2913            vec![
2914                vec![22, 22, 22, 22, 22, 22, 22],
2915                vec![22, 22, 22, 22, 22, 22, 22],
2916                vec![22, 22, 22, 22, 22, 22, 22],
2917                vec![22, 22, 22, 22, 22, 22, 22],
2918                vec![22, 22, 22, 22, 22, 22, 22],
2919                vec![22, 22, 22, 22, 22, 22, 22],
2920                vec![22, 22, 22, 22, 22, 22, 22],
2921            ],
2922        );
2923
2924        let included_block_references = iter::once(&core.last_proposed_block())
2925            .chain(blocks.iter())
2926            .filter(|block| block.round() == 22 || block.author() == core.context.own_index)
2927            .map(|block| block.reference())
2928            .collect::<Vec<_>>();
2929
2930        // Have enough ancestor blocks to propose now.
2931        sleep(context.parameters.min_round_delay).await;
2932        transaction_certifier
2933            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
2934        assert!(core.add_blocks(blocks).unwrap().is_empty());
2935        assert_eq!(core.last_proposed_block().round(), 23);
2936
2937        // Check that a new block has been proposed & signaled.
2938        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
2939            .await
2940            .unwrap()
2941            .unwrap();
2942        assert_eq!(extended_block.block.round(), 23);
2943        assert_eq!(extended_block.block.author(), core.context.own_index);
2944        assert_eq!(extended_block.block.ancestors().len(), 7);
2945        assert_eq!(extended_block.block.ancestors(), included_block_references);
2946        assert_eq!(extended_block.excluded_ancestors.len(), 0);
2947    }
2948
2949    #[tokio::test]
2950    async fn test_excluded_ancestor_limit() {
2951        telemetry_subscribers::init_for_testing();
2952        let (context, mut key_pairs) = Context::new_for_test(4);
2953        let context = Arc::new(context.with_parameters(Parameters {
2954            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2955            ..Default::default()
2956        }));
2957
2958        let store = Arc::new(MemStore::new());
2959        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
2960
2961        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
2962        let leader_schedule = Arc::new(
2963            LeaderSchedule::from_store(context.clone(), dag_state.clone())
2964                .with_num_commits_per_schedule(10),
2965        );
2966
2967        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
2968        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
2969        let (blocks_sender, _blocks_receiver) =
2970            monitored_mpsc::unbounded_channel("consensus_block_output");
2971        let transaction_certifier = TransactionCertifier::new(
2972            context.clone(),
2973            Arc::new(NoopBlockVerifier {}),
2974            dag_state.clone(),
2975            blocks_sender,
2976        );
2977        let (signals, signal_receivers) = CoreSignals::new(context.clone());
2978        // Need at least one subscriber to the block broadcast channel.
2979        let mut block_receiver = signal_receivers.block_broadcast_receiver();
2980
2981        let (commit_consumer, _commit_receiver, _transaction_receiver) =
2982            CommitConsumerArgs::new(0, 0);
2983        let commit_observer = CommitObserver::new(
2984            context.clone(),
2985            commit_consumer,
2986            dag_state.clone(),
2987            transaction_certifier.clone(),
2988            leader_schedule.clone(),
2989        )
2990        .await;
2991
2992        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
2993        let mut core = Core::new(
2994            context.clone(),
2995            leader_schedule,
2996            transaction_consumer,
2997            transaction_certifier.clone(),
2998            block_manager,
2999            commit_observer,
3000            signals,
3001            key_pairs.remove(context.own_index.value()).1,
3002            dag_state.clone(),
3003            true,
3004            round_tracker,
3005        );
3006
3007        // No new block should have been produced
3008        assert_eq!(
3009            core.last_proposed_round(),
3010            GENESIS_ROUND,
3011            "No block should have been created other than genesis"
3012        );
3013
3014        // Create blocks for the whole network
3015        let mut builder = DagBuilder::new(context.clone());
3016        builder.layers(1..=3).build();
3017
3018        // This will equivocate 9 blocks for authority 1 which will be excluded on
3019        // the proposal but because of the limits set will be dropped and not included
3020        // as part of the ExtendedBlock structure sent to the rest of the network
3021        builder
3022            .layer(4)
3023            .authorities(vec![AuthorityIndex::new_for_test(1)])
3024            .equivocate(9)
3025            .build();
3026        let blocks = builder.blocks(1..=4);
3027
3028        // Process all the blocks
3029        transaction_certifier
3030            .add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
3031        assert!(core.add_blocks(blocks).unwrap().is_empty());
3032        core.set_last_known_proposed_round(3);
3033
3034        let block = core.try_propose(true).expect("No error").unwrap();
3035        assert_eq!(block.round(), 5);
3036        assert_eq!(block.ancestors().len(), 4);
3037
3038        // Check that a new block has been proposed & signaled.
3039        let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
3040            .await
3041            .unwrap()
3042            .unwrap();
3043        assert_eq!(extended_block.block.round(), 5);
3044        assert_eq!(extended_block.block.author(), core.context.own_index);
3045        assert_eq!(extended_block.block.ancestors().len(), 4);
3046        assert_eq!(extended_block.excluded_ancestors.len(), 8);
3047    }
3048
3049    #[tokio::test]
3050    async fn test_core_set_propagation_delay_per_authority() {
3051        // TODO: create helper to avoid the duplicated code here.
3052        telemetry_subscribers::init_for_testing();
3053        let (context, mut key_pairs) = Context::new_for_test(4);
3054        let context = Arc::new(context);
3055        let store = Arc::new(MemStore::new());
3056        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3057
3058        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3059        let leader_schedule = Arc::new(LeaderSchedule::from_store(
3060            context.clone(),
3061            dag_state.clone(),
3062        ));
3063
3064        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3065        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3066        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3067        let (blocks_sender, _blocks_receiver) =
3068            monitored_mpsc::unbounded_channel("consensus_block_output");
3069        let transaction_certifier = TransactionCertifier::new(
3070            context.clone(),
3071            Arc::new(NoopBlockVerifier {}),
3072            dag_state.clone(),
3073            blocks_sender,
3074        );
3075        // Need at least one subscriber to the block broadcast channel.
3076        let _block_receiver = signal_receivers.block_broadcast_receiver();
3077
3078        let (commit_consumer, _commit_receiver, _transaction_receiver) =
3079            CommitConsumerArgs::new(0, 0);
3080        let commit_observer = CommitObserver::new(
3081            context.clone(),
3082            commit_consumer,
3083            dag_state.clone(),
3084            transaction_certifier.clone(),
3085            leader_schedule.clone(),
3086        )
3087        .await;
3088
3089        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3090        let mut core = Core::new(
3091            context.clone(),
3092            leader_schedule,
3093            transaction_consumer,
3094            transaction_certifier.clone(),
3095            block_manager,
3096            commit_observer,
3097            signals,
3098            key_pairs.remove(context.own_index.value()).1,
3099            dag_state.clone(),
3100            false,
3101            round_tracker.clone(),
3102        );
3103
3104        // Use a large propagation delay to disable proposing.
3105        // This is done by accepting an own block at round 1000 to dag state and
3106        // then simulating updating round tracker received rounds from probe where
3107        // low quorum round for own index should get calculated to round 0.
3108        let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
3109        transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
3110        // Force accepting the block to dag state because its causal history is incomplete.
3111        dag_state.write().accept_block(test_block);
3112
3113        round_tracker.write().update_from_probe(
3114            vec![
3115                vec![0, 0, 0, 0],
3116                vec![0, 0, 0, 0],
3117                vec![0, 0, 0, 0],
3118                vec![0, 0, 0, 0],
3119            ],
3120            vec![
3121                vec![0, 0, 0, 0],
3122                vec![0, 0, 0, 0],
3123                vec![0, 0, 0, 0],
3124                vec![0, 0, 0, 0],
3125            ],
3126        );
3127
3128        // There is no proposal even with forced proposing.
3129        assert!(core.try_propose(true).unwrap().is_none());
3130
3131        // Let Core know there is no propagation delay.
3132        // This is done by simulating updating round tracker recieved rounds from probe
3133        // where low quorum round for own index should get calculated to round 1000.
3134        round_tracker.write().update_from_probe(
3135            vec![
3136                vec![1000, 1000, 1000, 1000],
3137                vec![1000, 1000, 1000, 1000],
3138                vec![1000, 1000, 1000, 1000],
3139                vec![1000, 1000, 1000, 1000],
3140            ],
3141            vec![
3142                vec![1000, 1000, 1000, 1000],
3143                vec![1000, 1000, 1000, 1000],
3144                vec![1000, 1000, 1000, 1000],
3145                vec![1000, 1000, 1000, 1000],
3146            ],
3147        );
3148
3149        // Also add the necessary blocks from round 1000 so core will propose for
3150        // round 1001
3151        for author in 1..4 {
3152            let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
3153            transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
3154            // Force accepting the block to dag state because its causal history is incomplete.
3155            dag_state.write().accept_block(block);
3156        }
3157
3158        // Proposing now would succeed.
3159        assert!(core.try_propose(true).unwrap().is_some());
3160    }
3161
3162    #[tokio::test(flavor = "current_thread", start_paused = true)]
3163    async fn test_leader_schedule_change() {
3164        telemetry_subscribers::init_for_testing();
3165        let default_params = Parameters::default();
3166
3167        let (context, _) = Context::new_for_test(4);
3168        // create the cores and their signals for all the authorities
3169        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3170
3171        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3172        let mut last_round_blocks = Vec::new();
3173        for round in 1..=30 {
3174            let mut this_round_blocks = Vec::new();
3175
3176            // Wait for min round delay to allow blocks to be proposed.
3177            sleep(default_params.min_round_delay).await;
3178
3179            for core_fixture in &mut cores {
3180                // add the blocks from last round
3181                // this will trigger a block creation for the round and a signal should be emitted
3182                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3183
3184                core_fixture.core.round_tracker.write().update_from_probe(
3185                    vec![
3186                        vec![round, round, round, round],
3187                        vec![round, round, round, round],
3188                        vec![round, round, round, round],
3189                        vec![round, round, round, round],
3190                    ],
3191                    vec![
3192                        vec![round, round, round, round],
3193                        vec![round, round, round, round],
3194                        vec![round, round, round, round],
3195                        vec![round, round, round, round],
3196                    ],
3197                );
3198
3199                // A "new round" signal should be received given that all the blocks of previous round have been processed
3200                let new_round = receive(
3201                    Duration::from_secs(1),
3202                    core_fixture.signal_receivers.new_round_receiver(),
3203                )
3204                .await;
3205                assert_eq!(new_round, round);
3206
3207                // Check that a new block has been proposed.
3208                let extended_block = tokio::time::timeout(
3209                    Duration::from_secs(1),
3210                    core_fixture.block_receiver.recv(),
3211                )
3212                .await
3213                .unwrap()
3214                .unwrap();
3215                assert_eq!(extended_block.block.round(), round);
3216                assert_eq!(
3217                    extended_block.block.author(),
3218                    core_fixture.core.context.own_index
3219                );
3220
3221                // append the new block to this round blocks
3222                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3223
3224                let block = core_fixture.core.last_proposed_block();
3225
3226                // ensure that produced block is referring to the blocks of last_round
3227                assert_eq!(
3228                    block.ancestors().len(),
3229                    core_fixture.core.context.committee.size()
3230                );
3231                for ancestor in block.ancestors() {
3232                    if block.round() > 1 {
3233                        // don't bother with round 1 block which just contains the genesis blocks.
3234                        assert!(
3235                            last_round_blocks
3236                                .iter()
3237                                .any(|block| block.reference() == *ancestor),
3238                            "Reference from previous round should be added"
3239                        );
3240                    }
3241                }
3242            }
3243
3244            last_round_blocks = this_round_blocks;
3245        }
3246
3247        for core_fixture in cores {
3248            // Flush the DAG state to storage.
3249            core_fixture.dag_state.write().flush();
3250
3251            // Check commits have been persisted to store
3252            let last_commit = core_fixture
3253                .store
3254                .read_last_commit()
3255                .unwrap()
3256                .expect("last commit should be set");
3257            // There are 28 leader rounds with rounds completed up to and including
3258            // round 29. Round 30 blocks will only include their own blocks, so the
3259            // 28th leader will not be committed.
3260            assert_eq!(last_commit.index(), 27);
3261            let all_stored_commits = core_fixture
3262                .store
3263                .scan_commits((0..=CommitIndex::MAX).into())
3264                .unwrap();
3265            assert_eq!(all_stored_commits.len(), 27);
3266            assert_eq!(
3267                core_fixture
3268                    .core
3269                    .leader_schedule
3270                    .leader_swap_table
3271                    .read()
3272                    .bad_nodes
3273                    .len(),
3274                1
3275            );
3276            assert_eq!(
3277                core_fixture
3278                    .core
3279                    .leader_schedule
3280                    .leader_swap_table
3281                    .read()
3282                    .good_nodes
3283                    .len(),
3284                1
3285            );
3286            let expected_reputation_scores =
3287                ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
3288            assert_eq!(
3289                core_fixture
3290                    .core
3291                    .leader_schedule
3292                    .leader_swap_table
3293                    .read()
3294                    .reputation_scores,
3295                expected_reputation_scores
3296            );
3297        }
3298    }
3299
3300    #[tokio::test]
3301    async fn test_filter_new_commits() {
3302        telemetry_subscribers::init_for_testing();
3303
3304        let (context, _key_pairs) = Context::new_for_test(4);
3305        let context = context.with_parameters(Parameters {
3306            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3307            ..Default::default()
3308        });
3309
3310        let authority_index = AuthorityIndex::new_for_test(0);
3311        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3312        let mut core = core.core;
3313
3314        // No new block should have been produced
3315        assert_eq!(
3316            core.last_proposed_round(),
3317            GENESIS_ROUND,
3318            "No block should have been created other than genesis"
3319        );
3320
3321        // create a DAG of 12 rounds
3322        let mut dag_builder = DagBuilder::new(core.context.clone());
3323        dag_builder.layers(1..=12).build();
3324
3325        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3326        dag_builder.print();
3327        let blocks = dag_builder.blocks(1..=6);
3328
3329        for block in blocks {
3330            core.dag_state.write().accept_block(block);
3331        }
3332
3333        // Get all the committed sub dags up to round 10
3334        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3335
3336        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3337        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3338
3339        // We should have committed up to round 4
3340        assert_eq!(committed_sub_dags.len(), 4);
3341
3342        // Now validate the certified commits. We'll try 3 different scenarios:
3343        println!("Case 1. Provide certified commits that are all before the last committed round.");
3344
3345        // Highest certified commit should be for leader of round 4.
3346        let certified_commits = sub_dags_and_commits
3347            .iter()
3348            .take(4)
3349            .map(|(_, c)| c)
3350            .cloned()
3351            .collect::<Vec<_>>();
3352        assert!(
3353            certified_commits.last().unwrap().index()
3354                <= committed_sub_dags.last().unwrap().commit_ref.index,
3355            "Highest certified commit should older than the highest committed index."
3356        );
3357
3358        let certified_commits = core.filter_new_commits(certified_commits).unwrap();
3359
3360        // No commits should be processed
3361        assert!(certified_commits.is_empty());
3362
3363        println!("Case 2. Provide certified commits that are all after the last committed round.");
3364
3365        // Highest certified commit should be for leader of round 4.
3366        let certified_commits = sub_dags_and_commits
3367            .iter()
3368            .take(5)
3369            .map(|(_, c)| c.clone())
3370            .collect::<Vec<_>>();
3371
3372        let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
3373
3374        // The certified commit of index 5 should be processed.
3375        assert_eq!(certified_commits.len(), 1);
3376        assert_eq!(certified_commits.first().unwrap().reference().index, 5);
3377
3378        println!(
3379            "Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."
3380        );
3381
3382        // Highest certified commit should be for leader of round 4.
3383        let certified_commits = sub_dags_and_commits
3384            .iter()
3385            .skip(5)
3386            .take(1)
3387            .map(|(_, c)| c.clone())
3388            .collect::<Vec<_>>();
3389
3390        let err = core
3391            .filter_new_commits(certified_commits.clone())
3392            .unwrap_err();
3393        match err {
3394            ConsensusError::UnexpectedCertifiedCommitIndex {
3395                expected_commit_index: 5,
3396                commit_index: 6,
3397            } => (),
3398            _ => panic!("Unexpected error: {:?}", err),
3399        }
3400    }
3401
3402    #[tokio::test]
3403    async fn test_add_certified_commits() {
3404        telemetry_subscribers::init_for_testing();
3405
3406        let (context, _key_pairs) = Context::new_for_test(4);
3407        let context = context.with_parameters(Parameters {
3408            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3409            ..Default::default()
3410        });
3411
3412        let authority_index = AuthorityIndex::new_for_test(0);
3413        let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
3414        let store = core.store.clone();
3415        let mut core = core.core;
3416
3417        // No new block should have been produced
3418        assert_eq!(
3419            core.last_proposed_round(),
3420            GENESIS_ROUND,
3421            "No block should have been created other than genesis"
3422        );
3423
3424        // create a DAG of 12 rounds
3425        let mut dag_builder = DagBuilder::new(core.context.clone());
3426        dag_builder.layers(1..=12).build();
3427
3428        // Store all blocks up to round 6 which should be enough to decide up to leader 4
3429        dag_builder.print();
3430        let blocks = dag_builder.blocks(1..=6);
3431
3432        for block in blocks {
3433            core.dag_state.write().accept_block(block);
3434        }
3435
3436        // Get all the committed sub dags up to round 10
3437        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
3438
3439        // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
3440        let committed_sub_dags = core.try_commit(vec![]).unwrap();
3441
3442        // We should have committed up to round 4
3443        assert_eq!(committed_sub_dags.len(), 4);
3444
3445        // Flush the DAG state to storage.
3446        core.dag_state.write().flush();
3447
3448        println!("Case 1. Provide no certified commits. No commit should happen.");
3449
3450        let last_commit = store
3451            .read_last_commit()
3452            .unwrap()
3453            .expect("Last commit should be set");
3454        assert_eq!(last_commit.reference().index, 4);
3455
3456        println!(
3457            "Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."
3458        );
3459
3460        // The commits of leader rounds 5-8 should be committed via the certified commits.
3461        let certified_commits = sub_dags_and_commits
3462            .iter()
3463            .skip(3)
3464            .take(5)
3465            .map(|(_, c)| c.clone())
3466            .collect::<Vec<_>>();
3467
3468        // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be accepted via the certified commits processing.
3469        let blocks = dag_builder.blocks(8..=12);
3470        for block in blocks {
3471            core.dag_state.write().accept_block(block);
3472        }
3473
3474        // The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
3475        core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
3476            .expect("Should not fail");
3477
3478        // Flush the DAG state to storage.
3479        core.dag_state.write().flush();
3480
3481        let commits = store.scan_commits((6..=10).into()).unwrap();
3482
3483        // We expect all the sub dags up to leader round 10 to be committed.
3484        assert_eq!(commits.len(), 5);
3485
3486        for i in 6..=10 {
3487            let commit = &commits[i - 6];
3488            assert_eq!(commit.reference().index, i as u32);
3489        }
3490    }
3491
3492    #[tokio::test]
3493    async fn try_commit_with_certified_commits_gced_blocks() {
3494        const GC_DEPTH: u32 = 3;
3495        telemetry_subscribers::init_for_testing();
3496
3497        let (mut context, mut key_pairs) = Context::new_for_test(5);
3498        context
3499            .protocol_config
3500            .set_consensus_gc_depth_for_testing(GC_DEPTH);
3501        let context = Arc::new(context.with_parameters(Parameters {
3502            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
3503            ..Default::default()
3504        }));
3505
3506        let store = Arc::new(MemStore::new());
3507        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
3508
3509        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
3510        let leader_schedule = Arc::new(
3511            LeaderSchedule::from_store(context.clone(), dag_state.clone())
3512                .with_num_commits_per_schedule(10),
3513        );
3514
3515        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
3516        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
3517        let (signals, signal_receivers) = CoreSignals::new(context.clone());
3518        let (blocks_sender, _blocks_receiver) =
3519            monitored_mpsc::unbounded_channel("consensus_block_output");
3520        let transaction_certifier = TransactionCertifier::new(
3521            context.clone(),
3522            Arc::new(NoopBlockVerifier {}),
3523            dag_state.clone(),
3524            blocks_sender,
3525        );
3526        // Need at least one subscriber to the block broadcast channel.
3527        let _block_receiver = signal_receivers.block_broadcast_receiver();
3528
3529        let (commit_consumer, _commit_receiver, _transaction_receiver) =
3530            CommitConsumerArgs::new(0, 0);
3531        let commit_observer = CommitObserver::new(
3532            context.clone(),
3533            commit_consumer,
3534            dag_state.clone(),
3535            transaction_certifier.clone(),
3536            leader_schedule.clone(),
3537        )
3538        .await;
3539
3540        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
3541        let mut core = Core::new(
3542            context.clone(),
3543            leader_schedule,
3544            transaction_consumer,
3545            transaction_certifier.clone(),
3546            block_manager,
3547            commit_observer,
3548            signals,
3549            key_pairs.remove(context.own_index.value()).1,
3550            dag_state.clone(),
3551            true,
3552            round_tracker,
3553        );
3554
3555        // No new block should have been produced
3556        assert_eq!(
3557            core.last_proposed_round(),
3558            GENESIS_ROUND,
3559            "No block should have been created other than genesis"
3560        );
3561
3562        let dag_str = "DAG {
3563            Round 0 : { 5 },
3564            Round 1 : { * },
3565            Round 2 : { 
3566                A -> [-E1],
3567                B -> [-E1],
3568                C -> [-E1],
3569                D -> [-E1],
3570            },
3571            Round 3 : {
3572                A -> [*],
3573                B -> [*],
3574                C -> [*],
3575                D -> [*],
3576            },
3577            Round 4 : { 
3578                A -> [*],
3579                B -> [*],
3580                C -> [*],
3581                D -> [*],
3582            },
3583            Round 5 : { 
3584                A -> [*],
3585                B -> [*],
3586                C -> [*],
3587                D -> [*],
3588                E -> [A4, B4, C4, D4, E1]
3589            },
3590            Round 6 : { * },
3591            Round 7 : { * },
3592        }";
3593
3594        let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
3595        dag_builder.print();
3596
3597        // Now get all the committed sub dags from the DagBuilder
3598        let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
3599            .get_sub_dag_and_certified_commits(1..=5)
3600            .into_iter()
3601            .unzip();
3602
3603        // Now try to commit up to the latest leader (round = 5) with the provided certified commits. Not that we have not accepted any
3604        // blocks. That should happen during the commit process.
3605        let committed_sub_dags = core.try_commit(certified_commits).unwrap();
3606
3607        // We should have committed up to round 4
3608        assert_eq!(committed_sub_dags.len(), 4);
3609        for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
3610            assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
3611
3612            // ensure that block from E1 node has not been committed
3613            for block in committed_sub_dag.blocks.iter() {
3614                if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
3615                    panic!("Did not expect to commit block E1");
3616                }
3617            }
3618        }
3619    }
3620
3621    #[tokio::test(flavor = "current_thread", start_paused = true)]
3622    async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
3623        parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
3624    }
3625
3626    #[tokio::test(flavor = "current_thread", start_paused = true)]
3627    async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
3628        parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
3629    }
3630
3631    async fn parameterized_test_commit_on_leader_schedule_change_boundary(
3632        num_leaders_per_round: Option<usize>,
3633    ) {
3634        telemetry_subscribers::init_for_testing();
3635        let default_params = Parameters::default();
3636
3637        let (mut context, _) = Context::new_for_test(6);
3638        context
3639            .protocol_config
3640            .set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
3641        // create the cores and their signals for all the authorities
3642        let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
3643
3644        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3645        let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
3646        for round in 1..=33 {
3647            let mut this_round_blocks = Vec::new();
3648
3649            // Wait for min round delay to allow blocks to be proposed.
3650            sleep(default_params.min_round_delay).await;
3651
3652            for core_fixture in &mut cores {
3653                // add the blocks from last round
3654                // this will trigger a block creation for the round and a signal should be emitted
3655                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3656
3657                core_fixture.core.round_tracker.write().update_from_probe(
3658                    vec![
3659                        vec![round, round, round, round, round, round],
3660                        vec![round, round, round, round, round, round],
3661                        vec![round, round, round, round, round, round],
3662                        vec![round, round, round, round, round, round],
3663                        vec![round, round, round, round, round, round],
3664                        vec![round, round, round, round, round, round],
3665                    ],
3666                    vec![
3667                        vec![round, round, round, round, round, round],
3668                        vec![round, round, round, round, round, round],
3669                        vec![round, round, round, round, round, round],
3670                        vec![round, round, round, round, round, round],
3671                        vec![round, round, round, round, round, round],
3672                        vec![round, round, round, round, round, round],
3673                    ],
3674                );
3675
3676                // A "new round" signal should be received given that all the blocks of previous round have been processed
3677                let new_round = receive(
3678                    Duration::from_secs(1),
3679                    core_fixture.signal_receivers.new_round_receiver(),
3680                )
3681                .await;
3682                assert_eq!(new_round, round);
3683
3684                // Check that a new block has been proposed.
3685                let extended_block = tokio::time::timeout(
3686                    Duration::from_secs(1),
3687                    core_fixture.block_receiver.recv(),
3688                )
3689                .await
3690                .unwrap()
3691                .unwrap();
3692                assert_eq!(extended_block.block.round(), round);
3693                assert_eq!(
3694                    extended_block.block.author(),
3695                    core_fixture.core.context.own_index
3696                );
3697
3698                // append the new block to this round blocks
3699                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3700
3701                let block = core_fixture.core.last_proposed_block();
3702
3703                // ensure that produced block is referring to the blocks of last_round
3704                assert_eq!(
3705                    block.ancestors().len(),
3706                    core_fixture.core.context.committee.size()
3707                );
3708                for ancestor in block.ancestors() {
3709                    if block.round() > 1 {
3710                        // don't bother with round 1 block which just contains the genesis blocks.
3711                        assert!(
3712                            last_round_blocks
3713                                .iter()
3714                                .any(|block| block.reference() == *ancestor),
3715                            "Reference from previous round should be added"
3716                        );
3717                    }
3718                }
3719            }
3720
3721            last_round_blocks = this_round_blocks;
3722        }
3723
3724        for core_fixture in cores {
3725            // There are 31 leader rounds with rounds completed up to and including
3726            // round 33. Round 33 blocks will only include their own blocks, so there
3727            // should only be 30 commits.
3728            // However on a leader schedule change boundary its is possible for a
3729            // new leader to get selected for the same round if the leader elected
3730            // gets swapped allowing for multiple leaders to be committed at a round.
3731            // Meaning with multi leader per round explicitly set to 1 we will have 30,
3732            // otherwise 31.
3733            // NOTE: We used 31 leader rounds to specifically trigger the scenario
3734            // where the leader schedule boundary occurred AND we had a swap to a new
3735            // leader for the same round
3736            let expected_commit_count = match num_leaders_per_round {
3737                Some(1) => 30,
3738                _ => 31,
3739            };
3740
3741            // Flush the DAG state to storage.
3742            core_fixture.dag_state.write().flush();
3743
3744            // Check commits have been persisted to store
3745            let last_commit = core_fixture
3746                .store
3747                .read_last_commit()
3748                .unwrap()
3749                .expect("last commit should be set");
3750            assert_eq!(last_commit.index(), expected_commit_count);
3751            let all_stored_commits = core_fixture
3752                .store
3753                .scan_commits((0..=CommitIndex::MAX).into())
3754                .unwrap();
3755            assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
3756            assert_eq!(
3757                core_fixture
3758                    .core
3759                    .leader_schedule
3760                    .leader_swap_table
3761                    .read()
3762                    .bad_nodes
3763                    .len(),
3764                1
3765            );
3766            assert_eq!(
3767                core_fixture
3768                    .core
3769                    .leader_schedule
3770                    .leader_swap_table
3771                    .read()
3772                    .good_nodes
3773                    .len(),
3774                1
3775            );
3776            let expected_reputation_scores =
3777                ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
3778            assert_eq!(
3779                core_fixture
3780                    .core
3781                    .leader_schedule
3782                    .leader_swap_table
3783                    .read()
3784                    .reputation_scores,
3785                expected_reputation_scores
3786            );
3787        }
3788    }
3789
3790    #[tokio::test]
3791    async fn test_core_signals() {
3792        telemetry_subscribers::init_for_testing();
3793        let default_params = Parameters::default();
3794
3795        let (context, _) = Context::new_for_test(4);
3796        // create the cores and their signals for all the authorities
3797        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3798
3799        // Now iterate over a few rounds and ensure the corresponding signals are created while network advances
3800        let mut last_round_blocks = Vec::new();
3801        for round in 1..=10 {
3802            let mut this_round_blocks = Vec::new();
3803
3804            // Wait for min round delay to allow blocks to be proposed.
3805            sleep(default_params.min_round_delay).await;
3806
3807            for core_fixture in &mut cores {
3808                // add the blocks from last round
3809                // this will trigger a block creation for the round and a signal should be emitted
3810                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3811
3812                core_fixture.core.round_tracker.write().update_from_probe(
3813                    vec![
3814                        vec![round, round, round, round],
3815                        vec![round, round, round, round],
3816                        vec![round, round, round, round],
3817                        vec![round, round, round, round],
3818                    ],
3819                    vec![
3820                        vec![round, round, round, round],
3821                        vec![round, round, round, round],
3822                        vec![round, round, round, round],
3823                        vec![round, round, round, round],
3824                    ],
3825                );
3826
3827                // A "new round" signal should be received given that all the blocks of previous round have been processed
3828                let new_round = receive(
3829                    Duration::from_secs(1),
3830                    core_fixture.signal_receivers.new_round_receiver(),
3831                )
3832                .await;
3833                assert_eq!(new_round, round);
3834
3835                // Check that a new block has been proposed.
3836                let extended_block = tokio::time::timeout(
3837                    Duration::from_secs(1),
3838                    core_fixture.block_receiver.recv(),
3839                )
3840                .await
3841                .unwrap()
3842                .unwrap();
3843                assert_eq!(extended_block.block.round(), round);
3844                assert_eq!(
3845                    extended_block.block.author(),
3846                    core_fixture.core.context.own_index
3847                );
3848
3849                // append the new block to this round blocks
3850                this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
3851
3852                let block = core_fixture.core.last_proposed_block();
3853
3854                // ensure that produced block is referring to the blocks of last_round
3855                assert_eq!(
3856                    block.ancestors().len(),
3857                    core_fixture.core.context.committee.size()
3858                );
3859                for ancestor in block.ancestors() {
3860                    if block.round() > 1 {
3861                        // don't bother with round 1 block which just contains the genesis blocks.
3862                        assert!(
3863                            last_round_blocks
3864                                .iter()
3865                                .any(|block| block.reference() == *ancestor),
3866                            "Reference from previous round should be added"
3867                        );
3868                    }
3869                }
3870            }
3871
3872            last_round_blocks = this_round_blocks;
3873        }
3874
3875        for core_fixture in cores {
3876            // Flush the DAG state to storage.
3877            core_fixture.dag_state.write().flush();
3878            // Check commits have been persisted to store
3879            let last_commit = core_fixture
3880                .store
3881                .read_last_commit()
3882                .unwrap()
3883                .expect("last commit should be set");
3884            // There are 8 leader rounds with rounds completed up to and including
3885            // round 9. Round 10 blocks will only include their own blocks, so the
3886            // 8th leader will not be committed.
3887            assert_eq!(last_commit.index(), 7);
3888            let all_stored_commits = core_fixture
3889                .store
3890                .scan_commits((0..=CommitIndex::MAX).into())
3891                .unwrap();
3892            assert_eq!(all_stored_commits.len(), 7);
3893        }
3894    }
3895
3896    #[tokio::test]
3897    async fn test_core_compress_proposal_references() {
3898        telemetry_subscribers::init_for_testing();
3899        let default_params = Parameters::default();
3900
3901        let (context, _) = Context::new_for_test(4);
3902        // create the cores and their signals for all the authorities
3903        let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
3904
3905        let mut last_round_blocks = Vec::new();
3906        let mut all_blocks = Vec::new();
3907
3908        let excluded_authority = AuthorityIndex::new_for_test(3);
3909
3910        for round in 1..=10 {
3911            let mut this_round_blocks = Vec::new();
3912
3913            for core_fixture in &mut cores {
3914                // do not produce any block for authority 3
3915                if core_fixture.core.context.own_index == excluded_authority {
3916                    continue;
3917                }
3918
3919                // try to propose to ensure that we are covering the case where we miss the leader authority 3
3920                core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
3921                core_fixture.core.round_tracker.write().update_from_probe(
3922                    vec![
3923                        vec![round, round, round, round],
3924                        vec![round, round, round, round],
3925                        vec![round, round, round, round],
3926                        vec![round, round, round, round],
3927                    ],
3928                    vec![
3929                        vec![round, round, round, round],
3930                        vec![round, round, round, round],
3931                        vec![round, round, round, round],
3932                        vec![round, round, round, round],
3933                    ],
3934                );
3935                core_fixture.core.new_block(round, true).unwrap();
3936
3937                let block = core_fixture.core.last_proposed_block();
3938                assert_eq!(block.round(), round);
3939
3940                // append the new block to this round blocks
3941                this_round_blocks.push(block.clone());
3942            }
3943
3944            last_round_blocks = this_round_blocks.clone();
3945            all_blocks.extend(this_round_blocks);
3946        }
3947
3948        // Now send all the produced blocks to core of authority 3. It should produce a new block. If no compression would
3949        // be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression
3950        // is applied only the last round's (10) blocks should be referenced + the authority's block of round 0.
3951        let core_fixture = &mut cores[excluded_authority];
3952        // Wait for min round delay to allow blocks to be proposed.
3953        sleep(default_params.min_round_delay).await;
3954        // add blocks to trigger proposal.
3955        core_fixture.add_blocks(all_blocks).unwrap();
3956
3957        // Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and
3958        // to round 1 for its own block (created after recovery).
3959        let block = core_fixture.core.last_proposed_block();
3960        assert_eq!(block.round(), 11);
3961        assert_eq!(block.ancestors().len(), 4);
3962        for block_ref in block.ancestors() {
3963            if block_ref.author == excluded_authority {
3964                assert_eq!(block_ref.round, 1);
3965            } else {
3966                assert_eq!(block_ref.round, 10);
3967            }
3968        }
3969
3970        // Flush the DAG state to storage.
3971        core_fixture.dag_state.write().flush();
3972
3973        // Check commits have been persisted to store
3974        let last_commit = core_fixture
3975            .store
3976            .read_last_commit()
3977            .unwrap()
3978            .expect("last commit should be set");
3979        // There are 8 leader rounds with rounds completed up to and including
3980        // round 10. However because there were no blocks produced for authority 3
3981        // 2 leader rounds will be skipped.
3982        assert_eq!(last_commit.index(), 6);
3983        let all_stored_commits = core_fixture
3984            .store
3985            .scan_commits((0..=CommitIndex::MAX).into())
3986            .unwrap();
3987        assert_eq!(all_stored_commits.len(), 6);
3988    }
3989
3990    #[tokio::test]
3991    async fn try_select_certified_leaders() {
3992        // GIVEN
3993        telemetry_subscribers::init_for_testing();
3994
3995        let (context, _) = Context::new_for_test(4);
3996
3997        let authority_index = AuthorityIndex::new_for_test(0);
3998        let core =
3999            CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
4000        let mut core = core.core;
4001
4002        let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
4003        dag_builder.layers(1..=12).build();
4004
4005        let limit = 2;
4006
4007        let blocks = dag_builder.blocks(1..=12);
4008
4009        for block in blocks {
4010            core.dag_state.write().accept_block(block);
4011        }
4012
4013        // WHEN
4014        let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
4015        let mut certified_commits = sub_dags_and_commits
4016            .into_iter()
4017            .map(|(_, commit)| commit)
4018            .collect::<Vec<_>>();
4019
4020        let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
4021
4022        // THEN
4023        assert_eq!(leaders.len(), 2);
4024        assert_eq!(certified_commits.len(), 2);
4025    }
4026
4027    pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
4028        tokio::time::timeout(timeout, receiver.changed())
4029            .await
4030            .expect("Timeout while waiting to read from receiver")
4031            .expect("Signal receive channel shouldn't be closed");
4032        *receiver.borrow_and_update()
4033    }
4034}